Using Non Compliance custom config rule and remediation to auto tag resources missing custom tags

0

I'm in the process of creating a config rule for Noncompliant resources that don't adhere to a custom tagging standard. My current challenge is in automating the remediation process using either Lambda or SSM after the config rule has identified Noncompliant resources. The issue I'm encountering is that my Lambda function successfully finds and auto tags DynamoDB resources, but I'm encountering errors when attempting to fetch the resources' ARN for other resource types. Additionally, despite trying various triggers such as SNS topics, the Lambda function fails to invoke when new resources are created or for other existing resources to be auto-tagged. It's worth noting that the Lambda function is written in Python, the SSM document is in YAML format, and the entire infrastructure is provisioned using Terraform. Any insights or assistance in resolving these challenges would be greatly appreciated.

resource "aws_config_organization_managed_rule" "required_tags" { name = local.required_tags_organization_name rule_identifier = regex("^[a-zA-Z0-9-_]+$", "REQUIRED_TAGS")

input_parameters = jsonencode({ tag1Key = "enabled", tag1Value = "true", tag2Key = "organization", tag2Value = "", tag3Key = "cloud_provider", tag3Value = "aws", tag4Key = "namespace", tag4Value = "mgmt", tag5Key = "parent_name", tag5Value = "N/A", tag6Key = "name", tag6Value = "N/A" })

lambda_function:

import boto3 import logging

ssm_client = boto3.client('ssm') config_client = boto3.client('config')

logger = logging.getLogger() logger.setLevel(logging.INFO)

def get_parameters_from_ssm(account_id, parameter_path): try: response = ssm_client.get_parameters_by_path(Path=parameter_path) return {param['Name'].split('/')[-1]: param['Value'] for param in response['Parameters']} except Exception as e: raise ValueError(f"Error fetching SSM parameters: {e}")

def lambda_handler(event, context): try: # Extract execution context information if available current_account_id = None if 'invoked_function_arn' in context: current_account_id = context.invoked_function_arn.split(":")[4]

    execution_rate = event.get("ExecutionRate", "rate(1 day)")
    tagging_parameters_path = event.get("TaggingParametersPath", "/org/tagging/")

    parameters = get_parameters_from_ssm(current_account_id, tagging_parameters_path)

    org_client = boto3.client('organizations')
    accounts = []
    paginator = org_client.get_paginator('list_accounts')
    for page in paginator.paginate():
        for account in page['Accounts']:
            accounts.append(account['Id'])

    tagged_resources = []
    for account_id in accounts:
        # Get all Config rules for the current account
        response = config_client.describe_config_rules()
        all_config_rules = response.get('ConfigRules', [])

        for rule in all_config_rules:
            if "prod-config-auto-tags" in rule.get('ConfigRuleName', '').lower():
                rule_name = rule['ConfigRuleName']
                break
        else:
            raise ValueError("No Config rule found for required tag compliance")

        response = config_client.get_compliance_details_by_config_rule(
            ConfigRuleName=rule_name,
            ComplianceTypes=['NON_COMPLIANT']
        )
        noncompliant_resources = response.get('EvaluationResults', [])

        for resource in noncompliant_resources:
            resource_id = resource['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId']
            resource_type = resource['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceType']
            resource_arn = convert_to_arn(resource_id, resource_type)
            if resource_arn:
                tag_resource(resource_arn, parameters)
                tagged_resources.append({"ResourceARN": resource_arn, "Tags": parameters})
            else:
                logger.error(f"Error fetching ARN for resource {resource_id}")

    return {"message": "Auto-tagging completed.", "tagged_resources": tagged_resources}

except ValueError as ve:
    return {"error": str(ve)}
except Exception as e:
    return {"error": f"An unexpected error occurred: {e}"}

def convert_to_arn(resource_id, resource_type): if resource_type == 'AWS::EC2::Instance': return f'arn:aws:ec2:::{resource_id}' elif resource_type == 'AWS::S3::Bucket': return f'arn:aws:s3:::{resource_id}' elif resource_type == 'AWS::RDS::DBCluster': return f'arn:aws:rds:::{resource_id}' elif resource_type == 'AWS::DynamoDB::Table': return f'arn:aws:dynamodb:::{resource_id}' elif resource_type.startswith('arn:aws:cloudformation'): return resource_id return None

def tag_resource(resource_arn, parameters): try: response = config_client.put_evaluations( Evaluations=[ { 'ComplianceResourceType': 'AWS::Tag', 'ComplianceResourceId': resource_arn, 'ComplianceType': 'COMPLIANT', 'Annotation': 'Resource is compliant' } ], ResultToken='string' ) logger.info(f"Resource {resource_arn} successfully tagged with required tags.") except Exception as e: logger.error(f"Error tagging resource {resource_arn}: {e}") raise ValueError(f"Error tagging resource {resource_arn}: {e}") .

######### tf

resource "aws_ssm_document" "required_tags" { name = local.ssm_document_name document_type = "Automation" version_name = "default" document_format = "YAML" content = <<-CONTENT schemaVersion: "0.3" description: "Automated remediation for adding required tags to non-compliant resources" assumeRole: "arn:aws:iam::XXXXXXXXXX:role/prod-config-auto-tags-ssm-execution-role" mainSteps:

  • name: RunPythonScript action: "aws:executeScript" inputs: Runtime: "python3.8" Handler: "lambda_handler" Script: | import boto3 import logging

    # Initialize AWS clients
    ssm_client = boto3.client('ssm')
    config_client = boto3.client('config')
    
    # Setup logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # Helper function to fetch parameters from SSM Parameter Store
    def get_parameters_from_ssm(account_id, parameter_path):
        try:
            response = ssm_client.get_parameters_by_path(Path=parameter_path)
            return {param['Name'].split('/')[-1]: param['Value'] for param in response['Parameters']}
        except Exception as e:
            raise ValueError(f"Error fetching SSM parameters: {e}")
    
    # Main Lambda handler function
    def lambda_handler(event, context):
        try:
            # Extract execution context information if available
            current_account_id = None
            if 'invoked_function_arn' in context:
                current_account_id = context.invoked_function_arn.split(":")[4]
    
            execution_rate = event.get("ExecutionRate", "rate(1 day)")
            tagging_parameters_path = event.get("TaggingParametersPath", "/org/tagging/")
    
            # Get parameters for tagging from SSM Parameter Store
            parameters = get_parameters_from_ssm(current_account_id, tagging_parameters_path)
    
            # Get list of all AWS accounts in the organization
            org_client = boto3.client('organizations')
            accounts = []
            paginator = org_client.get_paginator('list_accounts')
            for page in paginator.paginate():
                for account in page['Accounts']:
                    accounts.append(account['Id'])
    
            # Loop through each account and region
            tagged_resources = []
            for account_id in accounts:
                # Get all Config rules for the current account
                response = config_client.describe_config_rules()
                all_config_rules = response.get('ConfigRules', [])
    
                # Find the Config rule related to required tag compliance
                for rule in all_config_rules:
                    if "prod-config-auto-tags" in rule.get('ConfigRuleName', '').lower():
                        rule_name = rule['ConfigRuleName']
                        break
                else:
                    raise ValueError("No Config rule found for required tag compliance")
    
                # Get noncompliant resources for the specific rule and account
                response = config_client.get_compliance_details_by_config_rule(
                    ConfigRuleName=rule_name,
                    ComplianceTypes=['NON_COMPLIANT']
                )
                noncompliant_resources = response.get('EvaluationResults', [])
    
                # Auto-tag noncompliant resources
                for resource in noncompliant_resources:
                    resource_id = resource['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId']
                    resource_type = resource['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceType']
                    resource_arn = convert_to_arn(resource_id, resource_type)
                    if resource_arn:
                        tag_resource(resource_arn, parameters)
                        tagged_resources.append({"ResourceARN": resource_arn, "Tags": parameters})
                    else:
                        logger.error(f"Error fetching ARN for resource {resource_id}")
    
            return {"message": "Auto-tagging completed.", "tagged_resources": tagged_resources}
    
        except ValueError as ve:
            return {"error": str(ve)}
        except Exception as e:
            return {"error": f"An unexpected error occurred: {e}"}
    
    # Helper function to convert resource ID and type to ARN
    def convert_to_arn(resource_id, resource_type):
        if resource_type == 'AWS::EC2::Instance':
            return f'arn:aws:ec2:::{resource_id}'
        elif resource_type == 'AWS::S3::Bucket':
            return f'arn:aws:s3:::{resource_id}'
        elif resource_type == 'AWS::RDS::DBCluster':
            return f'arn:aws:rds:::{resource_id}'
        elif resource_type == 'AWS::DynamoDB::Table':
            return f'arn:aws:dynamodb:::{resource_id}'
        elif resource_type.startswith('arn:aws:cloudformation'):
            return resource_id
        return None
    
    # Helper function to tag the resource with required tags
    def tag_resource(resource_arn, parameters):
        try:
            response = config_client.put_evaluations(
                Evaluations=[
                    {
                        'ComplianceResourceType': 'AWS::Tag',
                        'ComplianceResourceId': resource_arn,
                        'ComplianceType': 'COMPLIANT',
                        'Annotation': 'Resource is compliant'
                    }
                ],
                ResultToken='string'
            )
            logger.info(f"Resource {resource_arn} successfully tagged with required tags.")
        except Exception as e:
            logger.error(f"Error tagging resource {resource_arn}: {e}")
            raise ValueError(f"Error tagging resource {resource_arn}: {e}")
    

CONTENT }

1 Answer
5
Accepted Answer

Hello

It seems like you're encountering a couple of challenges in your script and Terraform configuration related to automating the remediation process for non-compliant resources identified by your Config rule. Here are some insights and potential solutions to your issues:

Fetching ARNs for resources of different types: It appears that you're encountering errors when attempting to fetch the ARNs for resources other than DynamoDB tables. One potential reason for this could be that the 'convert_to_arn' function may not cover all resource types you're trying to handle. Ensure that the 'convert_to_arn' function supports all resource types expected by your Config rule. Double-check the resource type strings returned by Config to ensure they match the cases handled in your convert_to_arn function. If there are discrepancies, adjust the function accordingly. **Lambda function invocation triggers: ** You mentioned that the Lambda function fails to invoke when new resources are created or for existing resources to be auto-tagged. This could be due to the lack of appropriate triggers configured for the Lambda function. Ensure that the Lambda function is properly subscribed to relevant SNS topics or configured as a target for CloudWatch Events rules that capture resource creation events. This ensures that the Lambda function is invoked whenever non-compliant resources are detected or new resources are created. Verify that the Lambda execution role has sufficient permissions to access the necessary AWS services (e.g., Config, SSM) and perform the required actions on resources. SSM automation document configuration: Your SSM automation document seems to be configured to execute a Python script (presumably the Lambda function) using the 'aws:executeScript' action. Ensure that the SSM document is correctly associated with the Lambda function and that the execution role '(prod-config-auto-tags-ssm-execution-role)' has the necessary permissions to execute the script. Double-check the 'assumeRole' parameter in the SSM document to ensure it grants the required permissions for the automation to execute successfully. ******Debugging and logging: ****** Implement comprehensive logging within your Lambda function to capture any errors or unexpected behavior during execution. This will help in diagnosing issues related to resource fetching, tagging, and Lambda invocation. Utilize CloudWatch Logs to review the log output generated by your Lambda function and SSM automation document. Look for any error messages or exceptions that may provide clues to the root cause of the failures.

answered 11 days ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions