如何使用在 MSK Connect 上執行的 MirrorMaker 2,以便在不同帳戶中的 Amazon MSK 叢集之間傳輸資料?

2 分的閱讀內容
0

我想使用在 MSK Connect 上執行的 MirrorMaker 2.0 (MM2),以便在不同帳戶中的 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 叢集之間傳輸資料。

解決方法

設定 VPC 對等

由於 Amazon MSK 叢集位於不同的虛擬私有雲端 (VPC) 中,因此您必須建立 VPC 對等互連。在相同 AWS 區域或不同區域的另一個 AWS 帳戶中,與 VPC 建立此 VPC 對等互連。如需詳細資訊,請參閱建立 VPC 對等互連

與來源 Amazon MSK 叢集相關聯的安全群組必須允許來自目標叢集安全群組的所有流量。目標叢集的安全群組也必須允許來自 MSK 叢集安全群組的所有流量。如需詳細資訊,請參閱將安全群組更新為參照對等安全群組

注意:若要參照另一個帳戶中的安全群組,請在來源目的地欄位中包含帳戶號碼。範例: 123456789012/sg-1a2b3c4d

使用 MM2 外掛程式資訊建立外掛程式

MSK 連接自訂外掛程式可接受含有 .jar 或 .zip 結尾的檔案或資料夾。

完成以下步驟:

  1. 建立一個虛設資料夾或檔案,然後將其壓縮:
mkdir mm2   
zip mm2.zip mm2 
  1. 將 .zip 物件上傳到目標帳戶中的 Amazon Simple Storage Service (Amazon S3) 儲存貯體:
aws s3 cp mm2.zip s3://mytestbucket/

由於 Apache Kafka 和 MSK Connect 都內建了 MirrorMaker 程式庫,因此您不需要為此功能增加額外的 .jar 檔案。MSK Connect 有一個先決條件,即在建立連接器時必須存有自訂外掛程式。因此,您必須建立一個空的以供參照。

  1. 在目標帳戶中,使用 .zip 檔案來建立自訂外掛程式。使用 mm2-connect-plugin 做為自訂外掛程式的名稱。

建立 MSK Connect 連接器

完成下列步驟,以在目標帳戶中建立連接器:

1.    開啟 Amazon MSK 主控台

  1. 在導覽窗格的 MSK Connect 項下,選擇 Connectors(連接器)。

  2. 選擇 Create connector(建立連接器)。

  3. 在自訂外掛程式清單中,選取您所建立自訂外掛程式旁邊的外掛程式,然後選擇 Next(下一步)。

  4. 輸入連接器的名稱,可以選擇是否填入說明。

  5. 從叢集清單中選擇目標叢集。

  6. 複製下列組態,然後將其貼到連接器組態欄位。針對您的使用案例修改範例。

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector  
tasks.max=1  

clusters=primary,replica  
source.cluster.alias=primary  
target.cluster.alias=replica  

topics=example.*  
replication.factor=2  
topic.creation.default.replication.factor=2  
topic.creation.default.partitions=2  
consumer.group.id=mm2-connector  

refresh.groups.interval.seconds=20  
refresh.topics.interval.seconds=20  

sync.topic.configs.interval.seconds=20  
sync.topic.acls.interval.seconds=20  

producer.enable.idempotence=true  

transforms=renameTopic  
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter  
transforms.renameTopic.regex=primary.(.*)  
transforms.renameTopic.replacement=$1  

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter  

# Source cluster options  
source.cluster.bootstrap.servers=<Source_MSK_Bootstrap_Server_PLAINTEXT>  
source.cluster.security.protocol=PLAINTEXT  

# Destination cluster options  
target.cluster.bootstrap.servers=<Target_MSK_Bootstrap_Server_PLAINTEXT>  
target.cluster.security.protocol=PLAINTEXT
  1. 設定連接器的容量。

  2. 工作者組態項下,選擇 Use the MSK default(使用 MSK 預設組態)。

  3. 存取權限項下,選擇提供 MSK Connect 所需許可的 AWS Identity and Access Management (AWS IAM) 角色,然後選擇 Next(下一步)。

  4. 安全頁面的加密 - 傳輸中項下,選擇 Plaintext traffic(純文字流量)。然後選擇 Next(下一步)。

  5. 可以選擇是否在日誌頁面設定日誌傳遞,然後選擇 Next(下一步)。

  6. 檢閱並建立項下,選擇 Create connector(建立連接器)。

**注意:**使用此組態時,若要複寫來源叢集中的每個主題,MM2 會在目標叢集中建立兩個主題。舉例來說,如果來源叢集上有主題 exampleTopic1,則 MM2 會在目標叢集建立 primary.exampleTopic1exampleTopic1。 訊息會路由至主題 exampleTopic1

建立用戶端執行個體

您必須先建立用戶端執行個體才能建立主題,並產生或消耗主題中的資料。

  1. 根據來源或目標帳戶中的需求,啟動 Amazon Elastic Compute Cloud (Amazon EC2) 執行個體,然後連接到執行個體。

  2. 執行下列命令以便在用戶端機器上安裝 Java:

sudo yum -y install java-11
  1. 執行下列命令以便下載 Apache Kafka:
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz  

tar -xzf kafka_2.12-2.8.1.tgz
  1. 在來源帳戶的 Amazon MSK 叢集建立主題 exampletopic1
<path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server <Source MSK cluster BootstrapServerString> --replication-factor 3 --partitions 1 --topic exampletopic1
  1. 在來源帳戶的叢集中產生資料:
<path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list <Source MSK cluster BootstrapServerString> --topic exampletopic1
>message 1  
>message 2
  1. 列出目標帳戶上叢集中的主題:
<path-to-your-kafka-installation>/bin/kafka-topics.sh --bootstrap-server <Target MSK cluster BootstrapServerString> --list

輸出看起來必須類似以下內容:

__amazon_msk_canary  
__amazon_msk_connect_configs_mm2-*****  
__amazon_msk_connect_offsets_mm2-*****  
__amazon_msk_connect_status_mm2-*****  
__consumer_offsets  
exampleTopic1  
primary.exampleTopic1
  1. 消耗目標叢集中的資料:
<path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server <Target MSK cluster BootstrapServerString> --topic exampletopic1 --from-beginning
message 1  
message 2

相關資訊

使用 Apache Kafka 的 MirrorMaker 來遷移叢集

AWS 官方
AWS 官方已更新 1 年前