S3 Bucket: Delete Objects 1 day after creation

0

Hello, I have a glue ETL job which runs every day, unlike Glue DataBrew, i cannot find an easy way to overwrite the existing data and the job will create 19 files each time on the run. My solution was to create a Lifecycle rule to delete the objects after one day, but it seems the minumum amount of time is 3 (expire after 1 day, then delete after expired for 1 day). This means that my Data Catalog ends up duplicate millions of lines until the old files are deleted.

I might be missing something, but can anyone advise on the best way to achieve this? either delete files 1 day after creation, or set glue ETL to overwrite the existing files instead of creating new ones.

Many thanks:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from recipe_transforms import *
from awsglue.dynamicframe import DynamicFrame


# Generated recipe steps for arken-document-transform-project-recipe
# Recipe version 3.0
def applyRecipe_node(inputFrame, glueContext, transformation_ctx):
    frame = inputFrame.toDF()
    gc = glueContext
    df1 = DataStructure.JsonToStructs.apply(
        data_frame=frame,
        glue_context=gc,
        transformation_ctx="transform-project-recipe-df1",
        source_columns=["data"],
        unnest_level=120,
    )
    df2 = DataStructure.UnnestStructNLevels.apply(
        data_frame=df1,
        glue_context=gc,
        transformation_ctx="transform-project-recipe-df2",
        source_columns=["data"],
        unnest_level=1,
        remove_source_column=True,
        delimiter=".",
    )
    df3 = DataCleaning.ExtractBetweenDelimiters.apply(
        data_frame=df2,
        glue_context=gc,
        transformation_ctx="transform-project-recipe-df3",
        source_column="settings",
        target_column="settings_Download",
        start_pattern='"Download": "',
        end_pattern='"',
    )
    df4 = DataCleaning.ExtractBetweenDelimiters.apply(
        data_frame=df3,
        glue_context=gc,
        transformation_ctx="transform-project-recipe-df4",
        source_column="settings",
        target_column="settings_CurrentKey",
        start_pattern='"CurrentKey": "',
        end_pattern='"',
    )
    df5 = DataCleaning.ExtractBetweenDelimiters.apply(
        data_frame=df4,
        glue_context=gc,
        transformation_ctx="transform-project-recipe-df5",
        source_column="settings",
        target_column="settings_Instruction",
        start_pattern='"Instruction": ',
        end_pattern=",",
    )
    df6 = Column.DeleteColumn.apply(
        data_frame=df5,
        glue_context=gc,
        transformation_ctx="transform-project-df6",
        source_columns=[
            "content",
            "data.AccountHashId",
            "data.ClientHashId",
            "data.Content",
            "data.CreatedByHashId",
            "data.CreatedOn",
            "data.Data",
            "data.DeletedOn",
            ",
        ],
    )
    return DynamicFrame.fromDF(df6, gc, transformation_ctx)


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node PostgreSQL table
PostgreSQLtable_node1 = glueContext.create_dynamic_frame.from_options(
    connection_type="postgresql",
    connection_options={
        "useConnectionProperties": "true",
        "dbtable": "document",
        "connectionName": "prod-conn",
    },
    transformation_ctx="PostgreSQLtable_node1",
)

# Script generated for node Data Preparation Recipe
# Adding configuration for certain Data Preparation recipe steps to run properly
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# Recipe name: transform-project-recipe
# Recipe version: 3.0
DataPreparationRecipe_node = applyRecipe_node(
    inputFrame=PostgreSQLtable_node1,
    glueContext=glueContext,
    transformation_ctx="DataPreparationRecipe_node",
)

# Script generated for node S3 bucket
S3bucket_node2 = glueContext.getSink(
    path="s3://data/document/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=[],
    enableUpdateCatalog=True,
    transformation_ctx="S3bucket_node2",
)
S3bucket_node2.setCatalogInfo(
    catalogDatabase="datalake", catalogTableName="document"
)
S3bucket_node2.setFormat("csv")
S3bucket_node2.writeFrame(DataPreparationRecipe_node)
job.commit()
asked 6 months ago559 views
2 Answers
1

Hello. S3 Lifecycle policies have a minimum expiration of 3 days for deleting objects. However, if you're looking to delete objects one day after creation or overwrite existing files, there are several approaches you can take:

  1. Overwriting Existing Files with AWS Glue: AWS Glue does provide the capability to overwrite files. This is often handled via the DynamicFrame write options. When you are writing out your DynamicFrame using the write_dynamic_frame method, you can specify the behavior you want on the underlying data:
glueContext.write_dynamic_frame.from_options(
   frame = your_dynamic_frame,
   connection_type = "s3",
   connection_options = {"path": "s3://your_bucket/path_to_data/", "partitionKeys": []},
   format = "parquet", # or your desired format
   transformation_ctx = "sink2",
   format_options = {"overwrite": True} # this line ensures overwrite
)

In the above code, the "overwrite": True option should ensure that the data is overwritten.

  1. Delete Objects Using Lambda: If overwriting isn't an option or doesn't work for your use case, you can use AWS Lambda to automatically delete objects one day after they've been created

Regards, Andrii

profile picture
EXPERT
answered 6 months ago
profile picture
EXPERT
reviewed 4 months ago
  • Thank you for your response - I built this job using the Nodes. Would I be able to edit the node script to acheive this? it currently looks like:

    Script generated for node S3 bucket

    S3bucket_node2 = glueContext.getSink( path="s3://prod-data/document/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], enableUpdateCatalog=True, transformation_ctx="S3bucket_node2", ) S3bucket_node2.setCatalogInfo( catalogDatabase="datalake", catalogTableName="document" ) S3bucket_node2.setFormat("csv") S3bucket_node2.writeFrame(DataPreparationRecipe_node) job.commit()

  • I'm not aware of that overwrite option being available in DynamicFrame but you can do the same using glueContext.purge_s3_path("s3://yourbucket/yourpath/", {"retentionPeriod": 0}

  • @gonzalo Thank you for you help, but i think im being a bit of a noob. I have tried to add this line of code to our glue job in a few places without luck - where exactly should I put it? I'll post the job script in the orginal post

0

Could you please accept the answer if it helped for you? Thanks)

profile picture
EXPERT
answered 6 months ago
  • Hi Still struggling with this issue, could you help with where I would insert the suggested code?

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions