How to create a read stream of a AWS S3 object in a async function?

0

How to create a read stream of a AWS S3 object in a async function?

If I try

    exports.handler = async (event) => {
      var csvreadstream = await s3.getObject({ Bucket: bucket, Key: filename }).promise().createReadStream()
    }

or

 exports.handler = async (event) => {
   var s3Object = await s3.getObject({ Bucket: bucket, Key: filename }).promise();
   var csvreadstream = s3Object.createReadStream();
}

I get

    {
      "errorType": "TypeError",
      "errorMessage": "(intermediate value).createReadStream is not a function",
      "trace": [
        "TypeError: (intermediate value).createReadStream is not a function",
        "    at Runtime.exports.handler (/var/task/app.js:29:86)",
        "    at processTicksAndRejections (internal/process/task_queues.js:94:5)"
      ]
    }

Can anyone advice how to create a read stream from an S3 object in an async (async/await manner) function?
Thank you!

Nick77
asked 5 years ago5572 views
1 Answer
0

Okay, here my solution:

    var s3object = (await s3.getObject({ Bucket: bucket, Key: filename }).promise());
    var csvreadstream = new stream.Readable();
    csvreadstream._read = () => {};
    csvreadstream.push(s3object.Body);

However, the stream does not get triggered, like

.on('data')

Here the code:

const AWS = require('aws-sdk');
const utils = require('./utils');
const csv = require('fast-csv');
const stream = require('stream');
const s3 = new AWS.S3();

exports.handler = async (event) => {
    console.log("Incoming Event: ", JSON.stringify(event));
    const bucket = event.Records[0].s3.bucket.name;
    const filename = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
    const message = `File is uploaded in - ${bucket} -> ${filename}`;
    console.log(message);
    
    var errors = [];

    const splittedFilename = filename.split('.');
    const reportFilename = splittedFilename[0] + "Report." + splittedFilename[1];
    const reportBucket = 'external.transactions.reports';
    
    var s3object = (await s3.getObject({ Bucket: bucket, Key: filename }).promise());
    var csvreadstream = new stream.Readable();
    csvreadstream._read = () => {};
    csvreadstream.push(s3object.Body);
   
    csvreadstream
    .pipe(csv.parse({ headers: true }))
    .on('data', async function(data){
        this.pause();
        console.log("DATA: " + data);
        await utils.filterLogic(data, errors);
        this.resume();
    })
    .on('end', async function(){
        console.log("END");
        await utils.writeErrorReport(errors, s3, reportBucket, reportFilename);
    })
};
Nick77
answered 5 years ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions