AWS announces preview of AWS Interconnect - multicloud
AWS announces AWS Interconnect – multicloud (preview), providing simple, resilient, high-speed private connections to other cloud service providers. AWS Interconnect - multicloud is easy to configure and provides high-speed, resilient connectivity with dedicated bandwidth, enabling customers to interconnect AWS networking services such as AWS Transit Gateway, AWS Cloud WAN, and Amazon VPC to other cloud service providers with ease.
Regenerate and Route AWS Health Events to EventBridge
This article demonstrates how to programmatically regenerate AWS Health events and route them to EventBridge buses—whether default or custom—enabling seamless integration with downstream consumers like Lambda functions, SNS topics, SQS queues, and third-party monitoring tools.
AWS Health provides personalized information about events that can affect your AWS infrastructure and services. However, organizations often need to replay historical health events or route them to centralized event buses for downstream processing, alerting, and integration with existing monitoring workflows. This becomes particularly important when implementing new monitoring solutions, testing incident response procedures, or consolidating multi-account health data.
Organizations may have requirements when managing AWS Health events:
- Event Replay: Historical health events aren't automatically available in EventBridge for new integrations
- Centralized Routing: Multi-account environments need a way to funnel health events to a central processing location
- Downstream Integration: Existing monitoring tools and workflows require events in EventBridge format
- Testing and Validation: Teams need to replay events to test alerting and response procedures
- Data Enrichment: Raw health events often need additional context before downstream processing
Overview
By leveraging the AWS Health API and EventBridge, you can create a robust event regeneration and routing pipeline that:
- Retrieves historical or current health events from AWS Health API
- Enriches events with additional metadata (affected resources, account information, event descriptions)
- Formats events in EventBridge-compatible structure
- Routes events to designated EventBridge buses (default or custom)
- Enables downstream consumers to process events consistently
Architecture
Event Flow
AWS Health API
↓
Event Retrieval & Enrichment
↓
Event Formatting (EventBridge Schema)
↓
EventBridge Bus (Default or Custom)
↓
Downstream Consumers
├── Lambda Functions
├── SNS Topics
├── SQS Queues
├── Step Functions
└── Third-party Integrations
Key Components
- Event Source: AWS Health API provides comprehensive event data
- Processing Layer: Python script retrieves, enriches, and formats events
- Event Bus: EventBridge receives and routes events based on rules
- Consumers: Downstream services process events for alerting, logging, or remediation
Implementation
Event Retrieval and Enrichment
The first step is retrieving health events with full context:
import boto3 import json from datetime import datetime health_client = boto3.client('health', 'us-east-1') eventbridge_client = boto3.client('events', region_name='us-east-1') def get_health_events(): """Retrieve health events with pagination""" events = [] next_token = None while True: kwargs = {'filter': {}} if next_token: kwargs['nextToken'] = next_token response = health_client.describe_events(**kwargs) events.extend(response.get('events', [])) next_token = response.get('nextToken') if not next_token: break return events def get_event_details(event_arn): """Retrieve detailed information for a specific event""" response = health_client.describe_event_details( eventArns=[event_arn] ) if response.get('successfulSet'): return response['successfulSet'][0] return None def get_affected_entities(event_arn): """Retrieve affected resources for an event""" entities = [] next_token = None while True: kwargs = { 'filter': { 'eventArns': [event_arn] } } if next_token: kwargs['nextToken'] = next_token response = health_client.describe_affected_entities(**kwargs) entities.extend(response.get('entities', [])) next_token = response.get('nextToken') if not next_token: break return entities
Event Formatting for EventBridge
Transform health events into EventBridge-compatible format:
def format_event_for_eventbridge(event_details, affected_entities): """Format health event data for EventBridge""" event = event_details.get('event', {}) description = event_details.get('eventDescription', {}) metadata = event_details.get('eventMetadata', {}) # Build enriched event payload event_data = { 'eventArn': event.get('arn'), 'service': event.get('service'), 'eventTypeCode': event.get('eventTypeCode'), 'eventTypeCategory': event.get('eventTypeCategory'), 'region': event.get('region'), 'startTime': event.get('startTime').isoformat() if event.get('startTime') else None, 'endTime': event.get('endTime').isoformat() if event.get('endTime') else None, 'lastUpdatedTime': event.get('lastUpdatedTime').isoformat() if event.get('lastUpdatedTime') else None, 'statusCode': event.get('statusCode'), 'eventDescription': description.get('latestDescription', ''), 'eventMetadata': metadata, 'affectedEntities': [ { 'entityValue': entity.get('entityValue'), 'entityArn': entity.get('entityArn'), 'statusCode': entity.get('statusCode'), 'tags': entity.get('tags', {}) } for entity in affected_entities ] } return event_data
Routing to EventBridge Bus
Send formatted events to the designated EventBridge bus:
def send_to_eventbridge(event_data, bus_name='default', source='heidi.health'): """ Send event to EventBridge bus Note: Use 'heidi.health' as source for manually regenerated events to distinguish from native AWS Health events (aws.health) """ try: response = eventbridge_client.put_events( Entries=[ { 'Source': source, 'DetailType': 'AWS Health Event', 'Detail': json.dumps(event_data, default=str), 'EventBusName': bus_name, 'Resources': [event_data.get('eventArn', '')] } ] ) if response['FailedEntryCount'] > 0: print(f"Failed to send event: {response['Entries']}") return False print(f"Successfully sent event {event_data['eventArn']} to {bus_name}") return True except Exception as e: print(f"Error sending event to EventBridge: {e}") return False
Complete Regeneration Pipeline
Putting it all together:
def regenerate_and_route_events( bus_name='default', source='heidi.health', event_filter=None ): """ Main function to regenerate and route health events Args: bus_name: Target EventBridge bus name source: Event source identifier (use 'heidi.health' for regenerated events) event_filter: Optional filter for event selection """ print(f"Starting health event regeneration to bus: {bus_name}") # Retrieve all health events events = get_health_events() print(f"Retrieved {len(events)} health events") success_count = 0 failure_count = 0 for event in events: event_arn = event.get('arn') # Apply filter if provided if event_filter and not event_filter(event): continue try: # Get detailed information event_details = get_event_details(event_arn) if not event_details: print(f"No details found for {event_arn}") failure_count += 1 continue # Get affected entities affected_entities = get_affected_entities(event_arn) # Format for EventBridge event_data = format_event_for_eventbridge( event_details, affected_entities ) # Send to EventBridge if send_to_eventbridge(event_data, bus_name, source): success_count += 1 else: failure_count += 1 except Exception as e: print(f"Error processing event {event_arn}: {e}") failure_count += 1 print(f"\nRegeneration complete:") print(f" Success: {success_count}") print(f" Failures: {failure_count}") print(f" Total: {len(events)}") # Example usage if __name__ == "__main__": # Route to default bus regenerate_and_route_events(bus_name='default') # Or route to custom bus # regenerate_and_route_events(bus_name='custom-health-bus') # With filtering (e.g., only issues in last 30 days) # from datetime import timedelta # def recent_issues(event): # if event.get('eventTypeCategory') != 'issue': # return False # start_time = event.get('startTime') # return start_time and (datetime.now(start_time.tzinfo) - start_time) < timedelta(days=30) # # regenerate_and_route_events( # bus_name='default', # event_filter=recent_issues # )
Event Source Distinction
Important: Manually regenerated events use heidi.health as the source to distinguish them from native AWS Health events:
aws.health: Native events automatically generated by AWS Health serviceheidi.health: Manually regenerated/backfilled events from the HEIDI solution
This distinction allows you to:
- Create separate EventBridge rules for native vs. regenerated events
- Avoid duplicate processing if both native and regenerated events exist
- Track which events came from backfill operations vs. real-time feeds
Routing Strategies
Strategy 1: Default Bus for Simple Integration
Route events to the default EventBridge bus when:
- You have simple downstream consumers in the same account
- You want to leverage existing EventBridge rules
- You need quick setup without additional infrastructure
# Events will use 'heidi.health' as source by default regenerate_and_route_events(bus_name='default')
Strategy 2: Custom Bus for Isolation
Use a dedicated custom bus when:
- You need to isolate health events from other EventBridge traffic
- You want granular access control for health event consumers
- You're implementing a multi-tenant architecture
# First create the custom bus eventbridge_client.create_event_bus(Name='health-events-bus') # Then route events to it regenerate_and_route_events(bus_name='health-events-bus')
Strategy 3: Cross-Account Routing
For centralized monitoring across multiple accounts:
# In member accounts, route to central bus central_bus_arn = f"arn:aws:events:{region}:{central_account_id}:event-bus/central-health-bus" def send_to_central_bus(event_data): """Send event to cross-account EventBridge bus""" eventbridge_client.put_events( Entries=[ { 'Source': 'heidi.health', # Use heidi.health for regenerated events 'DetailType': 'AWS Health Event', 'Detail': json.dumps(event_data, default=str), 'EventBusName': central_bus_arn } ] )
Downstream Consumer Examples
Example 1: Lambda Function for Slack Notifications
# EventBridge Rule Pattern - matches both native and regenerated events { "source": ["aws.health", "heidi.health"], "detail-type": ["AWS Health Event"], "detail": { "eventTypeCategory": ["issue"] } } # Or match only regenerated events { "source": ["heidi.health"], "detail-type": ["AWS Health Event"], "detail": { "eventTypeCategory": ["issue"] } } # Lambda Handler def lambda_handler(event, context): health_event = event['detail'] event_source = event['source'] # 'aws.health' or 'heidi.health' # Optionally handle differently based on source source_label = "🔄 Backfilled" if event_source == "heidi.health" else "🔴 Live" message = f""" {source_label} AWS Health Alert Service: {health_event['service']} Region: {health_event['region']} Type: {health_event['eventTypeCode']} Description: {health_event['eventDescription']} Affected Resources: {len(health_event['affectedEntities'])} """ # Send to Slack send_slack_notification(message)
Example 2: SNS Topic for Email Alerts
# EventBridge Rule targeting SNS - matches regenerated events { "source": ["heidi.health"], "detail-type": ["AWS Health Event"], "detail": { "eventTypeCategory": ["scheduledChange"], "service": ["EC2", "RDS"] } } # Or match both native and regenerated events { "source": ["aws.health", "heidi.health"], "detail-type": ["AWS Health Event"], "detail": { "eventTypeCategory": ["scheduledChange"], "service": ["EC2", "RDS"] } }
Example 3: SQS Queue for Batch Processing
# EventBridge Rule targeting SQS for async processing # Useful for high-volume events or complex processing workflows
Best Practices
1. Event Filtering Only regenerate and route events that are relevant to your use case. Filter by:
- Event category (issue, scheduledChange, accountNotification)
- Service (EC2, RDS, Lambda, etc.)
- Region
- Time range
- Status code
2. Event Source Naming
Always use heidi.health as the source for manually regenerated events:
- Distinguishes backfilled events from native AWS Health events
- Allows separate processing rules for historical vs. real-time events
- Prevents confusion when both native and regenerated events exist
- Aligns with HEIDI framework conventions
3. Rate Limiting Implement exponential backoff when calling AWS Health API to avoid throttling:
import time from botocore.exceptions import ClientError def call_with_backoff(func, max_retries=5): for attempt in range(max_retries): try: return func() except ClientError as e: if e.response['Error']['Code'] == 'ThrottlingException': wait_time = (2 ** attempt) + random.uniform(0, 1) time.sleep(wait_time) else: raise raise Exception("Max retries exceeded")
4. Event Deduplication Track processed events to avoid duplicates:
import hashlib def get_event_hash(event_arn, last_updated_time): """Create unique hash for event state""" return hashlib.sha256( f"{event_arn}:{last_updated_time}".encode() ).hexdigest() # Store in DynamoDB or similar processed_events = set() def is_event_processed(event_hash): return event_hash in processed_events
5. Monitoring and Logging Implement comprehensive logging:
import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) logger.info(f"Processing event: {event_arn}") logger.error(f"Failed to send event: {error}")
6. IAM Permissions Ensure proper permissions for the execution role:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "health:DescribeEvents", "health:DescribeEventDetails", "health:DescribeAffectedEntities" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "events:PutEvents" ], "Resource": [ "arn:aws:events:*:*:event-bus/default", "arn:aws:events:*:*:event-bus/custom-health-bus" ] } ] }
Use Cases
Testing Incident Response Replay historical incidents to test and refine your incident response procedures without waiting for real events.
Onboarding New Monitoring Tools When implementing new monitoring solutions, regenerate historical events to populate dashboards and establish baselines.
Compliance and Auditing Route health events to compliance systems for audit trail maintenance and regulatory reporting.
Multi-Account Consolidation Centralize health events from multiple AWS accounts into a single monitoring hub for unified visibility.
Custom Alerting Workflows Build sophisticated alerting logic by routing events through EventBridge to Lambda functions with custom business rules.
Organizational View Support
For AWS Organizations, extend the solution to handle organization-wide events:
def regenerate_org_events(bus_name='default'): """Regenerate organization-level health events""" health_client = boto3.client('health', 'us-east-1') # Get organization events events = [] next_token = None while True: kwargs = {'filter': {}} if next_token: kwargs['nextToken'] = next_token response = health_client.describe_events_for_organization(**kwargs) events.extend(response.get('events', [])) next_token = response.get('nextToken') if not next_token: break # Process each event with affected accounts for event in events: affected_accounts = health_client.describe_affected_accounts_for_organization( eventArn=event['arn'] ) for account_id in affected_accounts.get('affectedAccounts', []): # Get account-specific details and route to bus process_org_event(event, account_id, bus_name)
Conclusion
Regenerating and routing AWS Health events to EventBridge buses provides a powerful foundation for centralized monitoring, alerting, and incident response. By programmatically retrieving historical events and formatting them for EventBridge consumption, you can:
- Integrate health events with existing monitoring workflows
- Test and validate incident response procedures
- Consolidate multi-account health data
- Build custom alerting and remediation logic
- Maintain comprehensive audit trails
This approach transforms AWS Health from a passive information source into an active component of your operational excellence strategy, enabling proactive monitoring and faster incident response across your AWS infrastructure.
Additional Resources
- Language
- English
Relevant content
AWS OFFICIALUpdated 7 months ago- asked 2 years ago
AWS OFFICIALUpdated 8 months ago