Skip to content

Python code deleted wrong snapshots and tried to copy wrong snapshots

0

Hi re:Post!,

Thank you for all your help with getting my python code to run/work with my lambda function, "Lambda-Function-RDS-Snapshot-Management".

With the assistance of Riku, Manvitha, and the AI support bot - we were able to test the lambda function and move past some errors I was encountering. However, the last test run shows that the python code is not running as I expected.

I am trying to get the code to delete (2) of the oldest manual generated snapshots from Oregon us-west-2 named,

"copy-rds-db-demo-(YYYY-MM-DD)" and

"copy-rds-db-drtest20240314-(YYYY-MM-DD)"

Please note the "YYYY-MM-DD" for both is actually the date that copy was made and it changes every new day.

Then I need it to copy (2) of the latest system generated snapshots from Oregon us-west-2 named for that same set:

rds:db-demo-2025-07-30-05-07-us-east-2

rds:db-drtest20240314-2025-07-30-05-05

But at the runtime, the code deleted (2) oldest snapshots but not those associated with db-demo or db-drtest:

Deleting snapshot: *copy-rds-db-amgen-2025-07-24*
Successfully deleted snapshot: *copy-rds-db-amgen-2025-07-24*
Deleting snapshot: *copy-rds-db-alpine-2025-07-24*
Successfully deleted snapshot: *copy-rds-db-alpine-2025-07-24*

It should have deleted the the (2) oldest snashots that specfically have the string "db-demo" and "db-drtest20240314" as part of the name.

Then it tried to copy over the wrong snapshots. It should have copied the latest (2) snapshots that specifically have the string "db-demo" and "db-drtest20240314" as part of the name.

Below is the runtime output:

Status: Succeeded
Test Event Name: TestEventLambdaSnap2

Response:
{
  "statusCode": 200,
  "body": "Successfully processed 2 snapshots"
}

Function Logs:
START RequestId: 637572f5-219d-49df-820e-cd4ca140251a Version: $LATEST
Deleting snapshot: copy-rds-db-amgen-2025-07-24
Successfully deleted snapshot: copy-rds-db-amgen-2025-07-24
Deleting snapshot: copy-rds-db-alpine-2025-07-24
Successfully deleted snapshot: copy-rds-db-alpine-2025-07-24
Copying snapshot rds:db-biocryst-2025-07-30-05-06-us-east-2 to copy-rds-db-demo-2025-07-30
Error copying snapshot rds:db-biocryst-2025-07-30-05-06-us-east-2: An error occurred (KMSKeyNotAccessibleFault) when calling the CopyDBSnapshot operation: The source snapshot KMS key [arn:aws:kms:us-west-2:910286192445:key/mrk-6dae29119b094afaa3b9ed67c781ab3c] does not exist, is not enabled or you do not have permissions to access it. 
Copying snapshot rds:db-aaurology-2025-07-30-05-06-us-east-2 to copy-rds-db-drtest20240314-2025-07-30
Error copying snapshot rds:db-aaurology-2025-07-30-05-06-us-east-2: An error occurred (KMSKeyNotAccessibleFault) when calling the CopyDBSnapshot operation: The source snapshot KMS key [arn:aws:kms:us-west-2:910286192445:key/mrk-6dae29119b094afaa3b9ed67c781ab3c] does not exist, is not enabled or you do not have permissions to access it. 
END RequestId: 637572f5-219d-49df-820e-cd4ca140251a
REPORT RequestId: 637572f5-219d-49df-820e-cd4ca140251a	Duration: 5448.53 ms	Billed Duration: 5449 ms	Memory Size: 128 MB	Max Memory Used: 90 MB	Init Duration: 295.68 ms

Request ID: 637572f5-219d-49df-820e-cd4ca140251a

Here is the python code:

import boto3
import os
from datetime import datetime

# Define regions
# SOURCE_REGION = "us-west-1"  # N. California
SOURCE_REGION = "us-west-2"  # Oregon
DEST_REGION = "us-west-2"    # Oregon
NUM_SNAPSHOTS = 2            # Number of snapshots to process

def lambda_handler(event, context):
# Create RDS clients for both regions
    source_rds = boto3.client('rds', region_name=SOURCE_REGION)
    dest_rds = boto3.client('rds', region_name=DEST_REGION)

# Step 1: Delete oldest manual snapshots in destination region
    delete_oldest_snapshots(dest_rds, NUM_SNAPSHOTS)

# Step 2: Copy latest system snapshots from source to destination
    copy_latest_snapshots(source_rds, dest_rds, NUM_SNAPSHOTS)

    return {
        'statusCode': 200,
        'body': f'Successfully processed {NUM_SNAPSHOTS} snapshots'
    }

def delete_oldest_snapshots(dest_rds, count):
# Get all manual snapshots in destination region
    response = dest_rds.describe_db_snapshots(SnapshotType='manual')

# Sort snapshots by creation time (oldest first)
    snapshots = sorted(response['DBSnapshots'], key=lambda s: s['SnapshotCreateTime'])

# Delete the oldest 'count' snapshots
    for i, snapshot in enumerate(snapshots):
        if i >= count:
            break

        snapshot_id = snapshot['DBSnapshotIdentifier']
        print(f"Deleting snapshot: {snapshot_id}")

        try:
            dest_rds.delete_db_snapshot(DBSnapshotIdentifier=snapshot_id)
            print(f"Successfully deleted snapshot: {snapshot_id}")
        except Exception as e:
            print(f"Error deleting snapshot {snapshot_id}: {str(e)}")

def copy_latest_snapshots(source_rds, dest_rds, count):
# Get all automated snapshots in source region
    response = source_rds.describe_db_snapshots(SnapshotType='automated')

    # Sort snapshots by creation time (newest first)
    snapshots = sorted(response['DBSnapshots'], 
        key=lambda s: s['SnapshotCreateTime'], 
        reverse=True
    )

    # Get today's date for naming
    today = datetime.now().strftime("%Y-%m-%d")

    # Define the database names based on your example
    #  "amgen", "alpine"
    db_names = [
        "demo", "drtest20240314"
    ]

    # Copy the newest 'count' snapshots
    for i, snapshot in enumerate(snapshots[:count]):
        if i >= count:
            break

        source_snapshot_id = snapshot['DBSnapshotIdentifier']
        db_instance_id = snapshot['DBInstanceIdentifier']

        # Use the corresponding db name from the list if available
        db_name = db_names[i] if i < len(db_names) else db_instance_id

        # Create target snapshot name
        target_snapshot_id = f"copy-rds-db-{db_name}-{today}"

        print(f"Copying snapshot {source_snapshot_id} to {target_snapshot_id}")

        try:
        # Create ARN for source snapshot
            source_arn = snapshot['DBSnapshotArn']

        # Copy the snapshot
            dest_rds.copy_db_snapshot(
                SourceDBSnapshotIdentifier=source_arn,
                TargetDBSnapshotIdentifier=target_snapshot_id,
                SourceRegion=SOURCE_REGION
            )
            print(f"Successfully initiated copy of {source_snapshot_id} to {target_snapshot_id}")
        except Exception as e:
            print(f"Error copying snapshot {source_snapshot_id}: {str(e)}")

Among other edits/changes needed, I think I need to replace:

source_arn = snapshot['DBSnapshotArn']

with

source_arn = f"arn:aws:rds:{SOURCE_REGION}:{snapshot['DBSnapshotArn'].split(':')[4]}:{source_snapshot_id}"

So that it picks the correct source snapshot.

Please advise!

Thank you for your time and help!

Best Regards,

Donald

asked 8 months ago111 views
2 Answers
1
Accepted Answer

Hello.

The "describe_db_snapshots" API allows you to specify a DBInstanceIdentifier as shown below.
So, why not try filtering the snapshots to delete by specifying a DBInstanceIdentifier?
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/describe_db_snapshots.html

def delete_oldest_snapshots(dest_rds, count):

    db_names = [
        "demo", "drtest20240314"
    ]

# Get all manual snapshots in destination region
    for db_name in db_names:
        response = dest_rds.describe_db_snapshots(
            DBInstanceIdentifier=db_name,
            SnapshotType='manual')

    # Sort snapshots by creation time (oldest first)
        snapshots = sorted(response['DBSnapshots'], key=lambda s: s['SnapshotCreateTime'])

    # Delete the oldest 'count' snapshots
        for i, snapshot in enumerate(snapshots):
            if i >= count:
                break

            snapshot_id = snapshot['DBSnapshotIdentifier']
            print(f"Deleting snapshot: {snapshot_id}")

            try:
                dest_rds.delete_db_snapshot(DBSnapshotIdentifier=snapshot_id)
                print(f"Successfully deleted snapshot: {snapshot_id}")
            except Exception as e:
                print(f"Error deleting snapshot {snapshot_id}: {str(e)}")

Also, when copying a snapshot, please specify the KMS key for the destination region.
This can be specified using the KmsKeyId parameter of the "copy_db_snapshot" API.
The code has been modified so that when copying snapshots, only the DBs listed in the "db_names" list are included.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/rds/client/copy_db_cluster_snapshot.html

def copy_latest_snapshots(source_rds, dest_rds, count):
    # Define the database names based on your example
    #  "amgen", "alpine"
    db_names = [
        "demo", "drtest20240314"
    ]

    for db_name in db_names:
    # Get all automated snapshots in source region
        response = source_rds.describe_db_snapshots(
            DBInstanceIdentifier=db_name,
            SnapshotType='automated'
        )

        # Sort snapshots by creation time (newest first)
        snapshots = sorted(response['DBSnapshots'], 
            key=lambda s: s['SnapshotCreateTime'], 
            reverse=True
        )

        # Get today's date for naming
        today = datetime.now().strftime("%Y-%m-%d")

        # Copy the newest 'count' snapshots
        for i, snapshot in enumerate(snapshots[:count]):
            if i >= count:
                break

            source_snapshot_id = snapshot['DBSnapshotIdentifier']

            # Create target snapshot name
            target_snapshot_id = f"copy-rds-db-{db_name}-{today}"

            print(f"Copying snapshot {source_snapshot_id} to {target_snapshot_id}")

            try:
            # Create ARN for source snapshot
                source_arn = snapshot['DBSnapshotArn']

            # Copy the snapshot
                dest_rds.copy_db_snapshot(
                    SourceDBSnapshotIdentifier=source_arn,
                    TargetDBSnapshotIdentifier=target_snapshot_id,
                    KmsKeyId="arn:aws:kms:us-east-2:123456789012:key/12345678-asdf-ghjk-zxcv-123456789012",
                    SourceRegion=SOURCE_REGION
                )
                print(f"Successfully initiated copy of {source_snapshot_id} to {target_snapshot_id}")
            except Exception as e:
                print(f"Error copying snapshot {source_snapshot_id}: {str(e)}")

The whole code looks like this:

import boto3
import os
from datetime import datetime

# Define regions
# SOURCE_REGION = "us-west-1"  # N. California
SOURCE_REGION = "us-west-2"  # Oregon
DEST_REGION = "us-east-2"    # Oregon
NUM_SNAPSHOTS = 2            # Number of snapshots to process

def lambda_handler(event, context):
# Create RDS clients for both regions
    source_rds = boto3.client('rds', region_name=SOURCE_REGION)
    dest_rds = boto3.client('rds', region_name=DEST_REGION)

# Step 1: Delete oldest manual snapshots in destination region
    delete_oldest_snapshots(dest_rds, NUM_SNAPSHOTS)

# Step 2: Copy latest system snapshots from source to destination
    copy_latest_snapshots(source_rds, dest_rds, NUM_SNAPSHOTS)

    return {
        'statusCode': 200,
        'body': f'Successfully processed {NUM_SNAPSHOTS} snapshots'
    }

def delete_oldest_snapshots(dest_rds, count):

    db_names = [
        "demo", "drtest20240314"
    ]

# Get all manual snapshots in destination region
    for db_name in db_names:
        response = dest_rds.describe_db_snapshots(
            DBInstanceIdentifier=db_name,
            SnapshotType='manual'
        )

    # Sort snapshots by creation time (oldest first)
        snapshots = sorted(response['DBSnapshots'], key=lambda s: s['SnapshotCreateTime'])

    # Delete the oldest 'count' snapshots
        for i, snapshot in enumerate(snapshots):
            if i >= count:
                break

            snapshot_id = snapshot['DBSnapshotIdentifier']
            print(f"Deleting snapshot: {snapshot_id}")

            try:
                dest_rds.delete_db_snapshot(DBSnapshotIdentifier=snapshot_id)
                print(f"Successfully deleted snapshot: {snapshot_id}")
            except Exception as e:
                print(f"Error deleting snapshot {snapshot_id}: {str(e)}")

def copy_latest_snapshots(source_rds, dest_rds, count):
    # Define the database names based on your example
    #  "amgen", "alpine"
    db_names = [
        "demo", "drtest20240314"
    ]

    for db_name in db_names:
    # Get all automated snapshots in source region
        response = source_rds.describe_db_snapshots(
            DBInstanceIdentifier=db_name,
            SnapshotType='automated'
        )

        # Sort snapshots by creation time (newest first)
        snapshots = sorted(response['DBSnapshots'], 
            key=lambda s: s['SnapshotCreateTime'], 
            reverse=True
        )

        # Get today's date for naming
        today = datetime.now().strftime("%Y-%m-%d")

        # Copy the newest 'count' snapshots
        for i, snapshot in enumerate(snapshots[:count]):
            if i >= count:
                break

            source_snapshot_id = snapshot['DBSnapshotIdentifier']

            # Create target snapshot name
            target_snapshot_id = f"copy-rds-db-{db_name}-{today}"

            print(f"Copying snapshot {source_snapshot_id} to {target_snapshot_id}")

            try:
            # Create ARN for source snapshot
                source_arn = snapshot['DBSnapshotArn']

            # Copy the snapshot
                dest_rds.copy_db_snapshot(
                    SourceDBSnapshotIdentifier=source_arn,
                    TargetDBSnapshotIdentifier=target_snapshot_id,
                    KmsKeyId="arn:aws:kms:us-east-2:123456789012:key/12345678-asdf-ghjk-zxcv-123456789012",
                    SourceRegion=SOURCE_REGION
                )
                print(f"Successfully initiated copy of {source_snapshot_id} to {target_snapshot_id}")
            except Exception as e:
                print(f"Error copying snapshot {source_snapshot_id}: {str(e)}")
EXPERT
answered 8 months ago
  • My apologies to Riku and the re:Post community for my lack of communication lately. I took some time off and also have been focused on other projects. I will review latest response today!

  • Awesome! It mostly worked, we just have KMS key IAM issue with lambda role that needs to get sorted out. But the python code deleted the latest two per the db_names as expected but errored on the copy over part because of the KMS key issue, which I'll set up a new re:Post to review. Thank you very much Riku! Best Regards, Donald

-1

The issue with your code is that it's not filtering snapshots based on the database names you want to target. Let me explain the problems and provide solutions:

  1. Deletion Issue: Your delete_oldest_snapshots function is simply taking the oldest snapshots regardless of their database name. You need to filter snapshots to only include those with "db-demo" and "db-drtest20240314" in their names.

  2. Copy Issue: Similarly, your copy_latest_snapshots function isn't filtering for specific database instances before copying.

Here's how to fix these issues:

For the deletion function, modify it to filter snapshots first:

def delete_oldest_snapshots(dest_rds, count):
# Get all manual snapshots in destination region
response = dest_rds.describe_db_snapshots(SnapshotType='manual')

# Filter snapshots to only include those with the target database names
target_dbs = ["db-demo", "db-drtest20240314"]
filtered_snapshots = []

for snapshot in response['DBSnapshots']:
snapshot_id = snapshot['DBSnapshotIdentifier']
# Check if snapshot name contains any of our target DB names
if any(target_db in snapshot_id for target_db in target_dbs):
filtered_snapshots.append(snapshot)

# Sort filtered snapshots by creation time (oldest first)
snapshots = sorted(filtered_snapshots, key=lambda s: s['SnapshotCreateTime'])

# Delete the oldest snapshots for each target DB
deleted_counts = {db: 0 for db in target_dbs}

for snapshot in snapshots:
snapshot_id = snapshot['DBSnapshotIdentifier']

# Determine which target DB this snapshot belongs to
for target_db in target_dbs:
if target_db in snapshot_id and deleted_counts[target_db] < count/len(target_dbs):
print(f"Deleting snapshot: {snapshot_id}")

try:
dest_rds.delete_db_snapshot(DBSnapshotIdentifier=snapshot_id)
print(f"Successfully deleted snapshot: {snapshot_id}")
deleted_counts[target_db] += 1
break
except Exception as e:
print(f"Error deleting snapshot {snapshot_id}: {str(e)}")

For the copy function, modify it to filter for specific database instances:

def copy_latest_snapshots(source_rds, dest_rds, count):
# Get all automated snapshots in source region
response = source_rds.describe_db_snapshots(SnapshotType='automated')

# Define the target database instance names
target_instances = ["db-demo", "db-drtest20240314"]

# Get today's date for naming
today = datetime.now().strftime("%Y-%m-%d")

# Process each target instance
for db_name in target_instances:
# Filter snapshots for this specific DB instance
db_snapshots = [s for s in response['DBSnapshots'] if db_name in s['DBInstanceIdentifier']]

# Sort by creation time (newest first)
db_snapshots = sorted(db_snapshots, key=lambda s: s['SnapshotCreateTime'], reverse=True)

# Copy the newest snapshot if available
if db_snapshots:
snapshot = db_snapshots[0]  # Get the newest snapshot
source_snapshot_id = snapshot['DBSnapshotIdentifier']

# Create target snapshot name
target_snapshot_id = f"copy-rds-db-{db_name}-{today}"

print(f"Copying snapshot {source_snapshot_id} to {target_snapshot_id}")

try:
# Use the full ARN for the source snapshot
source_arn = snapshot['DBSnapshotArn']

# Copy the snapshot
dest_rds.copy_db_snapshot(
SourceDBSnapshotIdentifier=source_arn,
TargetDBSnapshotIdentifier=target_snapshot_id,
SourceRegion=SOURCE_REGION
)
print(f"Successfully initiated copy of {source_snapshot_id} to {target_snapshot_id}")
except Exception as e:
print(f"Error copying snapshot {source_snapshot_id}: {str(e)}")

Regarding the KMS key error, this indicates that the snapshots you're trying to copy are encrypted with a KMS key that your Lambda function doesn't have access to. You'll need to:

  1. Ensure your Lambda execution role has permissions to use the KMS key in the source region
  2. If copying between regions, you may need to specify a KMS key in the destination region:
dest_rds.copy_db_snapshot(
SourceDBSnapshotIdentifier=source_arn,
TargetDBSnapshotIdentifier=target_snapshot_id,
SourceRegion=SOURCE_REGION,
KmsKeyId='destination-kms-key-id'  # Add this if needed
)

These changes should help your Lambda function correctly identify and process the specific snapshots you're targeting.
Sources
Use DescribeDBSnapshots with an AWS SDK or CLI - AWS SDK Code Examples
What is the difference between RDS snapshots, RDS Automated Backups, and RDS Auto Replicated Backups? | AWS re:Post
copy_db_cluster_snapshot - Boto3 1.37.31 documentation

answered 8 months ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.