S3DistCp (s3-dist-cp) を使用して、Apache Parquet 形式のファイルを --groupBy および --targetSize オプションで連結しています。s3-dist-cp ジョブはエラーなしで完了しますが、生成された Parquet ファイルは破損しています。アプリケーションで Parquet ファイルを読み取ろうとすると、次のようなエラーメッセージが表示されます。
「/path/to/concatenated/parquet/file offset m の列チャンクに n 個の値が想定されますが、file offset z で終わる y ページではなく x 個の値が取得されました」
簡単な説明
S3DistCp は Parquet ファイルの連結をサポートしていません。代わりに PySpark を使用します。
解決方法
PySpark でターゲットファイルサイズを指定することはできませんが、パーティションの数を指定できます。Spark は、各パーティションを別々の出力ファイルに保存します。必要なパーティションの数を見積もるには、データセットのサイズをターゲットの個々のファイルサイズで除します。
1. Apache Spark がインストールされた Amazon EMR クラスターを作成します。
2. 必要なエグゼキュターの数を指定します。これは、クラスターの容量とデータセットのサイズによって異なります。詳細については、Amazon EMR で Apache Spark アプリケーションのメモリを正常に管理するためのベストプラクティスを参照してください。
$ pyspark --num-executors number_of_executors
3. Spark DataFrame にソース Parquet ファイルをロードします。これは、Amazon Simple Storage Service (Amazon S3) パスまたは HDFS パスです。例:
df=sqlContext.read.parquet("s3://awsdoc-example-bucket/parquet-data/")
HDFS:
df=sqlContext.read.parquet("hdfs:///tmp/parquet-data/")
4. DataFrame を再パーティションします。次の例では、n はパーティションの数です。
df_output=df.coalesce(n)
5. DataFrame を送信先に保存します。これは、Amazon S3 パスまたは HDFS パスです。例:
df_output.write.parquet("URI:s3://awsdoc-example-bucket1/destination/")
HDFS:
df=sqlContext.write.parquet("hdfs:///tmp/destination/")
6. 送信先ディレクトリにあるファイルの数を確認します。
hadoop fs -ls "URI:s3://awsdoc-example-bucket1/destination/ | wc -l"
ファイルの合計数は、ステップ 4 の n の値に 1 を加えた値となっているはずです。Parquet 出力コミッターは、_SUCCESS という追加のファイルを書き込みます。