¿Cómo puedo utilizar datos dinámicos después de una transformación Relationalize de AWS Glue?
Quiero usar la transformación Relationalize de AWS Glue para aplanar mis datos. ¿Qué campos puedo usar como particiones para almacenar los datos dinámicos en Amazon Simple Storage Service (Amazon S3)?
Descripción corta
La transformación Relationalize permite utilizar estructuras de datos que no sean de SQL, como matrices y estructuras, en bases de datos relacionales. La transformación Relationalize devuelve una recopilación de DynamicFrames (una DynamicFrameCollection en Python y una matriz en Scala). Se puede acceder a todos los DynamicFrames devueltos por una transformación Relationalize a través de sus nombres individuales en Python y a través de índices de matrices en Scala.
Resolución
Relacionalización de los datos
En este tutorial se utiliza el siguiente esquema:
|-- family_name: string |-- name: string |-- gender: string |-- image: string |-- images: array | |-- element: struct | | |-- url: string
Utilice la siguiente sintaxis Relationalize para Python:
# AWS Glue Data Catalog: database and table names db_name = "us-legislators" tempDir = "s3://awsexamplebucket/temp_dir/" # Create dynamic frames from the source tables persons = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_persons) # Relationalize transformation dfc = persons.relationalize("root", tempDir) dfc.select('root_images').printSchema() dfc.select('root_images').show()
Utilice la siguiente sintaxis Relationalize para Scala:
// AWS Glue Data Catalog: database and table names val dbName = "us-legislators" val tblPersons = "persons_json" // Output Amazon S3 temp directory val tempDir = "s3://awsexamplebucket/temp_dir" val persons: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblPersons).getDynamicFrame() val personRelationalize = persons.relationalize(rootTableName = "root", stagingPath = tempDir) personRelationalize(2).printSchema() personRelationalize(2).show()
Interpretación de los datos dinámicos
Esta transformación Relationalize da lugar a dos esquemas: root y root_images.
root:
|-- family_name: string |-- name: string |-- gender: string |-- image: string |-- images: long
root_images:
|-- id: long |-- index: int |-- images.val.url: string
- id: orden del elemento de la matriz (1, 2 o 3)
- index: posición del índice para cada elemento de una matriz
- images.val.url: valor de images.val.url en root_images
Estos son los únicos campos que se pueden usar como campos de partición para almacenar estos datos dinámicos en Amazon S3. Especificar campos de tabla root, como name, no funciona porque esos campos no existen en root_images.
Unión de los datos resultado de Relationalize para obtener los datos normalizados
El atributo id en root_images indica el orden de las matrices (1, 2 o 3) en el conjunto de datos. El atributo images en root contiene el valor del índice de la matriz. Esto significa que debe utilizar images e id para unir root y root_images. Puede ejecutar dynamicFrame.show() para comprobar el orden de las matrices y el valor del índice de la matriz.
Para unir root y root_images:
Python:
joined_root_root_images = Join.apply(dfc.select('root'), dfc.select('root_images'), 'images', 'id')
Scala:
val joined_root_root_images = personRelationalize(0).join(keys1 = Seq("images"), keys2 = Seq("id"), frame2 = personRelationalize(1))
Almacenamiento de los datos dinámicos
Para almacenar los datos dinámicos en Amazon S3 con particiones:
Python:
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir,"partitionKeys":["id"]}, format = "csv",transformation_ctx = "datasink4")
Scala:
Nota: En el siguiente ejemplo, personRelationalize(2) es la tabla de datos dinámicos root_images.
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> paths, "partitionKeys" -> List("id"))), format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))
Para almacenar los datos dinámicos en Amazon S3 sin particiones:
Python:
datasink5 = glueContext.write_dynamic_frame.from_options(frame = dfc.select('root_images'), connection_type = "s3", connection_options = {"path": outputHistoryDir}, format = "csv",transformation_ctx = "datasink5"
Scala:
Nota: En el siguiente ejemplo, personRelationalize(2) es la tabla de datos dinámicos root_images.
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> paths)), format = "csv", transformationContext = "").writeDynamicFrame(personRelationalize(2))
Después de escribir los datos en Amazon S3, consulte los datos en Amazon Athena o utilice DynamicFrame para escribir los datos en una base de datos relacional, como Amazon Redshift.
Información relacionada
Contenido relevante
- OFICIAL DE AWSActualizada hace 3 años
- ¿Cómo puedo resolver el error «No queda espacio en el dispositivo» en un trabajo de ETL de AWS Glue?OFICIAL DE AWSActualizada hace 2 años
- OFICIAL DE AWSActualizada hace 2 meses