How to use Glue job bookmark to read mongodb data and track last processed row using id column

1

I have implemented aws glue job bookmark to read data from MongoDB and write to the s3 bucket, but when we run the script, every time it writes all data in a separate file : below are my code: import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job import time import logging import urllib from pymongo import MongoClient import sys import nad_config from datetime import date

@params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args)

#Production DB mongo_uri = "mongodb://ip_1_2_3_4.ap-south-1.compute.internal:27017/test?replicaSet=repomongo"

list = ['glue_bookmark'] today = date.today() folder_name = today.strftime("%d-%m-%Y")

for i in list: org_id = i[12:18] read_mongo_options = 'read_mongo_options_'+org_id collection_name = i dynamic_frame = 'dynamic_frame'+org_id

read_mongo_options = {
"uri": mongo_uri,
"database": "test",
"collection": "test",
"username": "test",
"password": "test",
"partitioner": "MongoSamplePartitioner",
"partitionerOptions.partitionSizeMB": "10",
"partitionerOptions.partitionKey": "id"}

sub_folder_name = org_id;
final_folder_path = folder_name+'/test/'

datasource0 = glueContext.create_dynamic_frame_from_catalog(database = catalogDB,
    table_name = catalogTable,connection_type="mongodb",connection_options=read_mongo_options, transformation_ctx = "datasource0",additional_options = {"jobBookmarkKeys":["id"],"jobBookmarkKeysSortOrder":"asc"})

datasink1 = glueContext.write_dynamic_frame.from_options(frame = datasource0,connection_type = "s3",connection_options = {"path": "s3://aws-glue-assets-123456-ap-south-1/"+final_folder_path},format = "json", transformation_ctx = "datasink1")

job.commit()

gefragt vor 2 Jahren1047 Aufrufe
2 Antworten
0

There is some new about Job Bookmark for mongodb connection ?

Gon
beantwortet vor einem Jahr
-1

Please be advised that as per Job Bookmark documentation [1] this feature only supports S3 and JDBC sources Also from the way you have tried to implement Job bookmarks is specific for JDBC data stores. This does not support non-relational DB as the source & it is due to this you see your data is being reprocessed.

Workaround: I would also like you to consider overwriting the S3 location so you won't get the replicated data in output file everytime you run a job. You can do that with below commands:

df=dynamic_frame2.toDF()
df.write.mode('overwrite').parquet("s3://<bucket>/<folder>")

Experiment: You can try to just enable bookmark for your job and use this code to create you dynamicFrame and then can run the job.

datasource0 = glueContext.create_dynamic_frame_from_catalog(database = catalogDB,
table_name=catalogTable,connection_type="mongodb",connection_options=read_mongo_options,transformation_ctx = "datasource0").

References: [1] refer to the second code example : https://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html#monitor-continuations-script

AWS
SUPPORT-TECHNIKER
Shubh
beantwortet vor 2 Jahren
AWS
EXPERTE
überprüft vor 2 Jahren
  • Any news for Mongo DB about Job bookmark ?

Du bist nicht angemeldet. Anmelden um eine Antwort zu veröffentlichen.

Eine gute Antwort beantwortet die Frage klar, gibt konstruktives Feedback und fördert die berufliche Weiterentwicklung des Fragenstellers.

Richtlinien für die Beantwortung von Fragen