Lambda too slow to query DynamoDB with recursive promises - javascript, sharding

0

I'm trying to fetch the most recently created items in a dynamodb table. For that I'm using a pattern described by Alex Debrie in his dynamoddb book plus sharding. When a new item is created in the table it also feeds a GSI with a GSIPK that is made out of the item creation day plus a random shard number between 0 and 9. The SK would be the item unique ID

GSI1

  • GSI1PK: truncated timestamp#[0-9]
  • GSI1SK: item id

there can be few dozens of recently created items or thousands of items.

To fetch the most recent items I have three (3) parameters:

  • Date: The current day
  • Limit: total amount of items to fetch
  • Days:number of days back to look for items

As suggested by Alex Debrie book the method to retrieve the items is a recursive function with promises.

The problem that I'm facing is that my lambda function is very slow.

in the scenario that there are not so many items created recently, the function has to go through all the days+shards one after another to fetch items. for example. If I want to fetch the last 100 items in the last 7 days. and there are less than 100 items spread across the shards. The function will go through 70 Queries (7 days x 10 shards) and it takes around 10 seconds to finish

On the contrary if I want to fetch 100 items in the last 7 days and hundreds of items were created recently, then it till take around a second to run.

  • items are small. around 400 bytes each.
  • I'm running an on-demand capacity dynamodb table
  • Lambda is configured with memorySize: 1536MB
  • Node.js 16.x
  • Any ideas how can make this run faster ?
const getQueryParams = (createdAt, shard, limit) => {
    const params = {
        TableName : "table",
        IndexName: 'GSI1',
        KeyConditionExpression: "#gsi1pk = :gsi1pk",
        ExpressionAttributeNames: {
            "#gsi1pk": 'GSI1PK' 
        },
        ExpressionAttributeValues: {
            ":gsi1pk": `${truncateTimestamp(timestamp).toISOString()}#${shard}` //e.g 2023-02-09T00:00:00.000Z#8
        },
        ScanIndexForward: false,
        Limit: limit
    };
    return params;

}

const getItems = async => {

    const items = []
    const number_of_days = 3;

    const getLatestItems = async ({ createdAt = new Date(), limit = 100, days = 0, shard = 0 }) => {
        
        const query_params = getQueryParams(createdAt, shard, limit);

        let max_items_to_fetch = limit;

        return dynamoDb.query(query_params).then(
            (data) => {
                // process data.
                if (data.Items) {
                    data.Items.forEach((item) => {
                        if (items.length < limit) {
                            items.push(item);
                        }
                    })
                    max_items_to_fetch = limit - data.Items.length;
                }    
                
                if (items.length >= limit) {
                    return items;
                }

                if (shard < 9) {
                    let params = {
                        createdAt: new Date(createdAt.setDate(createdAt.getDate())),
                        limit: max_items_to_fetch,
                        days: days,
                        shard: shard + 1,
                    }
                    return getLatestItems(params);
                
                } else if (days < number_of_days) {
                    let params = {
                        createdAt: new Date(createdAt.setDate(createdAt.getDate() - 1)),
                        limit: max_items_to_fetch,
                        days: days + 1,
                        shard: 0,
                    }
                    return getLatestItems(params);
        
                }
                return items;
            },
            (error) => {
                    throw new Error('Error getting all recent itmems')
            }
        );
    }

    return getLatestItems({});
};


export const main = async (event) => {
    const start = Date.now();
    const itemPromises = getItems();
    const res = await Promise.all([itemPromises]);
    const end = Date.now();
    console.log(`Execution time: ${end - start} ms`);
};

items-create-consume

mvp
asked a year ago722 views
3 Answers
1
Accepted Answer

Below is an example demonstrating parallel and serial shard querying.

Note: I did run into a strange problem when I set the Limit to 1. This is probably caused by the fact that there are multiple records in the index with the save exact value. Somehow the paging didn't work properly...

Note that doing this in parallel might be a "premature optimization". In my example I found an improvement in speed 300ms serial or 50ms parallel (excluding the cold start one :-))

In the example below I added the user name to the index. I think your use case does not require this.

Still I think the table schema is not really good for your use case.

Maybe a design like this would be better:

GSI - Partition Key - Non-Truncated-DateTime

Then to get the most recent items you can just start a reverse scan of this index.

async function queryShard(user, date, shard, exclusiveKeyStart, results) {
  const params = {
    TableName: "UserActions",
    IndexName: 'UserDateShard-index',
    KeyConditionExpression: "UserDateShard = :s",
    ExpressionAttributeValues: {
      ":s": `${user}/${date.toISOString()}/${shard}`,
    },
    Limit: 5,
    ExclusiveKeyStart: exclusiveKeyStart
  }
  const res = await ddbDocClient.send(new QueryCommand(params));
  if (res.Items) {
    res.Items.forEach((item) => {
      results.push(item)
    })
  }
  if (res.LastEvaluatedKey) {
    await queryShard(user, date, shard, res.LastEvaluatedKey, results) 
  }
}

async function queryParallel(user, date) {
  const results = []
  const queries = []
  for(let shard=0; shard<10; shard++) {
    queries.push(queryShard(user, date, shard, undefined, results));
  }
  await Promise.all(queries)
  return results;
}

async function querySerial(user, date) {
  const results = []
  const queries = []
  for(let shard=0; shard<10; shard++) {
    await queryShard(user, date, shard, undefined, results);
  }
  return results;
}

export const handler = async (event) => {
  try {
    var date = randomDate('01-01-2022', '12-31-2022');
    var user = users[Math.floor(Math.random()*users.length)];
    console.time("parallel");
    var results = await queryParallel(user, date);
    console.timeEnd("parallel");
    console.time("serial");
    var results = await querySerial(user, date);
    console.timeEnd("serial");
    return { body: 'Successfully created item!' }
  } catch (err) {
    console.log(err)
    return { error: err }
  }
};
profile picture
JaccoPK
answered a year ago
  • Thanks Jacco for your detailed answer. Just one thing. I couldn't find in the documentation a way to do reverse scan on an index. it seems it is not possible using the scan operation.

  • Oops indeed ScanIndexForward is only supported by Query.

1

Sometimes the query takes more time than the 30s for an HTTP Lambda call. In these cases I use an async aproach; the Lambda return to the client through a websocket connection.

answered a year ago
0

Ok, am working on corrected code but there a two things that currently are a bit strange:

  1. First of all it seems a bit strange to want the first items from multiple days. I don't understand why you would want to have those items since you cannot make any statistic with it. And if it were a paging thing one day would suffice. To get it clear you want:

today 100 items of x items yesterday 100 items of y items day before yesterday 100 items of z items

Of course if that really is the data you want it can be arranged but it would make more sense to want:

the newest items from you table:

  • max hundred
  • max x days back
  • (whichever comes first)
  1. Usually when you want fast code that makes use of the io async situations to execute parrallel queries you will end up with code that waits for an array or results using const res = await Promise.all([itemPromises]); but in your code this will only contain one item. Also the recursive calls are done within the then part meaning a new query will only be spawned when another returns. The code should spawn the ten shard queries immediately and wait for them to return.

If you clear up my concernes about the 1st point raised I can try make some working code for you :-)

Regards Jacco

profile picture
JaccoPK
answered a year ago
  • Hi Jacco. For this use case, there will be two type of users:

    • Users that will be creating items. there is no pattern around it. one day there can be none item creation. some other days there will be dozens, some other days hundreds and some other day thousands of items.
    • User that will log into a dashboard and will see the most recently items sorted by creation date starting from the most recently created. These users may choose to have in the dashboard a number of recent items. I'm thinking about max 100. more than that won't provide much value to the use case.

    Hot key issue. if the users create, let's say five thousand items one specific day and I decide to add them to the GSI1 with PK item creation date, then I will have all users dashboards hitting that partition. Do I need shard those items to avoid hot key issues? How can I fetch those items in an orderly manner in the fastest possible way.

    max hundred, max x days back, (whichever comes first) = makes sense

    thanks

  • Adding the user-id to the gsipk1 will already give much less rise to the hotkey problem. And why did you not include a time in the record? Would be nice to sort most recent first :-)

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions