如何在 AWS Glue 关系化转换后使用转换后的数据?
我想要使用 AWS Glue 关系化转换来平展我的数据。我可以将哪些字段用作分区来将转换后的数据存储在 Amazon Simple Storage Service(Amazon S3)中?
简短描述
通过关系化转换,可以在关系数据库中使用 NoSQL 数据结构,如数组和结构体。关系转型将返回一系列 DynamicFrames(Python 中的 DynamicFrameCollection 和 Scala 中的数组)。关系转型返回的所有 DynamicFrames 均可以通过其在 Python 中的各个名称和 Scala 中的数组索引来访问。
解决方法
对数据进行关系化
本教程使用以下架构:
|-- family_name: string |-- name: string |-- gender: string |-- image: string |-- images: array | |-- element: struct | | |-- url: string
对 Python 使用以下关系化语法:
# AWS Glue Data Catalog: database and table names db_name = "us-legislators" tempDir = "s3://awsexamplebucket/temp_dir/" # Create dynamic frames from the source tables persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons) # Relationalize transformation dfc = persons.relationalize("root", tempDir) dfc.select('root_images').printSchema() dfc.select('root_images').show()
对 Scala 使用以下关系化语法:
// AWS Glue Data Catalog: database and table names val dbName = "us-legislators" val tblPersons = "persons_json" // Output Amazon S3 temp directory val tempDir = "s3://awsexamplebucket/temp_dir" val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame() val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir) personRelationalize(2).printSchema() personRelationalize(2).show()
解读转换后的数据
此关系化转换产生两个架构:root 和 root_images。
root:
|-- family_name: string |-- name: string |-- gender: string |-- image: string |-- images: long
root_images:
|-- id: long |-- index: int |-- images.val.url: string
- id:数组元素的顺序(1、2 或 3)
- index:数组中每个元素的索引位置
- images.val.url:root_images 中的 images.val.url 值
它们是唯一可用作分区字段以将此转换后的数据存储在 Amazon S3 中的字段。由于这些字段不存在于 root_images 中,指定 root 表字段(如 name)不起作用。
加入关系化数据以获取标准化数据
root_images 中的 id 属性指的是数组(1、2 或 3)在数据集中的顺序。root 中的 images 属性将保留数组索引的值。这表示,您必须使用 images 和 id 来加入 root 和 root_images。您可以运行 dynamicFrame.show() 来验证数组的顺序和数组索引的值。
要加入 root 和 root_images:
Python:
joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')
Scala:
val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))
存储转换后的数据
要将转换后的数据存储在具有分区的 Amazon S3:
Python:
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")
Scala:
**注意:**在下面的示例中,personRelationalize(2) 是 root_images 转换后的数据表。
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))), format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))
要将转换后的数据存储在不具有分区的 Amazon S3:
Python:
datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"
Scala:
**注意:**在下面的示例中,personRelationalize(2) 是 root_images 转换后的数据表。
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> paths)), format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))
当您将数据写入 Amazon S3 后,在 Amazon Athena 中查询数据或使用 DynamicFrame 将数据写入关系数据库中,如 Amazon Redshift。
相关信息
Simplify querying nested JSON with the AWS Glue relationalize transform
相关内容
- AWS 官方已更新 2 年前