EMRFS write errors

0

Upgrading from EMR versions 6.11 to 6.12 (even tried 7.0.0), I'm seeing these errors on the same exact job with the same resources - has something changed with how EMRFS has been implemented? What is the root cause of this error?

: org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=185, partition=635) failed; but task commit succ
ess, data duplication may happen. reason=ExceptionFailure(org.apache.spark.SparkException,[TASK_WRITE_FAILED] Task failed while writing rows to s3://<<redacted_s3_path>>,[Ljava.lang.StackTraceElement;@7eb2163c,org.apache.spark.
SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to s3://<<redacted_s3_path>>.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:789)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:421)
        at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: One or more of the specified parts could not be found.  The part
may not have been uploaded, or the specified entity tag may not match the part's entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; Reque
st ID: SZ2QF0DHZKKF6D9V; S3 Extended Request ID: M2ixKpwc5DmqtJ7FftTwhSZCrN7bcSAsy0m09c19HzF8pmQ/Z35NSUkcfX0HhgFdmfohsFJQ1wg=; Proxy: null), S3 Extended Request I
D: M2ixKpwc5DmqtJ7FftTwhSZCrN7bcSAsy0m09c19HzF8pmQ/Z35NSUkcfX0HhgFdmfohsFJQ1wg=
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)

        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5516)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5463)
        at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3667)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:26)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.call.CompleteMultipartUploadCall.perform(CompleteMultipartUploadCall.java:12)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor$CallPerformer.call(GlobalS3Executor.java:114)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:141)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:196)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:191)
        at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.completeMultipartUpload(AmazonS3LiteClient.java:170)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
        at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
        at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
        at com.sun.proxy.$Proxy47.completeMultipartUpload(Unknown Source)
        at com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.completeUpload(DefaultMultipartUploadDispatcher.java:57)

        at com.amazon.ws.emr.hadoop.fs.s3.upload.dispatch.DefaultMultipartUploadDispatcher.complete(DefaultMultipartUploadDispatcher.java:40)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.completeUpload(InMemoryStagingDirectory.java:227)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingDirectory.publish(InMemoryStagingDirectory.java:109)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.SynchronizedStagingDirectory.publish(SynchronizedStagingDirectory.java:52)
        at com.amazon.ws.emr.hadoop.fs.staging.metadata.inmemory.InMemoryStagingMetadataStore.publishStagingDirectory(InMemoryStagingMetadataStore.java:96)
        at com.amazon.ws.emr.hadoop.fs.staging.DefaultStagingMechanism.publishStagingDirectory(DefaultStagingMechanism.java:76)
        at org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.publishStagingDirectory(FileSystemOptimizedCommitter.java:149)
        at org.apache.hadoop.mapreduce.lib.output.FileSystemOptimizedCommitter.commitTask(FileSystemOptimizedCommitter.java:138)
        at com.amazon.emr.committer.FilterParquetOutputCommitter.commitTask(FilterParquetOutputCommitter.java:119)
        at com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter.commitTask(EmrOptimizedSparkSqlParquetOutputCommitter.java:9)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.$anonfun$commitTask$1(SparkHadoopMapRedUtil.scala:51)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:51)
        at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:78)
        at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:233)
        at org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol.commitTask(SQLEmrOptimizedCommitProtocol.scala:129)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.$anonfun$commit$1(FileFormatDataWriter.scala:107)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:641)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:107)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:405)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1575)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:411)
        ... 15 more
Dev
asked 3 months ago530 views
1 Answer
3

Hello,

The above error occurs if CompleteMultipartUpload failed to complete the multipart upload by assembling previously uploaded parts. This error could have been caused because the object was probably not written (fully committed) and a new request is being made on the same object as per this doc..

In order to alleviate this issue, you can turn off this speculative uploads by setting fs.s3.multipart.th.fraction.parts.completed to a high valid value e.g. 0.99. This configuration will disable Speculative UploadParts in EMRFS, and also reattempts only that part that failed during the MultiPart upload as per the changed logic for large file upload to s3 possibly you did not observe in the previous version perhaps.

Also, I recommend to use a bigger value for fs.s3n.multipart.uploads.split.size and also increase the number of retries. This specifies the maximum size of a part, in bytes, before EMRFS starts a new part upload when multipart uploads is enabled.

Also, fs.s3.maxRetries: this specifies how many retries EMRFS will use in its exponential backoff strategy. Default value is 15. These settings can be specified in emrfs-site.xml

AWS
SUPPORT ENGINEER
answered 3 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions