Usage of StreamingFileSink is throwing NoClassDefFoundError

0

I know this could be my problem, but trying to graple it for a while. I am trying to run flink in AWS EMR cluster.

My setup is: Time series event from Kinesis -> flink job -> Save it to S3

    DataStream<Event> kinesis =
                env.addSource(new FlinkKinesisConsumer< (this.streamName, new EventSchema(), kinesisConsumerConfig)).name("source");
    final StreamingFileSink<Event> streamingFileSink =
                StreamingFileSink.<Event>forRowFormat(
                        new org.apache.flink.core.fs.Path("s3a://"+ this.bucketName + "/" + this.objectPrefix),
                        new SimpleStringEncoder<>("UTF-8"))
                        .withBucketAssignerAndPolicy(new OrgIdBucketAssigner(), DefaultRollingPolicy.create().build())
                        .build();

DataStream<Event> eventDataStream =  kinesis
                .rebalance()
                .keyBy(createKeySelectorByChoosingOrgIdFromTheEvent())
                .process(new KeyedProcessFunction<String, Event, Event>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<DeviceEvent> out) throws Exception {
                        out.collect(value);
                    }
                });
eventDataStream.addSink(streamingFileSink).name("streamingFileSink");

From one of the sites, https://www.mail-archive.com/user@flink.apache.org/msg25039.html

I came to know that in order to make StreamingFileSink work, one has to drop a jar flink-s3-fs-hadoop-1.7.1.jar to /usr/lib/flink/lib folder. My /usr/lib/flink/lib folder in EMR master node looks as below

-rw-r--r-- 1 root root     9924 Mar 20 01:06 slf4j-log4j12-1.7.15.jar
-rw-r--r-- 1 root root 42655628 Mar 20 01:06 flink-shaded-hadoop2-uber-1.7.1.jar
-rw-r--r-- 1 root root   483665 Mar 20 01:06 log4j-1.2.17.jar
-rw-r--r-- 1 root root   140172 Mar 20 01:06 flink-python_2.11-1.7.1.jar
-rw-r--r-- 1 root root 92070994 Mar 20 01:08 flink-dist_2.11-1.7.1.jar
-rw-r--r-- 1 root root 23451686 May  5 23:04 flink-s3-fs-hadoop-1.7.1.jar

When I try to run the flink job, it throws the below exception in EMR slaves.

2019-05-06 01:43:49,589 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedProcess -> Sink: streamingFileSink (3/4) (31000a186f6ab11f0066556116c669ba) switched from RUNNING to FAILED.
java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.internal.S3ErrorResponseHandler
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:374)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:553)
    at org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:531)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.newAmazonS3Client(DefaultS3ClientFactory.java:80)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.DefaultS3ClientFactory.createS3Client(DefaultS3ClientFactory.java:54)
    at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:256)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:125)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

Can you pls let me know what is the basic thing I am missing?

Edited by: nischit on May 6, 2019 7:09 AM

nischit
asked 5 years ago589 views
1 Answer
0

Finally I found the cause. Although its very very deceptive.
Flink internally uses AWS SDK version 1.11.271.
The class S3ErrorResponseHandler which is causing NoClassDefFoundError...
has the following static variables.

public class S3ErrorResponseHandler implements
        HttpResponseHandler<AmazonServiceException> {
    /** Shared logger for profiling information */
    private static final Log log = LogFactory
            .getLog(S3ErrorResponseHandler.class);

    /** Shared factory for creating XML event readers */
    private static final XMLInputFactory xmlInputFactory = XMLInputFactory
            .newInstance();

    private static enum S3ErrorTags {
        Error, Message, Code, RequestId, HostId
    };
   ....
   ...

In AWS SDK 1.11.272, the initialization of XMLInputFactory has been removed.
That gave me a clue.
I rebuilt flink library 1.7.1 by switching to 1.11.272. Voila it started working.
This still posed me few unanswered questions.
I did thorough debugging within Flink JVM running in EMR.
The classpath clearly has flink-s3-fs-hadoop-1.7.1.jar. I wrote a code to read that jar and print all its entries and I did see S3ErrorResponseHandler.
In my flink operator, I could initialize as below -

XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance()

The classloader clearly had reference to jre libs.
Yet, flink was unable to initialize this specific class. I wonder why!! Is it because of how Flink deals with operators!!!
Flink underneath serializes the operators and transmits to slave nodes. In slave nodes, the operators are deserialized, initialized and run as a task.
Between those vaarious phases, the Flink classloader somehow didnt have access to default implementation of XMLInputFactory from JRE. Thats very wierd!!! Also I wish JRE is more specific about which static variable it could not initialize when loading the class.
Should I call this as a bug in Flink?
No one has reported this error in AWS EMR when using Flink?

Also noted that flink release 1.7.1 has a bug with StreamingFlileSink. If your EMR cluster has 2 mounts, it tries do a read/write operation on both causing it to fail.
It was resolved in 1.8.0 and above. However AWS EMR still uses 1.7.1 release by default. So pls ensure you put 1.8.0 libraries under /usr/lib/flink/lib

nischit
answered 5 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions