What is the best way to limit concurrency when using ES6's Promise.all()?

Javascriptnode.jsEs6 Promise

Javascript Problem Overview


I have some code that is iterating over a list that was queried out of a database and making an HTTP request for each element in that list. That list can sometimes be a reasonably large number (in the thousands), and I would like to make sure I am not hitting a web server with thousands of concurrent HTTP requests.

An abbreviated version of this code currently looks something like this...

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

This code is running on Node 4.3.2. To reiterate, can Promise.all be managed so that only a certain number of Promises are in progress at any given time?

Javascript Solutions


Solution 1 - Javascript

P-Limit

I have compared promise concurrency limitation with a custom script, bluebird, es6-promise-pool, and p-limit. I believe that p-limit has the most simple, stripped down implementation for this need. See their documentation.

Requirements

To be compatible with async in example

My Example

In this example, we need to run a function for every URL in the array (like, maybe an API request). Here this is called fetchData(). If we had an array of thousands of items to process, concurrency would definitely be useful to save on CPU and memory resources.

const pLimit = require('p-limit');
 
// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]
 
// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});
 
(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();

The console log result is an array of your resolved promises response data.

Solution 2 - Javascript

Using Array.prototype.splice

while (funcs.length) {
  // 100 at a time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )
}

Solution 3 - Javascript

Note that Promise.all() doesn't trigger the promises to start their work, creating the promise itself does.

With that in mind, one solution would be to check whenever a promise is resolved whether a new promise should be started or whether you're already at the limit.

However, there is really no need to reinvent the wheel here. One library that you could use for this purpose is es6-promise-pool. From their examples:

var PromisePool = require('es6-promise-pool')
 
var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}
 
// The number of promises to process simultaneously. 
var concurrency = 3
 
// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)
 
// Start the pool. 
var poolPromise = pool.start()
 
// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})

Solution 4 - Javascript

If you know how iterators work and how they are consumed you would't need any extra library, since it can become very easy to build your own concurrency yourself. Let me demonstrate:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}

We can use the same iterator and share it across workers.

If you had used .entries() instead of .values() you would have goten a 2D array with [[index, value]] which i will demonstrate below with a concurrency of 2

const sleep = t => new Promise(rs => setTimeout(rs, t))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.allSettled(workers).then(() => console.log('done'))

The benefit of this is that you can have a generator function instead of having everything ready at once.

What's even more awesome is that you can do stream.Readable.from(iterator) in node (and eventually in whatwg streams as well). and with transferable ReadbleStream, this makes this potential very useful in the feature if you are working with web workers also for performances


Note: the different from this compared to example async-pool is that it spawns two workers, so if one worker throws an error for some reason at say index 5 it won't stop the other worker from doing the rest. So you go from doing 2 concurrency down to 1. (so it won't stop there) So my advise is that you catch all errors inside the doWork function

Solution 5 - Javascript

Instead of using promises for limiting http requests, use node's built-in http.Agent.maxSockets. This removes the requirement of using a library or writing your own pooling code, and has the added advantage more control over what you're limiting.

> agent.maxSockets > > By default set to Infinity. Determines how many concurrent sockets the agent can have open per origin. Origin is either a 'host:port' or 'host:port:localAddress' combination.

For example:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

If making multiple requests to the same origin, it might also benefit you to set keepAlive to true (see docs above for more info).

Solution 6 - Javascript

bluebird's Promise.map can take a concurrency option to control how many promises should be running in parallel. Sometimes it is easier than .all because you don't need to create the promise array.

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}

Solution 7 - Javascript

I suggest the library async-pool: https://github.com/rxaviers/async-pool

npm install tiny-async-pool

Description: > Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7 > > asyncPool runs multiple promise-returning & async functions in a limited concurrency pool. It rejects immediately as soon as one of the promises rejects. It resolves when all the promises completes. It calls the iterator function as soon as possible (under concurrency limit).

Usage:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

Solution 8 - Javascript

Here is my ES7 solution to a copy-paste friendly and feature complete Promise.all()/map() alternative, with a concurrency limit.

Similar to Promise.all() it maintains return order as well as a fallback for non promise return values.

I also included a comparison of the different implementation as it illustrates some aspects a few of the other solutions have missed.

Usage

const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4

Implementation

