我使用 S3DistCp (s3-dist-cp) 并选择 --groupBy 和 --targetSize 选项以 Apache Parquet 格式串联文件。s3-dist-cp 任务完成并且未发生错误,但生成的 Parquet 文件却已损坏。当尝试在应用程序中读取 Parquet 文件时,我看到与以下内容类似的错误消息:
“Expected n values in column chunk at /path/to/concatenated/parquet/file offset m but got x values instead over y pages ending at file offset z”
简短描述
S3DistCp 不支持串联 Parquet 文件。因此应使用 PySpark。
解决方法
您不能在 PySpark 中指定目标文件大小,但您可以指定分区的数量。Spark 会将每个分区保存到单独的输出文件中。要估算您需要的分区数量,将数据集的大小除以单个目标文件的大小。
1. 创建安装了 Apache Spark 的 Amazon EMR 集群。
2. 指定您需要的执行程序的数量。该数量由集群的容量和数据集的大小决定。如需更多信息,见 Best practices for successfully managing memory for Apache Spark applications on Amazon EMR。
$ pyspark --num-executors number_of_executors
3. 将源 Parquet 文件负载至 Spark DataFrame。它可以是 Amazon Simple Storage Service (Amazon S3) 路径或 HDFS 路径。例如:
df=sqlContext.read.parquet("s3://awsdoc-example-bucket/parquet-data/")
HDFS:
df=sqlContext.read.parquet("hdfs:///tmp/parquet-data/")
4. 对 DataFrame 进行重新分区。在以下示例中,n 是分区的数量。
df_output=df.coalesce(n)
5. 将 DataFrame 保存到目标位置。它可以是 Amazon S3 路径或 HDFS 路径。例如:
df_output.write.parquet("URI:s3://awsdoc-example-bucket1/destination/")
HDFS:
df=sqlContext.write.parquet("hdfs:///tmp/destination/")
6. 验证目标目录中现在有多少文件:
hadoop fs -ls "URI:s3://awsdoc-example-bucket1/destination/ | wc -l"
文件总数应该是第 4 步的 n 值加 1。Parquet 输出提交程序会写入另一个额外的文件,名为 _SUCCESS。