如何使用在 MSK Connect 上執行的 MirrorMaker 2,以便在不同帳戶中的 Amazon MSK 叢集之間傳輸資料?
我想使用在 MSK Connect 上執行的 MirrorMaker 2.0 (MM2),以便在不同帳戶中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 叢集之間傳輸資料。
解決方法
設定 VPC 對等
由於 Amazon MSK 叢集位於不同的虛擬私有雲端 (VPC) 中,因此您必須建立 VPC 對等互連。在相同 AWS 區域或不同區域的另一個 AWS 帳戶中,與 VPC 建立此 VPC 對等互連。如需詳細資訊,請參閱建立 VPC 對等互連。
與來源 Amazon MSK 叢集相關聯的安全群組必須允許來自目標叢集安全群組的所有流量。目標叢集的安全群組也必須允許來自 MSK 叢集安全群組的所有流量。如需詳細資訊,請參閱將安全群組更新為參照對等安全群組。
注意:若要參照另一個帳戶中的安全群組,請在來源或目的地欄位中包含帳戶號碼。範例: 123456789012/sg-1a2b3c4d
使用 MM2 外掛程式資訊建立外掛程式
MSK 連接自訂外掛程式可接受含有 .jar 或 .zip 結尾的檔案或資料夾。
完成以下步驟:
- 建立一個虛設資料夾或檔案,然後將其壓縮:
mkdir mm2 zip mm2.zip mm2
- 將 .zip 物件上傳到目標帳戶中的 Amazon Simple Storage Service (Amazon S3) 儲存貯體:
aws s3 cp mm2.zip s3://mytestbucket/
由於 Apache Kafka 和 MSK Connect 都內建了 MirrorMaker 程式庫,因此您不需要為此功能增加額外的 .jar 檔案。MSK Connect 有一個先決條件,即在建立連接器時必須存有自訂外掛程式。因此,您必須建立一個空的以供參照。
- 在目標帳戶中,使用 .zip 檔案來建立自訂外掛程式。使用 mm2-connect-plugin 做為自訂外掛程式的名稱。
建立 MSK Connect 連接器
完成下列步驟,以在目標帳戶中建立連接器:
1. 開啟 Amazon MSK 主控台。
-
在導覽窗格的 MSK Connect 項下,選擇 Connectors(連接器)。
-
選擇 Create connector(建立連接器)。
-
在自訂外掛程式清單中,選取您所建立自訂外掛程式旁邊的外掛程式,然後選擇 Next(下一步)。
-
輸入連接器的名稱,可以選擇是否填入說明。
-
從叢集清單中選擇目標叢集。
-
複製下列組態,然後將其貼到連接器組態欄位。針對您的使用案例修改範例。
connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector tasks.max=1 clusters=primary,replica source.cluster.alias=primary target.cluster.alias=replica topics=example.* replication.factor=2 topic.creation.default.replication.factor=2 topic.creation.default.partitions=2 consumer.group.id=mm2-connector refresh.groups.interval.seconds=20 refresh.topics.interval.seconds=20 sync.topic.configs.interval.seconds=20 sync.topic.acls.interval.seconds=20 producer.enable.idempotence=true transforms=renameTopic transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter transforms.renameTopic.regex=primary.(.*) transforms.renameTopic.replacement=$1 key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.converters.ByteArrayConverter # Source cluster options source.cluster.bootstrap.servers=<Source_MSK_Bootstrap_Server_PLAINTEXT> source.cluster.security.protocol=PLAINTEXT # Destination cluster options target.cluster.bootstrap.servers=<Target_MSK_Bootstrap_Server_PLAINTEXT> target.cluster.security.protocol=PLAINTEXT
-
設定連接器的容量。
-
在工作者組態項下,選擇 Use the MSK default(使用 MSK 預設組態)。
-
在存取權限項下,選擇提供 MSK Connect 所需許可的 AWS Identity and Access Management (AWS IAM) 角色,然後選擇 Next(下一步)。
-
在安全頁面的加密 - 傳輸中項下,選擇 Plaintext traffic(純文字流量)。然後選擇 Next(下一步)。
-
可以選擇是否在日誌頁面設定日誌傳遞,然後選擇 Next(下一步)。
-
在檢閱並建立項下,選擇 Create connector(建立連接器)。
**注意:**使用此組態時,若要複寫來源叢集中的每個主題,MM2 會在目標叢集中建立兩個主題。舉例來說,如果來源叢集上有主題 exampleTopic1,則 MM2 會在目標叢集建立 primary.exampleTopic1 和 exampleTopic1。 訊息會路由至主題 exampleTopic1。
建立用戶端執行個體
您必須先建立用戶端執行個體才能建立主題,並產生或消耗主題中的資料。
-
根據來源或目標帳戶中的需求,啟動 Amazon Elastic Compute Cloud (Amazon EC2) 執行個體,然後連接到執行個體。
-
執行下列命令以便在用戶端機器上安裝 Java:
sudo yum -y install java-11
- 執行下列命令以便下載 Apache Kafka:
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz tar -xzf kafka_2.12-2.8.1.tgz
- 在來源帳戶的 Amazon MSK 叢集建立主題 exampletopic1:
<path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server <Source MSK cluster BootstrapServerString> --replication-factor 3 --partitions 1 --topic exampletopic1
- 在來源帳戶的叢集中產生資料:
<path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list <Source MSK cluster BootstrapServerString> --topic exampletopic1
>message 1 >message 2
- 列出目標帳戶上叢集中的主題:
<path-to-your-kafka-installation>/bin/kafka-topics.sh --bootstrap-server <Target MSK cluster BootstrapServerString> --list
輸出看起來必須類似以下內容:
__amazon_msk_canary __amazon_msk_connect_configs_mm2-***** __amazon_msk_connect_offsets_mm2-***** __amazon_msk_connect_status_mm2-***** __consumer_offsets exampleTopic1 primary.exampleTopic1
- 消耗目標叢集中的資料:
<path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server <Target MSK cluster BootstrapServerString> --topic exampletopic1 --from-beginning
message 1 message 2
相關資訊
相關內容
- 已提問 9 個月前lg...
- 已提問 2 個月前lg...
- 已提問 1 年前lg...
- AWS 官方已更新 2 年前
- AWS 官方已更新 3 年前
- AWS 官方已更新 1 年前
- AWS 官方已更新 2 年前