I have a Glue ETL script which I will eventually have execute via Lambda with S3 event trigger. But for now I am just running regularly. If there is no new data, then the script runs and produces errors since my transformations have empty DataFrames.
I am using Bookmarks. My ingest and export sections are using transformation contexts. But my regular transformation code is not wrapped inside any sort of if-clause or anything like that.
So my code looks like this:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import to_timestamp, to_date, date_format
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as f
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Data Catalog: database and table name
db_name = "my-database"
tbl_name = "my-table"
# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
connection_type="s3",
format="csv",
connection_options={"paths": ["s3://my-bucket/my-folder"], "recurse": True},
transformation_ctx="S3bucket_node1",
)
df = S3bucket_node1.toDF()
df = df.drop("col6")
df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("value", df.VALUE.cast('int'))
df = df.withColumn("filename", f.split(f.input_file_name(), '/')[4])
df = df.withColumnRenamed('VID', 'vid')
df = df.withColumnRenamed('ALTID', 'altid')
df = df.withColumnRenamed('VTYPE', 'vtype')
## SPARK DF -> DYNAMIC DF
dynamic_df = DynamicFrame.fromDF(
df,
glueContext,
"convert_ctx"
)
# Data Catalog WRITE
DataCatalogtable_node2 = glueContext.write_dynamic_frame.from_catalog(
frame = dynamic_df,
database = db_name,
table_name = tbl_name,
transformation_ctx = "DataCatalogtable_node2",
)
job.commit()
so I am trying to put logic so that it doesn't try to execute all those transformations if there is in fact no new data (i.e. empty DataFrame).
Thanks, I did read about this method and variations of it, such as looking for empty data frames. I figured there would be a way that worked off the bookmarks. I mean if the bookmarks determine there is no new data seen, then that would be an easy way to gate out as well. I will use this method though for now.