MSK Connect で実行されている MirrorMaker 2 を使用して、複数のアカウントの Amazon MSK クラスター間でデータを転送する方法を教えてください。next

所要時間2分
0

複数のアカウントの Amazon Managed Streaming for Apache Kafka (Amazon MSK) クラスター間でデータを転送するために、MSK Connect で実行される MirrorMaker 2.0 (MM2) を使用したいと考えています。

解決方法

VPC ピアリングをセットアップする

Amazon MSK クラスターは複数の仮想プライベートクラウド (VPC) にあるため、VPC ピアリング接続を作成する必要があります。作成する VPC ピアリング接続は、同じ AWS リージョンまたは別のリージョンにある、別の AWS アカウントの VPC との接続です。詳細については、「VPC ピアリング接続を作成する」を参照してください。

ターゲットの Amazon MSK クラスターに関連付けられているセキュリティグループは、ターゲットクラスターのセキュリティグループからのすべてのトラフィックを許可する必要があります。ターゲットクラスターのセキュリティグループが、MSK クラスターのセキュリティグループからのすべてのトラフィックを許可することも必要です。詳細については、「セキュリティグループの更新とピアセキュリティグループの参照」を参照してください。

注: 別のアカウントのセキュリティグループを参照するには、[送信元] または [送信先] フィールドにアカウント番号を含めてください。例: 123456789012/sg-1a2b3c4d

MM2 プラグイン情報を使用してプラグインを作成する

MSK Connect カスタムプラグインは、.jar または .zip で終わるファイルまたはフォルダーを受け入れます。

次の手順を実行します。

1.    ダミーフォルダーまたはファイルを作成して圧縮します。

mkdir mm2   
zip mm2.zip mm2 

2.    .zip オブジェクトをターゲットアカウントの Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。

aws s3 cp mm2.zip s3://mytestbucket/

Apache Kafka と MSK Connect には MirrorMaker ライブラリが組み込まれているため、この機能のために追加の.jar ファイルを追加する必要はありません。前提条件として、コネクタの作成時に MSK Connect カスタムプラグインが存在している必要があります。そのため、参照用として空のカスタムプラグインを作成しておく必要があります。

3.    ターゲットアカウントで、.zip ファイルを使用して [カスタムプラグインを作成] します。カスタムプラグインの名前には mm2-connect-plugin を使用します。

MSK Connect コネクタを作成する

以下の手順を実行して、ターゲットアカウントにコネクタを作成します。

1.    Amazon MSK コンソールを開きます。

2.    ナビゲーションペインの [MSK Connect] で、[コネクタ]を選択します。 

3.    **[コネクタを作成]**を選択します。

4.    カスタムプラグインのリストで、作成したカスタムプラグインの横にあるプラグインを選択し、**[次へ]**を選択します。

5.    コネクタの名前を入力し、任意で説明を入力します。

6.    クラスターのリストから、ターゲットクラスターを選択します。

7.    次の設定をコピーして、コネクタ設定フィールドに貼り付けます。ユースケースに合わせて例を変更してください。

connector.class=org.apache.kafka.connect.mirror.MirrorSourceConnector  
tasks.max=1  

clusters=primary,replica  
source.cluster.alias=primary  
target.cluster.alias=replica  

topics=example.*  
replication.factor=2  
topic.creation.default.replication.factor=2  
topic.creation.default.partitions=2  
consumer.group.id=mm2-connector  

refresh.groups.interval.seconds=20  
refresh.topics.interval.seconds=20  

sync.topic.configs.interval.seconds=20  
sync.topic.acls.interval.seconds=20  

producer.enable.idempotence=true  

transforms=renameTopic  
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter  
transforms.renameTopic.regex=primary.(.*)  
transforms.renameTopic.replacement=$1  

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter  
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter  

# Source cluster options  
source.cluster.bootstrap.servers=<Source_MSK_Bootstrap_Server_PLAINTEXT>  
source.cluster.security.protocol=PLAINTEXT  

# Destination cluster options  
target.cluster.bootstrap.servers=<Target_MSK_Bootstrap_Server_PLAINTEXT>  
target.cluster.security.protocol=PLAINTEXT

8.    コネクタの容量を設定します。

9.    [ワーカー設定] で **[MSK のデフォルト設定を使用]**を選択します。

10.    [アクセス許可] で、MSK Connect に必要な権限を付与する AWS Identity and Access Management (IAM) ロールを選択します。次に、**[次へ]**を選択します。

11.    [セキュリティ] ページの [暗号化 - 転送時] で、**[プレーンテキストトラフィック]を選択します。次に、[次へ]**を選択します。

12.    オプションで、[ログ] ページでログ配信を設定します。次に、**[次へ]**を選択します。

13.    [確認して作成] で **[コネクタを作成]**を選択します。

注: この設定では、ソースクラスターから各トピックを複製するために、MM2 はターゲットクラスターに 2 つのトピックを作成します。例えば、ソースクラスターに exampleTopic1 というトピックがある場合、MM2 はターゲットクラスターに primary.exampleTopic1exampleTopic1 というトピックを作成します。 メッセージは exampleTopic1 トピックにルーティングされます。

クライアントインスタンスを作成する

トピックを作成し、トピックからデータを生成または消費するためには、クライアントインスタンスを作成する必要があります。

1.    要件に応じてソースアカウントまたはターゲットアカウントのいずれかで、Amazon Elastic Compute Cloud (Amazon EC2) インスタンスを起動します。次に、インスタンスに接続します。

2.     次のコマンドを実行して、クライアントマシンに Java をインストールします。

sudo yum -y install java-11

3.    次のコマンドを実行して Apache Kafka をダウンロードします。

wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz  

tar -xzf kafka_2.12-2.8.1.tgz

4.    ソースアカウントの Amazon MSK クラスター内にトピック exampletopic1 を作成します。

<path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server <Source MSK cluster BootstrapServerString> --replication-factor 3 --partitions 1 --topic exampletopic1

5.    ソースアカウントのクラスター内にデータを生成します。

<path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list <Source MSK cluster BootstrapServerString> --topic exampletopic1
>message 1  
>message 2

6.    ターゲットアカウントのクラスター内のトピックを一覧表示します。

<path-to-your-kafka-installation>/bin/kafka-topics.sh --bootstrap-server <Target MSK cluster BootstrapServerString> --list

出力は次のようになります。

__amazon_msk_canary  
__amazon_msk_connect_configs_mm2-*****  
__amazon_msk_connect_offsets_mm2-*****  
__amazon_msk_connect_status_mm2-*****  
__consumer_offsets  
exampleTopic1  
primary.exampleTopic1

7.    ターゲットクラスターのデータを消費します。

<path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server <Target MSK cluster BootstrapServerString> --topic exampletopic1 --from-beginning
message 1  
message 2

関連情報

Apache Kafka の MirrorMaker を使ったクラスターの移行

AWS公式
AWS公式更新しました 1年前