Glue Job with load data incrementally

0

Hello guys, i'm doing a job with glue and i need a incremental load, who only take new or modified data. I've tried this code below, but doesn't work, its processing all data again and again every time i start the job, and i'm with all data repeated.

node = glueContext.create_dynamic_frame.from_catalog(
    database="db",
    table_name="table",
    transformation_ctx="node",
    additional_options = {"jobBookmarkKeys":[""], "jobBookmarkKeysSortOrder":"asc"}
)

Can someone help me with this?

profile picture
lebuosi
asked 7 months ago469 views
2 Answers
1
Accepted Answer

Hello,

Looking at your script, I notice that:

additional_options = {"jobBookmarkKeys":["REQUEST", "OPENDATE"], "jobBookmarkKeysSortOrder":"asc"}

Per documented at [1], we have:

Warning
If user-defined bookmarks keys are used, they must each be strictly monotonically increasing or decreasing. When selecting additional fields for a compound key, fields for concepts like "minor versions" or "revision numbers" do not meet this criteria, since their values are reused throughout your dataset.

It seems to me, by guessing via the names, that the keys "REQUEST" and "OPENDATE" do not satisfy the requirements above.

A workaround is, in the source database, create an additional column or using the existing primary key, as long as they meet the requirements. Then use that column for "jobBookmarkKeys".

========== References:

[1] - https://docs.aws.amazon.com/glue/latest/dg/programming-etl-connect-bookmarks.html

AWS
Thi_N
answered 7 months ago
1

Hello,

For Glue job bookmark, there are prerequisites before using it, depending on use cases.

Looking at your codes, I can't tell if the requirements have been met, so just providing generic advice.

In case of S3 data source, please note the supported file formats are "JSON, CSV, Apache Avro, XML, Parquet, ORC" [1].

In case of JDBC data source, the following rules apply [1]:

- For each table, AWS Glue uses one or more columns as bookmark keys to determine new and processed data. The bookmark keys combine to form a single compound key.

- AWS Glue by default uses the primary key as the bookmark key, provided that it is sequentially increasing or decreasing (with no gaps).

- You can specify the columns to use as bookmark keys in your AWS Glue script. For more information about using Job bookmarks in AWS Glue scripts, see Using job bookmarks.

- AWS Glue doesn't support using columns with case-sensitive names as job bookmark keys.

Regarding your code:

additional_options = {"jobBookmarkKeys":[""], "jobBookmarkKeysSortOrder":"asc"}

please pick up a suitable column for jobBookmarkKeys, with requirements above. More sample scripts and usage are also available at [2], for your information.

============ References:

[1] - https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html#monitor-continuations-implement

[2] - https://docs.aws.amazon.com/glue/latest/dg/programming-etl-connect-bookmarks.html

AWS
Thi_N
answered 7 months ago
  • Hello Thi_N. Thanks for the fb. I'v read your response but I still haven't been able to make it work. Can you take a look in my poc scrip above and tell me what is wrong? Its first time working with glue and this dynamic frame so if there something you identify a better way to do please tell me.

  • import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from pyspark.sql.functions import split
    from awsglue.dynamicframe import DynamicFrame
    from pyspark.sql import functions as F
    # -------------------------------------------------------
    sys.argv += ["--JOB_NAME", "glue_job"]
    # -------------------------------------------------------
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    # -------------------------------------------------------
    # %status
    
    REQUEST_node = glueContext.create_dynamic_frame.from_catalog(
        database="db_rds",
        table_name="request",
        transformation_ctx="REQUEST_node",
        additional_options = {"jobBookmarkKeys":["REQUEST", "OPENDATE"], 
                              "jobBookmarkKeysSortOrder":"asc"}
    )
    # -------------------------------------------------------
    df_req = REQUEST_node.toDF()
    # -------------------------------------------------------
    df2_req = df_req.withColumn('date', split(df_req['OPENDATE'], ' ').getItem(0)) \
                    .withColumn('time', split(df_req['OPENDATE'], ' ').getItem(1))
    # -------------------------------------------------------
    

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions