Global outage event
If you're experiencing issues with your AWS services, then please refer to the AWS Health Dashboard. You can find the overall status of ongoing outages, the health of AWS services, and the latest updates from AWS engineers.
如何在 AWS Glue 关系化转换后使用转换后的数据?
我想要使用 AWS Glue 关系化转换来平展我的数据。我可以将哪些字段用作分区来将转换后的数据存储在 Amazon Simple Storage Service(Amazon S3)中?
简短描述
通过关系化转换,可以在关系数据库中使用 NoSQL 数据结构,如数组和结构体。关系转型将返回一系列 DynamicFrames(Python 中的 DynamicFrameCollection 和 Scala 中的数组)。关系转型返回的所有 DynamicFrames 均可以通过其在 Python 中的各个名称和 Scala 中的数组索引来访问。
解决方法
对数据进行关系化
本教程使用以下架构:
|-- family_name: string |-- name: string |-- gender: string |-- image: string |-- images: array | |-- element: struct | | |-- url: string
对 Python 使用以下关系化语法:
# AWS Glue Data Catalog: database and table names db_name = "us-legislators" tempDir = "s3://awsexamplebucket/temp_dir/" # Create dynamic frames from the source tables persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons) # Relationalize transformation dfc = persons.relationalize("root", tempDir) dfc.select('root_images').printSchema() dfc.select('root_images').show()
对 Scala 使用以下关系化语法:
// AWS Glue Data Catalog: database and table names val dbName = "us-legislators" val tblPersons = "persons_json" // Output Amazon S3 temp directory val tempDir = "s3://awsexamplebucket/temp_dir" val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame() val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir) personRelationalize(2).printSchema() personRelationalize(2).show()
解读转换后的数据
此关系化转换产生两个架构:root 和 root_images。
root:
|-- family_name: string |-- name: string |-- gender: string |-- image: string |-- images: long
root_images:
|-- id: long |-- index: int |-- images.val.url: string
- id:数组元素的顺序(1、2 或 3)
- index:数组中每个元素的索引位置
- images.val.url:root_images 中的 images.val.url 值
它们是唯一可用作分区字段以将此转换后的数据存储在 Amazon S3 中的字段。由于这些字段不存在于 root_images 中,指定 root 表字段(如 name)不起作用。
加入关系化数据以获取标准化数据
root_images 中的 id 属性指的是数组(1、2 或 3)在数据集中的顺序。root 中的 images 属性将保留数组索引的值。这表示,您必须使用 images 和 id 来加入 root 和 root_images。您可以运行 dynamicFrame.show() 来验证数组的顺序和数组索引的值。
要加入 root 和 root_images:
Python:
joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')
Scala:
val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))
存储转换后的数据
要将转换后的数据存储在具有分区的 Amazon S3:
Python:
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")
Scala:
**注意:**在下面的示例中,personRelationalize(2) 是 root_images 转换后的数据表。
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))), format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))
要将转换后的数据存储在不具有分区的 Amazon S3:
Python:
datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"
Scala:
**注意:**在下面的示例中,personRelationalize(2) 是 root_images 转换后的数据表。
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> paths)), format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))
当您将数据写入 Amazon S3 后,在 Amazon Athena 中查询数据或使用 DynamicFrame 将数据写入关系数据库中,如 Amazon Redshift。
相关信息
Simplify querying nested JSON with the AWS Glue relationalize transform
相关内容
AWS 官方已更新 3 年前