Spark Stream on EMR Serverless with Kinesis as source does not update DynamoDb Lease Table

0

Question

Is there a limitation in EMR Serverless that prevents the spark-streaming-kinesis-asl_2.12 library from reading / writing to the DynamoDb lease table? When running a kinesis streaming spark job locally or on a EMR Cluster the DynamoDb lease table is created and syncs properly. Its only on EMR serverless where the table gets created but is never written to.

If not, where am I going wrong with my approach to streaming? (detailed below)

What I'm trying to achieve:

I want to run a very simple Spark streaming job that counts the number of records added to a Kinesis stream every second and prints the count out to in the logs. Nothing fancy.

I am using the Spark Streaming + Kinesis Integration approach.

I am using the spark-streaming-kinesis-asl_2.12 library.

The issue

New records are not detected by my streaming job.

The library (built on top of the KCL) is supposed to create a DynamoDb table to keep track of the checkpoint. The table is created by the library but never written to / updated. I think an error in the checkpointing with Dynamodb is resulting in the stream not picking up new records in Kinesis.

-->DynamoDb lease table not updating

My approach (resulting in the issue):

  • Create a Kinesis stream with one shard.
  • Create a VPC with private subnets
  • Create VPC endpoints for DynamoDb and integration endpoint for Kinesis
  • Create a security groups that allows all TCP traffic from anywhere (not secure, but simple for the sake of testing)
  • Create an EMR Serverless application
    • Spark & emr-6.10
    • Deployed in VPC private subnet and security group above
  • Submit job
    • --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:3.3.2 main.py
    • the execution role has admin permission (not secure, but simple for the sake of testing)
  • Publish records to Kinesis from local script

main.py

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

if __name__ == "__main__":

    logger.info("Testing spark streaming")
    sparkContext = SparkContext()
    sparkStreamingContext = StreamingContext(sparkContext, 1)
    applicationName = 'kinesisApp01'
    kinesisStreamName = 'raw-data-ingestion-stream'
    endpointUrl = 'https://kinesis.eu-west-1.amazonaws.com'
    regionName = 'eu-west-1'

    stream = KinesisUtils.createStream(
        ssc=sparkStreamingContext,
        kinesisAppName=applicationName,
        streamName=kinesisStreamName,
        endpointUrl=endpointUrl,
        regionName=regionName,
        initialPositionInStream=InitialPositionInStream.LATEST,
        checkpointInterval=2,
    )

    stream.pprint()

    sparkStreamingContext.start()
    sparkStreamingContext.awaitTermination()
    sparkStreamingContext.stop()
    sparkContext.stop()

Alternative approaches (that work): I ran the same script from my local machine, as well as trying it on an EMR Cluster. In both scenarios it worked as expected and the DynamoDb Checkpoint table was updated correctly, resulting in the most recent records being picked up by the streaming job.

No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions