Customize AWS Glue Job Script - MYSQL to JSON Files

0

Hello Dears,

I am creating an ETL job to transfer data from MySQL table to s3 bucket, I just need to know how to build mapping between the two nodes, I need to have each row like the following format:

Mysql format:

col1,col2,col3,col4

xx,yy,ss,dd

rr,ll,ff,gg

JSON files should be like the following (each file should contain 1000 rows from MySQL table) :

{'col1':'xx', 'col2':'yy','col3':'ss' , 'col4':'dd'}

{'col1':'rr', 'col2':'ll','col3':'ff' , 'col4':'gg'}

I appreciate your comments and help!

With many thanks

B

2 Risposte
0
Risposta accettata

This can be achieved in multiple ways using Glue jobs. I will provide a couple of approahes below:

  • Using Glue Visual MySQL table as a source, provide connection details and S3 location to store the JSON file. Enter image description here

  • Using Glue Pyspark job with he MySQL JDBC JAR -

  • Download the JAR for the version of MySQL you need to use - latest version is typically backwards compatible. The JDBC JAR needs to loaded into S3 location and that can be added under JOB DETAILS > Advanced properties > Dependent JARs path

  • The Glue connection is needed in cases where you need a VPC to connect to the database

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql import DataFrame, Row
import datetime
import boto3
import json
from awsglue import DynamicFrame
from pyspark.sql.functions import regexp_replace, col

args = getResolvedOptions(sys.argv, ["JOB_NAME",'raw_bucket','DB_secret','region'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
#Pass parameters from Step function
raw_bucket = str(args['raw_bucket'])
db_secret = str(args['DB_secret'])
region = str(args['region'])

session = boto3.session.Session()
sm_client = session.client(service_name = 'secretsmanager', region_name = region) 
db_secret_response = sm_client.get_secret_value(SecretId = db2_secret)


secret = json.loads(db_secret_response['SecretString'])

user = secret['Username']
password = secret['password']
sslPassword = secret['jkspassword']
db_url = secret['db_url']

query = "SELECT * FROM TABLE"

df = glueContext.read.format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver").option("url", db_url) \
        .option("user", user).option("dbtable", query) \
        .option("password", password).option("encrypt", "true") \
        .option("sslConnection", "true").option("trustServerCertificate", "false") \
        .option("sslTrustStoreLocation" , "keystore.jks") \
        .option("sslStoreType" , "JKS") \
        .option("sslTrustStorePassword" , sslPassword) \
        .load()

outputDyf = DynamicFrame.fromDF(df, glueContext,"outputDyf")

datasink = glueContext.write_dynamic_frame.from_options(frame=outputDyf,connection_type ="s3",
                connection_options={"path":raw_bucket}, format="json",transformation_ctx ="datasink")

job.commit()

profile pictureAWS
con risposta un anno fa
0

Thank You ananthtm for your response with suggested solutions!

I really know these options and already implement one of them, but the problem that I am in right now, is to map the mysql columns to a json schema as described above in my first post.

I appricate any help in that point.

Best, Basem

profile picture
con risposta un anno fa

Accesso non effettuato. Accedi per postare una risposta.

Una buona risposta soddisfa chiaramente la domanda, fornisce un feedback costruttivo e incoraggia la crescita professionale del richiedente.

Linee guida per rispondere alle domande