Automating AWS Network Infrastructure Documentation with Daily Snapshots

6 minute read
Content level: Intermediate
0

Maintaining up-to-date documentation of your AWS network infrastructure is crucial for security audits, troubleshooting, and architecture planning. This solution automates the process by taking daily snapshots of your network resources and storing them in Amazon S3.

Why Network Snapshots Matter

For Security Audits: Network snapshots provide historical evidence of infrastructure configurations for compliance requirements. Security teams can identify unauthorized changes to security groups and network ACLs that might indicate a breach. These snapshots verify proper network segmentation and security boundaries. During incidents, these records support forensic investigations by showing the exact network state at specific points in time. The comprehensive data also simplifies audit reporting.

For Troubleshooting: Historical snapshots offer a reliable baseline for comparison when issues arise. Engineers can identify recent modifications that may have caused connectivity problems without relying on change logs. These snapshots provide configuration data without requiring production access. The network topology information enables faster root cause analysis. For intermittent issues, comparing snapshots from before and after reported problems can reveal subtle configuration changes.

For Architecture Planning: Network snapshots create a comprehensive inventory of existing resources for planning. Teams gain insights into current network design patterns by analyzing snapshots over time. The historical data supports capacity planning by showing growth trends. When planning migrations, these snapshots enable accurate dependency mapping, reducing the risk of overlooking critical connections.

Solution Overview

Solution uses AWS Lambda to run a Python script daily that captures detailed information about your network resources and stores it in an S3 bucket with versioning enabled.

Architecture Diagram

Enter image description here

CloudFormation Template

Deploy this CloudFormation template to provision all necessary resources:

AWSTemplateFormatVersion: '2010-09-09'
Description: 'CloudFormation template for AWS Network Snapshot Lambda scheduler'

Parameters:
  S3BucketName:
    Type: String
    Description: Name of the S3 bucket to store network snapshots
    Default: aws-network-snapshots

  ScheduleExpression:
    Type: String
    Description: Schedule expression for the Lambda function (default is daily at midnight UTC)
    Default: cron(0 0 * * ? *)

