- 最新
- 最多得票
- 最多評論
This can be achieved in multiple ways using Glue jobs. I will provide a couple of approahes below:
-
Using Glue Visual MySQL table as a source, provide connection details and S3 location to store the JSON file.
-
Using Glue Pyspark job with he MySQL JDBC JAR -
-
Download the JAR for the version of MySQL you need to use - latest version is typically backwards compatible. The JDBC JAR needs to loaded into S3 location and that can be added under JOB DETAILS > Advanced properties > Dependent JARs path
-
The Glue connection is needed in cases where you need a VPC to connect to the database
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
import datetime
import boto3
import json
from awsglue import DynamicFrame
from pyspark.sql.functions import regexp_replace, col
args = getResolvedOptions(sys.argv, ["JOB_NAME",'raw_bucket','DB_secret','region'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
#Pass parameters from Step function
raw_bucket = str(args['raw_bucket'])
db_secret = str(args['DB_secret'])
region = str(args['region'])
session = boto3.session.Session()
sm_client = session.client(service_name = 'secretsmanager', region_name = region)
db_secret_response = sm_client.get_secret_value(SecretId = db2_secret)
secret = json.loads(db_secret_response['SecretString'])
user = secret['Username']
password = secret['password']
sslPassword = secret['jkspassword']
db_url = secret['db_url']
query = "SELECT * FROM TABLE"
df = glueContext.read.format("jdbc") \
.option("driver", "com.mysql.jdbc.Driver").option("url", db_url) \
.option("user", user).option("dbtable", query) \
.option("password", password).option("encrypt", "true") \
.option("sslConnection", "true").option("trustServerCertificate", "false") \
.option("sslTrustStoreLocation" , "keystore.jks") \
.option("sslStoreType" , "JKS") \
.option("sslTrustStorePassword" , sslPassword) \
.load()
outputDyf = DynamicFrame.fromDF(df, glueContext,"outputDyf")
datasink = glueContext.write_dynamic_frame.from_options(frame=outputDyf,connection_type ="s3",
connection_options={"path":raw_bucket}, format="json",transformation_ctx ="datasink")
job.commit()
Thank You ananthtm for your response with suggested solutions!
I really know these options and already implement one of them, but the problem that I am in right now, is to map the mysql columns to a json schema as described above in my first post.
I appricate any help in that point.
Best, Basem
相關內容
- AWS 官方已更新 2 年前
- AWS 官方已更新 3 年前
- AWS 官方已更新 1 年前