Dedupe when processing data within Step functions Map State

0

Anyone know of a clean way to 'dedupe' when processing data within an AWS Step Functions Map State? I need to scan an entire DDB table where the hash key is UserRef and range key is ConnectionId and process each user ref just once..

I'm not sure that adding user refs to an array outside of the map state would scale given the ~260KB data limitation between states.

My initial thought was to create a GSI with just the user ref as a primary key until I realised GSIs don't enforce uniqueness on PK's.

Current Stepfunctions Workflow

asked a year ago264 views
1 Answer
1

Change your Scan logic to only return distinct keys, its a much more efficient process and fits your requirements perfectly. The sort key can be manipulated during the scan to jump past the maximum possible sort key value to ensure the next read starts at the next item collection.

The steps are as follows:

  1. Use the scan operation to retrieve a single item from the table, with a Limit of 1.
  2. If the response contains a LastEvaluatedKey, modify the key to have the maximum possible sort key value for that partition key and use it as the ExclusiveStartKey in the next scan operation. This will continue the scan from right after the previous item collection, such that the next scan operation should return a new unique partition key value.
  3. Repeat Steps 1 and 2 until the response doesn’t contain a LastEvaluatedKey. That indicates the end of the table.
  4. Extract the partition keys from the responses.

An example using boto3 Python:

import argparse
import boto3
import decimal
import time
import boto3.dynamodb.types
from botocore.exceptions import ClientError

MAX_SORT_KEY_VALUE_S = str(256 * chr(0x10FFFF))
MAX_SORT_KEY_VALUE_N = decimal.Decimal('9.9999999999999999999999999999999999999E+125')
MAX_SORT_KEY_VALUE_B = boto3.dynamodb.types.Binary(b'\xFF' * 1024)

def print_distinct_pks(region, table_name):
    dynamodb = boto3.resource('dynamodb', region_name=region)
    table = dynamodb.Table(table_name)

    partition_key_name = table.key_schema[0]['AttributeName']
    sort_key_name = table.key_schema[1]['AttributeName']
    sort_key_type = table.attribute_definitions[1]['AttributeType']
    # Determine the maximum value of the sort key based on its type
    max_sort_key_value = ''
    if sort_key_type == 'S':
        max_sort_key_value = MAX_SORT_KEY_VALUE_S
    elif sort_key_type == 'N':
        max_sort_key_value = MAX_SORT_KEY_VALUE_N
    elif sort_key_type == 'B':
        max_sort_key_value = MAX_SORT_KEY_VALUE_B
    else:
        raise ValueError(f"Unsupported sort key type: {sort_key_type}")

    last_evaluated_key = None

    while True:
        try:
            scan_params = {
                'TableName': table_name,
                'Limit': 1,
                'ProjectionExpression': 'pk',
            }

            if last_evaluated_key:
                scan_params['ExclusiveStartKey'] = last_evaluated_key

            response = table.scan(**scan_params)
            items = response['Items']

            if len(items) > 0:
                print(items[0]['pk'])

            if 'LastEvaluatedKey' not in response:
                break

            last_key = response['LastEvaluatedKey']
            partition_key_value = last_key[partition_key_name]
            sort_key_value = last_key[sort_key_name]

            # Create a new key with the maximum value of the sort key
            new_key = {
                partition_key_name: partition_key_value,
                sort_key_name: max_sort_key_value
            }

            last_evaluated_key = new_key

        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code == 'InternalServerError' or error_code == 'ThrottlingException':
                print(f"Received an error: {error_code}, retrying...")
                time.sleep(1)
            else:
                raise

if __name__ == '__main__':
    # Define CLI arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--region', required=True, help='AWS Region')
    parser.add_argument('--table-name', required=True, help='Name of the DynamoDB table')
    args = parser.parse_args()

    # Call the function with the specified table name
    print_distinct_pks(args.region, args.table_name)
profile pictureAWS
EXPERT
answered a year ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions