By using AWS re:Post, you agree to the Terms of Use

How to use Glue job bookmark to read mongodb data and track last processed row using id column

0

I have implemented aws glue job bookmark to read data from MongoDB and write to the s3 bucket, but when we run the script, every time it writes all data in a separate file : below are my code: import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import time import logging import urllib from pymongo import MongoClient import sys import nad_config from datetime import date

@params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args)

#Production DB mongo_uri = "mongodb://ip_1_2_3_4.ap-south-1.compute.internal:27017/test?replicaSet=repomongo"

list = ['glue_bookmark'] today = date.today() folder_name = today.strftime("%d-%m-%Y")

for i in list: org_id = i[12:18] read_mongo_options = 'read_mongo_options_'+org_id collection_name = i dynamic_frame = 'dynamic_frame'+org_id

read_mongo_options = {
"uri": mongo_uri,
"database": "test",
"collection": "test",
"username": "test",
"password": "test",
"partitioner": "MongoSamplePartitioner",
"partitionerOptions.partitionSizeMB": "10",
"partitionerOptions.partitionKey": "id"}

sub_folder_name = org_id;
final_folder_path = folder_name+'/test/'

datasource0 = glueContext.create_dynamic_frame_from_catalog(database = catalogDB,
    table_name = catalogTable,connection_type="mongodb",connection_options=read_mongo_options, transformation_ctx = "datasource0",additional_options = {"jobBookmarkKeys":["id"],"jobBookmarkKeysSortOrder":"asc"})

datasink1 = glueContext.write_dynamic_frame.from_options(frame = datasource0,connection_type = "s3",connection_options = {"path": "s3://aws-glue-assets-123456-ap-south-1/"+final_folder_path},format = "json", transformation_ctx = "datasink1")

job.commit()

1 Answer
0

Please be advised that as per Job Bookmark documentation [1] this feature only supports S3 and JDBC sources Also from the way you have tried to implement Job bookmarks is specific for JDBC data stores. This does not support non-relational DB as the source & it is due to this you see your data is being reprocessed.

Workaround: I would also like you to consider overwriting the S3 location so you won't get the replicated data in output file everytime you run a job. You can do that with below commands:

df=dynamic_frame2.toDF()
df.write.mode('overwrite').parquet("s3://<bucket>/<folder>")

Experiment: You can try to just enable bookmark for your job and use this code to create you dynamicFrame and then can run the job.

datasource0 = glueContext.create_dynamic_frame_from_catalog(database = catalogDB,
table_name=catalogTable,connection_type="mongodb",connection_options=read_mongo_options,transformation_ctx = "datasource0").

References: [1] refer to the second code example : https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html#monitor-continuations-script

SUPPORT ENGINEER
answered a month ago
EXPERT
reviewed a month ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions