Skip to content

Using pagination tokens as Checkpoints with AWS API Pagination: A Resilient Pattern

5 minute read
Content level: Advanced
0

This article demonstrates how to use pagination tokens as checkpoints to create resilient, resumable data processing scripts. When working with AWS APIs that return large datasets, implementing a checkpoint pattern can save significant time and resources if your script fails or is interrupted.

AWS APIs paginate results to manage large datasets efficiently and prevent overwhelming clients with massive responses. This pagination is essential but creates challenges for long-running data processing tasks. AWS APIs often paginate results to manage large datasets. For example:

  • AWS Health API may return thousands of events across multiple accounts
  • Resource Explorer may need to scan millions of resources
  • CloudWatch Logs can contain terabytes of log data

In this article, you'll learn how to use pagination tokens as checkpoints to build resilient, restart-friendly data processing scripts. This pattern helps you retain progress, eliminate redundant reprocessing, and save significant time and compute costs if your script stops, errors, or is manually interrupted.

Important Note about Pagination Tokens:

If nextToken is returned in the API response, there are more results available. The value of nextToken is a unique pagination token for each page. Make the call again using the returned token to retrieve the next page. Keep all other arguments unchanged.

Token Expiration: Each pagination token expires after 24 hours. Using an expired pagination token will return an HTTP 400 InvalidToken error. This means your checkpoint-based resume must complete within 24 hours of the last saved checkpoint, or you'll need to restart from the beginning.

Key Components

  1. Save checkpoint after each page - Not after each item
  2. Store the pagination token - AWS provides nextToken or similar
  3. Track processed count - For monitoring progress
  4. Auto-cleanup on success - Remove checkpoint when complete

Implementation Example

Here's a simplified example from an AWS Health event backfill script:

import boto3
import json
import os
from datetime import datetime

# Checkpoint file
CHECKPOINT_FILE = "checkpoint_health_events.json"

def save_checkpoint(next_token=None, processed_count=0):
    """Save progress to checkpoint file"""
    checkpoint = {
        'next_token': next_token,
        'processed_count': processed_count,
        'timestamp': datetime.now().isoformat()
    }
    with open(CHECKPOINT_FILE, 'w') as f:
        json.dump(checkpoint, f)
    print(f"✓ Checkpoint saved: {processed_count} events processed")

def load_checkpoint():
    """Load checkpoint if exists"""
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            checkpoint = json.load(f)
        print(f"↻ Resuming from checkpoint: {checkpoint['processed_count']} events already processed")
        return checkpoint
    return None

def clear_checkpoint():
    """Remove checkpoint after successful completion"""
    if os.path.exists(CHECKPOINT_FILE):
        os.remove(CHECKPOINT_FILE)
        print("✓ Checkpoint cleared")

def process_health_events():
    """Main processing function with checkpoint support"""
    health_client = boto3.client('health', region_name='us-east-1')
    
    # Load checkpoint if exists
    checkpoint = load_checkpoint()
    next_token = checkpoint.get('next_token') if checkpoint else None
    total_processed = checkpoint.get('processed_count', 0) if checkpoint else 0
    
    # Process events page by page
    while True:
        # Fetch one page of events
        kwargs = {'maxResults': 100}
        if next_token:
            kwargs['nextToken'] = next_token
        
        response = health_client.describe_events_for_organization(
            filter={},
            **kwargs
        )
        
        events = response.get('events', [])
        if not events:
            break
        
        # Process all events in this page
        for event in events:
            # Your processing logic here
            process_event(event)
            total_processed += 1
        
        # Get next page token
        new_next_token = response.get('nextToken')
        
        # Save checkpoint after processing this page
        save_checkpoint(new_next_token, total_processed)
        
        # Stop if no more pages
        if not new_next_token:
            break
        
        next_token = new_next_token
    
    print(f"✓ Complete: {total_processed} events processed")
    clear_checkpoint()

def process_event(event):
    """Process a single event"""
    # Your business logic here
    print(f"Processing: {event['arn']}")

if __name__ == "__main__":
    process_health_events()

How It Works

First Run

Page 1: Process 100 events → Save checkpoint (next_token=abc123, count=100)
Page 2: Process 100 events → Save checkpoint (next_token=def456, count=200)
Page 3: Process 50 events  → Save checkpoint (next_token=None, count=250)
Complete! → Delete checkpoint

After Interruption (crashed at Page 2)

Load checkpoint (next_token=abc123, count=100)
Skip to Page 2: Process 100 events → Save checkpoint (next_token=def456, count=200)
Page 3: Process 50 events → Save checkpoint (next_token=None, count=250)
Complete! → Delete checkpoint

Best Practices

1. Checkpoint Frequency

DO: Save after each page

# Good - saves after processing a page
for page in paginator.paginate():
    process_page(page)
    save_checkpoint(page.next_token)

DON'T: Save after each item

# Bad - too many checkpoint writes
for item in all_items:
    process_item(item)
    save_checkpoint()  # Too frequent!

2. Checkpoint Content

Include essential information:

checkpoint = {
    'next_token': 'abc123...',      # Required for resuming
    'processed_count': 1500,         # For progress monitoring
    'timestamp': '2024-11-14T...',   # For debugging
    'last_item_id': 'item-123'       # Optional: for verification
}

3. Error Handling

Save checkpoint even on errors:

try:
    process_page(events)
    save_checkpoint(next_token, count)
except Exception as e:
    logger.error(f"Error: {e}")
    save_checkpoint(next_token, count)  # Save progress before failing
    raise

4. Idempotency

Ensure reprocessing the same page is safe:

# Track processed IDs to avoid duplicates
processed_ids = set(checkpoint.get('processed_ids', []))

for item in page:
    if item['id'] not in processed_ids:
        process_item(item)
        processed_ids.add(item['id'])

Conclusion

Implementing a checkpoint pattern with AWS API pagination tokens is a simple yet powerful technique for building resilient data processing scripts. By saving progress after each page, you can:

  • Resume from interruptions without data loss
  • Save time and API calls
  • Build more reliable automation

The pattern is straightforward to implement and pays dividends in production environments where long-running scripts are common.

Additional Resources