Glue Job with load data incrementally

0

Hello guys, i'm doing a job with glue and i need a incremental load, who only take new or modified data. I've tried this code below, but doesn't work, its processing all data again and again every time i start the job, and i'm with all data repeated.

node = glueContext.create_dynamic_frame.from_catalog(
    database="db",
    table_name="table",
    transformation_ctx="node",
    additional_options = {"jobBookmarkKeys":[""], "jobBookmarkKeysSortOrder":"asc"}
)

Can someone help me with this?

profile picture
lebuosi
已提问 7 个月前513 查看次数
2 回答
1
已接受的回答

Hello,

Looking at your script, I notice that:

additional_options = {"jobBookmarkKeys":["REQUEST", "OPENDATE"], "jobBookmarkKeysSortOrder":"asc"}

Per documented at [1], we have:

Warning
If user-defined bookmarks keys are used, they must each be strictly monotonically increasing or decreasing. When selecting additional fields for a compound key, fields for concepts like "minor versions" or "revision numbers" do not meet this criteria, since their values are reused throughout your dataset.

It seems to me, by guessing via the names, that the keys "REQUEST" and "OPENDATE" do not satisfy the requirements above.

A workaround is, in the source database, create an additional column or using the existing primary key, as long as they meet the requirements. Then use that column for "jobBookmarkKeys".

========== References:

[1] - https://docs.aws.amazon.com/glue/latest/dg/programming-etl-connect-bookmarks.html

AWS
Thi_N
已回答 7 个月前
1

Hello,

For Glue job bookmark, there are prerequisites before using it, depending on use cases.

Looking at your codes, I can't tell if the requirements have been met, so just providing generic advice.

In case of S3 data source, please note the supported file formats are "JSON, CSV, Apache Avro, XML, Parquet, ORC" [1].

In case of JDBC data source, the following rules apply [1]:

- For each table, AWS Glue uses one or more columns as bookmark keys to determine new and processed data. The bookmark keys combine to form a single compound key.

- AWS Glue by default uses the primary key as the bookmark key, provided that it is sequentially increasing or decreasing (with no gaps).

- You can specify the columns to use as bookmark keys in your AWS Glue script. For more information about using Job bookmarks in AWS Glue scripts, see Using job bookmarks.

- AWS Glue doesn't support using columns with case-sensitive names as job bookmark keys.

Regarding your code:

additional_options = {"jobBookmarkKeys":[""], "jobBookmarkKeysSortOrder":"asc"}

please pick up a suitable column for jobBookmarkKeys, with requirements above. More sample scripts and usage are also available at [2], for your information.

============ References:

[1] - https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html#monitor-continuations-implement

[2] - https://docs.aws.amazon.com/glue/latest/dg/programming-etl-connect-bookmarks.html

AWS
Thi_N
已回答 7 个月前
  • Hello Thi_N. Thanks for the fb. I'v read your response but I still haven't been able to make it work. Can you take a look in my poc scrip above and tell me what is wrong? Its first time working with glue and this dynamic frame so if there something you identify a better way to do please tell me.

  • import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.functions import split
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql import functions as F
    # -------------------------------------------------------
    sys.argv += ["--JOB_NAME", "glue_job"]
    # -------------------------------------------------------
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    # -------------------------------------------------------
    # %status
    
    REQUEST_node = glueContext.create_dynamic_frame.from_catalog(
        database="db_rds",
        table_name="request",
        transformation_ctx="REQUEST_node",
        additional_options = {"jobBookmarkKeys":["REQUEST", "OPENDATE"], 
                              "jobBookmarkKeysSortOrder":"asc"}
    )
    # -------------------------------------------------------
    df_req = REQUEST_node.toDF()
    # -------------------------------------------------------
    df2_req = df_req.withColumn('date', split(df_req['OPENDATE'], ' ').getItem(0)) \
                    .withColumn('time', split(df_req['OPENDATE'], ' ').getItem(1))
    # -------------------------------------------------------
    

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则