J'utilise S3DistCp (s3-dist-cp) pour concaténer des fichiers au format Apache Parquet avec les options --groupBy et --targetSize. La tâche s3-dist-cp s'exécute sans erreurs, mais les fichiers Parquet générés ne fonctionnent pas. Lorsque j'essaie de lire les fichiers Parquet dans les applications, je reçois un message d'erreur similaire au suivant :
« Expected n values in column chunk at /path/to/concatenated/parquet/file offset m but got x values instead over y pages ending at file offset z » « N valeurs attendues dans le segment de colonne à /path/to/concatenated/parquet/file décalage m mais obtenu x valeurs au lieu de y pages se terminant au décalage z du fichier »
Brève description
S3DistCp ne prend pas en charge la concaténation pour les fichiers Parquet. Utilisez PySpark à la place.
Solution
Vous ne pouvez pas spécifier la taille du fichier cible dans PySpark, mais vous pouvez spécifier le nombre de partitions. Spark enregistre chaque partition dans un fichier de sortie distinct. Pour estimer le nombre de partitions dont vous avez besoin, divisez la taille de l'ensemble de données par la taille du fichier individuel cible.
1. Créez un cluster Amazon EMR avec Apache Spark installé.
2. Spécifiez le nombre de programmes d'exécution dont vous avez besoin. Cela dépend de la capacité du cluster et de la taille de l'ensemble de données. Pour plus d'informations, consultez Bonnes pratiques de gestion de la mémoire pour les applications Apache Spark sur Amazon EMR.
$ pyspark --num-executors number_of_executors
3. Chargez les fichiers Parquet source dans un cadre de données Spark. Il peut s'agir d'un chemin Amazon Simple Storage Service (Amazon S3) ou d'un chemin HDFS. Par exemple :
df=sqlContext.read.parquet("s3://awsdoc-example-bucket/parquet-data/")
HDFS :
df=sqlContext.read.parquet("hdfs:///tmp/parquet-data/")
4. Repartitionnez le DataFrame. Dans l'exemple suivant, n correspond au nombre de partitions.
df_output=df.coalesce(n)
5. Enregistrez le DataFrame dans la destination. Il peut s'agir d'un chemin Amazon S3 ou d'un chemin HDFS. Par exemple :
df_output.write.parquet("URI:s3://awsdoc-example-bucket1/destination/")
HDFS :
df=sqlContext.write.parquet("hdfs:///tmp/destination/")
6. Vérifiez combien de fichiers se trouvent à présent dans le répertoire de destination :
hadoop fs -ls "URI:s3://awsdoc-example-bucket1/destination/ | wc -l"
Le nombre total de fichiers doit être la valeur n spécifiée à l'étape 4, plus un. Le validateur de sortie Parquet écrit le fichier supplémentaire, appelé _SUCCESS.