async function asyncBatch(args, fn, limit = 8) {
  // Copy arguments to avoid side effects
  args = [...args];
  const outs = [];
  while (args.length) {
    const batch = args.splice(0, limit);
    const out = await Promise.all(batch.map(fn));
    outs.push(...out);
  }
  return outs;
}

async function asyncPool(args, fn, limit = 8) {
  return new Promise((resolve) => {
    // Copy arguments to avoid side effect, reverse queue as
    // pop is faster than shift
    const argQueue = [...args].reverse();
    let count = 0;
    const outs = [];
    const pollNext = () => {
      if (argQueue.length === 0 && count === 0) {
        resolve(outs);
      } else {
        while (count < limit && argQueue.length) {
          const index = args.length - argQueue.length;
          const arg = argQueue.pop();
          count += 1;
          const out = fn(arg);
          const processOut = (out, index) => {
            outs[index] = out;
            count -= 1;
            pollNext();
          };
          if (typeof out === 'object' && out.then) {
            out.then(out => processOut(out, index));
          } else {
            processOut(out, index);
          }
        }
      }
    };
    pollNext();
  });
}

Comparison

// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => {
  console.log(delay);
  resolve(delay);
}, delay));

// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];

// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.

// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms

// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms

// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms

console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3

// Conclusion: Execution order and performance is different,
// but return order is still identical

Conclusion

asyncPool() should be the best solution as it allows new requests to start as soon as any previous one finishes.

asyncBatch() is included as a comparison as its implementation is simpler to understand, but it should be slower in performance as all requests in the same batch is required to finish in order to start the next batch.

In this contrived example, the non-limited vanilla Promise.all() is of course the fastest, while the others could perform more desirable in a real world congestion scenario.

Update

The async-pool library that others have already suggested is probably a better alternative to my implementation as it works almost identically and has a more concise implementation with a clever usage of Promise.race(): https://github.com/rxaviers/async-pool/blob/master/lib/es7.js

Hopefully my answer can still serve an educational value.

Solution 9 - Javascript

Semaphore is well known concurrency primitive that was designed to solve similar problems. It's very universal construct, implementations of Semaphore exist in many languages. This is how one would use Semaphore to solve this issue:

async function main() {
  const s = new Semaphore(100);
  const res = await Promise.all(
    entities.map((users) => 
      s.runExclusive(() => remoteServer.getCount(user))
    )
  );
  return res;
}

I'm using implementation of Semaphore from async-mutex, it has decent documentation and TypeScript support.

If you want to dig deep into topics like this you can take a look at the book "The Little Book of Semaphores" which is freely available as PDF here

Solution 10 - Javascript

As all others in this answer thread have pointed out, Promise.all() won't do the right thing if you need to limit concurrency. But ideally you shouldn't even want to wait until all of the Promises are done before processing them.

Instead, you want to process each result ASAP as soon as it becomes available, so you don't have to wait for the very last promise to finish before you start iterating over them.

So, here's a code sample that does just that, based partly on the answer by Endless and also on this answer by T.J. Crowder.

async function* raceAsyncIterators(iterators) {
    async function queueNext(iteratorResult) {
        delete iteratorResult.result; // Release previous result ASAP
        iteratorResult.result = await iteratorResult.iterator.next();
        return iteratorResult;
    };
    const iteratorResults = new Map(iterators.map(iterator => [
        iterator,
        queueNext({ iterator })
    ]));
    while (iteratorResults.size) {
        const winner = await Promise.race(iteratorResults.values());
        if (winner.result.done) {
            iteratorResults.delete(winner.iterator);
        } else {
            const { value } = winner.result;
            iteratorResults.set(winner.iterator, queueNext(winner));
            yield value;
        }
    }
}

async function* runTasks(maxConcurrency, iterator) {
    // Each worker is an async generator that polls for tasks
    // from the shared iterator.
    // Sharing the iterator ensures that each worker gets unique tasks.
    const workers = new Array(maxConcurrency);
    for (let i = 0; i < maxConcurrency; i++) {
        workers[i] = (async function*() {
            for (const task of iterator) yield await task();
        })();
    }

    yield* raceAsyncIterators(workers);
}

// example tasks that sleep and return a number
function sleep(ms) { return new Promise(r => setTimeout(r, ms)); }

const tasks = [];
for (let i = 0; i < 20; i++) {
    tasks.push(async () => {
        console.log(`start ${i}`);
        await sleep(Math.random() * 1000);
        console.log(`end ${i}`);
        return i;
    });
}

