Comment puis-je exécuter des commandes SQL sur une table Amazon Redshift avant ou après avoir écrit des données dans une tâche AWS Glue ?

Lecture de 5 minute(s)
0

J'ai une tâche AWS Glue qui charge des données dans une table Amazon Redshift. Je souhaite exécuter des commandes SQL sur Amazon Redshift avant ou après la fin de la tâche AWS Glue.

Résolution

Transmettez le paramètre suivant à la classe DynamicFrameWriter AWS Glue à des fins d'autorisation :

  • aws_iam_role : Fournit l'autorisation d'accéder aux données d'une autre ressource AWS. Utilisez ce paramètre avec l'ARN entièrement spécifié du rôle AWS Identity and Access Management (IAM) attaché au cluster Amazon Redshift. Par exemple, utilisez arn:aws:iam::123456789012:role/redshift_iam_role. Pour plus d'informations, consultez la section Paramètres d'autorisation.

Transmettez un ou plusieurs des paramètres suivants à la classe AWS Glue DynamicFramewriter :

  • preactions : Liste des commandes SQL délimitées par une virgule qui sont exécutées avant la commande COPY. Si les commandes échouent, Amazon Redshift génère une exception.
    Remarque : Le paramètre preaction ne peut pas contenir de caractères de nouvelle ligne.
  • postactions : Liste de commandes SQL délimitées par un point virgule qui sont exécutées après l’exécution réussie d'une commande COPY. Si les commandes échouent, Amazon Redshift génère une exception.
    Remarque : Le paramètre postaction ne peut pas contenir de caractères de nouvelle ligne.
  • extracopyoptions : Liste d'options supplémentaires à ajouter à la commande COPY Amazon Redshift lors du chargement des données. Par exemple, vous pouvez utiliser TRUNCATECOLUMNS ou MAXERROR.

Exemples de scénarios

Tronquer une table Amazon Redshift avant d'insérer des enregistrements dans AWS Glue

Utilisez le paramètre preactions.

Exemple Python :

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table schema.target_table;","dbtable": "schema.target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Exemple Scala :

val options = JsonOptions(Map(   "dbtable" -> "schema.target_table",
   "database" -> "redshiftdb",
   "preactions" -> "truncate table schema.target_table;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasource0").writeDynamicFrame(datasource0)

Dans les commandes précédentes, remplacez les valeurs suivantes :

  • test_red : Connexion au catalogue à utiliser.
  • schema.target_table : Schéma de la base de données Amazon Redshift et table Amazon Redshift.
  • s3://s3path : Chemin du répertoire temporaire de la table Amazon Redshift.

Utiliser un rôle IAM dans les options de connexion

Les informations d'identification expirent au bout d'une heure. Utilisez un rôle IAM dans les options de connexion afin que vos connexions de longue durée n'échouent pas.

Exemple Python :

glueContext.create_dynamic_frame.from_catalog(database = "redshift-database-name",  table_name = "redshift-table-name", redshift_tmp_dir = args["TempDir"], additional_options = {"aws_iam_role": "arn:aws:iam::account-id:role/role-name"})

Exemple Scala :

val connectionOptions = JsonOptions(Map(      "url" -> "jdbc:redshift://your_redshift_cluster.us-west-2.redshift.amazonaws.com:5439/database",
      "dbtable" -> "schema.table",
      "user" -> "redshift_user",
      "password" -> "redshift_password",
      "tempdir" -> "s3://temp_bucket/temp",
      "aws_iam_role" -> "arn:aws:iam::your_account_id:role/your_role_name" ))

val dyf = glueContext.getSource("redshift", connectionOptions)
          .getDynamicFrame()

Fusionner une table Amazon Redshift dans AWS Glue (mise à jour)

Après avoir chargé les données dans une table intermédiaire, créez une requête de fusion.

Remarque : Pour que votre requête de fusion fonctionne, target_table doit déjà exister dans votre base de données Amazon Redshift.

Exemple Python :

post_query="begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource0, catalog_connection = "test_red", connection_options = {"preactions":"drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;","dbtable": "schema.stage_table", "database": "redshiftdb","postactions":post_query},
redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Exemple Scala :

val options = JsonOptions(Map(   "dbtable" -> "schema.stage_table",
   "database" -> "redshiftdb",
   "preactions" -> "drop table if exists schema.stage_table;create table schema.stage_table as select * from schema.target_table where 1=2;",
   "postactions" -> "begin;delete from schema.target_table using schema.stage_table where schema.stage_table.id = schema.target_table.id ; insert into schema.target_table select * from schema.stage_table; drop table schema.stage_table; end;"
   ))
glueContext.getJDBCSink(catalogConnection = "test_red", options = options, redshiftTmpDir = 's3://s3path', transformationContext = "datasink4").writeDynamicFrame(datasink4)

Dans les commandes précédentes, remplacez les valeurs suivantes :

  • schema.target_table : Schéma de la base de données Amazon Redshift et table Amazon Redshift.
  • test_red : Connexion au catalogue à utiliser.
  • schema.stage_table : Schéma de la base de données Amazon Redshift et table intermédiaire Amazon Redshift.
  • s3://s3path : Chemin du répertoire temporaire de la table Amazon Redshift.

Ignorer les lignes qui ne sont pas valides

Utilisez le paramètre extracopyoptions pour spécifier une valeur MAXERROR plus élevée.

Exemple Python :

datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = persons_DyF, catalog_connection = "test", connection_options = {"dbtable": "schema.load_table", "database": "reddb","extracopyoptions":"MAXERROR 2"},redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")

Exemple Scala :

val options = JsonOptions(Map(   "dbtable" -> "schema.load_table",
   "database" -> "reddb",
   "extracopyoptions" -> "MAXERROR 2"
   ))

Dans les exemples précédents, remplacez les valeurs suivantes :

  • schema.target_table : Schéma de la base de données Amazon Redshift et table Amazon Redshift.
  • schema.stage_table : Schéma de la base de données Amazon Redshift et table intermédiaire Amazon Redshift.
  • test : Connexion au catalogue à utiliser.
  • testalblog2 : Table Amazon Redshift dans laquelle charger les données.
  • reddb : Base de données Amazon Redshift.
  • emp1 : Table Amazon Redshift à partir de laquelle supprimer les données, une fois les données chargées dans testalblog2.
  • s3://s3path : Chemin du répertoire temporaire de la table Amazon Redshift.

Informations supplémentaires

Vous pouvez utiliser le connecteur Amazon Redshift Spark (redshift-jdbc42-2.1.0.9) lorsque vous utilisez les tâches ETL AWS Glue 4.0. Ce connecteur possède les propriétés suivantes :

  • Prend en charge les URL JDBC basées sur IAM.
  • Inclut des options d'amélioration des performances telles que autopushdown, autopushdown.s3_result_cache et unload_s3_format.
  • Inclut l'option de chiffrement SSE_KMS qui peut être utilisée pour les données du dossier temporaire. AWS Glue utilise ces données lorsqu'il lit des tables Amazon Redshift.

Informations connexes

COPY

TRUNCATE

Opérations de chargement de données

AWS OFFICIEL
AWS OFFICIELA mis à jour il y a 3 mois