Customize AWS Glue Job Script - MYSQL to JSON Files

0

Hello Dears,

I am creating an ETL job to transfer data from MySQL table to s3 bucket, I just need to know how to build mapping between the two nodes, I need to have each row like the following format:

Mysql format:

col1,col2,col3,col4

xx,yy,ss,dd

rr,ll,ff,gg

JSON files should be like the following (each file should contain 1000 rows from MySQL table) :

{'col1':'xx', 'col2':'yy','col3':'ss' , 'col4':'dd'}

{'col1':'rr', 'col2':'ll','col3':'ff' , 'col4':'gg'}

I appreciate your comments and help!

With many thanks

B

2개 답변
0
수락된 답변

This can be achieved in multiple ways using Glue jobs. I will provide a couple of approahes below:

  • Using Glue Visual MySQL table as a source, provide connection details and S3 location to store the JSON file. Enter image description here

  • Using Glue Pyspark job with he MySQL JDBC JAR -

  • Download the JAR for the version of MySQL you need to use - latest version is typically backwards compatible. The JDBC JAR needs to loaded into S3 location and that can be added under JOB DETAILS > Advanced properties > Dependent JARs path

  • The Glue connection is needed in cases where you need a VPC to connect to the database

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql import DataFrame, Row
import datetime
import boto3
import json
from awsglue import DynamicFrame
from pyspark.sql.functions import regexp_replace, col

args = getResolvedOptions(sys.argv, ["JOB_NAME",'raw_bucket','DB_secret','region'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
#Pass parameters from Step function
raw_bucket = str(args['raw_bucket'])
db_secret = str(args['DB_secret'])
region = str(args['region'])

session = boto3.session.Session()
sm_client = session.client(service_name = 'secretsmanager', region_name = region) 
db_secret_response = sm_client.get_secret_value(SecretId = db2_secret)


secret = json.loads(db_secret_response['SecretString'])

user = secret['Username']
password = secret['password']
sslPassword = secret['jkspassword']
db_url = secret['db_url']

query = "SELECT * FROM TABLE"

df = glueContext.read.format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver").option("url", db_url) \
        .option("user", user).option("dbtable", query) \
        .option("password", password).option("encrypt", "true") \
        .option("sslConnection", "true").option("trustServerCertificate", "false") \
        .option("sslTrustStoreLocation" , "keystore.jks") \
        .option("sslStoreType" , "JKS") \
        .option("sslTrustStorePassword" , sslPassword) \
        .load()

outputDyf = DynamicFrame.fromDF(df, glueContext,"outputDyf")

datasink = glueContext.write_dynamic_frame.from_options(frame=outputDyf,connection_type ="s3",
                connection_options={"path":raw_bucket}, format="json",transformation_ctx ="datasink")

job.commit()

profile pictureAWS
답변함 2년 전
0

Thank You ananthtm for your response with suggested solutions!

I really know these options and already implement one of them, but the problem that I am in right now, is to map the mysql columns to a json schema as described above in my first post.

I appricate any help in that point.

Best, Basem

profile picture
답변함 2년 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