Replicate topics and messages between 2 MSK clusters using MirrorMaker2

0

Hi,

I have followed this blog post to set up MSK Connect Connectors, plugins and other configuration to replicate Kafka topics and messages between 2 MSK clusters set up in the same account but in 2 different VPCs. The Source cluster from which data needs to be replicated is configured with Plaintext, whereas the destination cluster is configured with IAM authentication. I have created IAM role and policy as recommended in the referenced post. Seeing this error in the MirrorSourceConnector and MirrorCheckpointConnector logs.

org.apache.kafka.connect.errors.ConnectException: Could not look up partition metadata for offset backing store topic in allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if this is your first use of the topic it may have taken too long to create

MirrorSourceConnector configuration properties:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max=4
clusters=source,target
source.cluster.alias=source
target.cluster.alias=target
# private Plaintext bootstrap servers endpoints for source MSK cluster
source.cluster.bootstrap.servers=b-1.source.gdgdfg.gdfg.com:9092,b-2.gdgdfg.gdfg.com:9092,b-3.gdgdfg.gdfg.com:9092
# private IAM bootstrap servers endpoints for target MSK cluster
target.cluster.bootstrap.servers=b-3.dest.dfdfgdfg.sfdgfdg.amazonaws.com:9098,b-2.dest.dfdfgdfg.sfdgfdg.amazonaws.com:9098,b-1.dest.dfdfgdfg.sfdgfdg.amazonaws.com:9098
target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::1234567678:role/iam_role_4_msk_connect" awsDebugCreds=true;
target.cluster.producer.sasl.mechanism=AWS_MSK_IAM
target.cluster.producer.security.protocol=SASL_SSL
target.cluster.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::1234567678:role/iam_role_4_msk_connect" awsDebugCreds=true;
target.cluster.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.security.protocol=SASL_SSL
target.cluster.consumer.sasl.mechanism=AWS_MSK_IAM
target.cluster.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
target.cluster.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::1234567678:role/iam_role_4_msk_connect" awsDebugCreds=true;
target.cluster.sasl.mechanism=AWS_MSK_IAM
target.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
refresh.groups.enabled=true
refresh.groups.interval.seconds=60
refresh.topics.interval.seconds=60
topics.exclude=.*[-.]internal,.*.replica,__.*,.*-config,.*-status,.*-offset
emit.checkpoints.enabled=true
topics=my-topic-1,my-topic-2,my-topic-3,my-topic-4
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
producer.max.block.ms=10000
producer.linger.ms=500
producer.retry.backoff.ms=1000
sync.topic.configs.enabled=true
sync.topic.configs.interval.seconds=60
refresh.topics.enabled=true
groups.exclude=console-consumer-.*,connect-.*,__.*
consumer.auto.offset.reset=earliest
replication.factor=3

I am sure I am missing something here, any help is really appreciated. Also is it not possible to edit the connector configuration after it's created?

Thanks

asked a year ago2457 views
3 Answers
0

check connectivity between VPCs and the security groups configuration:

  1. set up an ec2 instance in the target VPC, assign it security group of the target cluster, try to telnet to port 9092 of any of the source brokers
  2. if you get connection timeout: a. ensure you have outbound rules in target cluster to connect to port 9092 on SG of the source cluster b. ensure you have inbound rules in source cluster SG - port 9092 for target cluster SG
  3. if you can connect from ec2 instance and still getting connection time out, check if SG used for target MSK ENIs is the same as used for Connector ENI in target VPC
  4. if they are the same, check VPC peering or TGW, or other network related configurations between VPCs.

Let us know the results, please

AWS
EdbE
answered a year ago
  • Thanks for the update. I believe you're the same Ed who wrote the blog post, thank you very much for that as I couldn't much information about configuring MSK connect to work with MirrorMaker2.

    Coming to the issue I noticed that I had an incorrect IAM policy attached to the role, now seeing below error after fixing the policy as below.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:", "kafka:", ], "Resource": [ "*" ] } ] }

    Error message when connecting to target cluster endpoint: [MirrorSourceConnectorWithFullAccess|worker] [AdminClient clientId=adminclient-10] Failed authentication with b-2.fgfhfghfhh.tryrtyrty.c3.kafka.ca-central-1.amazonaws.com/INTERNAL_IP (An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by aws_msk_iam_auth_shadow.com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/52.119.198.216] failed: connect timed out]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state.)

    Note that I have provided MSK IAM private endpoints in the target.cluster.bootstrap.servers list, or should this be public endpoint list since the target cluster is configured with public access?

  • Forgot to mention that we do have VPC peering in place between the VPCs of the source and target clusters, so that's not an issue. I read about setting up a private STS interface endpoint for the target cluster VPC, I did that and still seeing the same error as it's using the global sts endpoint. I am not sure how and where to update the regional STS endpoint as I don't recall adding the endpoint information explicitly anywhere.

  • I don't think it's working as I am still seeing this error even after fixing IAM role trust relationship policy which was an issue earlier.

    [Worker-031d6f985ad5997a5] org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by aws_msk_iam_auth_shadow.com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com/209.54.180.124] failed: connect timed out]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state. [Worker-031d6f985ad5997a5] Caused by: javax.security.sasl.SaslException: Failed to find AWS IAM Credentials [Caused by aws_msk_iam_auth_shadow.com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to sts.amazonaws.com:443 [sts.amazonaws.com] failed: connect timed out] [Worker-031d6f985ad5997a5] at software.amazon.msk.auth.iam.internals.IAMSaslClient.generateClientMessage(IAMSaslClient.java:149) [Worker-031d6f985ad5997a5] at software.amazon.msk.auth.iam.internals.IAMSaslClient.evaluateChallenge(IAMSaslClient.java:96) [Worker-031d6f985ad5997a5] at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:524) [Worker-031d6f985ad5997a5] at java.base/java.security.AccessController.doPrivileged(Native Method)

