Customize AWS Glue Job Script - MYSQL to JSON Files

0

Hello Dears,

I am creating an ETL job to transfer data from MySQL table to s3 bucket, I just need to know how to build mapping between the two nodes, I need to have each row like the following format:

Mysql format:

col1,col2,col3,col4

xx,yy,ss,dd

rr,ll,ff,gg

JSON files should be like the following (each file should contain 1000 rows from MySQL table) :

{'col1':'xx', 'col2':'yy','col3':'ss' , 'col4':'dd'}

{'col1':'rr', 'col2':'ll','col3':'ff' , 'col4':'gg'}

I appreciate your comments and help!

With many thanks

B

2 réponses
0
Réponse acceptée

This can be achieved in multiple ways using Glue jobs. I will provide a couple of approahes below:

  • Using Glue Visual MySQL table as a source, provide connection details and S3 location to store the JSON file. Enter image description here

  • Using Glue Pyspark job with he MySQL JDBC JAR -

  • Download the JAR for the version of MySQL you need to use - latest version is typically backwards compatible. The JDBC JAR needs to loaded into S3 location and that can be added under JOB DETAILS > Advanced properties > Dependent JARs path

  • The Glue connection is needed in cases where you need a VPC to connect to the database

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql import DataFrame, Row
import datetime
import boto3
import json
from awsglue import DynamicFrame
from pyspark.sql.functions import regexp_replace, col

args = getResolvedOptions(sys.argv, ["JOB_NAME",'raw_bucket','DB_secret','region'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
#Pass parameters from Step function
raw_bucket = str(args['raw_bucket'])
db_secret = str(args['DB_secret'])
region = str(args['region'])

session = boto3.session.Session()
sm_client = session.client(service_name = 'secretsmanager', region_name = region) 
db_secret_response = sm_client.get_secret_value(SecretId = db2_secret)


secret = json.loads(db_secret_response['SecretString'])

user = secret['Username']
password = secret['password']
sslPassword = secret['jkspassword']
db_url = secret['db_url']

query = "SELECT * FROM TABLE"

df = glueContext.read.format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver").option("url", db_url) \
        .option("user", user).option("dbtable", query) \
        .option("password", password).option("encrypt", "true") \
        .option("sslConnection", "true").option("trustServerCertificate", "false") \
        .option("sslTrustStoreLocation" , "keystore.jks") \
        .option("sslStoreType" , "JKS") \
        .option("sslTrustStorePassword" , sslPassword) \
        .load()

outputDyf = DynamicFrame.fromDF(df, glueContext,"outputDyf")

datasink = glueContext.write_dynamic_frame.from_options(frame=outputDyf,connection_type ="s3",
                connection_options={"path":raw_bucket}, format="json",transformation_ctx ="datasink")

job.commit()

profile pictureAWS
répondu il y a 2 ans
0

Thank You ananthtm for your response with suggested solutions!

I really know these options and already implement one of them, but the problem that I am in right now, is to map the mysql columns to a json schema as described above in my first post.

I appricate any help in that point.

Best, Basem

profile picture
répondu il y a 2 ans

Vous n'êtes pas connecté. Se connecter pour publier une réponse.

Une bonne réponse répond clairement à la question, contient des commentaires constructifs et encourage le développement professionnel de la personne qui pose la question.

Instructions pour répondre aux questions