(async () => {
    for await (let value of runTasks(3, tasks.values())) {
        console.log(`output ${value}`);
    }
})()

There's a lot of magic in here; let me explain.

We start with an async generator function (an async function*()) called raceAsyncIterators(). raceAsyncIterators() is like Promise.race() but with N iterators of Promises instead of just N Promises; it returns an async iterator that yields the results of resolved Promises.

The raceAsyncIterators() keeps an iteratorResults map, mapping from iterators (as keys) to Promises of iteratorResult objects; each iteratorResult has an iterator property and a result (the result of awaiting the iterator's next() Promise).

raceAsyncIterators() calls Promise.race() to let the iteratorResult Promises race to complete their tasks. When the winning iteratorResult says that its iterator is completely done, we remove it from the map; otherwise, we replace its Promise in the iteratorResults map with the iterator's next() Promise and yield the result value.

With that out of the way, we can now define our runTasks() function.

runTasks() accepts an iterator parameter, a non-async iterator of "tasks" to perform. Each task is a async function (a regular async function(), not an async generator async function*()). It also accepts a number maxConcurrency, the number of workers we'll run.

// Each worker is an async iterator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) {
    workers[i] = (async function*() {
        for (const task of iterator) yield await task();
    })();
}

Note that the workers are initially defined as async generator functions, but we immediately invoke each function, and store each resulting async iterator in the workers array.

If we had just one worker iterator, we could call for await (let result of worker()) to get a stream of results.

But, since we have N worker iterators, we want to race them with raceAsyncIterators(), processing results from whichever worker iterator yields results first.

The last line of runTasks() is:

yield* raceAsyncIterators(workers)

yield* is an uncommon JS expression in which a generator can yield the results of another generator. This yield* line yields whichever results win the race.

With runTasks(), we can use a for await loop, like this:

for await (const value of runTasks(3, tasks.values())) {
    console.log(`output ${value}`);
}

This returns each Promise's value in the order in which they're resolved.

In the example, we generate an array of 20 async tasks that sleep for a random amount of time and return a number. (In real life, you'd probably make an array of async functions that fetch URLs or something.)

The example calls runTasks with 3 concurrent workers, so no more than 3 tasks are launched at the same time. When any task completes, we immediately queue up the next task. (This is superior to "batching", where you do 3 tasks at once, await all three of them, and don't start the next batch of three until the entire previous batch has finished.)

Solution 11 - Javascript

Here goes basic example for streaming and 'p-limit'. It streams http read stream to mongo db.

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
};
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) {

                            data.someData = someData;    
                            done(null, data);
                        },
                        function handleError(error) {
                            done(error)
                        }
                    );
                })

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        }

Solution 12 - Javascript

It can be resolved using recursion.

The idea is that initially you send maximum allowed number of requests and each of these requests should recursively continue to send itself on its completion.

function batchFetch(urls, concurrentRequestsLimit) {
    return new Promise(resolve => {
	    var documents = [];
	    var index = 0;
	
	    function recursiveFetch() {
		    if (index === urls.length) {
			    return;
		    }
		    fetch(urls[index++]).then(r => {
			    documents.push(r.text());
			    if (documents.length === urls.length) {
				    resolve(documents);
			    } else {
				    recursiveFetch();
			    }
		    });
	    }
	
	    for (var i = 0; i < concurrentRequestsLimit; i++) {
		    recursiveFetch();
	    }
    });
}

var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
   console.log(documents);
});

Solution 13 - Javascript

  • @tcooc's answer was quite cool. Didn't know about it and will leverage it in the future.
  • I also enjoyed @MatthewRideout's answer, but it uses an external library!!

Whenever possible, I give a shot at developing this kind of things on my own, rather than going for a library. You end up learning a lot of concepts which seemed daunting before.

 class Pool{
        constructor(maxAsync) {
            this.maxAsync = maxAsync;
            this.asyncOperationsQueue = [];
            this.currentAsyncOperations = 0
        }

        runAnother() {
            if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) {
                this.currentAsyncOperations += 1;
                this.asyncOperationsQueue.pop()()
                    .then(() => { this.currentAsyncOperations -= 1; this.runAnother() }, () => { this.currentAsyncOperations -= 1; this.runAnother() })
            }
        }

        add(f){  // the argument f is a function of signature () => Promise
            this.runAnother();
            return new Promise((resolve, reject) => {
                this.asyncOperationsQueue.push(
                    () => f().then(resolve).catch(reject)
                )
            })
        }
    }

//#######################################################
//                        TESTS
//#######################################################

function dbCall(id, timeout, fail) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            if (fail) {
               reject(`Error for id ${id}`);
            } else {
                resolve(id);
            }
        }, timeout)
    }
    )
}


const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);


const cappedPool = new Pool(2);

const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: ${i}`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: ${i}`))

This approach provides a nice API, similar to thread pools in scala/java.
After creating one instance of the pool with const cappedPool = new Pool(2), you provide promises to it with simply cappedPool.add(() => myPromise).
Obliviously we must ensure that the promise does not start immediately and that is why we must "provide it lazily" with the help of the function.

Most importantly, notice that the result of the method add is a Promise which will be completed/resolved with the value of your original promise! This makes for a very intuitive use.

const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => {
   // Do something with the result form the DB
  }
)

Solution 14 - Javascript

So I tried to make some examples shown work for my code, but since this was only for an import script and not production code, using the npm package batch-promises was surely the easiest path for me

NOTE: Requires runtime to support Promise or to be polyfilled.

Api batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) The Promise: Iteratee will be called after each batch.

Use:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});

Solution 15 - Javascript

Recursion is the answer if you don't want to use external libraries

downloadAll(someArrayWithData){
  var self = this;
  
  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function(){
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      }
    });
  }
  
  return tracker(0); 
}

Solution 16 - Javascript

Unfortunately there is no way to do it with native Promise.all, so you have to be creative.

This is the quickest most concise way I could find without using any outside libraries.

It makes use of a newer javascript feature called an iterator. The iterator basically keeps track of what items have been processed and what haven't.

In order to use it in code, you create an array of async functions. Each async function asks the same iterator for the next item that needs to be processed. Each function processes its own item asynchronously, and when done asks the iterator for a new one. Once the iterator runs out of items, all the functions complete.

Thanks to @Endless for inspiration.

const items = [
  'https://httpbin.org/bytes/2',
  'https://httpbin.org/bytes/2',
  'https://httpbin.org/bytes/2',
  'https://httpbin.org/bytes/2',
  'https://httpbin.org/bytes/2',
  'https://httpbin.org/bytes/2',
  'https://httpbin.org/bytes/2',
  'https://httpbin.org/bytes/2'
]

const concurrency = 5

Array(concurrency).fill(items.entries()).map(async (cursor) => {
  for (let [index, url] of cursor){
    console.log('getting url is ', index, url)
    // run your async task instead of this next line
    var text = await fetch(url).then(res => res.text())
    console.log('text is', text.slice(0, 20))
  }
})

Solution 17 - Javascript

So many good solutions. I started out with the elegant solution posted by @Endless and ended up with this little extension method that does not use any external libraries nor does it run in batches (although assumes you have features like async, etc):

Promise.allWithLimit = async (taskList, limit = 5) => {
    const iterator = taskList.entries();
    let results = new Array(taskList.length);
    let workerThreads = new Array(limit).fill(0).map(() => 
        new Promise(async (resolve, reject) => {
            try {
                let entry = iterator.next();
                while (!entry.done) {
                    let [index, promise] = entry.value;
                    try {
                        results[index] = await promise;
                        entry = iterator.next();
                    }
                    catch (err) {
                        results[index] = err;
                    }
                }
                // No more work to do
                resolve(true); 
            }
            catch (err) {
                // This worker is dead
                reject(err);
            }
        }));

    await Promise.all(workerThreads);
    return results;
};

    Promise.allWithLimit = async (taskList, limit = 5) => {
        const iterator = taskList.entries();
        let results = new Array(taskList.length);
        let workerThreads = new Array(limit).fill(0).map(() => 
            new Promise(async (resolve, reject) => {
                try {
                    let entry = iterator.next();
                    while (!entry.done) {
                        let [index, promise] = entry.value;
                        try {
                            results[index] = await promise;
                            entry = iterator.next();
                        }
                        catch (err) {
                            results[index] = err;
                        }
                    }
                    // No more work to do
                    resolve(true); 
                }
                catch (err) {
                    // This worker is dead
                    reject(err);
                }
            }));
    
        await Promise.all(workerThreads);
        return results;
    };

    const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => {
       let n = (i + 1) * 5;
       setTimeout(() => {
          console.log(`Did nothing for ${n} seconds`);
          resolve(n);
       }, n * 1000);
    }));

    var results = Promise.allWithLimit(demoTasks);

Solution 18 - Javascript

expanding on the answer posted by @deceleratedcaviar, I created a 'batch' utility function that takes as argument: array of values, concurrency limit and processing function. Yes I realize that using Promise.all this way is more akin to batch processing vs true concurrency, but if the goal is to limit excessive number of HTTP calls at one time I go with this approach due to its simplicity and no need for external library.

async function batch(o) {
  let arr = o.arr
  let resp = []
  while (arr.length) {
    let subset = arr.splice(0, o.limit)
    let results = await Promise.all(subset.map(o.process))
    resp.push(results)
  }
  return [].concat.apply([], resp)
}

let arr = []
for (let i = 0; i < 250; i++) { arr.push(i) }

async function calc(val) { return val * 100 }

(async () => {
  let resp = await batch({
    arr: arr,
    limit: 100,
    process: calc
  })
  console.log(resp)
})();

Solution 19 - Javascript

One more solution with a custom promise library (CPromise):

    import { CPromise } from "c-promise2";
    import cpFetch from "cp-fetch";
    
    const promise = CPromise.all(
      function* () {
        const urls = [
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
        ];
    
        for (const url of urls) {
          yield cpFetch(url); // add a promise to the pool
          console.log(`Request [${url}] completed`);
        }
      },
      { concurrency: 2 }
    ).then(
      (v) => console.log(`Done: `, v),
      (e) => console.warn(`Failed: ${e}`)
    );
    
    // yeah, we able to cancel the task and abort pending network requests
    // setTimeout(() => promise.cancel(), 4500);

    import { CPromise } from "c-promise2";
    import cpFetch from "cp-fetch";
    
    const promise = CPromise.all(
      [
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
      ],
      {
        mapper: (url) => {
          console.log(`Request [${url}]`);
          return cpFetch(url);
        },
        concurrency: 2
      }
    ).then(
      (v) => console.log(`Done: `, v),
      (e) => console.warn(`Failed: ${e}`)
    );
    
    // yeah, we able to cancel the task and abort pending network requests
    //setTimeout(() => promise.cancel(), 4500);

Solution 20 - Javascript

Warning this has not been benchmarked for efficiency and does a lot of array copying/creation

If you want a more functional approach you could do something like:

import chunk from 'lodash.chunk';

const maxConcurrency = (max) => (dataArr, promiseFn) =>
  chunk(dataArr, max).reduce(
      async (agg, batch) => [
          ...(await agg),
          ...(await Promise.all(batch.map(promiseFn)))
      ],
      []
  );

and then to you could use it like:

const randomFn = (data) =>
    new Promise((res) => setTimeout(
      () => res(data + 1),
        Math.random() * 1000
      ));


const result = await maxConcurrency(5)(
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    randomFn
);
console.log('result+++', result);

Solution 21 - Javascript

I had been using the bottleneck library, which I actually really liked, but in my case wasn't releasing memory and kept tanking long running jobs... Which isn't great for running the massive jobs that you likely want a throttling/concurrency library for in the first place.

I needed a simple, low-overhead, easy to maintain solution. I also wanted something that kept the pool topped up, rather than simply batching predefined chunks... In the case of a downloader, this will stop that nGB file from holding up your queue for minutes/hours at a time, even though the rest of the batch finished ages ago.

This is the Node.js v16+, no-dependency, async generator solution I've been using instead:

const promiseState = function( promise ) {
  // A promise could never resolve to a unique symbol unless it was in this scope
  const control = Symbol();

  // This helps us determine the state of the promise... A little heavy, but it beats a third-party promise library. The control is the second element passed to Promise.race() since it will only resolve first if the promise being tested is pending.
  return Promise
    .race([ promise, control ])
    .then( value => ( value === control ) ? 'pending' : 'fulfilled' )
    .catch( () => 'rejected' );
}

const throttle = async function* ( reservoir, promiseFunction, highWaterMark ) {
  let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseFunction( item ) );

  while ( iterable.length > 0 ) {
    // When a promise has resolved we have space to top it up to the high water mark...
    await Promise.any( iterable );

    const pending = [];
    const resolved = [];

    // This identifies the promise(s) that have resolved so that we can yield them
    for ( const currentValue of iterable ) {
      if ( await promiseState( currentValue ) === 'pending' ) {
        pending.push( currentValue );
      } else {
        resolved.push( currentValue );
      }
    }

    // Put the remaining promises back into iterable, and top it to the high water mark
    iterable = [
      ...pending,
      ...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseFunction( value ) )
    ];

    yield Promise.allSettled( resolved );
  }
}

// This is just an example of what would get passed as "promiseFunction"... This can be the function that returns your HTTP request promises
const getTimeout = delay => new Promise( (resolve, reject) => setTimeout(resolve, delay, delay) );

// This is just the async IIFE that bootstraps this example
( async () => {

  const test = [ 1000, 2000, 3000, 4000, 5000, 6000, 1500, 2500, 3500, 4500, 5500, 6500 ];

  for await ( const timeout of throttle( test, getTimeout, 4 ) ) {
    console.log( timeout );
  }

} )();

Solution 22 - Javascript

I have solution with creating chunks and using .reduce function to wait each chunks promise.alls to be finished. And also I add some delay if the promises have some call limits.

export function delay(ms: number) {
  return new Promise<void>((resolve) => setTimeout(resolve, ms));
}

export const chunk = <T>(arr: T[], size: number): T[][] => [
  ...Array(Math.ceil(arr.length / size)),
].map((_, i) => arr.slice(size * i, size + size * i));

const myIdlist = []; // all items
const groupedIdList = chunk(myIdList, 20); // grouped by 20 items

await groupedIdList.reduce(async (prev, subIdList) => {
  await prev;
  // Make sure we wait for 500 ms after processing every page to prevent overloading the calls.
  const data = await Promise.all(subIdList.map(myPromise));
  await delay(500);
}, Promise.resolve());

Solution 23 - Javascript

This solution uses an async generator to manage concurrent promises with vanilla javascript. The throttle generator takes 3 arguments:

  • An array of values to be be supplied as arguments to a promise genrating function. (e.g. An array of URLs.)
  • A function that return a promise. (e.g. Returns a promise for an HTTP request.)
  • An integer that represents the maximum concurrent promises allowed.

Promises are only instantiated as required in order to reduce memory consumption. Results can be iterated over using a for await...of statement.

The example below provides a function to check promise state, the throttle async generator, and a simple function that return a promise based on setTimeout. The async IIFE at the end defines the reservoir of timeout values, sets the async iterable returned by throttle, then iterates over the results as they resolve.

If you would like a more complete example for HTTP requests, let me know in the comments.

Please note that Node.js 16+ is required in order async generators.

const promiseState = function( promise ) {
  const control = Symbol();

  return Promise
    .race([ promise, control ])
    .then( value => ( value === control ) ? 'pending' : 'fulfilled' )
    .catch( () => 'rejected' );
}

const throttle = async function* ( reservoir, promiseClass, highWaterMark ) {
  let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseClass( item ) );

  while ( iterable.length > 0 ) {
    await Promise.any( iterable );

    const pending = [];
    const resolved = [];

    for ( const currentValue of iterable ) {
      if ( await promiseState( currentValue ) === 'pending' ) {
        pending.push( currentValue );
      } else {
        resolved.push( currentValue );
      }
    }

    console.log({ pending, resolved, reservoir });

    iterable = [
      ...pending,
      ...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseClass( value ) )
    ];

    yield Promise.allSettled( resolved );
  }
}

const getTimeout = delay => new Promise( ( resolve, reject ) => {
  setTimeout(resolve, delay, delay);
} );

( async () => {
  const test = [ 1100, 1200, 1300, 10000, 11000, 9000, 5000, 6000, 3000, 4000, 1000, 2000, 3500 ];

  const throttledRequests = throttle( test, getTimeout, 4 );

  for await ( const timeout of throttledRequests ) {
    console.log( timeout );
  }
} )();

Solution 24 - Javascript

A good solution for controlling the maximum number of promises/requests is to split your list of requests into pages, and produce only requests for one page at a time.

The example below makes use of iter-ops library:

import {pipe, toAsync, map, page} from 'iter-ops';

const i = pipe(
    toAsync(users), // make it asynchronous
    page(10), // split into pages of 10 items in each
    map(p => Promise.all(p.map(u => u.remoteServer.getCount(u)))), // map into requests
    wait() // resolve each page in the pipeline
);

// below triggers processing page-by-page:

for await(const p of i) {
    //=> p = resolved page of data
}

This way it won't try to create more requests/promises than the size of one page.

Solution 25 - Javascript

Using tiny-async-pool ES9 for await...of API, you can do the following:

const asyncPool = require("tiny-async-pool");
const getCount = async (user) => ([user, remoteServer.getCount(user)]);
const concurrency = 2;

for await (const [user, count] of asyncPool(concurrency, users, getCount)) {
  console.log(user, count);
}

The above asyncPool function returns an async iterator that yields as soon as a promise completes (under concurrency limit) and it rejects immediately as soon as one of the promises rejects.

Solution 26 - Javascript

It is possible to limit requests to server by using https://www.npmjs.com/package/job-pipe

Basically you create a pipe and tell it how many concurrent requests you want:

const pipe = createPipe({ throughput: 6, maxQueueSize: Infinity })

Then you take your function which performs call and force it through the pipe to create a limited amount of calls at the same time:

const makeCall = async () => {...}
const limitedMakeCall = pipe(makeCall)

Finally, you call this method as many times as you need as if it was unchanged and it will limit itself on how many parallel executions it can handle:

await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
await limitedMakeCall()
....
await limitedMakeCall()

Profit.

Solution 27 - Javascript

This is what I did using Promise.race, inside my code here

const identifyTransactions = async function() {
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) {
    if (concurrency > 4)
      await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
    promises.push(tx.identifyTransaction())
    concurrency++
  }
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest
}

If you wanna see an example: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

Solution 28 - Javascript

If you goal is to slow down the Promise.all to avoid Rate limiting, or overloading:

Here's my implementation

async function promiseAllGentle(arr, batchSize = 5, sleep = 50) {
  let output = [];
  while (arr.length) {
    const batchResult = await Promise.all(arr.splice(0, batchSize));
    output = [...output, ...batchResult];
    await new Promise((res) => setTimeout(res, sleep));
  }
  return output;
}

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionChrisView Question on Stackoverflow
Solution 1 - JavascriptMatthew RideoutView Answer on Stackoverflow
Solution 2 - JavascriptdeceleratedcaviarView Answer on Stackoverflow
Solution 3 - JavascriptTimoStaudingerView Answer on Stackoverflow
Solution 4 - JavascriptEndlessView Answer on Stackoverflow
Solution 5 - JavascripttcoocView Answer on Stackoverflow
Solution 6 - JavascriptJingshao ChenView Answer on Stackoverflow
Solution 7 - JavascriptVenryxView Answer on Stackoverflow
Solution 8 - JavascriptAdelostView Answer on Stackoverflow
Solution 9 - JavascriptSafareliView Answer on Stackoverflow
Solution 10 - JavascriptDan FabulichView Answer on Stackoverflow
Solution 11 - Javascriptgosuer1921View Answer on Stackoverflow
Solution 12 - JavascriptAnton FilView Answer on Stackoverflow
Solution 13 - JavascriptcmhteixeiraView Answer on Stackoverflow
Solution 14 - JavascriptAgusti Fernandez PardoView Answer on Stackoverflow
Solution 15 - JavascriptJuanView Answer on Stackoverflow
Solution 16 - Javascriptuser3413723View Answer on Stackoverflow
Solution 17 - JavascriptKris OyeView Answer on Stackoverflow
Solution 18 - JavascriptEugene BlinnView Answer on Stackoverflow
Solution 19 - JavascriptDmitriy MozgovoyView Answer on Stackoverflow
Solution 20 - JavascriptcphooverView Answer on Stackoverflow
Solution 21 - JavascriptAndrew OdriView Answer on Stackoverflow
Solution 22 - JavascripthurricaneView Answer on Stackoverflow
Solution 23 - JavascriptAndrew OdriView Answer on Stackoverflow
Solution 24 - Javascriptvitaly-tView Answer on Stackoverflow
Solution 25 - JavascriptRafael XavierView Answer on Stackoverflow
Solution 26 - JavascriptVytenis UrbonavičiusView Answer on Stackoverflow
Solution 27 - JavascriptAlexView Answer on Stackoverflow
Solution 28 - JavascriptSimoView Answer on Stackoverflow