Skip to content

How do I transfer data between Amazon MSK clusters in different accounts with MirrorMaker 2 that runs on MSK Connect?

5 minute read
0

I want to use MirrorMaker 2.0 (MM2) that runs on MSK Connect to transfer data between Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters in different AWS accounts.

Resolution

Set up VPC peering

When the Amazon MSK clusters are in different Amazon Virtual Private Clouds (Amazon VPC), you must create a VPC peering connection.

The security groups that are associated with the source Amazon MSK cluster must allow all traffic from the target cluster's security groups. The target cluster's security groups must also allow all traffic from the source Amazon MSK cluster's security groups. For more information, see Update your security groups to reference peer security groups.

Note: To reference a security group in another account, include the account number in the Source or Destination field.

Create a plugin with MM2 plugin information

Note: MSK Connect custom plugins accept a file or folder with a .jar or .zip format.

Complete the following steps:

  1. To create a test folder and then compress the folder, run the following commands:

    mkdir mm2
    zip mm2.zip mm2
  2. Upload the .zip object to your Amazon Simple Storage Service (Amazon S3) bucket in the target account:

    aws s3 cp mm2.zip s3://mytestbucket/

    Note: Replace s3://mytestbucket/ with your Amazon S3 bucket url.
    Apache Kafka and MSK Connect have MirrorMaker libraries built in, so you don't need to add additional .jar files for this functionality.

  3. In the target account, use the .zip file to create a custom plugin and name the file mm2-connect-plugin.
    Note: When you use MSK Connect, you must have a custom plugin at connector creation.

Create an MSK Connect Connector in the target account

Complete the following steps:

  1. Open the Amazon MSK console.

  2. In the navigation pane, under MSK Connect, choose Connectors.

  3. Choose Create connector.

  4. Choose Use existing custom plugin.

  5. In Custom plugins, select the custom plugin that you created, and then choose Next.

  6. Enter a name for the connector.
    (Optional) Enter a description.

  7. From the list of clusters, choose the target cluster.

  8. Add a configuration similar to the following example into the connector configuration field:

    connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector  
    tasks.max=1  
      
    clusters=primary,replica  
    source.cluster.alias=primary  
    target.cluster.alias=replica  
      
    topics=example.*  
    replication.factor=2  
    topic.creation.default.replication.factor=2  
    topic.creation.default.partitions=2  
    consumer.group.id=mm2-connector  
      
    refresh.groups.interval.seconds=20  
    refresh.topics.interval.seconds=20  
      
    sync.topic.configs.interval.seconds=20  
    sync.topic.acls.interval.seconds=20  
      
    producer.enable.idempotence=true  
      
    transforms=renameTopic  
    transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter  
    transforms.renameTopic.regex=primary.(.*)  
    transforms.renameTopic.replacement=$1  
      
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
      
    # Source cluster options  
    source.cluster.bootstrap.servers=  
    source.cluster.security.protocol=PLAINTEXT  
      
    # Target cluster options  
    target.cluster.bootstrap.servers=  
    target.cluster.security.protocol=PLAINTEXT
  9. Set the capacity for your connector.

  10. Under Worker configuration, choose Use the MSK default configuration.

  11. Under Access permissions, choose the AWS Identity and Access Management (IAM) role that provides the required permissions to MSK Connect. Then, choose Next.

  12. On the Security page, under Encryption - in transit, choose Plaintext traffic. Then, choose Next.

  13. (Optional) On the Logs page, set the log delivery. Then, choose Next.

  14. Under Review and create, choose Create connector.

Note: With the preceding configuration, to replicate each topic from the source cluster, MM2 creates two topics in the target cluster. For example, if you have the topic exampleTopic1 on the source cluster, then MM2 creates the topics primary.exampleTopic1 and exampleTopic1 on the target cluster. Then MM2 routes messages to exampleTopic1 topic.

Create a client instance

You must create a client instance to create topics and produce or consume data from topics.

Complete the following steps:

  1. Launch an Amazon Elastic Compute Cloud (Amazon EC2) instance and then, connect to the instance.

  2. To install Java on the client machine, run the following command:

    sudo yum -y install java-11
  3. To download Apache Kafka, run the following command:

    wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz 
    tar -xzf kafka_2.12-2.8.1.tgz
  4. Create the topic exampletopic1 in the Amazon MSK cluster in the source account:

    kafka-installation-path/bin/kafka-topics.sh --create --bootstrap-server SourceMSKclusterBootstrapServerString --replication-factor 3 --partitions 1 --topic exampletopic1

    Note: Replace kafka-installation-path with the path for where you installed Kafka on your system. Replace SourceMSKclusterBootstrapServerString with your source Amazon MSK cluster's Bootstrap server string.

  5. Produce data in the cluster on the source account:

    Kafka-installation-path/bin/kafka-console-producer.sh --broker-list SourceMSKclusterBootstrapServerString --topic exampletopic1

    Note: Replace kafka-installation-path with the path for where you installed Kafka on your system. Replace SourceMSKclusterBootstrapServerString with your source Amazon MSK cluster's Bootstrap server string.
    Expected output:

    >message 1
    >message 2
  6. List the topics in the cluster on the target account:

    Kafka-installation-path/bin/kafka-topics.sh --bootstrap-server TargetMSKclusterBootstrapServerString --list

    Note: Replace kafka-installation-path with the path for where you installed Kafka on your system. Replace TargetMSKclusterBootstrapServerString with your target Amazon MSK cluster's Bootstrap server string.
    Example output:

    __amazon_msk_canary  __amazon_msk_connect_configs_mm2-*****  
    __amazon_msk_connect_offsets_mm2-*****  
    __amazon_msk_connect_status_mm2-*****  
    __consumer_offsets  
    exampleTopic1  
    primary.exampleTopic1
  7. Consume data from the target cluster:

    Kafka-installation-path/bin/kafka-console-consumer.sh --bootstrap-server TargetMSKclusterBootstrapServerString --topic exampletopic1 --from-beginning

    Note: Replace kafka-installation-path with the path for where you installed Kafka on your system. Replace TargetMSKclusterBootstrapServerString with your target Amazon MSK cluster's Bootstrap server string.
    Expected output:

    >message 1
    >message 2

Related information

Migrate to an Amazon MSK Cluster