Amazon Simple Storage Service (Amazon S3) 内に保存した Parquet または’ ORC ファイルを処理するために AWS Glue ジョブを実行すると、「Unable to infer schema」というエラーが発生します。
簡単な説明
Parquet または ORC ファイルは、Hive スタイルの key=value というパーティションパス形式に従う必要があります。ファイルがこの形式ではなく、階層パス構造を使用している場合、AWS Glue はスキーマを解釈できず、エラーが発生します。
たとえば、AWS Glue ジョブで s3://s3-bucket/parquet-data/ のファイルを処理する場合、ファイルは次のパーティション形式を使用する必要があります。
s3://s3-bucket/parquet-data/year=2018/month=10/day=10/file1.parquet
ファイルが次のパーティション化されていない形式を使用している場合、AWS Glue ジョブは失敗します。
s3://s3-bucket/parquet-data/year/month/day/file1.parquet
解決策
AWS Glue での「Unable to infer schema」エラーを解決するには、ユースケースに応じて次のいずれかの方法を使用します。
データを再構築する
ファイルを新しい S3 バケットにコピーし、Hive スタイルのパーティションパスを使用します。次に、ジョブを実行します。
パーティション列名をアスタリスクに置き換える
データを再構築できない場合は、Amazon S3 から直接 DynamicFrame を作成してください。パーティション列名の代わりに、アスタリスク (*) を使用します。AWS Glue にはパーティション列のデータは含まれず、DynamicFrame のデータのみが含まれます。
たとえば、s3://s3-bucket/parquet-data/year/month/day/files.parquet というファイルパスを使用してファイルを S3 バケットに保存する場合は、次の DynamicFrame を使用します。
dynamic_frame0 = glueContext.create_dynamic_frame_from_options(
's3',
connection_options={'paths': ['s3://s3-bucket/parquet-data/*/*/*']},
format='parquet',
transformation_ctx='dynamic_frame0'
)
マップクラス変換を使用してパーティション列を追加する
DynamicFrame にパーティション列を含めるには、データを DataFrame に読み込み、Amazon S3 ファイルパスの列を追加します。次に、マップクラス変換を適用します。
コード例
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import input_file_name
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.parquet("s3://s3-bucket/parquet-data/*/*/*")
modified_df = df.withColumn('partitions_column', input_file_name())
dyf_0 = DynamicFrame.fromDF(modified_df, glueContext, "dyf_0")
def modify_col(x):
if x['partitions_column']:
new_columns = x['partitions_column'].split('/')
x['year'], x['month'], x['day'] = new_columns[4], new_columns[5], new_columns[6]
del x['partitions_column']
return x
modified_dyf = Map.apply(dyf_0, f=modify_col)
datasink2 = glueContext.write_dynamic_frame.from_options(
frame=modified_dyf,
connection_type="s3",
connection_options={
"path": "s3://my-output-bucket/output/",
"partitionKeys": ["year", "month", "day"]
},
format="parquet",
transformation_ctx="datasink2"
)
注: 例の S3 パスを実際の S3 パスに置き換え、ユースケースに合わせてパーティション列をカスタマイズします。
存在しないファイルまたはプレフィックスを解決する
パスにファイルがない場合は、ファイルを削除したりアーカイブ化したりしていないかどうかを確認します。ファイルで別のプレフィックスを使用している場合は、AWS Glue スクリプトの connection_options パラメータを更新し、正しいパスを指すようにします。カタログテーブルが参照している S3 ロケーションが存在しなかったり、古くなっていたりしないかも確認してください。テーブルが存在しないファイルを指している場合、処理するデータがないため、ジョブは失敗します。
ジョブブックマークパラメータを使用するジョブが古いファイルをスキャンしている場合の問題を解決する
ジョブブックマークを使用すると、AWS Glue は以前に処理したファイルを追跡し、タイムスタンプが古いファイルをスキップします。ジョブで新しい適格ファイルが見つからない場合、処理するデータがないため、ジョブは失敗します。
この問題を解決するには、次の手順を実行します。
- ファイルの変更タイムスタンプが想定する範囲内であることを確認します。
- ブックマークを無効にすると、すべてのファイルが再処理されます。
- ファイルの名前を変更または更新し、最終更新タイムスタンプを新しい値にすると、AWS Glue はそれらのファイルを新しいファイルとして検出し、次回の実行に含めるようになります。
関連情報
AWS Glue で ETL 出力用のパーティションを管理する