AWS Glue でリレーショナル変換後にピボットされたデータを使用するにはどうすればよいですか?

所要時間2分
0

AWS Glue のリレーショナル化変換を使用してデータをフラット化したいと考えています。ピボットされたデータを Amazon Simple Storage Service (Amazon S3) に保存するためにパーティションとして使用できるフィールドはどれですか?

簡単な説明

リレーショナル変換により、リレーショナルデータベースで NoSQL データ構造 (配列や構造など) を使用できるようになります。リレーショナル変換は、DynamicFrames のコレクション (Python では DynamicFrameCollection、Scala では配列) を返します。リレーショナル変換によって返されるすべての DynamicFrames には、Python では個別の名前から、Scala では配列インデックスを使用してアクセスできます。

解決方法

データをリレーショナル化

このチュートリアルでは、次のスキーマを使用します。

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string

Python では、次のリレーショナル構文を使用します。

# AWS Glue Data Catalog: database and table names
db_name = "us-legislators"
tempDir = "s3://awsexamplebucket/temp_dir/"

# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)

# Relationalize transformation
dfc = persons.relationalize("root", tempDir)
dfc.select('root_images').printSchema()
dfc.select('root_images').show()

Scala には、次のリレーショナル構文を使用します。

// AWS Glue Data Catalog: database and table names
val dbName = "us-legislators"
val tblPersons = "persons_json"

// Output Amazon S3 temp directory
val tempDir = "s3://awsexamplebucket/temp_dir"

val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir)
personRelationalize(2).printSchema()
personRelationalize(2).show()

ピボットされたデータを解釈する

このリレーショナル変換では、rootroot_images という 2 つのスキーマが生成されます。

root:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: long

root_images:

|-- id: long
|-- index: int
|-- images.val.url: string
  • id: 配列要素の順序 (1、2、または 3)
  • index: 配列内の各要素のインデックス位置
  • images.val.url: root_imagesimages.val.url

これらは、このピボットされたデータを Amazon S3 に保存するためのパーティションフィールドとして使用できる唯一のフィールドです。name などの root テーブルフィールドは root_images に存在しないため、指定できません。

正規化されたデータを取得するためにリレーショナルデータを結合する

root_imagesid 属性は、データセット内の配列 (1、2、または 3) の順序です。rootimages 属性は、配列インデックスの値を保持します。つまり、rootroot_images を結合するには、imagesid を使用する必要があります。dynamicFrame.show () を実行して、配列の順序と配列インデックスの値を確認できます。

rootroot_images を結合する方法:

Python:

joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')

Scala:

val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))

ピボットされたデータを保存

パーティションを使用してピボットされたデータを Amazon S3 に保存するには:

Python:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")

Scala:

注意: 次の例で、personRelationalize(2)root_images のピボットされたデータテーブルです。

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

ピボットされたデータをパーティションなしで Amazon S3 に保存する方法:

Python:

datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"

Scala:

注意: 次の例で、personRelationalize(2)root_images のピボットされたデータテーブルです。

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths)),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

データを Amazon S3 に書き込んだ後、Amazon Athena でデータをクエリするか、DynamicFrame を使用して Amazon Redshift などのリレーショナルデータベースにデータを書き込みます


関連情報

Simplify querying nested JSON with the AWS Glue relationalize transform

Code example: Joining and relationalizing Data

AWS公式
AWS公式更新しました 3年前