Skip to content

Regenerate and Route AWS Health Events to EventBridge

10 minute read
Content level: Advanced
1

This article demonstrates how to programmatically regenerate AWS Health events and route them to EventBridge buses—whether default or custom—enabling seamless integration with downstream consumers like Lambda functions, SNS topics, SQS queues, and third-party monitoring tools.

AWS Health provides personalized information about events that can affect your AWS infrastructure and services. However, organizations often need to replay historical health events or route them to centralized event buses for downstream processing, alerting, and integration with existing monitoring workflows. This becomes particularly important when implementing new monitoring solutions, testing incident response procedures, or consolidating multi-account health data.

Organizations may have requirements when managing AWS Health events:

  1. Event Replay: Historical health events aren't automatically available in EventBridge for new integrations
  2. Centralized Routing: Multi-account environments need a way to funnel health events to a central processing location
  3. Downstream Integration: Existing monitoring tools and workflows require events in EventBridge format
  4. Testing and Validation: Teams need to replay events to test alerting and response procedures
  5. Data Enrichment: Raw health events often need additional context before downstream processing

Overview

By leveraging the AWS Health API and EventBridge, you can create a robust event regeneration and routing pipeline that:

  • Retrieves historical or current health events from AWS Health API
  • Enriches events with additional metadata (affected resources, account information, event descriptions)
  • Formats events in EventBridge-compatible structure
  • Routes events to designated EventBridge buses (default or custom)
  • Enables downstream consumers to process events consistently

Architecture

Event Flow

AWS Health API
    ↓
Event Retrieval & Enrichment
    ↓
Event Formatting (EventBridge Schema)
    ↓
EventBridge Bus (Default or Custom)
    ↓
Downstream Consumers
    ├── Lambda Functions
    ├── SNS Topics
    ├── SQS Queues
    ├── Step Functions
    └── Third-party Integrations

Key Components

  1. Event Source: AWS Health API provides comprehensive event data
  2. Processing Layer: Python script retrieves, enriches, and formats events
  3. Event Bus: EventBridge receives and routes events based on rules
  4. Consumers: Downstream services process events for alerting, logging, or remediation

Implementation

Event Retrieval and Enrichment

The first step is retrieving health events with full context:

import boto3
import json
from datetime import datetime

health_client = boto3.client('health', 'us-east-1')
eventbridge_client = boto3.client('events', region_name='us-east-1')

def get_health_events():
    """Retrieve health events with pagination"""
    events = []
    next_token = None
    
    while True:
        kwargs = {'filter': {}}
        if next_token:
            kwargs['nextToken'] = next_token
            
        response = health_client.describe_events(**kwargs)
        events.extend(response.get('events', []))
        
        next_token = response.get('nextToken')
        if not next_token:
            break
    
    return events

def get_event_details(event_arn):
    """Retrieve detailed information for a specific event"""
    response = health_client.describe_event_details(
        eventArns=[event_arn]
    )
    
    if response.get('successfulSet'):
        return response['successfulSet'][0]
    return None

def get_affected_entities(event_arn):
    """Retrieve affected resources for an event"""
    entities = []
    next_token = None
    
    while True:
        kwargs = {
            'filter': {
                'eventArns': [event_arn]
            }
        }
        if next_token:
            kwargs['nextToken'] = next_token
            
        response = health_client.describe_affected_entities(**kwargs)
        entities.extend(response.get('entities', []))
        
        next_token = response.get('nextToken')
        if not next_token:
            break
    
    return entities

Event Formatting for EventBridge

Transform health events into EventBridge-compatible format:

def format_event_for_eventbridge(event_details, affected_entities):
    """Format health event data for EventBridge"""
    event = event_details.get('event', {})
    description = event_details.get('eventDescription', {})
    metadata = event_details.get('eventMetadata', {})
    
    # Build enriched event payload
    event_data = {
        'eventArn': event.get('arn'),
        'service': event.get('service'),
        'eventTypeCode': event.get('eventTypeCode'),
        'eventTypeCategory': event.get('eventTypeCategory'),
        'region': event.get('region'),
        'startTime': event.get('startTime').isoformat() if event.get('startTime') else None,
        'endTime': event.get('endTime').isoformat() if event.get('endTime') else None,
        'lastUpdatedTime': event.get('lastUpdatedTime').isoformat() if event.get('lastUpdatedTime') else None,
        'statusCode': event.get('statusCode'),
        'eventDescription': description.get('latestDescription', ''),
        'eventMetadata': metadata,
        'affectedEntities': [
            {
                'entityValue': entity.get('entityValue'),
                'entityArn': entity.get('entityArn'),
                'statusCode': entity.get('statusCode'),
                'tags': entity.get('tags', {})
            }
            for entity in affected_entities
        ]
    }
    
    return event_data

Routing to EventBridge Bus

Send formatted events to the designated EventBridge bus:

def send_to_eventbridge(event_data, bus_name='default', source='heidi.health'):
    """
    Send event to EventBridge bus
    
    Note: Use 'heidi.health' as source for manually regenerated events
    to distinguish from native AWS Health events (aws.health)
    """
    try:
        response = eventbridge_client.put_events(
            Entries=[
                {
                    'Source': source,
                    'DetailType': 'AWS Health Event',
                    'Detail': json.dumps(event_data, default=str),
                    'EventBusName': bus_name,
                    'Resources': [event_data.get('eventArn', '')]
                }
            ]
        )
        
        if response['FailedEntryCount'] > 0:
            print(f"Failed to send event: {response['Entries']}")
            return False
        
        print(f"Successfully sent event {event_data['eventArn']} to {bus_name}")
        return True
        
    except Exception as e:
        print(f"Error sending event to EventBridge: {e}")
        return False

Complete Regeneration Pipeline

Putting it all together:

def regenerate_and_route_events(
    bus_name='default',
    source='heidi.health',
    event_filter=None
):
    """
    Main function to regenerate and route health events
    
    Args:
        bus_name: Target EventBridge bus name
        source: Event source identifier (use 'heidi.health' for regenerated events)
        event_filter: Optional filter for event selection
    """
    print(f"Starting health event regeneration to bus: {bus_name}")
    
    # Retrieve all health events
    events = get_health_events()
    print(f"Retrieved {len(events)} health events")
    
    success_count = 0
    failure_count = 0
    
    for event in events:
        event_arn = event.get('arn')
        
        # Apply filter if provided
        if event_filter and not event_filter(event):
            continue
        
        try:
            # Get detailed information
            event_details = get_event_details(event_arn)
            if not event_details:
                print(f"No details found for {event_arn}")
                failure_count += 1
                continue
            
            # Get affected entities
            affected_entities = get_affected_entities(event_arn)
            
            # Format for EventBridge
            event_data = format_event_for_eventbridge(
                event_details, 
                affected_entities
            )
            
            # Send to EventBridge
            if send_to_eventbridge(event_data, bus_name, source):
                success_count += 1
            else:
                failure_count += 1
                
        except Exception as e:
            print(f"Error processing event {event_arn}: {e}")
            failure_count += 1
    
    print(f"\nRegeneration complete:")
    print(f"  Success: {success_count}")
    print(f"  Failures: {failure_count}")
    print(f"  Total: {len(events)}")

# Example usage
if __name__ == "__main__":
    # Route to default bus
    regenerate_and_route_events(bus_name='default')
    
    # Or route to custom bus
    # regenerate_and_route_events(bus_name='custom-health-bus')
    
    # With filtering (e.g., only issues in last 30 days)
    # from datetime import timedelta
    # def recent_issues(event):
    #     if event.get('eventTypeCategory') != 'issue':
    #         return False
    #     start_time = event.get('startTime')
    #     return start_time and (datetime.now(start_time.tzinfo) - start_time) < timedelta(days=30)
    # 
    # regenerate_and_route_events(
    #     bus_name='default',
    #     event_filter=recent_issues
    # )

