当没有数据时,如何不执行ETL作业脚本?

0

【以下的问题经过翻译处理】 我有一个 Glue ETL 脚本,最终将通过S3 event trigger和Lambda执行。但目前我只是定期运行它。如果没有新数据,脚本运行会生成错误,因为我的转换过程有空的 DataFrames。

我用的是Bookmarks。我的摄取和导出部分都在使用转换上下文。但我的普通转换代码没有包装在任何 if 语句或类似的东西中。

我的代码大概是这样:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import to_timestamp, to_date, date_format
from awsglue.dynamicframe import DynamicFrame
import pyspark.sql.functions as f

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# 数据目录:数据库和表名
db_name = "my-database"
tbl_name = "my-table"

# 为 S3 桶生成的脚本
S3bucket_node1 = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": True, "separator": ","},
    connection_type="s3",
    format="csv",
    connection_options={"paths": ["s3://my-bucket/my-folder"], "recurse": True},
    transformation_ctx="S3bucket_node1",
)

df = S3bucket_node1.toDF()
df = df.drop("col6") 

df = df.withColumn("time", f.unix_timestamp("time", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("timegmt", f.unix_timestamp("timegmt", 'dd-MM-yyyy HH:mm:ss') * 1000)
df = df.withColumn("value", df.VALUE.cast('int'))
df = df.withColumn("filename", f.split(f.input_file_name
profile picture
专家
已提问 5 个月前3 查看次数
1 回答
0

【以下的回答经过翻译处理】 我不喜欢下面这种方法,因为它会引起额外的开销,但我不确定是否有更好的方式:在转换回dynamic frame之前,添加一个数据框是否为空的检查。另一种常见的方式是计算dataframe中的行数,如果不为零则继续,但是使用take的方法更加高效,因为它评估的分区/行数较少。

if df.take(1):
    <do your write command>

job.commit()
profile picture
专家
已回答 5 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则