By using AWS re:Post, you agree to the Terms of Use

Not able to get the data in query result in the Athena for the AWS config from S3 bucket

0

Hi,

I have been trying to implement a monitoring solution to monitor the resources for AWS accounts in the organizations with AWS config, AWS Athena and Quicksight. I have set up all the services however Athena is not able to query all the data from the S3 bucket where the config data for all the accounts are stored. It is able to only query the data for the current account from where I am running the query. I can see the config data for all accounts in the S3 bucket as well.

Athena table creation query

CREATE EXTERNAL TABLE aws_config_configuration_snapshot (
    fileversion STRING,
    configSnapshotId STRING,
    configurationitems ARRAY < STRUCT <
        configurationItemVersion : STRING,
        configurationItemCaptureTime : STRING,
        configurationStateId : BIGINT,
        awsAccountId : STRING,
        configurationItemStatus : STRING,
        resourceType : STRING,
        resourceId : STRING,
        resourceName : STRING,
        ARN : STRING,
        awsRegion : STRING,
        availabilityZone : STRING,
        configurationStateMd5Hash : STRING,
        configuration : STRING,
        supplementaryConfiguration : MAP < STRING, STRING >,
        tags: MAP < STRING, STRING >,
        resourceCreationTime : STRING
    > >
) PARTITIONED BY (accountid STRING, dt STRING, region STRING) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<S3_BUCKET_NAME>/AWSLogs/';

The lambda function used for data partitioning as per accounts

import datetime
import re
import boto3
import os

TABLE_NAME = 'aws_config_configuration_snapshot'
DATABASE_NAME = 'sampledb'
ACCOUNT_ID = None # Determined at runtime
LATEST_PARTITION_VALUE = 'latest'

athena = boto3.client('athena')

def lambda_handler(event, context):
    global ACCOUNT_ID

    object_key = event['Records'][0]['s3']['object']['key']
    match = get_configuration_snapshot_object_key_match(object_key)
    if match is None:
        print('Ignoring event for non-configuration snapshot object key', object_key)
        return
    print('Adding partitions for configuration snapshot object key', object_key)
    
    ACCOUNT_ID = context.invoked_function_arn.split(':')[4]
    object_key_parent = 's3://{bucket_name}/{object_key_parent}/'.format(
        bucket_name=event['Records'][0]['s3']['bucket']['name'],
        object_key_parent=os.path.dirname(object_key))
    configuration_snapshot_accountid = get_configuration_snapshot_accountid(match)
    configuration_snapshot_region = get_configuration_snapshot_region(match)
    configuration_snapshot_date = get_configuration_snapshot_date(match)
    
    drop_partition(configuration_snapshot_accountid, configuration_snapshot_region, LATEST_PARTITION_VALUE)
    add_partition(configuration_snapshot_accountid, configuration_snapshot_region, LATEST_PARTITION_VALUE, object_key_parent)
    add_partition(configuration_snapshot_accountid, configuration_snapshot_region, get_configuration_snapshot_date(match).strftime('%Y-%m-%d'), object_key_parent)
    
def get_configuration_snapshot_object_key_match(object_key):
    # Matches object keys like AWSLogs/123456789012/Config/us-east-1/2018/4/11/ConfigSnapshot/123456789012_Config_us-east-1_ConfigSnapshot_20180411T054711Z_a970aeff-cb3d-4c4e-806b-88fa14702hdb.json.gz
    return re.match('^AWSLogs/(\d+)/Config/([\w-]+)/(\d+)/(\d+)/(\d+)/ConfigSnapshot/[^\\\]+$', object_key)

def get_configuration_snapshot_accountid(match):
    print('AccountId:', match.group(1))
    return match.group(1)

def get_configuration_snapshot_region(match):
    return match.group(2)

def get_configuration_snapshot_date(match):
    return datetime.date(int(match.group(3)), int(match.group(4)), int(match.group(5)))
    
def add_partition(accountid_partition_value, region_partition_value, dt_partition_value, partition_location):
    execute_query('ALTER TABLE {table_name} ADD PARTITION {partition} location \'{partition_location}\''.format(
        table_name=TABLE_NAME,
        partition=build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value),
        partition_location=partition_location))
        
def drop_partition(accountid_partition_value, region_partition_value, dt_partition_value):
    execute_query('ALTER TABLE {table_name} DROP PARTITION {partition}'.format(
        table_name=TABLE_NAME,
        partition=build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value)))
        
def build_partition_string(accountid_partition_value, region_partition_value, dt_partition_value):
    return "(accountid='{accountid_partition_value}', dt='{dt_partition_value}', region='{region_partition_value}')".format(
	    accountid_partition_value=accountid_partition_value,
        dt_partition_value=dt_partition_value,
        region_partition_value=region_partition_value)

def execute_query(query):
    print('Executing query:', query)
    query_output_location = 's3://aws-athena-query-results-{account_id}-{region}'.format(
        account_id=ACCOUNT_ID,
        region=os.environ['AWS_REGION'])
    start_query_response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': DATABASE_NAME
        },
        ResultConfiguration={
            'OutputLocation': query_output_location,
        }
    )
    print('Query started')
    
    is_query_running = True
    while is_query_running:
        get_query_execution_response = athena.get_query_execution(
            QueryExecutionId=start_query_response['QueryExecutionId']
        )
        query_state = get_query_execution_response['QueryExecution']['Status']['State']
        is_query_running = query_state in ('RUNNING','QUEUED')
        
        if not is_query_running and query_state != 'SUCCEEDED':
            raise Exception('Query failed')
    print('Query completed')

sample query tried:

CREATE OR REPLACE VIEW v_config_ec2_vpcs AS

SELECT DISTINCT

"accountId" "633328536665"

, "region" "us-east-1"

, "configurationItem"."resourceid" "ResourceId"

, "configurationItem"."tags"['name'] "TagName"

, "json_extract_scalar"("configurationItem"."configuration", '$.isdefault') "IsDefault"

, "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[0].cidrblock') "CidrBlock0"

, "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[1].cidrblock') "CidrBlock1"

, "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[2].cidrblock') "CidrBlock2"

, "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[3].cidrblock') "CidrBlock3"

, "json_extract_scalar"("configurationItem"."configuration", '$.cidrblockassociationset[4].cidrblock') "CidrBlock4"

FROM default.aws_config_configuration_snapshot

CROSS JOIN UNNEST("configurationitems") t (configurationItem)

WHERE (("dt" = 'latest') AND ("configurationItem"."resourcetype" = 'AWS::EC2::VPC'))

It is not able to get the data from the S3 bucket for all the AWS account for some reason(only the data for the current account the data is queried.). I have checked the s3 bucket policy and it is set up as per the given below solution.

Solution referred:

Thanks and Regards,

Mahesh B.