Event Source Distinction

Important: Manually regenerated events use heidi.health as the source to distinguish them from native AWS Health events:

  • aws.health: Native events automatically generated by AWS Health service
  • heidi.health: Manually regenerated/backfilled events from the HEIDI solution

This distinction allows you to:

  • Create separate EventBridge rules for native vs. regenerated events
  • Avoid duplicate processing if both native and regenerated events exist
  • Track which events came from backfill operations vs. real-time feeds

Routing Strategies

Strategy 1: Default Bus for Simple Integration

Route events to the default EventBridge bus when:

  • You have simple downstream consumers in the same account
  • You want to leverage existing EventBridge rules
  • You need quick setup without additional infrastructure
# Events will use 'heidi.health' as source by default
regenerate_and_route_events(bus_name='default')

Strategy 2: Custom Bus for Isolation

Use a dedicated custom bus when:

  • You need to isolate health events from other EventBridge traffic
  • You want granular access control for health event consumers
  • You're implementing a multi-tenant architecture
# First create the custom bus
eventbridge_client.create_event_bus(Name='health-events-bus')

# Then route events to it
regenerate_and_route_events(bus_name='health-events-bus')

Strategy 3: Cross-Account Routing

For centralized monitoring across multiple accounts:

# In member accounts, route to central bus
central_bus_arn = f"arn:aws:events:{region}:{central_account_id}:event-bus/central-health-bus"

def send_to_central_bus(event_data):
    """Send event to cross-account EventBridge bus"""
    eventbridge_client.put_events(
        Entries=[
            {
                'Source': 'heidi.health',  # Use heidi.health for regenerated events
                'DetailType': 'AWS Health Event',
                'Detail': json.dumps(event_data, default=str),
                'EventBusName': central_bus_arn
            }
        ]
    )

Downstream Consumer Examples

Example 1: Lambda Function for Slack Notifications

# EventBridge Rule Pattern - matches both native and regenerated events
{
  "source": ["aws.health", "heidi.health"],
  "detail-type": ["AWS Health Event"],
  "detail": {
    "eventTypeCategory": ["issue"]
  }
}

# Or match only regenerated events
{
  "source": ["heidi.health"],
  "detail-type": ["AWS Health Event"],
  "detail": {
    "eventTypeCategory": ["issue"]
  }
}

# Lambda Handler
def lambda_handler(event, context):
    health_event = event['detail']
    event_source = event['source']  # 'aws.health' or 'heidi.health'
    
    # Optionally handle differently based on source
    source_label = "🔄 Backfilled" if event_source == "heidi.health" else "🔴 Live"
    
    message = f"""
    {source_label} AWS Health Alert
    Service: {health_event['service']}
    Region: {health_event['region']}
    Type: {health_event['eventTypeCode']}
    Description: {health_event['eventDescription']}
    Affected Resources: {len(health_event['affectedEntities'])}
    """
    
    # Send to Slack
    send_slack_notification(message)

Example 2: SNS Topic for Email Alerts

# EventBridge Rule targeting SNS - matches regenerated events
{
  "source": ["heidi.health"],
  "detail-type": ["AWS Health Event"],
  "detail": {
    "eventTypeCategory": ["scheduledChange"],
    "service": ["EC2", "RDS"]
  }
}

# Or match both native and regenerated events
{
  "source": ["aws.health", "heidi.health"],
  "detail-type": ["AWS Health Event"],
  "detail": {
    "eventTypeCategory": ["scheduledChange"],
    "service": ["EC2", "RDS"]
  }
}

Example 3: SQS Queue for Batch Processing

# EventBridge Rule targeting SQS for async processing
# Useful for high-volume events or complex processing workflows

Best Practices

1. Event Filtering Only regenerate and route events that are relevant to your use case. Filter by:

  • Event category (issue, scheduledChange, accountNotification)
  • Service (EC2, RDS, Lambda, etc.)
  • Region
  • Time range
  • Status code

2. Event Source Naming Always use heidi.health as the source for manually regenerated events:

  • Distinguishes backfilled events from native AWS Health events
  • Allows separate processing rules for historical vs. real-time events
  • Prevents confusion when both native and regenerated events exist
  • Aligns with HEIDI framework conventions

3. Rate Limiting Implement exponential backoff when calling AWS Health API to avoid throttling:

import time
from botocore.exceptions import ClientError

def call_with_backoff(func, max_retries=5):
    for attempt in range(max_retries):
        try:
            return func()
        except ClientError as e:
            if e.response['Error']['Code'] == 'ThrottlingException':
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                time.sleep(wait_time)
            else:
                raise
    raise Exception("Max retries exceeded")

4. Event Deduplication Track processed events to avoid duplicates:

import hashlib

def get_event_hash(event_arn, last_updated_time):
    """Create unique hash for event state"""
    return hashlib.sha256(
        f"{event_arn}:{last_updated_time}".encode()
    ).hexdigest()

# Store in DynamoDB or similar
processed_events = set()

def is_event_processed(event_hash):
    return event_hash in processed_events

5. Monitoring and Logging Implement comprehensive logging:

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

logger.info(f"Processing event: {event_arn}")
logger.error(f"Failed to send event: {error}")

6. IAM Permissions Ensure proper permissions for the execution role:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "health:DescribeEvents",
        "health:DescribeEventDetails",
        "health:DescribeAffectedEntities"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "events:PutEvents"
      ],
      "Resource": [
        "arn:aws:events:*:*:event-bus/default",
        "arn:aws:events:*:*:event-bus/custom-health-bus"
      ]
    }
  ]
}

Use Cases

Testing Incident Response Replay historical incidents to test and refine your incident response procedures without waiting for real events.

Onboarding New Monitoring Tools When implementing new monitoring solutions, regenerate historical events to populate dashboards and establish baselines.

Compliance and Auditing Route health events to compliance systems for audit trail maintenance and regulatory reporting.

Multi-Account Consolidation Centralize health events from multiple AWS accounts into a single monitoring hub for unified visibility.

Custom Alerting Workflows Build sophisticated alerting logic by routing events through EventBridge to Lambda functions with custom business rules.

Organizational View Support

For AWS Organizations, extend the solution to handle organization-wide events:

def regenerate_org_events(bus_name='default'):
    """Regenerate organization-level health events"""
    health_client = boto3.client('health', 'us-east-1')
    
    # Get organization events
    events = []
    next_token = None
    
    while True:
        kwargs = {'filter': {}}
        if next_token:
            kwargs['nextToken'] = next_token
            
        response = health_client.describe_events_for_organization(**kwargs)
        events.extend(response.get('events', []))
        
        next_token = response.get('nextToken')
        if not next_token:
            break
    
    # Process each event with affected accounts
    for event in events:
        affected_accounts = health_client.describe_affected_accounts_for_organization(
            eventArn=event['arn']
        )
        
        for account_id in affected_accounts.get('affectedAccounts', []):
            # Get account-specific details and route to bus
            process_org_event(event, account_id, bus_name)

Conclusion

Regenerating and routing AWS Health events to EventBridge buses provides a powerful foundation for centralized monitoring, alerting, and incident response. By programmatically retrieving historical events and formatting them for EventBridge consumption, you can:

  • Integrate health events with existing monitoring workflows
  • Test and validate incident response procedures
  • Consolidate multi-account health data
  • Build custom alerting and remediation logic
  • Maintain comprehensive audit trails

This approach transforms AWS Health from a passive information source into an active component of your operational excellence strategy, enabling proactive monitoring and faster incident response across your AWS infrastructure.

Additional Resources