Help us improve the AWS re:Post Knowledge Center by sharing your feedback in a brief survey. Your input can influence how we create and update our content to better support your AWS journey.
如何分析 Athena 中的 AWS WAF 日志?
我想在 Amazon Athena 中查询 AWS WAF 日志。
解决方法
要在 Athena 中查询 AWS WAF 日志,请在 Amazon Simple Storage Service (Amazon S3) 中创建数据库和表架构。然后,使用示例查询从日志中获取所需的信息。
在 Amazon S3 中创建数据库和表
-
为您的 Amazon S3 存储桶开启 Web 访问控制列表 (Web ACL) 日志记录。将 Target bucket(目标存储桶)和 Target prefix(目标前缀)的值复制到文本文件中,以便在表架构中使用。这些值在您的 Athena 查询中指定 Amazon S3 的位置。
-
打开 Athena 控制台。
**注意:**在运行第一次查询之前,为您的查询结果位置创建 S3 存储桶。 -
在查询编辑器中,运行 CREATE DATABASE 以创建数据库:
CREATE DATABASE waf_logs_db**注意:**最好是在与 S3 存储桶相同的 AWS 区域中创建数据库。
-
在 Athena 中为 AWS WAF 日志创建表架构。以下示例是带有分区投影的表模板查询:
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')**注意:**将 storage.location.template、projection.region.values、projection.date.range、DOC-EXAMPLE-BUCKET 和 DOC-EXAMPLE-WEBACL 替换为您的值。
以下示例是没有分区投影的表模板查询:
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'**注意:**将 DOC-EXAMPLE-BUCKET 替换为您的 S3 存储桶名称。
-
在导航窗格的 Tables(表)下,选择 Preview table(预览表)。确认 AWS WAF 数据(例如 formatversion、webaclid、httpsourcename 和 ja3Fingerprint)都在表中。
在 Athena 中分析您的 AWS WAF 日志
要分析您的 AWS WAF 日志文件,请使用以下示例查询。您也可以创建自己的查询。
统计过去 10 天内符合排除规则的匹配的 IP 地址
运行以下命令:
WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
**注意:**将 10 替换为您的时间范围。
返回指定日期范围和 IP 地址的记录
运行以下命令:
SELECT * FROM waf_logs WHERE httprequest.clientip='192.168.0.0' AND "date" >= '2022/03/01' AND "date" < '2022/03/31'
将 192.168.0.0 替换为您的 IP 地址,将 2022/03/01 和 2022/03/31 替换为您的日期。
按特定属性分组统计请求被屏蔽的次数
运行以下命令:
SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
**注意:**将 webaclid、terminatingruleid、httprequest.clientip 和 httprequest.uri 替换为您的值,将 100 替换为您想要的最大结果数。
按匹配次数对所有计数的自定义规则进行分组
运行以下命令:
SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50
使用筛选器 IP 地址运行查询
运行以下命令:
SELECT * FROM "waf_logs_db"."waf_logs" where httprequest.clientip='192.168.0.0' limit 10;
**注意:**将 192.168.0.0 替换为您的 IP 地址,将 10 替换为您想要的最大结果数。
选择请求没有原始标头、浏览器用户代理字符串或 Cookie 的日期戳
运行以下命令:
SELECT datestamp, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AS MissingOrigin, httprequest.clientip, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS UserAgent, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'cookie'), 1).value AS Cookie from "waf_logs_db"."waf_logs" where webaclname = 'production-web' AND datestamp >= '2021/01/01' AND httprequest.uri = '/uri/path' AND httprequest.httpmethod = 'POST' order by 1 desc
**注意:**将 production-web 替换为您的 Web ACL,将 2021/01/01 替换为您的日期。
按特定列对记录进行计数和排序
以下示例查询根据 User-Agent 和 URI path 列对记录进行计数和排序。它还排除了特定 HTTP 方法和带有原始请求标头的请求。
运行以下命令:
SELECT count() AS Count, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS useragent, httprequest.uri from "db"."waf_logs" where webaclname = 'production-web' AND httprequest.httpmethod != 'GET' AND httprequest.httpmethod != 'HEAD' AND element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AND datestamp >= '2021/01/01' group by 2,3 ORDER BY 1 desc
**注意:**将 production-web 替换为您的 Web ACL,将 2021/01/01 替换为您的日期。
有关详细信息,请参阅查询 AWS WAF 日志。
相关信息
- 语言
- 中文 (简体)

相关内容
AWS 官方已更新 9 个月前