How do I keep ETL job script from executing when there is no data

1

I have a Glue ETL script which I will eventually have execute via Lambda with S3 event trigger. But for now I am just running regularly. If there is no new data, then the script runs and produces errors since my transformations have empty DataFrames.

I am using Bookmarks. My ingest and export sections are using transformation contexts. But my regular transformation code is not wrapped inside any sort of if-clause or anything like that.

So my code looks like this:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import to_timestamp, to_date, date_format
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as f

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Data Catalog: database and table name
db_name = "my-database"
tbl_name = "my-table"

# Script generated for node S3 bucket
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://my-bucket/my-folder"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)

df = S3bucket_node1.toDF()
df = df.drop("col6") 

df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("value", df.VALUE.cast('int'))
df = df.withColumn("filename", f.split(f.input_file_name(), '/')[4])
df = df.withColumnRenamed('VID', 'vid')
df = df.withColumnRenamed('ALTID', 'altid')
df = df.withColumnRenamed('VTYPE', 'vtype')

## SPARK DF -> DYNAMIC DF
dynamic_df = DynamicFrame.fromDF(
    df,
    glueContext,
    "convert_ctx"
)

# Data Catalog WRITE
DataCatalogtable_node2 = glueContext.write_dynamic_frame.from_catalog(
    frame = dynamic_df,
    database = db_name,
    table_name = tbl_name,
    transformation_ctx = "DataCatalogtable_node2",
)

job.commit()

so I am trying to put logic so that it doesn't try to execute all those transformations if there is in fact no new data (i.e. empty DataFrame).

bfeeny
질문됨 2년 전1568회 조회
1개 답변
3
수락된 답변

I don't love this method because it causes overhead but I'm not sure there is a better way: before you convert back to the dynamic frame add something that checks to see if the dataframe is empty. Another common way is to count the number of rows in the dataframe and if it is not zero then proceed, however, the take method is more performant as it evaluates less partitions/rows.

if df.take(1):
    <do your write command>

job.commit()
tjtoll
답변함 2년 전
AWS
전문가
검토됨 2년 전
  • Thanks, I did read about this method and variations of it, such as looking for empty data frames. I figured there would be a way that worked off the bookmarks. I mean if the bookmarks determine there is no new data seen, then that would be an easy way to gate out as well. I will use this method though for now.

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