Knowledge Center Monthly Newsletter - March 2025
Stay up to date with the latest from the Knowledge Center. See all new and updated Knowledge Center articles published in the last month and re:Post’s top contributors.
Kinesis Data Firehose から Amazon OpenSearch Service へのクロスアカウントストリーミングをセットアップするにはどうすればよいですか?
別のアカウントにある Amazon OpenSearch Service クラスターにデータを送信する Amazon Kinesis Data Firehose ストリームをセットアップしたいと考えています。
簡単な説明
Kinesis Data Firehose とその依存関係 (Amazon Simple Storage Service (Amazon S3) や Amazon CloudWatch など) を設定して、異なるアカウント間でストリーミングを行えるようにします。ストリーミングデータ配信は、きめ細かなアクセス制御 (FGAC) が有効になっているかどうかにかかわらず、パブリックにアクセスできる OpenSearch Service クラスターに対して機能します。
OpenSearch Service クラスターにデータを送信するように Kinesis Data Firehose ストリームを設定するには、以下の手順を実行します。
- アカウント A で Amazon S3 バケットを作成します。
- アカウント A で CloudWatch ロググループとログストリームを作成します。
- アカウント A で Kinesis Data Firehose ロールとポリシーを作成します。
- アカウント A で Kinesis Data Firehose ロールがデータをストリーミングする先のアカウント B でパブリックにアクセス可能な OpenSearch Service クラスターを作成します。
- (オプション) FGAC が有効になっている場合は、OpenSearch Dashboards にログインし、ロールマッピングを追加します。
- アカウント A で Kinesis Data Firehose ロールの AWS Identity and Access Management (IAM) ロールポリシーを更新して、アカウント B にデータを送信します。
- アカウント A で Kinesis Data Firehose ストリームを作成します。
- OpenSearch Service クラスターへのクロスアカウントストリーミングをテストします。
解決策
アカウント A で Amazon S3 バケットを作成する
アカウント A で S3 バケットを作成します。Amazon S3 バケットは Amazon リソースネーム (ARN) を生成します。
**注:**この Amazon S3 バケットからのレコードを取得および保存するためのアクセス許可を、後ほど Kinesis Data Firehose に付与する際に、この ARN を完全な形で使用します。
アカウント A で CloudWatch ロググループとログストリームを作成する
次の手順を実行して、CloudWatch ロググループを作成します。
- CloudWatch コンソールを開きます。
- ナビゲーションペインで [ログ] を選択し、次に [ロググループ] を選択します。
- [ロググループの作成] を選択します。
- [ログストリーム名] に名前を入力します。
- [ロググループの作成] ボタンをクリックして、新しいロググループを保存します。
- 新しく作成したロググループを検索して選択します。
以下のステップを実行して、Amazon CloudWatch ログストリームを作成します。
- [ログストリームの作成] を選択します。
- [ログストリーム名] に名前を入力します。
- [ログストリームの作成] を選択します。
**重要:**CloudWatch ロググループ名と CloudWatch ログストリーム名は、Kinesis Data Firehose ロールポリシーを作成する際に必要となります。
アカウント A で Kinesis Data Firehose ロールとポリシーを作成する
- AWS Identity and Access Management (IAM) コンソールを開きます。
- Kinesis Data Firehose に以下の操作を許可する IAM ポリシーを作成します。
ストリームログを CloudWatch に保存する
Amazon S3 に記録する
OpenSearch Service クラスターにデータをストリームする
例:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": [ "<Bucket ARN>", "<Bucket ARN>/*" ] }, { "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:<region>:<account-id>:log-group:/aws/kinesisfirehose/<Firehose Name>:log-stream:*" ] } ] }
**注:**後で OpenSearch Service クラスターポリシーにストリーミングするためのアクセス許可を追加します。ただし、最初にアカウント B でクラスターを作成する必要があります。
- 作成したポリシーを保存します。
- [ロールを作成] をクリックします。
- 作成したポリシーを Kinesis Data Firehose ロールに追加します。
アカウント A で Kinesis Data Firehose ロールがデータをストリーミングできるよう、アカウント B でパブリックにアクセス可能な OpenSearch Service クラスターを作成する
- アカウント B でパブリックにアクセス可能な OpenSearch Service クラスターを作成します。
- OpenSearch Service ドメイン ARN を記録します。後の手順で ARN を使用します。
- クラスターのセキュリティを設定します。
**重要:**アカウント A で Kinesis Data Firehose ロールが OpenSearch Service クラスターにストリーミングできるようにするには、OpenSearch Service のセキュリティの設定を行う必要があります。
次の手順を実行して、セキュリティの設定を行います。
-
OpenSearch Serviceで、[アクセスポリシー] に移動します。
-
JSON 定義のアクセスポリシーを選択します。ポリシーには、次のアクセス許可が必要です。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "es:*", "Resource": "<ES Domain ARN in Account B>/*", "Condition": { "IpAddress": { "aws:SourceIp": "<Your IP Address for OpenSearch Dashboards access>" } } }, { "Effect": "Allow", "Principal": { "AWS": "<Firehose Role ARN in Account A>" }, "Action": [ "es:ESHttpPost", "es:ESHttpPut" ], "Resource": [ "<ES Domain ARN in Account B>", "<ES Domain ARN in Account B>/*" ] }, { "Effect": "Allow", "Principal": { "AWS": "<Firehose Role ARN in Account A>" }, "Action": "es:ESHttpGet", "Resource": [ "<ES Domain ARN in Account B>/_all/_settings", "<ES Domain ARN in Account B>/_cluster/stats", "<ES Domain ARN in Account B>/index-name*/_mapping/type-name", "<ES Domain ARN in Account B>/roletest*/_mapping/roletest", "<ES Domain ARN in Account B>/_nodes", "<ES Domain ARN in Account B>/_nodes/stats", "<ES Domain ARN in Account B>/_nodes/*/stats", "<ES Domain ARN in Account B>/_stats", "<ES Domain ARN in Account B>/index-name*/_stats", "<ES Domain ARN in Account B>/roletest*/_stats" ] } ] }
OpenSearch Service ポリシーにおけるアクセス許可の詳細については、「サービス送信先への OpenSearch クロスアカウント配信」を参照してください。
-
(オプション) クラスターで FGAC が有効になっている場合は、OpenSearch Dashboards にログインし、ロールマッピングを追加します。ロールマッピングにより、Kinesis Data Firehose ロールが OpenSearch Service にリクエストを送信できるようになります。
以下の手順を実行して、OpenSearch Dashboards にログインしてロールマッピングを追加します。
- ダッシュボードを開きます。
- [セキュリティ] タブを選択します。
- [ロール] を選択します。
- [all_access] ロールを選択します。
- [マッピングされたユーザー] タブを選択します。
- [マッピングを管理] を選択します。
- [バックエンドロール] セクションで、Kinesis Data Firehose ロールを入力します。
- [マップ] を選択します。
アカウント A で Kinesis Data Firehose ロールの IAM ロールポリシーを更新して、アカウント B にデータを送信する
アカウント A の Kinesis Data Firehose ロールからアカウント B の OpenSearch Service クラスターにデータを送信するには、次のようにして Kinesis Data Firehose ポリシーを更新します。
例:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": [ "<Bucket ARN>", "<Bucket ARN>/*" ] }, { "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:<region>:<account-id>:log-group:/aws/kinesisfirehose/<Firehose Name>:log-stream:*" ] }, { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "es:DescribeDomain", "es:DescribeDomains", "es:DescribeDomainConfig" ], "Resource": [ "<Domain ARN in Account B>", "<Domain ARN in Account B>/*" ] }, { "Effect": "Allow", "Action": [ "es:ESHttpGet" ], "Resource": [ "<Domain ARN in Account B>/_all/_settings", "<Domain ARN in Account B>/_cluster/stats", "<Domain ARN in Account B>/index-name*/_mapping/superstore", "<Domain ARN in Account B>/_nodes", "<Domain ARN in Account B>/_nodes/stats", "<Domain ARN in Account B>/_nodes/*/stats", "<Domain ARN in Account B>/_stats", "<Domain ARN in Account B>/index-name*/_stats" ] } ] }
詳細については、「Amazon Data Firehose にパブリック OpenSearch サービスの送信先へのアクセス権を付与する」を参照してください。
アカウント A で Kinesis Data Firehose ストリームを作成する
OpenSearch Service クラスターへのクロスアカウントアクセスを用いて Kinesis Data Firehose ストリームを作成するには、AWS コマンドラインインターフェイス (AWS CLI) を使用して設定します。
AWS CLI が最新であることを確認してください。
aws --version
注: AWS CLI コマンドの実行時にエラーが発生する場合は、「AWS CLI エラーのトラブルシューティング」を参照してください。また、AWS CLI の最新バージョンを使用していることを確認してください。
次の内容を含む input.json という名前のファイルを作成します。
{ "DeliveryStreamName": "<Firehose Name>", "DeliveryStreamType": "DirectPut", "ElasticsearchDestinationConfiguration": { "RoleARN": "", "ClusterEndpoint": "", "IndexName": "local", "TypeName": "TypeName", "IndexRotationPeriod": "OneDay", "BufferingHints": { "IntervalInSeconds": 60, "SizeInMBs": 50 }, "RetryOptions": { "DurationInSeconds": 60 }, "S3BackupMode": "FailedDocumentsOnly", "S3Configuration": { "RoleARN": "", "BucketARN": "", "Prefix": "", "BufferingHints": { "SizeInMBs": 128, "IntervalInSeconds": 128 }, "CompressionFormat": "UNCOMPRESSED", "CloudWatchLoggingOptions": { "Enabled": true, "LogGroupName": "/aws/kinesisfirehose/<Firehose Name>", "LogStreamName": "S3Delivery" } }, "CloudWatchLoggingOptions": { "Enabled": true, "LogGroupName": "/aws/kinesisfirehose/<Firehose Name>", "LogStreamName": "ElasticsearchDelivery" } } }
ClusterEndpoint 属性フィールドに、エンドポイントの値が正しく入力されていることを確認します。
**注:**Elasticsearch バージョン 7.x ではタイプが非推奨となっているため、input.json ファイルから TypeName 属性を削除する必要があります。
その後、input.json ファイルが置かれているのと同じディレクトリで、次のコマンドを実行します。
aws firehose create-delivery-stream --cli-input-json file://input.json
このコマンド構文は、アカウント B の OpenSearch Service クラスターへの送信先を使用して、アカウント A で Kinesis Data Firehose ストリームを作成します。
OpenSearch Service クラスターへのクロスアカウントストリーミングをテストする
Kinesis Data Generator (KDG) を使用して、アカウント A で Kinesis Data Firehose ストリームにレコードをストリーミングします。
KDG は毎秒大量のレコードを生成します。この生産性レベルにより、レコード構造のマッピングについて正しく判断するのに十分なデータポイントを OpenSearch Service に送ることができます。
Kinesis Data Generator で使用されるテンプレート構造は次のとおりです。
{ "device_id": {{random.number(5)}}, "device_owner": "{{name.firstName}} {{name.lastName}}", "temperature": {{random.number( { "min":10, "max":150 } )}}, "timestamp": "{{date.now("DD/MMM/YYYY:HH:mm:ss Z")}}" }
クロスアカウントストリーミングが成功したかどうかは、クラスターの [インデックス] タブの下のインデックスエントリで確認できます。ここでは、現在の日付で接頭辞「local」を使用してインデックス名があるかどうかをチェックします。これは、OpenSearch Dashboards にレコードが存在するかどうかをチェックしても確認できます。
注: OpenSearch Service が正しいマッピングについて判断するには数分かかります。
関連情報

関連するコンテンツ
- 質問済み 4ヶ月前lg...
- 質問済み 9ヶ月前lg...
- AWS公式更新しました 3年前