- Newest
- Most votes
- Most comments
It seems like you're trying to handle ETL processes using AWS Glue with PySpark. The issue you're facing might be due to trying to access the schema directly on the DataFrame object, which could lead to errors. Here are some suggestions to fix the issue and improve the script:
-
Check for schema existence: Instead of checking if node_role_employee.schema() is not None, you should check if the DataFrame has data. You can use the isEmpty() method for this purpose.
-
Handle schema differences: If you're dealing with multiple tables, it's possible that their schemas might differ. Make sure to handle schema differences appropriately.
3)Error Handling: Implement error handling to catch any exceptions that might occur during the ETL process.
Here's the modified version of your script with these suggestions incorporated:
Your previous code...
Handle role employee
node_role_employee = glueContext.create_dynamic_frame_from_catalog( database=role, table_name="employee", transformation_ctx="node_role_employee", additional_options={"jobBookmarkKeys": ["id"], "jobBookmarkKeysSortOrder": "asc", "mergeSchema": "true"} ).toDF()
Check if the DataFrame has data
if not node_role_employee.isEmpty(): # Your DataFrame operations... # For example: w = Window.partitionBy("employeeId").orderBy(node_role_employee["transaction_seq"].desc()) dyfroleEmployeeRow = node_role_employee.withColumn("rownum", row_number().over(w)) dyfroleEmployeeFiltered = dyfroleEmployeeRow.filter((dyfroleEmployeeRow["rownum"] == 1) & (col("transaction_seq").isNotNull())).drop("rownum") dyfroleEmployeeDel = dyfroleEmployeeFiltered.filter(col("op") != "D")
dyfJoinEmployeeConfigCoalesce = dyfroleEmployeeDel.coalesce(1)
dyfJoinEmployeeConfigConvert = DynamicFrame.fromDF(dyfJoinEmployeeConfigCoalesce, glueContext, "convert")
# Write the data in the converted DynamicFrame to an S3 location
glueContext.write_dynamic_frame.from_options(
frame=dyfJoinEmployeeConfigConvert,
connection_type="s3",
connection_options={"path": f"s3://{curated_bucket}/{dataset_folder}/bookmark/employee/"},
format="parquet",
transformation_ctx="datasink1"
)
Handle role isoagent
node_role_agent = glueContext.create_dynamic_frame_from_catalog( database=role, table_name="isoagent", transformation_ctx="node_role_agent", additional_options={"jobBookmarkKeys": ["id"], "jobBookmarkKeysSortOrder": "asc", "mergeSchema": "true"} ).toDF()
Check if the DataFrame has data
if not node_role_agent.isEmpty(): # Your DataFrame operations...
Your previous code...
job.commit()
Ensure that you replace the placeholder comments with your actual DataFrame operations. This modification will ensure that the script proceeds to process each table independently without failing if one of them has no data. Additionally, consider implementing proper error handling for robustness.
Thank you so much for your help. The bookmark works fine now on script handling single table. However, I am also thinking of joining tables. I have a code as below and I have tried to add 'else' to handle the case that, say, table configuration doesn't have updates but table employee has, then, updated employee table would join configuration table with previous bookmarked record, but it fails. I am not sure if AWS bookmark is capable to do so.
node_role_config_3 = glueContext.create_dynamic_frame_from_catalog(
database=role,
table_name="configuration",
transformation_ctx="node_role_config_3",
additional_options={"jobBookmarkKeys":["id"], "jobBookmarkKeysSortOrder":"asc", "mergeSchema": "true"}
).toDF()
if not node_role_config_3.isEmpty():
w = Window.partitionBy("id").orderBy(node_role_config_3["transaction_seq"].desc())
dyfroleConfigRow = node_role_config_3.withColumn("rownum", row_number().over(w))
dyfroleConfigFiltered= dyfroleConfigRow.filter((dyfroleConfigRow["rownum"] == 1) & (col("transaction_seq").isNotNull())).drop("rownum")
dyfroleConfigDel = dyfroleConfigFiltered.filter(col("op") != "D")
dyfroleConfigSelect = dyfroleConfigDel.select(
col("merchant_config_id"))
##################################################################################
node_role_employee_3 = glueContext.create_dynamic_frame_from_catalog(
database=role,
table_name="employee",
transformation_ctx="node_role_employee_3",
additional_options={"jobBookmarkKeys":["employeeId"], "jobBookmarkKeysSortOrder":"asc", "mergeSchema": "true"}
).toDF()
if not node_role_employee_3.isEmpty():
logger.info("sorting employee table begins")
w = Window.partitionBy("employeeId").orderBy(node_role_employee_3["transaction_seq"].desc())
dyfroleEmployeeRow = node_role_employee_3.withColumn("rownum", row_number().over(w))
dyfroleEmployeeFiltered= dyfroleEmployeeRow.filter((dyfroleEmployeeRow["rownum"] == 1) & (col("transaction_seq").isNotNull())).drop("rownum")
dyfroleEmployeeDel = dyfroleEmployeeFiltered.filter(col("op") != "D")
######################################################################################
dyfJoinEmployeeConfig= dyfroleEmployeeSelect.join(
dyfroleConfigSelect,
dyfroleEmployeeSelect.employee_id == dyfroleConfigSelect.config_employee_id,
"left"
).drop(dyfroleConfigSelect.config_employee_id)
dyfJoinEmployeeConfigCoalesce = dyfJoinEmployeeConfig.coalesce(1)
# Convert from Spark DataFrame to Glue DynamicFrame
dyfJoinEmployeeConfigConvert = DynamicFrame.fromDF(dyfJoinEmployeeConfigCoalesce, glueContext, "convert")
# Write the data in the converted DynamicFrame to an S3 location
glueContext.write_dynamic_frame.from_options(
frame=dyfJoinEmployeeConfigConvert,
connection_type="s3",
connection_options={"path": f"s3://{curated_bucket}/{dataset_folder}/bookmark3/employee_config/"},
format="parquet",
transformation_ctx="datasink2"
)
job.commit()
Relevant content
- asked 6 months ago
- asked 2 years ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 months ago
- AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 2 months ago