Dedupe when processing data within Step functions Map State

0

Anyone know of a clean way to 'dedupe' when processing data within an AWS Step Functions Map State? I need to scan an entire DDB table where the hash key is UserRef and range key is ConnectionId and process each user ref just once..

I'm not sure that adding user refs to an array outside of the map state would scale given the ~260KB data limitation between states.

My initial thought was to create a GSI with just the user ref as a primary key until I realised GSIs don't enforce uniqueness on PK's.

Current Stepfunctions Workflow

1개 답변
1

Change your Scan logic to only return distinct keys, its a much more efficient process and fits your requirements perfectly. The sort key can be manipulated during the scan to jump past the maximum possible sort key value to ensure the next read starts at the next item collection.

The steps are as follows:

  1. Use the scan operation to retrieve a single item from the table, with a Limit of 1.
  2. If the response contains a LastEvaluatedKey, modify the key to have the maximum possible sort key value for that partition key and use it as the ExclusiveStartKey in the next scan operation. This will continue the scan from right after the previous item collection, such that the next scan operation should return a new unique partition key value.
  3. Repeat Steps 1 and 2 until the response doesn’t contain a LastEvaluatedKey. That indicates the end of the table.
  4. Extract the partition keys from the responses.

An example using boto3 Python:

import argparse
import boto3
import decimal
import time
import boto3.dynamodb.types
from botocore.exceptions import ClientError

MAX_SORT_KEY_VALUE_S = str(256 * chr(0x10FFFF))
MAX_SORT_KEY_VALUE_N = decimal.Decimal('9.9999999999999999999999999999999999999E+125')
MAX_SORT_KEY_VALUE_B = boto3.dynamodb.types.Binary(b'\xFF' * 1024)

def print_distinct_pks(region, table_name):
    dynamodb = boto3.resource('dynamodb', region_name=region)
    table = dynamodb.Table(table_name)

    partition_key_name = table.key_schema[0]['AttributeName']
    sort_key_name = table.key_schema[1]['AttributeName']
    sort_key_type = table.attribute_definitions[1]['AttributeType']
    # Determine the maximum value of the sort key based on its type
    max_sort_key_value = ''
    if sort_key_type == 'S':
        max_sort_key_value = MAX_SORT_KEY_VALUE_S
    elif sort_key_type == 'N':
        max_sort_key_value = MAX_SORT_KEY_VALUE_N
    elif sort_key_type == 'B':
        max_sort_key_value = MAX_SORT_KEY_VALUE_B
    else:
        raise ValueError(f"Unsupported sort key type: {sort_key_type}")

    last_evaluated_key = None

    while True:
        try:
            scan_params = {
                'TableName': table_name,
                'Limit': 1,
                'ProjectionExpression': 'pk',
            }

            if last_evaluated_key:
                scan_params['ExclusiveStartKey'] = last_evaluated_key

            response = table.scan(**scan_params)
            items = response['Items']

            if len(items) > 0:
                print(items[0]['pk'])

            if 'LastEvaluatedKey' not in response:
                break

            last_key = response['LastEvaluatedKey']
            partition_key_value = last_key[partition_key_name]
            sort_key_value = last_key[sort_key_name]

            # Create a new key with the maximum value of the sort key
            new_key = {
                partition_key_name: partition_key_value,
                sort_key_name: max_sort_key_value
            }

            last_evaluated_key = new_key

        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code == 'InternalServerError' or error_code == 'ThrottlingException':
                print(f"Received an error: {error_code}, retrying...")
                time.sleep(1)
            else:
                raise

if __name__ == '__main__':
    # Define CLI arguments
    parser = argparse.ArgumentParser()
    parser.add_argument('--region', required=True, help='AWS Region')
    parser.add_argument('--table-name', required=True, help='Name of the DynamoDB table')
    args = parser.parse_args()

    # Call the function with the specified table name
    print_distinct_pks(args.region, args.table_name)
profile pictureAWS
전문가
답변함 9달 전

로그인하지 않았습니다. 로그인해야 답변을 게시할 수 있습니다.

좋은 답변은 질문에 명확하게 답하고 건설적인 피드백을 제공하며 질문자의 전문적인 성장을 장려합니다.

질문 답변하기에 대한 가이드라인

관련 콘텐츠