Get Hands-on with Amazon EKS - Workshop Event Series
Whether you're taking your first steps with Kubernetes or you're an experienced practitioner looking to sharpen your skills, our Amazon EKS workshop series delivers practical, real-world experience that moves you forward. Learn directly from AWS solutions architects and EKS specialists through hands-on sessions designed to build your confidence with Kubernetes. Register now and start building with Amazon EKS!
如何使用 MSK Connect 上运行的 MirrorMaker 2 在不同账户的 Amazon MSK 集群之间传输数据?
我想使用 MSK Connect 上运行的 MirrorMaker 2.0 (MM2) 在不同 AWS 账户的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群之间传输数据。
解决方法
设置 VPC 对等关系
当 Amazon MSK 集群位于不同的 Amazon Virtual Private Cloud (Amazon VPC) 中时,您必须创建 VPC 对等连接。
与源 Amazon MSK 集群关联的安全组必须允许来自目标集群安全组的所有流量。目标集群的安全组还必须允许来自源 Amazon MSK 集群安全组的所有流量。有关详细信息,请参阅更新您的安全组以引用对等安全组。
**注意:**要引用其他账户中的安全组,请在 Source(源)或 Destination(目标)字段中加入账号。
创建包含 MM2 插件信息的插件
**注意:**MSK Connect 自定义插件接受 .jar 或 .zip 格式的文件或文件夹。
完成以下步骤:
-
要创建测试文件夹,然后压缩该文件夹,请运行以下命令:
mkdir mm2 zip mm2.zip mm2 -
将 .zip 对象上传到目标账户中的 Amazon Simple Storage Service (Amazon S3) 存储桶:
aws s3 cp mm2.zip s3://mytestbucket/**注意:**将 s3://mytestbucket/ 替换为你的 Amazon S3 存储桶 URL。
Apache Kafka 和 MSK Connect 内置了 MirrorMaker 库,因此您无需为此功能添加额外的 .jar 文件。 -
在目标账户中,使用 .zip 文件创建自定义插件并将该文件命名为 mm2-connect-plugin。
**注意:**使用 MSK Connect 时,在创建连接器时必须有自定义插件。
在目标账户中创建 MSK Connect 连接器
完成以下步骤:
-
打开 Amazon MSK 控制台。
-
在导航窗格中的 MSK Connect 下方,选择 Connectors(连接器)。
-
选择 Create connector(创建连接器)。
-
选择 Use existing custom plugin(使用现有自定义插件)。
-
在 Custom plugins(自定义插件)中,选择您创建的自定义插件,然后选择 Next(下一步)。
-
输入连接器的名称。
(可选)输入描述。 -
在集群列表中,选择目标集群。
-
在连接器配置字段中添加与以下示例类似的配置:
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector tasks.max=1 clusters=primary,replica source.cluster.alias=primary target.cluster.alias=replica topics=example.* replication.factor=2 topic.creation.default.replication.factor=2 topic.creation.default.partitions=2 consumer.group.id=mm2-connector refresh.groups.interval.seconds=20 refresh.topics.interval.seconds=20 sync.topic.configs.interval.seconds=20 sync.topic.acls.interval.seconds=20 producer.enable.idempotence=true transforms=renameTopic transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter transforms.renameTopic.regex=primary.(.*) transforms.renameTopic.replacement=$1 key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter # Source cluster options source.cluster.bootstrap.servers= source.cluster.security.protocol=PLAINTEXT # Target cluster options target.cluster.bootstrap.servers= target.cluster.security.protocol=PLAINTEXT -
设置连接器的容量。
-
在 Worker configuration(Worker 配置)下,选择 Use the MSK default configuration(使用 MSK 默认配置)。
-
在 Access permissions(访问权限)下,选择为 MSK Connect 提供所需权限的 AWS Identity and Access Management (IAM) 角色。然后,选择 Next(下一步)。
-
在 Security(安全)页面的 Encryption - in transit(加密 - 传输中)下,选择 Plaintext traffic(纯文本流量)。然后,选择 Next(下一步)。
-
或者,在 Logs(日志)页面上,设置日志传输。然后,选择 Next(下一步)。
-
在 Review and create(查看并创建)下,选择 Create connector(创建连接器)。
**注意:**在上述配置下,为了从源集群复制每个主题,MM2 在目标集群中创建了两个主题。例如,如果您在源集群上有一个主题 exampleTopic1,则 MM2 会在目标集群上创建 primary.exampleTopic1 和 exampleTopic1 两个主题。然后 MM2 将消息路由到 exampleTopic1 主题。
创建客户端实例
您必须创建一个客户端实例,才能创建主题并生成或使用主题的数据。
完成以下步骤:
-
要在客户端计算机上安装 Java,请运行以下命令:
sudo yum -y install java-11 -
要下载 Apache Kafka,请运行以下命令:
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz tar -xzf kafka_2.12-2.8.1.tgz -
在源账户的 Amazon MSK 集群中创建主题 exampletopic1:
kafka-installation-path/bin/kafka-topics.sh --create --bootstrap-server SourceMSKclusterBootstrapServerString --replication-factor 3 --partitions 1 --topic exampletopic1**注意:**将 kafka-installation-path 替换为系统上安装 Kafka 的路径。将 SourceMSKclusterBootstrapServerString 替换为您的源 Amazon MSK 集群的 Bootstrap 服务器字符串。
-
在源账户的该集群中生成数据:
Kafka-installation-path/bin/kafka-console-producer.sh --broker-list SourceMSKclusterBootstrapServerString --topic exampletopic1**注意:**将 kafka-installation-path 替换为系统上安装 Kafka 的路径。将 SourceMSKclusterBootstrapServerString 替换为您的源 Amazon MSK 集群的 Bootstrap 服务器字符串。
预期输出:>message 1 >message 2 -
在目标账户的该集群中列出主题:
Kafka-installation-path/bin/kafka-topics.sh --bootstrap-server TargetMSKclusterBootstrapServerString --list**注意:**将 kafka-installation-path 替换为系统上安装 Kafka 的路径。将 TargetMSKclusterBootstrapServerString 替换为目标 Amazon MSK 集群的 Bootstrap 服务器字符串。
输出示例:__amazon_msk_canary __amazon_msk_connect_configs_mm2-***** __amazon_msk_connect_offsets_mm2-***** __amazon_msk_connect_status_mm2-***** __consumer_offsets exampleTopic1 primary.exampleTopic1 -
使用目标集群的数据:
Kafka-installation-path/bin/kafka-console-consumer.sh --bootstrap-server TargetMSKclusterBootstrapServerString --topic exampletopic1 --from-beginning**注意:**将 kafka-installation-path 替换为系统上安装 Kafka 的路径。将 TargetMSKclusterBootstrapServerString 替换为目标 Amazon MSK 集群的 Bootstrap 服务器字符串。
预期输出:>message 1 >message 2
相关信息
- 语言
- 中文 (简体)

相关内容
AWS 官方已更新 4 个月前