如何分析 Athena 中的 AWS WAF 日志?
我想在 Amazon Athena 中查询 AWS WAF 日志。
解决方法
要在 Athena 中查询 AWS WAF 日志,请在 Amazon Simple Storage Service (Amazon S3) 中创建数据库和表架构。然后,使用示例查询从日志中获取所需的信息。
在 Amazon S3 中创建数据库和表
-
为您的 Amazon S3 存储桶开启 Web 访问控制列表 (Web ACL) 日志记录。将 Target bucket(目标存储桶)和 Target prefix(目标前缀)的值复制到文本文件中,以便在表架构中使用。这些值在您的 Athena 查询中指定 Amazon S3 的位置。
-
打开 Athena 控制台。
**注意:**在运行第一次查询之前,为您的查询结果位置创建 S3 存储桶。 -
在查询编辑器中,运行 CREATE DATABASE 以创建数据库:
CREATE DATABASE waf_logs_db
**注意:**最好是在与 S3 存储桶相同的 AWS 区域中创建数据库。
-
在 Athena 中为 AWS WAF 日志创建表架构。以下示例是带有分区投影的表模板查询:
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')
**注意:**将 storage.location.template、projection.region.values、projection.date.range、DOC-EXAMPLE-BUCKET 和 DOC-EXAMPLE-WEBACL 替换为您的值。
以下示例是没有分区投影的表模板查询:
CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
**注意:**将 DOC-EXAMPLE-BUCKET 替换为您的 S3 存储桶名称。
-
在导航窗格的 Tables(表)下,选择 Preview table(预览表)。确认 AWS WAF 数据(例如 formatversion、webaclid、httpsourcename 和 ja3Fingerprint)都在表中。
在 Athena 中分析您的 AWS WAF 日志
要分析您的 AWS WAF 日志文件,请使用以下示例查询。您也可以创建自己的查询。
统计过去 10 天内符合排除规则的匹配的 IP 地址
运行以下命令:
WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
**注意:**将 10 替换为您的时间范围。
返回指定日期范围和 IP 地址的记录
运行以下命令:
SELECT * FROM waf_logs WHERE httprequest.clientip='192.168.0.0' AND "date" >= '2022/03/01' AND "date" < '2022/03/31'
将 192.168.0.0 替换为您的 IP 地址,将 2022/03/01 和 2022/03/31 替换为您的日期。
按特定属性分组统计请求被屏蔽的次数
运行以下命令:
SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
**注意:**将 webaclid、terminatingruleid、httprequest.clientip 和 httprequest.uri 替换为您的值,将 100 替换为您想要的最大结果数。
按匹配次数对所有计数的自定义规则进行分组
运行以下命令:
SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50
使用筛选器 IP 地址运行查询
运行以下命令:
SELECT * FROM "waf_logs_db"."waf_logs" where httprequest.clientip='192.168.0.0' limit 10;
**注意:**将 192.168.0.0 替换为您的 IP 地址,将 10 替换为您想要的最大结果数。
选择请求没有原始标头、浏览器用户代理字符串或 Cookie 的日期戳
运行以下命令:
SELECT datestamp, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AS MissingOrigin, httprequest.clientip, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS UserAgent, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'cookie'), 1).value AS Cookie from "waf_logs_db"."waf_logs" where webaclname = 'production-web' AND datestamp >= '2021/01/01' AND httprequest.uri = '/uri/path' AND httprequest.httpmethod = 'POST' order by 1 desc
**注意:**将 production-web 替换为您的 Web ACL,将 2021/01/01 替换为您的日期。
按特定列对记录进行计数和排序
以下示例查询根据 User-Agent 和 URI path 列对记录进行计数和排序。它还排除了特定 HTTP 方法和带有原始请求标头的请求。
运行以下命令:
SELECT count() AS Count, element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'user-agent'), 1).value AS useragent, httprequest.uri from "db"."waf_logs" where webaclname = 'production-web' AND httprequest.httpmethod != 'GET' AND httprequest.httpmethod != 'HEAD' AND element_at(filter(httprequest.headers, headers -> lower(headers.name) = 'origin'), 1).value IS NULL AND datestamp >= '2021/01/01' group by 2,3 ORDER BY 1 desc
**注意:**将 production-web 替换为您的 Web ACL,将 2021/01/01 替换为您的日期。
有关详细信息,请参阅查询 AWS WAF 日志。
相关信息
相关内容
- AWS 官方已更新 4 年前
- AWS 官方已更新 8 个月前