How to use ES8 async/await with streams?

Javascriptnode.jsAsync AwaitEcmascript 2017

Javascript Problem Overview


In https://stackoverflow.com/a/18658613/779159 is an example of how to calculate the md5 of a file using the built-in crypto library and streams.

var fs = require('fs');
var crypto = require('crypto');

// the file you want to get the hash    
var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');

fd.on('end', function() {
    hash.end();
    console.log(hash.read()); // the desired sha1sum
});

// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

But is it possible to convert this to using ES8 async/await instead of using the callback as seen above, but while still keeping the efficiency of using streams?

Javascript Solutions


Solution 1 - Javascript

async/await only works with promises, not with streams. There are ideas to make an extra stream-like data type that would get its own syntax, but those are highly experimental if at all and I won't go into details.

Anyway, your callback is only waiting for the end of the stream, which is a perfect fit for a promise. You'd just have to wrap the stream:

var fd = fs.createReadStream('/some/file/name.txt');
var hash = crypto.createHash('sha1');
hash.setEncoding('hex');
// read all file and pipe it (write it) to the hash object
fd.pipe(hash);

var end = new Promise(function(resolve, reject) {
    hash.on('end', () => resolve(hash.read()));
    fd.on('error', reject); // or something like that. might need to close `hash`
});

Now you can await that promise:

(async function() {
    let sha1sum = await end;
    console.log(sha1sum);
}());

Solution 2 - Javascript

If you are using node version >= v10.0.0 then you can use stream.pipeline and util.promisify.

const fs = require('fs');
const crypto = require('crypto');
const util = require('util');
const stream = require('stream');

const pipeline = util.promisify(stream.pipeline);

const hash = crypto.createHash('sha1');
hash.setEncoding('hex');

async function run() {
  await pipeline(
    fs.createReadStream('/some/file/name.txt'),
    hash
  );
  console.log('Pipeline succeeded');
}

run().catch(console.error);

Solution 3 - Javascript

Node V15 now has a promisfiy pipeline in stream/promises. This is the cleanest and most official way.

const { pipeline } = require('stream/promises');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

We all should appreciate how much works it's done here:

  • Capture errors in all the streams.
  • Destroy unfinished streams when error is raised.
  • Only return when the last writable stream is finished.

This pipe thing is one of the most powerful feature Node.JS has. Making it fully async is not easy. Now we have it.

Solution 4 - Javascript

Something like this works:

for (var res of fetchResponses){ //node-fetch package responses
    const dest = fs.createWriteStream(filePath,{flags:'a'});
    totalBytes += Number(res.headers.get('content-length'));
    await new Promise((resolve, reject) => {
        res.body.pipe(dest);
        res.body.on("error", (err) => {
            reject(err);
        });
        dest.on("finish", function() {
            resolve();
        });
    });         
}

Solution 5 - Javascript

2021 Update:

New example from Node documentation:

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const chunk of readable) {
    data += chunk;
  }
  console.log(data);
}

see https://nodejs.org/api/stream.html#stream_readable_symbol_asynciterator

Solution 6 - Javascript

I would comment, but don't have enough reputation.

A WORD OF CAUTION: If you have an application that is passing streams around AND doing async/await, be VERY CAREFUL to connect ALL pipes before you await. You can end up with streams not containing what you thought they did. Here's the minimal example

const { PassThrough } = require('stream');

async function main() {
	const initialStream = new PassThrough();

	const otherStream = new PassThrough();
	const data = [];
	otherStream.on('data', dat => data.push(dat));
	const resultOtherStreamPromise = new Promise(resolve => otherStream.on('end', () => { resolve(Buffer.concat(data)) }));

	const yetAnotherStream = new PassThrough();
	const data2 = [];
	yetAnotherStream.on('data', dat => data2.push(dat));
	const resultYetAnotherStreamPromise = new Promise(resolve => yetAnotherStream.on('end', () => { resolve(Buffer.concat(data2)) }));

	initialStream.pipe(otherStream);
	initialStream.write('some ');

	await Promise.resolve(); // Completely unrelated await

	initialStream.pipe(yetAnotherStream);
	initialStream.end('data');
	const [resultOtherStream, resultYetAnotherStream] = await Promise.all([
		resultOtherStreamPromise,
		resultYetAnotherStreamPromise,
	]);

	console.log('other stream:', resultOtherStream.toString()); // other stream: some data
	console.log('yet another stream:', resultYetAnotherStream.toString()); // yet another stream: data
}
main();

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
Questionuser779159View Question on Stackoverflow
Solution 1 - JavascriptBergiView Answer on Stackoverflow
Solution 2 - JavascriptshussonView Answer on Stackoverflow
Solution 3 - JavascriptJason ChingView Answer on Stackoverflow
Solution 4 - JavascriptRonnie RoystonView Answer on Stackoverflow
Solution 5 - JavascriptDmitri R117View Answer on Stackoverflow
Solution 6 - JavascriptjdylanstewartView Answer on Stackoverflow