跳至内容

如何使用 Athena 分析 Amazon VPC 流日志?

3 分钟阅读
0

我想使用 Amazon Athena 分析我的 Amazon Virtual Private Cloud (Amazon VPC) 流日志。

简短描述

您可以使用 Athena 控制台查询编辑器创建数据库、为 VPC 流日志创建表以及运行示例查询。使用流日志分析网络流量模式,并识别 Amazon VPC 网络中的威胁和风险。

解决方法

使用 Athena 控制台查询编辑器创建数据库

完成以下步骤:

  1. 打开 Athena 控制台查询编辑器
  2. 运行以下命令:
    CREATE DATABASE test_db_vpclogs;
    **注意:**请将 test_db_vpclogs 替换为您的数据库的名称。

**重要事项:**最佳做法是在与存储流日志的 Amazon Simple Storage Service (Amazon S3) 存储桶相同的 AWS 区域中创建数据库。

为数据库中的流日志创建表

完成以下步骤:

  1. Athena 控制台查询编辑器中,运行以下命令:
    CREATE EXTERNAL TABLE `vpcflow_logs`(
     `version` int,
     `resource_type` string,
     `account_id` string,
     `tgw_id` string,
     `tgw_attachment_id` string,
     `tgw_src_vpc_account_id` string,
     `tgw_dst_vpc_account_id` string,
     `tgw_src_vpc_id` string,
     `tgw_dst_vpc_id` string,
     `tgw_src_subnet_id` string,
     `tgw_dst_subnet_id` string,
     `tgw_src_eni` string,
     `tgw_dst_eni` string,
     `tgw_src_az_id` string,
     `tgw_dst_az_id` string,
     `tgw_pair_attachment_id` string,
     `srcaddr` string,
     `dstaddr` string,
     `srcport` int,
     `dstport` int,
     `protocol` bigint,
     `packets` bigint,
     `bytes` bigint,
     `start` bigint,
     `end` bigint,
     `log_status` string,
     `type` string,
     `packets_lost_no_route` bigint,
     `packets_lost_blackhole` bigint,
     `packets_lost_mtu_exceeded` bigint,
     `packets_lost_ttl_expired` bigint,
     `tcp_flags` int,
     `region` string,
     `flow_direction` string,
     `pkt_src_aws_service` string,
     `pkt_dst_aws_service` string)
    PARTITIONED BY (
     `aws_region` string,
     `log_time` string)
    ROW FORMAT DELIMITED
     FIELDS TERMINATED BY ' '
    STORED AS INPUTFORMAT
     'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
     'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
     's3://AWS_S3_BUCKET/awsexampleprefix/AWSLogs/AWS_ACCOUNT_NUMBER/vpcflowlogs/'
    TBLPROPERTIES (
     'projection.aws_region.type'='enum',
     'projection.aws_region.values'='us-east-1,us-west-2,ap-southeast-2',
     'projection.log_time.format'='yyyy/MM/dd',
     'projection.log_time.range'='2025/01/01,NOW',
     'projection.log_time.type'='date',
     'projection.enabled'='true',
     'skip.header.line.count'='1',
     'storage.location.template'='s3://AWS_S3_BUCKET/awsexampleprefix/AWSLogs/AWS_ACCOUNT_NUMBER/vpcflowlogs/${aws_region}/${log_time}')
    **注意:**请将 test_table_vpclogs 替换为您表的名称。

修改 LOCATION 参数,以指向包含您的日志数据的 Amazon S3 存储桶。根据数据中最早的可用日志设置 projection.log_time.range 的第一个值。例如,如果您的日志从 2023 年 3 月 1 日开始,则请将时间范围设置为 '2025/03/01,NOW'。根据日志的实际开始日期修改该值,以实现准确的数据投影。前面的命令使用分区投影来创建表、对表进行分区并自动填充分区。如果在 Amazon S3 中不存在投影分区,则 Athena 仍会对该分区进行投影。最佳做法是在查询中使用分区的属性。

在表上运行 SQL 语句以获取流日志

使用 Athena 控制台查询编辑器在表上运行 SQL 语句。您可以保存查询、查看之前的查询,或以 .csv 格式下载查询结果。

查询示例

**注意:**在以下示例查询中,请将 test_table_vpclogs 替换为您表的名称。根据您的用例修改列值和其他变量。

要查看指定时间段内按时间顺序排列的前 100 个流日志条目,请运行以下查询:

SELECT *  FROM test_table_vpclogs
 WHERE day >= '2021/02/01' AND day < '2021/02/28'
 ORDER BY day ASC
 LIMIT 100;

要查看在指定时间段内收到前 10 个 HTTP 数据包的服务器,请运行以下查询:

SELECT SUM(packets) AS packetcount,
        dstaddr
FROM test_table_vpclogs
WHERE dstport = 443
  AND day >= '2021/03/01'
  AND day < '2021/03/31'
GROUP BY dstaddr
ORDER BY packetcount DESC
LIMIT 10;

**注意:**上述查询会计算在 HTTPS 端口 443 上收到的数据包数量,并按照目标 IP 地址对其进行分组。然后,该查询会返回前一周的前 10 个条目。

要查看指定时间范围内创建的日志,请运行以下查询:

SELECT interface_id,       
       srcaddr,
       action,
       protocol,
       to_iso8601(from_unixtime(start)) AS start_time,
       to_iso8601(from_unixtime("end")) AS end_time
FROM test_table_vpclogs
WHERE DAY >= '2021/04/01'
  AND DAY < '2021/04/30';

要查看指定时间范围内特定源 IP 地址的流日志,请运行以下查询:

SELECT *FROM test_table_vpclogs
WHERE srcaddr = '10.117.1.22'
  AND day >= '2021/02/01'
  AND day < '2021/02/28';

要列出指定时间范围内被拒绝的 TCP 连接,请运行以下查询:

SELECT day,       
interface_id,
       srcaddr,
       action,
       protocol
FROM test_table_vpclogs
WHERE action = 'REJECT'
    AND protocol = 6
    AND day >= '2021/02/01' AND day < '2021/02/28'
LIMIT 10;

要查看以 10.117 开头的 IP 地址范围的流日志,请运行以下查询:

SELECT *FROM test_table_vpclogs
WHERE split_part(srcaddr,'.', 1)='10'
  AND split_part(srcaddr,'.', 2) ='117'

要查看某个时间范围内特定目标 IP 地址的流日志,请运行以下查询:

SELECT *FROM test_table_vpclogs
WHERE dstaddr = '10.0.1.14'
AND day >= '2021/01/01'
AND day < '2021/01/31'

相关信息

如何使用流日志监控 VPC 中的流量?

Analyzing VPC Flow Logs using Amazon Athena, and Amazon Quick Sight(使用 Amazon Athena 和 Amazon QuickSight 分析 VPC 流日志)

AWS 官方已更新 9 个月前