C4W4 project 1 transformation zone job is failing

AnalysisException: Column ‘session_items’ does not exist. Did you mean one of the following? [user_since, user_id, user_location, user_name, ingest_on, user_lastname, processing_timestamp];

This is the error I am getting for the “de-c4w4a1-json-transform-job”

import json
import sys
from datetime import datetime

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import col, explode, lit, to_date

# Initialize Glue context and Spark session
# Take a look to the `args` object with the parameters required to run the Job
args = getResolvedOptions(
    sys.argv,
    [
        "JOB_NAME",
        # "lakehouse_path",
        # The Glue Catalog database name
        "catalog_database",
        # The date when data was ingested, formatted as your current date in Pacific Time (UTC -7) to match the server's timezone
        "ingest_date",
        # Object's path in the source Data Lake for the users data
        "users_source_path",
        # Object's path in the source Data Lake for the sessions data
        "sessions_source_path",
        # Object's path in the destination Data Lake
        "target_bucket_path",
        # Name of the users table inside the Glue Catalog Database
        "users_table",
        # Name of the sessions table inside the Glue Catalog Database.
        "sessions_table",
    ],
)

# The Glue Job has been configured to store data in Apache Iceberg Format
conf = SparkConf()
conf.set(
    "spark.sql.extensions",
    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
conf.set(
    "spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog"
)
conf.set(
    "spark.sql.catalog.glue_catalog.warehouse",
    f"s3://{args['target_bucket_path']}/transform_zone",
)
conf.set(
    "spark.sql.catalog.glue_catalog.catalog-impl",
    "org.apache.iceberg.aws.glue.GlueCatalog",
)
conf.set(
    "spark.sql.catalog.glue_catalog.io-impl",
    "org.apache.iceberg.aws.s3.S3FileIO",
)

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
logger = glueContext.get_logger()

# Define S3 paths
ingestion_timestamp = args["ingest_date"]
s3_path_users = f"{args['users_source_path']}/{ingestion_timestamp}"
s3_path_sessions = f"{args['sessions_source_path']}/{ingestion_timestamp}"

# Read JSON data from S3
# In this Glue job, the datasets are created directy as Spark Dataframes. Use the `s3_path_users` and `s3_path_sessions` as argument inside
# The `spark.read.json()` method respectively for the users, `users_df`, and sessions, `sessions_df`, dataframes
users_df = spark.read.json(s3_path_users)
sessions_df = spark.read.json(s3_path_sessions)

# Add ingestion and processing timestamp metadata
# Extract date for partitioning
users_df = users_df.withColumn(
    "ingest_on", to_date(lit(ingestion_timestamp), "yyyy-MM-dd")
)

# For the `users_df` you are already provided with the `"ingest_on"` column. On the other hand, remember from your initial exploration that there
# Is a field named `user_location`. This field is actually an array; you are already provided with an example on how to extract an element from
# This array to create the `"latitude"` column. Now you will create the `"longitude"`, `"place_name"`, `"country_code"` and `"timezone"` columns
# based on the other elements of the `user_location` field.
users_df = (
    users_df.withColumn("latitude", col("user_location")[0])
    .withColumn("longitude", col("user_location")[1])
    .withColumn("place_name", col("user_location")[2])
    .withColumn("country_code", col("user_location")[3])
    .withColumn("timezone", col("user_location")[4])
    .drop("user_location")
)

print("users_df schema", users_df.printSchema())

sessions_df = sessions_df.withColumn(
    "ingest_on", to_date(lit(ingestion_timestamp), "yyyy-MM-dd")
)
print("sessions_df schema", sessions_df.printSchema())


processing_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

users_df = users_df.withColumn(
    "processing_timestamp", lit(processing_timestamp)
)
sessions_df = sessions_df.withColumn(
    "processing_timestamp", lit(processing_timestamp)
)


# Unnest the session_items in sessions_df
# For the `sessions_df`, you have `session_items` field, which is a JSON object
# You will create a new item named `"session_item"` by applying the `explode()` method to the `session_items` field
sessions_df = sessions_df.withColumn("session_item", explode("session_items"))

# Then, you will apply the `select` method of the `sessions_df` Spark Dataframe with the `col()` method to select the required columns
sessions_df = sessions_df.select(
    col("user_id"),
    col("session_id"),
    col("session_item.song_id").alias("song_id"),
    col("session_item.song_name").alias("song_name"),
    col("session_item.artist_id").alias("artist_id"),
    col("session_item.artist_name").alias("artist_name"),
    col("session_item.price").alias("price"),
    col("session_item.currency").alias("currency"),
    col("session_item.liked").alias("liked"),
    col("session_item.liked_since").alias("liked_since"),
    col("user_agent"),
    col("session_start_time"),
    col("ingest_on"),
)

print("users_df schema", users_df.printSchema())
print("sessions_df schema", sessions_df.printSchema())

print("users_df schema", users_df.show(5))
print("sessions_df schema", sessions_df.show(5))


# Writing data
catalog_database = args["catalog_database"]
users_table_name = args["users_table"]
sessions_table_name = args["sessions_table"]
target_bucket_path = args["target_bucket_path"]

tables_collection = spark.catalog.listTables(catalog_database)
tables_in_db = [table.name for table in tables_collection]

users_table_exists = users_table_name in tables_in_db
sessions_table_exists = sessions_table_name in tables_in_db

print("Saving users_df")

# Users
if users_table_exists:

    users_df.writeTo(
        f"glue_catalog.{catalog_database}.{users_table_name}"
    ).tableProperty("format-version", "2").partitionedBy("ingest_on").append()

else:
    # Create (equivalent to CREATE TABLE AS SELECT)
    users_df.writeTo(
        f"glue_catalog.{catalog_database}.{users_table_name}"
    ).tableProperty("format-version", "2").partitionedBy(
        "ingest_on"
    ).createOrReplace()


print("Saved users_df")
print("Saving sessions_df")

# sessions
if sessions_table_exists:
    # Append (equivalent to INSERT INTO)
    sessions_df.writeTo(
        f"glue_catalog.{catalog_database}.{sessions_table_name}"
    ).tableProperty("format-version", "2").partitionedBy("ingest_on").append()

else:
    # Create (equivalent to CREATE TABLE AS SELECT)
    sessions_df.writeTo(
        f"glue_catalog.{catalog_database}.{sessions_table_name}"
    ).tableProperty("format-version", "2").partitionedBy(
        "ingest_on"
    ).createOrReplace()


print("Saved sessions_df")


# Commit job
job.commit()

Hello @UmairRauf,

Your code looks complete, there were only two places you need to change and I checked the other parts and looks good. I couldn’t reproduce the issue but I had a similar error with AnalysisException in the same job because of using previous date for PACIFIC-TIME-CURRENT-DATE. How does your other transform job looks songs-transform-job. Thank you

songs-transform-job was running successfully. Which two places do I need to change?

@UmairRauf your code in this file is correct those are the two places. Perhaps your issue is in the glue.tf files. Could you check there please, thanks:

users_df = spark.read.json(s3_path_users)
sessions_df = spark.read.json(s3_path_sessions)

Hello @UmairRauf,

It seems I could reproduce your issue. In line 109 of extract_job/glue.tf file you used /users two times. Instead use for api_sessions_ingestion_etl_job use /sessions not /users

   "--api_end_date"        = "2020-01-31"
    # Replace the placeholder <API-ENDPOINT> with the value from the CloudFormation outputs
    "--api_url"             = "http://ec2-*****.compute-1.amazonaws.com/sessions" <---Use /sessions instead of /users
    # Notice the target path. This line of the code code is complete - no changes are required
    "--target_path"         = "s3://${var.data_lake_bucket}/landing_zone/api/sessions"