Resources:
  # S3 bucket to store network snapshots
  NetworkSnapshotBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Ref S3BucketName
      VersioningConfiguration:
        Status: Enabled
      LifecycleConfiguration:
        Rules:
          - Id: DeleteOldSnapshots
            Status: Enabled
            ExpirationInDays: 90

  # IAM role for Lambda function
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: NetworkSnapshotPolicy
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - ec2:Describe*
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:PutObject
                  - s3:GetObject
                Resource: !Sub ${NetworkSnapshotBucket.Arn}/*

  # Lambda function
  NetworkSnapshotLambda:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: aws-network-snapshot
      Handler: index.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Runtime: python3.9
      Timeout: 300
      MemorySize: 256
      Environment:
        Variables:
          S3_BUCKET: !Ref NetworkSnapshotBucket
      Code:
        ZipFile: |
          import boto3
          import json
          import os
          from datetime import datetime
          from botocore.exceptions import ClientError

          def get_aws_network_snapshot():
              ec2 = boto3.client('ec2')
              
              snapshot = {
                  'VPCs': [],
                  'Subnets': [],
                  'RouteTables': [],
                  'InternetGateways': [],
                  'SecurityGroups': [],
                  'VirtualPrivateGateways': [],
                  'VPCPeeringConnections': [],
                  'NATGateways': [],
                  'TransitGateways': [],
                  'TransitGatewayAttachments': [],
                  'TransitGatewayRouteTables': [],
                  'VPNConnections': [],
                  'CustomerGateways': [],
                  'NetworkACLs': [],
                  'EndpointServices': [],
                  'VPCEndpoints': [],
                  'EgressOnlyInternetGateways': [],
                  'CarrierGateways': [],
                  'LocalGateways': [],
                  'VPCAttachments': [],
                  'PrefixLists': [],
                  'DHCPOptions': [],
                  'NetworkInterfaces': []
              }

              # Helper function to handle pagination
              def get_all_resources(method, key):
                  try:
                      resources = []
                      paginator = ec2.get_paginator(method)
                      for page in paginator.paginate():
                          resources.extend(page[key])
                      return resources
                  except ClientError as e:
                      print(f"Error fetching {method}: {e}")
                      return []
                  
              # Helper function for non-paginated calls
              def get_resources(method, key):
                  try:
                      response = getattr(ec2, method)()
                      return response.get(key, [])
                  except ClientError as e:
                      print(f"Error fetching {method}: {e}")
                      return []
                  
              # Basic VPC Resources
              snapshot['VPCs'] = get_all_resources('describe_vpcs', 'Vpcs')
              snapshot['Subnets'] = get_all_resources('describe_subnets', 'Subnets')
              snapshot['RouteTables'] = get_all_resources('describe_route_tables', 'RouteTables')
              snapshot['SecurityGroups'] = get_all_resources('describe_security_groups', 'SecurityGroups')
              snapshot['NetworkACLs'] = get_all_resources('describe_network_acls', 'NetworkAcls')
              snapshot['DHCPOptions'] = get_all_resources('describe_dhcp_options', 'DhcpOptions')
              snapshot['NetworkInterfaces'] = get_all_resources('describe_network_interfaces', 'NetworkInterfaces')

              # Gateway Resources
              snapshot['InternetGateways'] = get_all_resources('describe_internet_gateways', 'InternetGateways')
              snapshot['VirtualPrivateGateways'] = get_resources('describe_vpn_gateways', 'VpnGateways')
              snapshot['NATGateways'] = get_all_resources('describe_nat_gateways', 'NatGateways')
              snapshot['EgressOnlyInternetGateways'] = get_all_resources('describe_egress_only_internet_gateways', 'EgressOnlyInternetGateways')
              snapshot['CarrierGateways'] = get_all_resources('describe_carrier_gateways', 'CarrierGateways')
              snapshot['LocalGateways'] = get_all_resources('describe_local_gateways', 'LocalGateways')

              # VPN and Customer Gateways
              snapshot['CustomerGateways'] = get_resources('describe_customer_gateways', 'CustomerGateways')
              snapshot['VPNConnections'] = get_resources('describe_vpn_connections', 'VpnConnections')

              # Transit Gateway Resources
              snapshot['TransitGateways'] = get_all_resources('describe_transit_gateways', 'TransitGateways')
              snapshot['TransitGatewayAttachments'] = get_all_resources('describe_transit_gateway_attachments', 'TransitGatewayAttachments')
              snapshot['TransitGatewayRouteTables'] = get_all_resources('describe_transit_gateway_route_tables', 'TransitGatewayRouteTables')

              # VPC Peering and Endpoints
              snapshot['VPCPeeringConnections'] = get_all_resources('describe_vpc_peering_connections', 'VpcPeeringConnections')
              snapshot['EndpointServices'] = get_all_resources('describe_vpc_endpoint_services', 'ServiceDetails')
              snapshot['VPCEndpoints'] = get_all_resources('describe_vpc_endpoints', 'VpcEndpoints')

              # Prefix Lists
              snapshot['PrefixLists'] = get_all_resources('describe_managed_prefix_lists', 'PrefixLists')

              return snapshot

          def save_snapshot_to_s3(snapshot, bucket_name):
              s3 = boto3.client('s3')
              timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
              filename = f"aws_network_snapshot_{timestamp}.json"
              
              # Count resources
              resource_counts = {k: len(v) for k, v in snapshot.items()}
              
              # Add resource counts to the snapshot
              snapshot['ResourceCounts'] = resource_counts
              snapshot['SnapshotTimestamp'] = timestamp
              
              # Convert to JSON string
              snapshot_json = json.dumps(snapshot, indent=2, default=str)
              
              # Upload to S3
              s3.put_object(
                  Bucket=bucket_name,
                  Key=filename,
                  Body=snapshot_json,
                  ContentType='application/json'
              )
              
              print(f"Network snapshot saved to s3://{bucket_name}/{filename}")
              print("\nResource Counts:")
              for resource, count in resource_counts.items():
                  print(f"{resource}: {count}")
              
              return filename

          def lambda_handler(event, context):
              print("Starting AWS network snapshot...")
              bucket_name = os.environ['S3_BUCKET']
              
              try:
                  snapshot = get_aws_network_snapshot()
                  filename = save_snapshot_to_s3(snapshot, bucket_name)
                  
                  return {
                      'statusCode': 200,
                      'body': json.dumps({
                          'message': 'Network snapshot completed successfully',
                          'filename': filename,
                          'bucket': bucket_name
                      })
                  }
              except Exception as e:
                  print(f"Error: {str(e)}")
                  return {
                      'statusCode': 500,
                      'body': json.dumps({
                          'message': 'Error creating network snapshot',
                          'error': str(e)
                      })
                  }

  # EventBridge rule to schedule Lambda execution
  ScheduledRule:
    Type: AWS::Events::Rule
    Properties:
      Description: "Rule to trigger AWS Network Snapshot Lambda on schedule"
      ScheduleExpression: !Ref ScheduleExpression
      State: ENABLED
      Targets:
        - Arn: !GetAtt NetworkSnapshotLambda.Arn
          Id: "NetworkSnapshotLambdaTarget"

  # Permission for EventBridge to invoke Lambda
  PermissionForEventsToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref NetworkSnapshotLambda
      Action: "lambda:InvokeFunction"
      Principal: "events.amazonaws.com"
      SourceArn: !GetAtt ScheduledRule.Arn

Outputs:
  NetworkSnapshotBucketName:
    Description: "Name of the S3 bucket storing network snapshots"
    Value: !Ref NetworkSnapshotBucket

  NetworkSnapshotLambdaArn:
    Description: "ARN of the Network Snapshot Lambda function"
    Value: !GetAtt NetworkSnapshotLambda.Arn

  ScheduleExpression:
    Description: "Schedule expression for the Lambda function"
    Value: !Ref ScheduleExpression

Deployment Steps

  1. Save the CloudFormation template to a file named network_snapshot_cf.yaml

  2. Deploy using AWS CLI:

    aws cloudformation create-stack \
      --stack-name network-snapshot-stack \
      --template-body file://network_snapshot_cf.yaml \
      --capabilities CAPABILITY_IAM
  3. Customize parameters (optional):

    • S3BucketName: Choose a unique name for your S3 bucket
    • ScheduleExpression: Modify the schedule if needed

Key Features

  • Captures VPCs, subnets, route tables, security groups, and many other network resources
  • Runs daily at midnight UTC (customizable)
  • Stores snapshots in S3 with 90-day retention
  • Uses least privilege permissions
  • Costs less than $1 per month for most AWS accounts