0

I have made some progress on Kafka topics replication. Noticed that running MirrorMaker2 in an EC2 instance with below configuration worked to replicate topics from source to the target MSK cluster but the same fails when tried with MSK Connect. Note that source cluster has no authentication configured whereas the target cluster is configured with IAM authentication/authorization.

On a side note I could also remove the cluster name prefix for the replicated topics by adding replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy as mentioned at mirrormaker2-msk-migration when running MirrorMaker2 in the EC2 instance.

Another observation was that MSK Connect appears to expect the source cluster alias as 'source' since giving another string as 'my-source' didn't seem to work as the connector failed to start with an error for this field.

Error:

ERROR [MirrorSourceConnectorCustom|worker] Scheduler for MirrorSourceConnector caught exception in scheduled task: loading initial set of topic-partitions (org.apache.kafka.connect.mirror.Scheduler:102)
[Worker-0ba8c4d265a06ee31] java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1683240049825, tries=1, nextAllowedTryMs=1683240049926) timed out at 1683240049826 after 1 attempt(s)

Configuration:

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector **-- this line is not used in EC2 MirrorMaker2 test since there we're using Kafka provided connect-mirror-maker.sh by pointing to this configuration**
replication.factor=3
target.cluster.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required awsRoleArn="arn:aws:iam::111111111:role/msk-full-access-role" awsDebugCreds=true awsStsRegion="ca-central-1";
offset-sync.topic.replication.factor=3
sync.topic.acls.enabled=false
tasks.max=2
source->target.emit.checkpoints.enabled=true
source->target.enabled=true
source.cluster.alias=source
target.cluster.security.protocol=SASL_SSL
clusters=source,target
topics.exclude=.*[-.]internal,.*.replica,__.*,.*-config,.*-status,.*-offset
source->target.emit.heartbeats.enabled=true
target.cluster.sasl.mechanism=AWS_MSK_IAM
topics=my-topic-1,my-topic-2,my-topic-4,my-topic-3
refresh.topics.enabled=true
target.cluster.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
groups=.*
source.cluster.bootstrap.servers=b-1.my-source-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9092,b-2.my-source-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9092,b-3.my-source-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9092
target.cluster.alias=target
target.cluster.bootstrap.servers=b-3.my-target-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9098,b-1.my-target-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9098,b-2.my-target-kafka.fhfghfgh.c3.kafka.ca-central-1.amazonaws.com:9098
heartbeat.topic.replication.factor=3
sync.topic.configs.enabled=true
checkpoint.topic.replication.factor=3
source.cluster.security.protocol=PLAINTEXT
offset.flush.timeout.ms=50000
buffer.memory=100
answered a year ago
  • you can use different names in MSK Connect for MM2 cluster names. Please post here the configuration (and the error trace) that was failing, it could be related to the configuration.

    Another recommendation: if you are running on EC2 (meaning, you are free to choose a kafka version under MM2), you don't need to use CustomMM2ReplicationPolicy. Starting Kafka 3, MM2 has IdentityReplicationPolicy (https://github.com/apache/kafka/blob/trunk/connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/IdentityReplicationPolicy.java)

  • Thank you. I tried with IdentityReplicationPolicy and noticed that target topics still has the source cluster alias as the prefix. I will check and try again to see if it's something related to the configuration. Coming to MSK Connect, I have used exact configuration posted above except for the account and actual endpoint URLs.

  • @rePost-User-7286222, please post configuration you used with source and target names different than source and target. So it will be easy to spot possible misconfig.

0

I also was not able to see a custom replication policy via the custom.replication.policy property work in MSK connect. However I was able to get around this by setting the following items to be empty in each of the three required connectors.

replication.policy.separator= 
source.cluster.alias= 
target.cluster.alias= 

When I did this, the topics in the target have the same name as the source, and offset translation works as expected.

Got this idea from this SO post: https://stackoverflow.com/questions/59390555/is-it-possible-to-replicate-kafka-topics-without-alias-prefix-with-mirrormaker2

answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions