如何使用 MSK Connect 上运行的 MirrorMaker 2 在不同账户的 Amazon MSK 集群之间传输数据?

2 分钟阅读
0

我想使用 MSK Connect 上运行的 MirrorMaker 2.0 (MM2) 在不同账户的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 集群之间传输数据。

解决方法

设置 VPC 对等关系

由于 Amazon MSK 集群位于不同的虚拟私有云 (VPC) 中,因此您必须创建 VPC 对等连接。创建此 VPC 对等连接,与同一 AWS 区域或不同区域的另一个 AWS 账户中的 VPC 相连。有关详细信息,请参阅创建 VPC 对等连接

与源 Amazon MSK 集群关联的安全组必须允许来自目标集群安全组的所有流量。目标集群的安全组还必须允许来自 MSK 集群安全组的所有流量。有关详细信息,请参阅更新您的安全组以引用对等安全组

注意:要引用其他账户中的安全组,请在目标字段中输入账号。示例: 123456789012/sg-1a2b3c4d

创建包含 MM2 插件信息的插件

MSK Connect 自定义插件接受以 .jar 或 .zip 结尾的文件或文件夹。

完成以下步骤:

1.    创建一个虚拟文件夹或文件,然后压缩它:

mkdir mm2   
zip mm2.zip mm2 

2.    将 .zip 对象上传到目标账户中的 Amazon Simple Storage Service (Amazon S3) 桶:

aws s3 cp mm2.zip s3://mytestbucket/

由于 Apache Kafka 和 MSK Connect 内置了 MirrorMaker 库,因此您无需为此功能添加额外的 .jar 文件。MSK Connect 有一个先决条件,即创建连接器时必须有自定义插件。因此,您必须创建一个空插件以供参考。

3.    在目标账户中,使用 .zip 文件创建一个自定义插件。使用 mm2-connect-plugin 作为自定义插件的名称。

创建 MSK Connect 连接器

完成以下步骤,在目标账户中创建一个连接器:

1.    打开 Amazon MSK 控制台

2.    在导航窗格中的 MSK Connect 下方,选择连接器

3.    选择创建连接器

4.    在自定义插件列表中,选择您创建的自定义插件旁边的插件,然后选择下一步

5.    输入连接器的名称和描述(可选)。

6.    在集群列表中,选择目标集群。

7.    复制以下配置,然后粘贴到连接器配置字段中。根据您的用例修改示例。

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector  
tasks.max=1  

clusters=primary,replica  
source.cluster.alias=primary  
target.cluster.alias=replica  

topics=example.*  
replication.factor=2  
topic.creation.default.replication.factor=2  
topic.creation.default.partitions=2  
consumer.group.id=mm2-connector  

refresh.groups.interval.seconds=20  
refresh.topics.interval.seconds=20  

sync.topic.configs.interval.seconds=20  
sync.topic.acls.interval.seconds=20  

producer.enable.idempotence=true  

transforms=renameTopic  
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter  
transforms.renameTopic.regex=primary.(.*)  
transforms.renameTopic.replacement=$1  

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter  

# Source cluster options  
source.cluster.bootstrap.servers=<Source_MSK_Bootstrap_Server_PLAINTEXT>  
source.cluster.security.protocol=PLAINTEXT  

# Destination cluster options  
target.cluster.bootstrap.servers=<Target_MSK_Bootstrap_Server_PLAINTEXT>  
target.cluster.security.protocol=PLAINTEXT

8.    设置连接器的容量。

9.    在 Worker 配置下,选择使用 MSK 默认配置

10.    在访问权限下,选择为 MSK Connect 提供所需权限的 AWS Identity and Access Management (IAM) 角色。然后,选择下一步

11.    在安全页面的加密 - 传输中下,选择纯文本流量。然后,选择下一步

12.    或者,在日志页面上,设置日志交付。然后,选择下一步

13.    在查看并创建下,选择创建连接器

**注意:**在此配置下,为了从源集群复制每个主题,MM2 在目标集群中创建了两个主题。例如,如果您在源集群上有一个主题 exampleTopic1,则 MM2 会在目标集群上创建 primary.exampleTopic1exampleTopic1 两个主题。 消息被路由到 exampleTopic1 主题。

创建客户端实例

您必须创建一个客户端实例,才能创建主题并生成或使用主题的数据。

1.    根据您的要求,在源账户或目标账户中启动一个 Amazon Elastic Compute Cloud (Amazon EC2) 实例。然后,连接到该实例。

2.     运行以下命令,在客户端计算机上安装 Java:

sudo yum -y install java-11

3.    运行以下命令以下载 Apache Kafka:

wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz  

tar -xzf kafka_2.12-2.8.1.tgz

4.    在源账户的 Amazon MSK 集群中,创建主题 exampletopic1

<path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server <Source MSK cluster BootstrapServerString> --replication-factor 3 --partitions 1 --topic exampletopic1

5.    在源账户的该集群中生成数据:

<path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list <Source MSK cluster BootstrapServerString> --topic exampletopic1
>message 1  
>message 2

6.    在目标账户的该集群中列出主题:

<path-to-your-kafka-installation>/bin/kafka-topics.sh --bootstrap-server <Target MSK cluster BootstrapServerString> --list

输出内容必须与以下内容类似:

__amazon_msk_canary  
__amazon_msk_connect_configs_mm2-*****  
__amazon_msk_connect_offsets_mm2-*****  
__amazon_msk_connect_status_mm2-*****  
__consumer_offsets  
exampleTopic1  
primary.exampleTopic1

7.    使用目标集群的数据:

<path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server <Target MSK cluster BootstrapServerString> --topic exampletopic1 --from-beginning
message 1  
message 2

相关信息

使用 Apache Kafka 的 MirrorMaker 迁移集群

AWS 官方
AWS 官方已更新 1 年前