跳至内容

如何使用 MSK Connect 上运行的 MirrorMaker 2 在不同账户的 Amazon MSK 集群之间传输数据?

3 分钟阅读
0

我想使用 MSK Connect 上运行的 MirrorMaker 2.0 (MM2) 在不同 AWS 账户的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群之间传输数据。

解决方法

设置 VPC 对等关系

当 Amazon MSK 集群位于不同的 Amazon Virtual Private Cloud (Amazon VPC) 中时,您必须创建 VPC 对等连接

与源 Amazon MSK 集群关联的安全组必须允许来自目标集群安全组的所有流量。目标集群的安全组还必须允许来自源 Amazon MSK 集群安全组的所有流量。有关详细信息,请参阅更新您的安全组以引用对等安全组

**注意:**要引用其他账户中的安全组,请在 Source(源)或 Destination(目标)字段中加入账号。

创建包含 MM2 插件信息的插件

**注意:**MSK Connect 自定义插件接受 .jar 或 .zip 格式的文件或文件夹。

完成以下步骤:

  1. 要创建测试文件夹,然后压缩该文件夹,请运行以下命令:

    mkdir mm2
    zip mm2.zip mm2
  2. 将 .zip 对象上传到目标账户中的 Amazon Simple Storage Service (Amazon S3) 存储桶:

    aws s3 cp mm2.zip s3://mytestbucket/

    **注意:**将 s3://mytestbucket/ 替换为你的 Amazon S3 存储桶 URL。
    Apache Kafka 和 MSK Connect 内置了 MirrorMaker 库,因此您无需为此功能添加额外的 .jar 文件。

  3. 在目标账户中,使用 .zip 文件创建自定义插件并将该文件命名为 mm2-connect-plugin
    **注意:**使用 MSK Connect 时,在创建连接器时必须有自定义插件。

在目标账户中创建 MSK Connect 连接器

完成以下步骤:

  1. 打开 Amazon MSK 控制台

  2. 在导航窗格中的 MSK Connect 下方,选择 Connectors(连接器)。

  3. 选择 Create connector(创建连接器)。

  4. 选择 Use existing custom plugin(使用现有自定义插件)。

  5. Custom plugins(自定义插件)中,选择您创建的自定义插件,然后选择 Next(下一步)。

  6. 输入连接器的名称。
    (可选)输入描述。

  7. 在集群列表中,选择目标集群。

  8. 在连接器配置字段中添加与以下示例类似的配置:

    connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector  
    tasks.max=1  
    
    clusters=primary,replica  
    source.cluster.alias=primary  
    target.cluster.alias=replica  
    
    topics=example.*  
    replication.factor=2  
    topic.creation.default.replication.factor=2  
    topic.creation.default.partitions=2  
    consumer.group.id=mm2-connector  
    
    refresh.groups.interval.seconds=20  
    refresh.topics.interval.seconds=20  
    
    sync.topic.configs.interval.seconds=20  
    sync.topic.acls.interval.seconds=20  
    
    producer.enable.idempotence=true  
    
    transforms=renameTopic  
    transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter  
    transforms.renameTopic.regex=primary.(.*)  
    transforms.renameTopic.replacement=$1  
    
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
    
    # Source cluster options  
    source.cluster.bootstrap.servers=  
    source.cluster.security.protocol=PLAINTEXT  
    
    # Target cluster options  
    target.cluster.bootstrap.servers=  
    target.cluster.security.protocol=PLAINTEXT
  9. 设置连接器的容量。

  10. Worker configuration(Worker 配置)下,选择 Use the MSK default configuration(使用 MSK 默认配置)。

  11. Access permissions(访问权限)下,选择为 MSK Connect 提供所需权限的 AWS Identity and Access Management (IAM) 角色。然后,选择 Next(下一步)。

  12. Security(安全)页面的 Encryption - in transit(加密 - 传输中)下,选择 Plaintext traffic(纯文本流量)。然后,选择 Next(下一步)。

  13. 或者,在 Logs(日志)页面上,设置日志传输。然后,选择 Next(下一步)。

  14. Review and create(查看并创建)下,选择 Create connector(创建连接器)。

**注意:**在上述配置下,为了从源集群复制每个主题,MM2 在目标集群中创建了两个主题。例如,如果您在源集群上有一个主题 exampleTopic1,则 MM2 会在目标集群上创建 primary.exampleTopic1exampleTopic1 两个主题。然后 MM2 将消息路由到 exampleTopic1 主题。

创建客户端实例

您必须创建一个客户端实例,才能创建主题并生成或使用主题的数据。

完成以下步骤:

  1. 启动 Amazon Elastic Compute Cloud (Amazon EC2) 实例,然后连接到该实例。

  2. 要在客户端计算机上安装 Java,请运行以下命令:

    sudo yum -y install java-11
  3. 要下载 Apache Kafka,请运行以下命令:

    wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
    tar -xzf kafka_2.12-2.8.1.tgz
  4. 在源账户的 Amazon MSK 集群中创建主题 exampletopic1

    kafka-installation-path/bin/kafka-topics.sh --create --bootstrap-server SourceMSKclusterBootstrapServerString --replication-factor 3 --partitions 1 --topic exampletopic1

    **注意:**将 kafka-installation-path 替换为系统上安装 Kafka 的路径。将 SourceMSKclusterBootstrapServerString 替换为您的源 Amazon MSK 集群的 Bootstrap 服务器字符串。

  5. 在源账户的该集群中生成数据:

    Kafka-installation-path/bin/kafka-console-producer.sh --broker-list SourceMSKclusterBootstrapServerString --topic exampletopic1

    **注意:**将 kafka-installation-path 替换为系统上安装 Kafka 的路径。将 SourceMSKclusterBootstrapServerString 替换为您的源 Amazon MSK 集群的 Bootstrap 服务器字符串。
    预期输出:

    >message 1
    >message 2
  6. 在目标账户的该集群中列出主题:

    Kafka-installation-path/bin/kafka-topics.sh --bootstrap-server TargetMSKclusterBootstrapServerString --list

    **注意:**将 kafka-installation-path 替换为系统上安装 Kafka 的路径。将 TargetMSKclusterBootstrapServerString 替换为目标 Amazon MSK 集群的 Bootstrap 服务器字符串。
    输出示例:

    __amazon_msk_canary  __amazon_msk_connect_configs_mm2-*****  
    __amazon_msk_connect_offsets_mm2-*****  
    __amazon_msk_connect_status_mm2-*****  
    __consumer_offsets  
    exampleTopic1  
    primary.exampleTopic1
  7. 使用目标集群的数据:

    Kafka-installation-path/bin/kafka-console-consumer.sh --bootstrap-server TargetMSKclusterBootstrapServerString --topic exampletopic1 --from-beginning

    **注意:**将 kafka-installation-path 替换为系统上安装 Kafka 的路径。将 TargetMSKclusterBootstrapServerString 替换为目标 Amazon MSK 集群的 Bootstrap 服务器字符串。
    预期输出:

    >message 1
    >message 2

相关信息

迁移到 Amazon MSK 集群

AWS 官方已更新 4 个月前