Athena の AWS WAF ログを分析する方法を教えてください。
Amazon Athena の AWS WAF ログをクエリしたいです。
解決方法
Athena で AWS WAF ログをクエリするには、Amazon Simple Storage Service (Amazon S3) でデータベースとテーブルスキーマを作成します。次に、サンプルクエリを使用してログから必要な情報を取得します。
Amazon S3 にデータベースとテーブルを作成する
-
Amazon S3 バケットのウェブアクセスコントロールリスト (ウェブ ACL) ログ記録を有効にします。[ターゲットバケット] と [ターゲットプレフィックス] の値を、テキストファイルにコピーし、テーブルスキーマで使用します。これらの値は、Athena クエリ内で Amazon S3 ロケーションを指定します。
-
Athena コンソールを開きます。
注: 最初のクエリを実行する前に、クエリ結果の場所用の S3 バケットを作成します。 -
クエリエディターで、CREATE DATABASE を実行してデータベースを作成します。
CREATE DATABASE waf_logs_db
注: お使いの S3 バケットと同じ AWS リージョンにデータベースを作成するのがベストプラクティスです。
-
Athena で AWS WAF ログ用のテーブルスキーマを作成します。次の例は、パーティションプロジェクションを使用したテーブルテンプレートクエリの例です。
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')
注: storage.location.template、projection.region.values、projection.date.range、DOC-EXAMPLE-BUCKET、DOC-EXAMPLE-WEBACL は、実際の値で置き換えてください。
次の例は、パーティションプロジェクションを使用しないテーブルテンプレートクエリです。
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
注: DOC-EXAMPLE-BUCKET は、お使いの S3 バケット名に置き換えてください。
-
ナビゲーションペインの [テーブル] で、[テーブルのプレビュー] を選択します。formatversion、webaclid、httpsourcename、ja3Fingerprint などの AWS WAF データが、テーブルに含まれていることを確認します。
Athena で AWS WAF ログを分析する
AWS WAF ログファイルを分析するには、次のサンプルクエリを使用します。独自のクエリを作成することもできます。
過去 10 日間に、除外ルールと一致した IP アドレスをカウントする
次のコマンドを実行します。
WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
注: 10 は、目的の期間で置き換えてください。
指定された日付範囲と IP アドレスのレコードを返す
次のコマンドを実行します。
SELECT * FROM waf_logs WHERE httprequest.clientip='192.168.0.0' AND "date" >= '2022/03/01' AND "date" < '2022/03/31'
192.168.0.0 はお使いの IP アドレスに、2022/03/01 および 2022/03/31 は実際に使用する日付に置き換えてください。
リクエストがブロックされた回数を特定の属性別にグループ化してカウントする
次のコマンドを実行します。
SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
注: webaclid, terminatingruleid, httprequest.clientip, and httprequest.uri はそれぞれお使いの値で置き換え、100 は表示する結果の最大数で置き換えます。
カウントされたすべてのカスタムルールを一致回数でグループ化する
次のコマンドを実行します。
SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50
フィルター IP アドレスを使用してクエリを実行する
次のコマンドを実行します。
SELECT * FROM "waf_logs_db"."waf_logs" where httprequest.clientip='192.168.0.0' limit 10;
注: 192.168.0.0 はお使いの IPアドレスに置き換え、10 は表示する結果の最大数に置き換えます。
リクエストにオリジンヘッダー、ブラウザのユーザーエージェント文字列、またはクッキーがない日付スタンプを選択する
次のコマンドを実行します。
SELECT datestamp, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AS MissingOrigin, httprequest.clientip, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS UserAgent, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'cookie'), 1).value AS Cookie from "waf_logs_db"."waf_logs" where webaclname = 'production-web' AND datestamp >= '2021/01/01' AND httprequest.uri = '/uri/path' AND httprequest.httpmethod = 'POST' order by 1 desc
注: production-web はお使いのウェブ ACL に、2021/01/01 は使用する日付に置き換えます。
特定の列でレコードをカウントしてソートする
次のクエリ例では、User-Agent 列と URI path 列に基づいてレコードをカウントし、ソートします。また、オリジンリクエストヘッダーのある特定の HTTP メソッドとリクエストも除外されます。
次のコマンドを実行します。
SELECT count() AS Count, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS useragent, httprequest.uri from "db"."waf_logs" where webaclname = 'production-web' AND httprequest.httpmethod != 'GET' AND httprequest.httpmethod != 'HEAD' AND element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AND datestamp >= '2021/01/01' group by 2,3 ORDER BY 1 desc
注: production-web はお使いのウェブ ACL に、2021/01/01 は使用する日付に置き換えます。
詳細については、「 AWS WAF ログのクエリ」を参照してください。
関連情報
関連するコンテンツ
- 質問済み 24日前lg...
- AWS公式更新しました 2ヶ月前
- AWS公式更新しました 6ヶ月前
- AWS公式更新しました 2年前