Iterating over a mongodb cursor serially (waiting for callbacks before moving to next document)

node.jsMongodbMongoskinasync.js

node.js Problem Overview


Using mongoskin, I can do a query like this, which will return a cursor:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {
            
      }
}

However, I'd like to call some async functions for each document, and only move on to the next item on the cursor after this has called back (similar to the eachSeries structure in the async.js module). E.g:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

            externalAsyncFunction(result, function(err) {
               //externalAsyncFunction completed - now want to move to next doc
            });
            
      }
}  

How could I do this?

Thanks

UPDATE:

I don't wan't to use toArray() as this is a large batch operation, and the results might not fit in memory in one go.

node.js Solutions


Solution 1 - node.js

A more modern approach that uses async/await:

const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
  const doc = await cursor.next();
  // process doc here
}

Notes:

  • This may be even more simple to do when async iterators arrive.
  • You'll probably want to add try/catch for error checking.
  • The containing function should be async or the code should be wrapped in (async function() { ... })() since it uses await.
  • If you want, add await new Promise(resolve => setTimeout(resolve, 1000)); (pause for 1 second) at the end of the while loop to show that it does process docs one after the other.

Solution 2 - node.js

If you don't want to load all of the results into memory using toArray, you can iterate using the cursor with something like the following.

myCollection.find({}, function(err, resultCursor) {
  function processItem(err, item) {
    if(item === null) {
      return; // All done!
    }

    externalAsyncFunction(item, function(err) {
      resultCursor.nextObject(processItem);
    });

  }
  
  resultCursor.nextObject(processItem);
}  

Solution 3 - node.js

since node.js v10.3 you can use async iterator

const cursor = db.collection('foo').find({});
for await (const doc of cursor) {
  // do your thing
  // you can even use `await myAsyncOperation()` here
}

Jake Archibald wrote a great blog post about async iterators, that I came to know after reading @user993683's answer.

Solution 4 - node.js

This works with large dataset by using setImmediate:

var cursor = collection.find({filter...}).cursor();

cursor.nextObject(function fn(err, item) {
    if (err || !item) return;
 
    setImmediate(fnAction, item, arg1, arg2, function() {
        cursor.nextObject(fn);
    });
});

function fnAction(item, arg1, arg2, callback) {
    // Here you can do whatever you want to do with your item.
    return callback();
}

Solution 5 - node.js

If someone is looking for a Promise way of doing this (as opposed to using callbacks of nextObject), here it is. I am using Node v4.2.2 and mongo driver v2.1.7. This is kind of an asyncSeries version of Cursor.forEach():

function forEachSeries(cursor, iterator) {
  return new Promise(function(resolve, reject) {
    var count = 0;
    function processDoc(doc) {
      if (doc != null) {
        count++;
        return iterator(doc).then(function() {
          return cursor.next().then(processDoc);
        });
      } else {
        resolve(count);
      }
    }
    cursor.next().then(processDoc);
  });
}

To use this, pass the cursor and an iterator that operates on each document asynchronously (like you would for Cursor.forEach). The iterator needs to return a promise, like most mongodb native driver functions do.

Say, you want to update all documents in the collection test. This is how you would do it:

var theDb;
MongoClient.connect(dbUrl).then(function(db) {
  theDb = db;     // save it, we'll need to close the connection when done.
  var cur = db.collection('test').find();

  return forEachSeries(cur, function(doc) {    // this is the iterator
    return db.collection('test').updateOne(
      {_id: doc._id},
      {$set: {updated: true}}       // or whatever else you need to change
    );
    // updateOne returns a promise, if not supplied a callback. Just return it.
  });
})
.then(function(count) {
  console.log("All Done. Processed", count, "records");
  theDb.close();
})

Solution 6 - node.js

You can do something like this using the async lib. The key point here is to check if the current doc is null. If it is, it means you are finished.

async.series([
        function (cb) {
            cursor.each(function (err, doc) {
                if (err) {
                    cb(err);
                } else if (doc === null) {
                    cb();
                } else {
                    console.log(doc);
                    array.push(doc);
                }
            });
        }
    ], function (err) {
        callback(err, array);
    });

Solution 7 - node.js

You can get the result in an Array and iterate using a recursive function, something like this.

myCollection.find({}).toArray(function (err, items) {
	var count = items.length;
	var fn = function () {
		externalAsyncFuntion(items[count], function () {
			count -= 1;
			if (count) fn();
		})
	}

    fn();
});

Edit:

This is only applicable for small datasets, for larger one's you should use cursors as mentioned in other answers.

Solution 8 - node.js

You could use a Future:

myCollection.find({}, function(err, resultCursor) {
    resultCursor.count(Meteor.bindEnvironment(function(err,count){
        for(var i=0;i<count;i++)
        {
            var itemFuture=new Future();

            resultCursor.nextObject(function(err,item)){
                itemFuture.result(item);
            }

            var item=itemFuture.wait();
            //do what you want with the item, 
            //and continue with the loop if so

        }
    }));
});

Solution 9 - node.js

You could use simple setTimeOut's. This is an example in typescript running on nodejs (I am using promises via the 'when' module but it can be done without them as well):

        import mongodb = require("mongodb");

        var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {});
        var db =  new mongodb.Db('myDb', dbServer);

        var util = require('util');
        var when = require('when'); //npm install when

        var dbDefer = when.defer();
        db.open(function() {
            console.log('db opened...');
            dbDefer.resolve(db);
        });

        dbDefer.promise.then(function(db : mongodb.Db){
            db.collection('myCollection', function (error, dataCol){
                if(error) {
                    console.error(error); return;
                }

                var doneReading = when.defer();

                var processOneRecordAsync = function(record) : When.Promise{
                    var result = when.defer();

                    setTimeout (function() {
                        //simulate a variable-length operation
                        console.log(util.inspect(record));
                        result.resolve('record processed');
                    }, Math.random()*5);

                    return result.promise;
                }

                var runCursor = function (cursor : MongoCursor){
                    cursor.next(function(error : any, record : any){
                        if (error){
                            console.log('an error occurred: ' + error);
                            return;
                        }
                        if (record){
                            processOneRecordAsync(record).then(function(r){
                                setTimeout(function() {runCursor(cursor)}, 1);
                            });
                        }
                        else{
                            //cursor up
                            doneReading.resolve('done reading data.');
                        }
                    });
                }

                dataCol.find({}, function(error, cursor : MongoCursor){
                    if (!error)
                    {
                        setTimeout(function() {runCursor(cursor)}, 1);
                    }
                });

                doneReading.promise.then(function(message : string){
                    //message='done reading data'
                    console.log(message);
                });
            });
        });

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionUpTheCreekView Question on Stackoverflow
Solution 1 - node.jsuser993683View Answer on Stackoverflow
Solution 2 - node.jsTimothy StrimpleView Answer on Stackoverflow
Solution 3 - node.jsJaydeep SolankiView Answer on Stackoverflow
Solution 4 - node.jsDaphoqueView Answer on Stackoverflow
Solution 5 - node.jsuser3392439View Answer on Stackoverflow
Solution 6 - node.jsAntoine DesboisView Answer on Stackoverflow
Solution 7 - node.jsSalmanView Answer on Stackoverflow
Solution 8 - node.jsGerard CarbóView Answer on Stackoverflow
Solution 9 - node.jsLeoView Answer on Stackoverflow