Como uso dados dinâmicos após uma transformação relacional do AWS Glue?

4 minuto de leitura
0

Quero usar a transformação de relacionalização do AWS Glue para nivelar meus dados. Quais campos posso usar como partições para armazenar os dados dinâmicos no Amazon Simple Storage Service (Amazon S3)?

Descrição breve

A transformação de relacionalização possibilita o uso de estruturas de dados NoSQL, como matrizes e estruturas, em bancos de dados relacionais. A transformação de relacionalização retorna uma coleção de DynamicFrames (uma DynamicFrameCollection em Python e uma matriz em Scala). Todos os DynamicFrames retornados por uma transformação de relacionalização podem ser acessados por meio de seus nomes individuais em Python e por meio de índices de matriz em Scala.

Resolução

Relacionalizar os dados

Este tutorial usa o seguinte esquema:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: array
|    |-- element: struct
|    |    |-- url: string

Use a seguinte sintaxe de relacionalização para Python:

# AWS Glue Data Catalog: database and table names
db_name = "us-legislators"
tempDir = "s3://awsexamplebucket/temp_dir/"

# Create dynamic frames from the source tables
persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons)

# Relationalize transformation
dfc = persons.relationalize("root", tempDir)
dfc.select('root_images').printSchema()
dfc.select('root_images').show()

Use a seguinte sintaxe de relacionalização para Scala:

// AWS Glue Data Catalog: database and table names
val dbName = "us-legislators"
val tblPersons = "persons_json"

// Output Amazon S3 temp directory
val tempDir = "s3://awsexamplebucket/temp_dir"

val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame()
val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir)
personRelationalize(2).printSchema()
personRelationalize(2).show()

Interpretar os dados dinâmicos

Essa transformação de relacionalização produz dois esquemas: root e root_images.

root:

|-- family_name: string
|-- name: string
|-- gender: string
|-- image: string
|-- images: long

root_images:

|-- id: long
|-- index: int
|-- images.val.url: string
  • id: ordem do elemento da matriz (1, 2 ou 3)
  • index: posição do índice para cada elemento em uma matriz
  • images.val.url: valor para images.val.url em root_images

Esses são os únicos campos que podem ser usados como campos de partição para armazenar esses dados dinâmicos no Amazon S3. Especificar campos da tabela root, como name, não funciona porque esses campos não existem em root_images.

Juntar os dados relacionalizados para obter os dados normalizados

O atributo id em root_images é a ordem das matrizes (1, 2 ou 3) no conjunto de dados. O atributo images em root contém o valor do índice de matrizes. Isso significa que você deve usar images e id para unir root e root_images. É possível executar dynamicFrame.show() para verificar a ordem das matrizes e o valor do índice de matrizes.

Para unir root e root_images:

Python:

joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')

Scala:

val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))

Armazenar os dados dinâmicos

Para armazenar os dados dinâmicos no Amazon S3 com partições:

Python:

datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")

Scala:

**Observação:**no exemplo a seguir, personRelationalize(2) é a tabela de dados dinâmica root_images.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

Para armazenar os dados dinâmicos no Amazon S3 sem partições:

Python:

datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"

Scala:

**Observação:**no exemplo a seguir, personRelationalize(2) é a tabela de dados dinâmica root_images.

glueContext.getSinkWithFormat(connectionType = "s3",
  options = JsonOptions(Map("path" -> paths)),
  format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))

Depois de gravar os dados no Amazon S3, consulte-os no Amazon Athena ou use um DynamicFrame para gravá-los em um banco de dados relacional, como o Amazon Redshift.


Informações relacionadas

Simplificar consultas a JSON aninhado com a transformação de relacionalização do AWS Glue

Exemplo de código: Unir e relacionalizar dados

AWS OFICIAL
AWS OFICIALAtualizada há 3 anos