如何在 Amazon EMR 中连结 Parquet 文件?

1 分钟阅读
0

我使用 S3DistCp (s3-dist-cp) 并选择 --groupBy 和 --targetSize 选项以 Apache Parquet 格式串联文件。s3-dist-cp 任务完成并且未发生错误,但生成的 Parquet 文件却已损坏。当尝试在应用程序中读取 Parquet 文件时,我看到与以下内容类似的错误消息: “Expected n values in column chunk at /path/to/concatenated/parquet/file offset m but got x values instead over y pages ending at file offset z”

简短描述

S3DistCp 不支持串联 Parquet 文件。因此应使用 PySpark

解决方法

您不能在 PySpark 中指定目标文件大小,但您可以指定分区的数量。Spark 会将每个分区保存到单独的输出文件中。要估算您需要的分区数量,将数据集的大小除以单个目标文件的大小。

1.    创建安装了 Apache Spark 的 Amazon EMR 集群

2.    指定您需要的执行程序的数量。该数量由集群的容量和数据集的大小决定。如需更多信息,见 Best practices for successfully managing memory for Apache Spark applications on Amazon EMR

$  pyspark --num-executors number_of_executors

3.    将源 Parquet 文件负载至 Spark DataFrame。它可以是 Amazon Simple Storage Service (Amazon S3) 路径或 HDFS 路径。例如:

df=sqlContext.read.parquet("s3://awsdoc-example-bucket/parquet-data/")

HDFS:

df=sqlContext.read.parquet("hdfs:///tmp/parquet-data/")

4.    对 DataFrame 进行重新分区。在以下示例中,n 是分区的数量。

df_output=df.coalesce(n)

5.    将 DataFrame 保存到目标位置。它可以是 Amazon S3 路径或 HDFS 路径。例如:

df_output.write.parquet("URI:s3://awsdoc-example-bucket1/destination/")

HDFS:

df=sqlContext.write.parquet("hdfs:///tmp/destination/")

6.    验证目标目录中现在有多少文件:

hadoop fs -ls "URI:s3://awsdoc-example-bucket1/destination/ | wc -l"

文件总数应该是第 4 步的 n 值加 1。Parquet 输出提交程序会写入另一个额外的文件,名为 _SUCCESS


AWS 官方
AWS 官方已更新 2 年前