自定义AWS Glue作业脚本-将MYSQL转换为JSON文件。

0

【以下的问题经过翻译处理】 我正在创建一个将数据从MySQL表传输到S3桶的ETL作业,我只需要知道如何建立两个节点之间的映射,我需要每行数据以以下格式显示:

MySQL格式:

col1,col2,col3,col4 xx,yy,ss,dd rr,ll,ff,gg

JSON文件应该如下(每个文件应该包含来自MySQL表的1000行数据):

{'col1':'xx', 'col2':'yy','col3':'ss','col4':'dd'} {'col1':'rr', 'col2':'ll','col3':'ff','col4':'gg'}

1 回答
0

【以下的回答经过翻译处理】 使用Glue作业可以通过多种方式实现。下面我将提供几种方法:

  • 使用Glue可视化MySQL表作为源,提供连接详细信息和S3位置来存储JSON文件。 Enter image description here
  • 使用Glue Pyspark作业和MySQL JDBC JAR -
  • 下载您需要使用的MySQL版本的JAR - 最新版本通常是向后兼容的。 JDBC JAR需要加载到S3位置,可以在“JOB DETAILS>高级属性>依赖JAR路径”下添加
  • Glue连接在需要连接到数据库的VPC的情况下需要
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql import DataFrame, Row
import datetime
import boto3
import json
from awsglue import DynamicFrame
from pyspark.sql.functions import regexp_replace, col

args = getResolvedOptions(sys.argv, ["JOB_NAME",'raw_bucket','DB_secret','region'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
#从步骤函数传递参数
raw_bucket = str(args['raw_bucket'])
db_secret = str(args['DB_secret'])
region = str(args['region'])

session = boto3.session.Session()
sm_client = session.client(service_name = 'secretsmanager', region_name = region) 
db_secret_response = sm_client.get_secret_value(SecretId = db2_secret)


secret = json.loads(db_secret_response['SecretString'])

user = secret['Username']
password = secret['password']
sslPassword = secret['jkspassword']
db_url = secret['db_url']

query = "SELECT * FROM TABLE"

df = glueContext.read.format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver").option("url", db_url) \
        .option("user", user).option("dbtable", query

更多详情请参考:https://docs.aws.amazon.com/glue/
profile picture
专家
已回答 10 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则