How do I connect to my Amazon MSK cluster using the Kafka-Kinesis-Connector?

4 minute read
0

When I try to use the Kafka-Kinesis-Connector to connect with Amazon Managed Streaming for Apache Kafka (Amazon MSK), I receive an error message. How do I connect to my Amazon MSK cluster using the Kafka-Kinesis-Connector?

Short description

To connect to your MSK cluster using the Kafka-Kinesis-Connector, your setup must meet the following requirements:

  • An active AWS subscription.
  • A virtual private cloud (VPC) that is visible to both the client machine and MSK cluster. The MSK cluster and client must reside in the same VPC.
  • Connectivity to MSK and Apache Zookeeper servers.
  • Two subnets associated to your VPC.
  • Topics created in MSK to send and receive messages from the server.

Resolution

Building your project file

1.    Clone the kafka-kinesis-connector project to download the Kafka-Kinesis-Connector.

2.    Use the mvn package command to build the amazon-kinesis-kafka-connector-X.X.X.jar file in the target directory:

[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package
..
......

[INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 28.822 s
[INFO] Finished at: 2020-02-19T13:01:31Z
[INFO] Final Memory: 26M/66M
[INFO] ------------------------------------------------------------------------

The Kafka-Kinesis-Connector looks for credentials in the following order: environment variables, java system properties, and the credentials profile file.

3.    Update your configuration to the DefaultAWSCredentailsProviderChain setting:

[ec2-user@ip-10-0-0-71 target]$ aws configure

This command makes sure that the access key attached to the AWS Identity and Access Management (IAM) user has the minimum required permissions. The aws configure command also makes sure that there is a policy available to access Amazon Kinesis Data Streams or Amazon Kinesis Data Firehose. For more information about setting AWS credentials, see Working with AWS credentials.

Note: If you are using a Java Development Kit (JDK), you can also use the EnvironmentVariableCredentialsProvider class to provide credentials.

4.    If you are using Kinesis Data Streams, then update your policy to the following:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Sid": "Stmt123",
          "Effect": "Allow",
          "Action": [
               "kinesis:DescribeStream",
               "kinesis:PutRecord",
               "kinesis:PutRecords",
               "kinesis:GetShardIterator",
               "kinesis:GetRecords",
               "kinesis:ListShards",
               "kinesis:DescribeStreamSummary",
               "kinesis:RegisterStreamConsumer"
          ],
          "Resource": [
               "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName"
          ]
     }]
}

If you are using Kinesis Data Firehose, then update your policy to look like the following example:

{
     "Version": "2012-10-17",
     "Statement": [{
          "Effect": "Allow",
          "Action": [
               "firehose:DeleteDeliveryStream",
               "firehose:PutRecord",
               "firehose:PutRecordBatch",
               "firehose:UpdateDestination"
          ],
          "Resource": [
               "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName"
          ]
     }]
}

For more information about the Kinesis Data Firehose delivery stream settings, see Configuration and credential file settings.

Configuring the connector

Note: You can configure the Kafka-Kinesis-Connector to publish messages from MSK. Messages can be published to the following destinations: Amazon Simple Storage Service (Amazon S3), Amazon Redshift, or Amazon OpenSearch Service.

1.    If you are setting up Kinesis Data Streams, you can configure the connector with the following values:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
streamName=YOUR_STREAM_NAME
usePartitionAsHashKey=false
flushSync=true
# Use new Kinesis Producer for each Partition
singleKinesisProducerPerPartition=true
# Whether to block new records from putting onto Kinesis Producer if
# threshold for outstanding records have reached
pauseConsumption=true
outstandingRecordsThreshold=500000
# If outstanding records on producers are beyond threshold sleep for following period (in ms)
sleepPeriod=1000
# If outstanding records on producers are not cleared sleep for following cycle before killing the tasks
sleepCycles=10
# Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties
# All kinesis producer configuration have not been exposed
maxBufferedTime=1500
maxConnections=1
rateLimit=100
ttl=60000
metricsLevel=detailed
metricsGranuality=shard
metricsNameSpace=KafkaKinesisStreamsConnector
aggregation=true

-or-

If you are setting up a different type of stream, configure the Kinesis Data Firehose delivery stream properties like this:

name=YOUR_CONNECTER_NAME
connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector
tasks.max=1
topics=YOUR_TOPIC_NAME
region=us-east-1
batch=true
batchSize=500
batchSizeInBytes=3670016
deliveryStream=YOUR_DELIVERY_STREAM_NAME

2.    Configure the worker properties for either standalone or distributed mode:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
offset.storage.file.filename=offset.log

For more information about Kafka-Kinesis-Connector's standalone or distributed mode, see Kafka Connect on the Apache website.

3.    Copy the amazon-kinesis-kafka-connector-0.0.X.jar file to your directory and export classpath.

Note: You can also add the amazon-kinesis-kafka-connector-0.0.X.jar file to the JAVA_HOME/lib/ext directory.

4.    Run the kafka-kinesis-connector by using the following command syntax:

[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/

kinesis-kafka-streams-connecter.properties

Related information

Creating an Amazon MSK cluster

AWS OFFICIAL
AWS OFFICIALUpdated 3 years ago