Come posso utilizzare i dati pivot dopo una trasformazione relazionale di AWS Glue?

4 minuti di lettura
0

Desidero utilizzare la trasformazione relazionale di AWS Glue per ridurre al massimo i dati. Quali campi posso usare come partizioni per archiviare i dati pivot in Amazon Simple Storage Service (Amazon S3)?

Breve descrizione

La trasformazione relazionale permette di utilizzare le strutture dati NoSQL, come array e struct, nei database relazionali. La trasformazione relazionale restituisce una raccolta di DynamicFrames (una DynamicFrameCollection in Python e un array in Scala). Tutti i DynamicFrames restituiti da una trasformazione relazionale sono accessibili tramite i nomi individuali in Python e gli indici di array in Scala.

Risoluzione

Trasformazione relazionale di dati

Questo tutorial utilizza il seguente schema:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string

Usa la seguente sintassi di trasformazione relazionale per Python:

# AWS Glue Data Catalog: database and table names
db_name = "us-legislators"
tempDir = "s3://awsexamplebucket/temp_dir/"

# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)

# Relationalize transformation
dfc = persons.relationalize("root", tempDir)
dfc.select('root_images').printSchema()
dfc.select('root_images').show()

Usa la seguente sintassi di trasformazione relazionale per Scala:

// AWS Glue Data Catalog: database and table names
val dbName = "us-legislators"
val tblPersons = "persons_json"

// Output Amazon S3 temp directory
val tempDir = "s3://awsexamplebucket/temp_dir"

val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir)
personRelationalize(2).printSchema()
personRelationalize(2).show()

Interpretazione dei dati pivot

Questa trasformazione relazionale produce due schemi: root e root_images.

root:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: long

root_images:

|-- id: long
|-- index: int
|-- images.val.url: string
  • id: ordine dell'elemento dell'array (1, 2 o 3)
  • index: posizione di indice di ogni elemento nell’array
  • images.val.url: valore di images.val.url in root_images

Questi sono gli unici campi che è possibile utilizzare come campi di partizione per archiviare i dati pivot in Amazon S3. Non è invece possibile specificare i campi della tabella root, come name, perché questi campi non esistono in root_images.

Unione dei dati relazionati per ottenere dati normalizzati

L'attributo id in root_images è l'ordine degli array (1, 2 o 3) nel set di dati. L'attributo images in root contiene il valore dell'indice dell'array. Ciò significa che è necessario utilizzare images e id per unire root e root_images. È possibile eseguire dynamicFrame.show() per verificare l'ordine degli array e il valore dell'indice dell'array.

Per unire root e root_images:

Python:

joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')

Scala:

val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))

Archiviazione dei dati pivot

Per archiviare i dati pivot in Amazon S3 con partizioni:

Python:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")

Scala:

Nota: nell'esempio seguente, personRelationalize(2) è la tabella di dati pivot root_images.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

Per archiviare i dati pivot in Amazon S3 senza partizioni:

Python:

datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"

Scala:

Nota: nell'esempio seguente, personRelationalize(2) è la tabella di dati pivot root_images.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths)),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

Dopo avere scritto i dati in Amazon S3, puoi interrogare i dati in Amazon Athena o utilizzare un DynamicFrame per scrivere i dati in un database relazionale, come Amazon Redshift.


Informazioni correlate

Simplify querying nested JSON with the AWS Glue relationalize transform

Code example: Joining and relationalizing Data

AWS UFFICIALE
AWS UFFICIALEAggiornata 3 anni fa