How do I connect to my Amazon MSK cluster using the Kafka-Kinesis-Connector?
When I try to use the Kafka-Kinesis-Connector to connect with Amazon Managed Streaming for Apache Kafka (Amazon MSK), I receive an error message. How do I connect to my Amazon MSK cluster using the Kafka-Kinesis-Connector?
Short description
To connect to your MSK cluster using the Kafka-Kinesis-Connector, your setup must meet the following requirements:
- An active AWS subscription.
- A virtual private cloud (VPC) that is visible to both the client machine and MSK cluster. The MSK cluster and client must reside in the same VPC.
- Connectivity to MSK and Apache Zookeeper servers.
- Two subnets associated to your VPC.
- Topics created in MSK to send and receive messages from the server.
Resolution
Building your project file
1. Clone the kafka-kinesis-connector project to download the Kafka-Kinesis-Connector.
2. Use the mvn package command to build the amazon-kinesis-kafka-connector-X.X.X.jar file in the target directory:
[ec2-user@ip-10-0-0-71 kinesis-kafka-connector]$ mvn package .. ...... [INFO] Replacing /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT.jar with /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/target/amazon-kinesis-kafka-connector-0.0.9-SNAPSHOT-shaded.jar [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 28.822 s [INFO] Finished at: 2020-02-19T13:01:31Z [INFO] Final Memory: 26M/66M [INFO] ------------------------------------------------------------------------
The Kafka-Kinesis-Connector looks for credentials in the following order: environment variables, java system properties, and the credentials profile file.
3. Update your configuration to the DefaultAWSCredentailsProviderChain setting:
[ec2-user@ip-10-0-0-71 target]$ aws configure
This command makes sure that the access key attached to the AWS Identity and Access Management (IAM) user has the minimum required permissions. The aws configure command also makes sure that there is a policy available to access Amazon Kinesis Data Streams or Amazon Kinesis Data Firehose. For more information about setting AWS credentials, see Working with AWS credentials.
Note: If you are using a Java Development Kit (JDK), you can also use the EnvironmentVariableCredentialsProvider class to provide credentials.
4. If you are using Kinesis Data Streams, then update your policy to the following:
{ "Version": "2012-10-17", "Statement": [{ "Sid": "Stmt123", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards", "kinesis:DescribeStreamSummary", "kinesis:RegisterStreamConsumer" ], "Resource": [ "arn:aws:kinesis:us-west-2:123xxxxxxxxx:stream/StreamName" ] }] }
If you are using Kinesis Data Firehose, then update your policy to look like the following example:
{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Action": [ "firehose:DeleteDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch", "firehose:UpdateDestination" ], "Resource": [ "arn:aws:firehose:us-west-2:123xxxxxxxxx:deliverystream/DeliveryStreamName" ] }] }
For more information about the Kinesis Data Firehose delivery stream settings, see Configuration and credential file settings.
Configuring the connector
Note: You can configure the Kafka-Kinesis-Connector to publish messages from MSK. Messages can be published to the following destinations: Amazon Simple Storage Service (Amazon S3), Amazon Redshift, or Amazon OpenSearch Service.
1. If you are setting up Kinesis Data Streams, you can configure the connector with the following values:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.AmazonKinesisSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 streamName=YOUR_STREAM_NAME usePartitionAsHashKey=false flushSync=true # Use new Kinesis Producer for each Partition singleKinesisProducerPerPartition=true # Whether to block new records from putting onto Kinesis Producer if # threshold for outstanding records have reached pauseConsumption=true outstandingRecordsThreshold=500000 # If outstanding records on producers are beyond threshold sleep for following period (in ms) sleepPeriod=1000 # If outstanding records on producers are not cleared sleep for following cycle before killing the tasks sleepCycles=10 # Kinesis Producer Configuration - https://github.com/awslabs/amazon-kinesis-producer/blob/main/java/amazon-kinesis-producer-sample/default_config.properties # All kinesis producer configuration have not been exposed maxBufferedTime=1500 maxConnections=1 rateLimit=100 ttl=60000 metricsLevel=detailed metricsGranuality=shard metricsNameSpace=KafkaKinesisStreamsConnector aggregation=true
-or-
If you are setting up a different type of stream, configure the Kinesis Data Firehose delivery stream properties like this:
name=YOUR_CONNECTER_NAME connector.class=com.amazon.kinesis.kafka.FirehoseSinkConnector tasks.max=1 topics=YOUR_TOPIC_NAME region=us-east-1 batch=true batchSize=500 batchSizeInBytes=3670016 deliveryStream=YOUR_DELIVERY_STREAM_NAME
2. Configure the worker properties for either standalone or distributed mode:
bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter #internal.value.converter=org.apache.kafka.connect.storage.StringConverter #internal.key.converter=org.apache.kafka.connect.storage.StringConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter.schemas.enable=true internal.value.converter.schemas.enable=true offset.storage.file.filename=offset.log
For more information about Kafka-Kinesis-Connector's standalone or distributed mode, see Kafka Connect on the Apache website.
3. Copy the amazon-kinesis-kafka-connector-0.0.X.jar file to your directory and export classpath.
Note: You can also add the amazon-kinesis-kafka-connector-0.0.X.jar file to the JAVA_HOME/lib/ext directory.
4. Run the kafka-kinesis-connector by using the following command syntax:
[ec2-user@ip-10-0-0-71 kafka_2.12-2.2.1]$ ./bin/connect-standalone.sh /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ worker.properties /home/ec2-user/kafka-kinesis-connector/kafka-kinesis-connector/ kinesis-kafka-streams-connecter.properties
Related information

Relevant content
- asked 4 months agolg...
- asked 7 months agolg...
- Accepted Answerasked 4 years agolg...
- asked a year agolg...
- asked 3 months agolg...
- How do I troubleshoot common issues when using my Amazon MSK cluster with SASL/SCRAM authentication?AWS OFFICIALUpdated 2 years ago
- AWS OFFICIALUpdated 8 months ago
- AWS OFFICIALUpdated 8 months ago
- AWS OFFICIALUpdated 8 months ago