- Newest
- Most votes
- Most comments
Hello.
The "describe_db_snapshots" API allows you to specify a DBInstanceIdentifier as shown below.
So, why not try filtering the snapshots to delete by specifying a DBInstanceIdentifier?
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/describe_db_snapshots.html
def delete_oldest_snapshots(dest_rds, count):
db_names = [
"demo", "drtest20240314"
]
# Get all manual snapshots in destination region
for db_name in db_names:
response = dest_rds.describe_db_snapshots(
DBInstanceIdentifier=db_name,
SnapshotType='manual')
# Sort snapshots by creation time (oldest first)
snapshots = sorted(response['DBSnapshots'], key=lambda s: s['SnapshotCreateTime'])
# Delete the oldest 'count' snapshots
for i, snapshot in enumerate(snapshots):
if i >= count:
break
snapshot_id = snapshot['DBSnapshotIdentifier']
print(f"Deleting snapshot: {snapshot_id}")
try:
dest_rds.delete_db_snapshot(DBSnapshotIdentifier=snapshot_id)
print(f"Successfully deleted snapshot: {snapshot_id}")
except Exception as e:
print(f"Error deleting snapshot {snapshot_id}: {str(e)}")
Also, when copying a snapshot, please specify the KMS key for the destination region.
This can be specified using the KmsKeyId parameter of the "copy_db_snapshot" API.
The code has been modified so that when copying snapshots, only the DBs listed in the "db_names" list are included.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/copy_db_cluster_snapshot.html
def copy_latest_snapshots(source_rds, dest_rds, count):
# Define the database names based on your example
# "amgen", "alpine"
db_names = [
"demo", "drtest20240314"
]
for db_name in db_names:
# Get all automated snapshots in source region
response = source_rds.describe_db_snapshots(
DBInstanceIdentifier=db_name,
SnapshotType='automated'
)
# Sort snapshots by creation time (newest first)
snapshots = sorted(response['DBSnapshots'],
key=lambda s: s['SnapshotCreateTime'],
reverse=True
)
# Get today's date for naming
today = datetime.now().strftime("%Y-%m-%d")
# Copy the newest 'count' snapshots
for i, snapshot in enumerate(snapshots[:count]):
if i >= count:
break
source_snapshot_id = snapshot['DBSnapshotIdentifier']
# Create target snapshot name
target_snapshot_id = f"copy-rds-db-{db_name}-{today}"
print(f"Copying snapshot {source_snapshot_id} to {target_snapshot_id}")
try:
# Create ARN for source snapshot
source_arn = snapshot['DBSnapshotArn']
# Copy the snapshot
dest_rds.copy_db_snapshot(
SourceDBSnapshotIdentifier=source_arn,
TargetDBSnapshotIdentifier=target_snapshot_id,
KmsKeyId="arn:aws:kms:us-east-2:123456789012:key/12345678-asdf-ghjk-zxcv-123456789012",
SourceRegion=SOURCE_REGION
)
print(f"Successfully initiated copy of {source_snapshot_id} to {target_snapshot_id}")
except Exception as e:
print(f"Error copying snapshot {source_snapshot_id}: {str(e)}")
The whole code looks like this:
import boto3
import os
from datetime import datetime
# Define regions
# SOURCE_REGION = "us-west-1" # N. California
SOURCE_REGION = "us-west-2" # Oregon
DEST_REGION = "us-east-2" # Oregon
NUM_SNAPSHOTS = 2 # Number of snapshots to process
def lambda_handler(event, context):
# Create RDS clients for both regions
source_rds = boto3.client('rds', region_name=SOURCE_REGION)
dest_rds = boto3.client('rds', region_name=DEST_REGION)
# Step 1: Delete oldest manual snapshots in destination region
delete_oldest_snapshots(dest_rds, NUM_SNAPSHOTS)
# Step 2: Copy latest system snapshots from source to destination
copy_latest_snapshots(source_rds, dest_rds, NUM_SNAPSHOTS)
return {
'statusCode': 200,
'body': f'Successfully processed {NUM_SNAPSHOTS} snapshots'
}
def delete_oldest_snapshots(dest_rds, count):
db_names = [
"demo", "drtest20240314"
]
# Get all manual snapshots in destination region
for db_name in db_names:
response = dest_rds.describe_db_snapshots(
DBInstanceIdentifier=db_name,
SnapshotType='manual'
)
# Sort snapshots by creation time (oldest first)
snapshots = sorted(response['DBSnapshots'], key=lambda s: s['SnapshotCreateTime'])
# Delete the oldest 'count' snapshots
for i, snapshot in enumerate(snapshots):
if i >= count:
break
snapshot_id = snapshot['DBSnapshotIdentifier']
print(f"Deleting snapshot: {snapshot_id}")
try:
dest_rds.delete_db_snapshot(DBSnapshotIdentifier=snapshot_id)
print(f"Successfully deleted snapshot: {snapshot_id}")
except Exception as e:
print(f"Error deleting snapshot {snapshot_id}: {str(e)}")
def copy_latest_snapshots(source_rds, dest_rds, count):
# Define the database names based on your example
# "amgen", "alpine"
db_names = [
"demo", "drtest20240314"
]
for db_name in db_names:
# Get all automated snapshots in source region
response = source_rds.describe_db_snapshots(
DBInstanceIdentifier=db_name,
SnapshotType='automated'
)
# Sort snapshots by creation time (newest first)
snapshots = sorted(response['DBSnapshots'],
key=lambda s: s['SnapshotCreateTime'],
reverse=True
)
# Get today's date for naming
today = datetime.now().strftime("%Y-%m-%d")
# Copy the newest 'count' snapshots
for i, snapshot in enumerate(snapshots[:count]):
if i >= count:
break
source_snapshot_id = snapshot['DBSnapshotIdentifier']
# Create target snapshot name
target_snapshot_id = f"copy-rds-db-{db_name}-{today}"
print(f"Copying snapshot {source_snapshot_id} to {target_snapshot_id}")
try:
# Create ARN for source snapshot
source_arn = snapshot['DBSnapshotArn']
# Copy the snapshot
dest_rds.copy_db_snapshot(
SourceDBSnapshotIdentifier=source_arn,
TargetDBSnapshotIdentifier=target_snapshot_id,
KmsKeyId="arn:aws:kms:us-east-2:123456789012:key/12345678-asdf-ghjk-zxcv-123456789012",
SourceRegion=SOURCE_REGION
)
print(f"Successfully initiated copy of {source_snapshot_id} to {target_snapshot_id}")
except Exception as e:
print(f"Error copying snapshot {source_snapshot_id}: {str(e)}")
The issue with your code is that it's not filtering snapshots based on the database names you want to target. Let me explain the problems and provide solutions:
-
Deletion Issue: Your
delete_oldest_snapshotsfunction is simply taking the oldest snapshots regardless of their database name. You need to filter snapshots to only include those with "db-demo" and "db-drtest20240314" in their names. -
Copy Issue: Similarly, your
copy_latest_snapshotsfunction isn't filtering for specific database instances before copying.
Here's how to fix these issues:
For the deletion function, modify it to filter snapshots first:
def delete_oldest_snapshots(dest_rds, count): # Get all manual snapshots in destination region response = dest_rds.describe_db_snapshots(SnapshotType='manual') # Filter snapshots to only include those with the target database names target_dbs = ["db-demo", "db-drtest20240314"] filtered_snapshots = [] for snapshot in response['DBSnapshots']: snapshot_id = snapshot['DBSnapshotIdentifier'] # Check if snapshot name contains any of our target DB names if any(target_db in snapshot_id for target_db in target_dbs): filtered_snapshots.append(snapshot) # Sort filtered snapshots by creation time (oldest first) snapshots = sorted(filtered_snapshots, key=lambda s: s['SnapshotCreateTime']) # Delete the oldest snapshots for each target DB deleted_counts = {db: 0 for db in target_dbs} for snapshot in snapshots: snapshot_id = snapshot['DBSnapshotIdentifier'] # Determine which target DB this snapshot belongs to for target_db in target_dbs: if target_db in snapshot_id and deleted_counts[target_db] < count/len(target_dbs): print(f"Deleting snapshot: {snapshot_id}") try: dest_rds.delete_db_snapshot(DBSnapshotIdentifier=snapshot_id) print(f"Successfully deleted snapshot: {snapshot_id}") deleted_counts[target_db] += 1 break except Exception as e: print(f"Error deleting snapshot {snapshot_id}: {str(e)}")
For the copy function, modify it to filter for specific database instances:
def copy_latest_snapshots(source_rds, dest_rds, count): # Get all automated snapshots in source region response = source_rds.describe_db_snapshots(SnapshotType='automated') # Define the target database instance names target_instances = ["db-demo", "db-drtest20240314"] # Get today's date for naming today = datetime.now().strftime("%Y-%m-%d") # Process each target instance for db_name in target_instances: # Filter snapshots for this specific DB instance db_snapshots = [s for s in response['DBSnapshots'] if db_name in s['DBInstanceIdentifier']] # Sort by creation time (newest first) db_snapshots = sorted(db_snapshots, key=lambda s: s['SnapshotCreateTime'], reverse=True) # Copy the newest snapshot if available if db_snapshots: snapshot = db_snapshots[0] # Get the newest snapshot source_snapshot_id = snapshot['DBSnapshotIdentifier'] # Create target snapshot name target_snapshot_id = f"copy-rds-db-{db_name}-{today}" print(f"Copying snapshot {source_snapshot_id} to {target_snapshot_id}") try: # Use the full ARN for the source snapshot source_arn = snapshot['DBSnapshotArn'] # Copy the snapshot dest_rds.copy_db_snapshot( SourceDBSnapshotIdentifier=source_arn, TargetDBSnapshotIdentifier=target_snapshot_id, SourceRegion=SOURCE_REGION ) print(f"Successfully initiated copy of {source_snapshot_id} to {target_snapshot_id}") except Exception as e: print(f"Error copying snapshot {source_snapshot_id}: {str(e)}")
Regarding the KMS key error, this indicates that the snapshots you're trying to copy are encrypted with a KMS key that your Lambda function doesn't have access to. You'll need to:
- Ensure your Lambda execution role has permissions to use the KMS key in the source region
- If copying between regions, you may need to specify a KMS key in the destination region:
dest_rds.copy_db_snapshot( SourceDBSnapshotIdentifier=source_arn, TargetDBSnapshotIdentifier=target_snapshot_id, SourceRegion=SOURCE_REGION, KmsKeyId='destination-kms-key-id' # Add this if needed )
These changes should help your Lambda function correctly identify and process the specific snapshots you're targeting.
Sources
Use DescribeDBSnapshots with an AWS SDK or CLI - AWS SDK Code Examples
What is the difference between RDS snapshots, RDS Automated Backups, and RDS Auto Replicated Backups? | AWS re:Post
copy_db_cluster_snapshot - Boto3 1.37.31 documentation
Relevant content
- asked 8 months ago
- asked 8 months ago
- AWS OFFICIALUpdated 3 years ago

My apologies to Riku and the re:Post community for my lack of communication lately. I took some time off and also have been focused on other projects. I will review latest response today!
Awesome! It mostly worked, we just have KMS key IAM issue with lambda role that needs to get sorted out. But the python code deleted the latest two per the db_names as expected but errored on the copy over part because of the KMS key issue, which I'll set up a new re:Post to review. Thank you very much Riku! Best Regards, Donald