Hello, I have a glue ETL job which runs every day, unlike Glue DataBrew, i cannot find an easy way to overwrite the existing data and the job will create 19 files each time on the run. My solution was to create a Lifecycle rule to delete the objects after one day, but it seems the minumum amount of time is 3 (expire after 1 day, then delete after expired for 1 day). This means that my Data Catalog ends up duplicate millions of lines until the old files are deleted.
I might be missing something, but can anyone advise on the best way to achieve this? either delete files 1 day after creation, or set glue ETL to overwrite the existing files instead of creating new ones.
Many thanks:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from recipe_transforms import *
from awsglue.dynamicframe import DynamicFrame
# Generated recipe steps for arken-document-transform-project-recipe
# Recipe version 3.0
def applyRecipe_node(inputFrame, glueContext, transformation_ctx):
frame = inputFrame.toDF()
gc = glueContext
df1 = DataStructure.JsonToStructs.apply(
data_frame=frame,
glue_context=gc,
transformation_ctx="transform-project-recipe-df1",
source_columns=["data"],
unnest_level=120,
)
df2 = DataStructure.UnnestStructNLevels.apply(
data_frame=df1,
glue_context=gc,
transformation_ctx="transform-project-recipe-df2",
source_columns=["data"],
unnest_level=1,
remove_source_column=True,
delimiter=".",
)
df3 = DataCleaning.ExtractBetweenDelimiters.apply(
data_frame=df2,
glue_context=gc,
transformation_ctx="transform-project-recipe-df3",
source_column="settings",
target_column="settings_Download",
start_pattern='"Download": "',
end_pattern='"',
)
df4 = DataCleaning.ExtractBetweenDelimiters.apply(
data_frame=df3,
glue_context=gc,
transformation_ctx="transform-project-recipe-df4",
source_column="settings",
target_column="settings_CurrentKey",
start_pattern='"CurrentKey": "',
end_pattern='"',
)
df5 = DataCleaning.ExtractBetweenDelimiters.apply(
data_frame=df4,
glue_context=gc,
transformation_ctx="transform-project-recipe-df5",
source_column="settings",
target_column="settings_Instruction",
start_pattern='"Instruction": ',
end_pattern=",",
)
df6 = Column.DeleteColumn.apply(
data_frame=df5,
glue_context=gc,
transformation_ctx="transform-project-df6",
source_columns=[
"content",
"data.AccountHashId",
"data.ClientHashId",
"data.Content",
"data.CreatedByHashId",
"data.CreatedOn",
"data.Data",
"data.DeletedOn",
",
],
)
return DynamicFrame.fromDF(df6, gc, transformation_ctx)
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node PostgreSQL table
PostgreSQLtable_node1 = glueContext.create_dynamic_frame.from_options(
connection_type="postgresql",
connection_options={
"useConnectionProperties": "true",
"dbtable": "document",
"connectionName": "prod-conn",
},
transformation_ctx="PostgreSQLtable_node1",
)
# Script generated for node Data Preparation Recipe
# Adding configuration for certain Data Preparation recipe steps to run properly
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# Recipe name: transform-project-recipe
# Recipe version: 3.0
DataPreparationRecipe_node = applyRecipe_node(
inputFrame=PostgreSQLtable_node1,
glueContext=glueContext,
transformation_ctx="DataPreparationRecipe_node",
)
# Script generated for node S3 bucket
S3bucket_node2 = glueContext.getSink(
path="s3://data/document/",
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=[],
enableUpdateCatalog=True,
transformation_ctx="S3bucket_node2",
)
S3bucket_node2.setCatalogInfo(
catalogDatabase="datalake", catalogTableName="document"
)
S3bucket_node2.setFormat("csv")
S3bucket_node2.writeFrame(DataPreparationRecipe_node)
job.commit()
Thank you for your response - I built this job using the Nodes. Would I be able to edit the node script to acheive this? it currently looks like:
Script generated for node S3 bucket
S3bucket_node2 = glueContext.getSink( path="s3://prod-data/document/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], enableUpdateCatalog=True, transformation_ctx="S3bucket_node2", ) S3bucket_node2.setCatalogInfo( catalogDatabase="datalake", catalogTableName="document" ) S3bucket_node2.setFormat("csv") S3bucket_node2.writeFrame(DataPreparationRecipe_node) job.commit()
I'm not aware of that overwrite option being available in DynamicFrame but you can do the same using glueContext.purge_s3_path("s3://yourbucket/yourpath/", {"retentionPeriod": 0}
@gonzalo Thank you for you help, but i think im being a bit of a noob. I have tried to add this line of code to our glue job in a few places without luck - where exactly should I put it? I'll post the job script in the orginal post