How can I use Async with ForEach?

C#Async Await

C# Problem Overview


Is it possible to use Async when using ForEach? Below is the code I am trying:

using (DataContext db = new DataLayer.DataContext())
{
    db.Groups.ToList().ForEach(i => async {
        await GetAdminsFromGroup(i.Gid);
    });
}

I am getting the error:

>The name 'Async' does not exist in the current context

The method the using statement is enclosed in is set to async.

C# Solutions


Solution 1 - C#

List<T>.ForEach doesn't play particularly well with async (neither does LINQ-to-objects, for the same reasons).

In this case, I recommend projecting each element into an asynchronous operation, and you can then (asynchronously) wait for them all to complete.

using (DataContext db = new DataLayer.DataContext())
{
    var tasks = db.Groups.ToList().Select(i => GetAdminsFromGroupAsync(i.Gid));
    var results = await Task.WhenAll(tasks);
}

The benefits of this approach over giving an async delegate to ForEach are:

  1. Error handling is more proper. Exceptions from async void cannot be caught with catch; this approach will propagate exceptions at the await Task.WhenAll line, allowing natural exception handling.
  2. You know that the tasks are complete at the end of this method, since it does an await Task.WhenAll. If you use async void, you cannot easily tell when the operations have completed.
  3. This approach has a natural syntax for retrieving the results. GetAdminsFromGroupAsync sounds like it's an operation that produces a result (the admins), and such code is more natural if such operations can return their results rather than setting a value as a side effect.

Solution 2 - C#

This little extension method should give you exception-safe async iteration:

public static async Task ForEachAsync<T>(this List<T> list, Func<T, Task> func)
{
    foreach (var value in list)
    {
        await func(value);
    }
}

Since we're changing the return type of the lambda from void to Task, exceptions will propagate up correctly. This will allow you to write something like this in practice:

await db.Groups.ToList().ForEachAsync(async i => {
    await GetAdminsFromGroup(i.Gid);
});

Solution 3 - C#

Starting with C# 8.0, you can create and consume streams asynchronously.

    private async void button1_Click(object sender, EventArgs e)
    {
        IAsyncEnumerable<int> enumerable = GenerateSequence();

        await foreach (var i in enumerable)
        {
            Debug.WriteLine(i);
        }
    }

    public static async IAsyncEnumerable<int> GenerateSequence()
    {
        for (int i = 0; i < 20; i++)
        {
            await Task.Delay(100);
            yield return i;
        }
    }

More

Solution 4 - C#

The simple answer is to use the foreach keyword instead of the ForEach() method of List().

using (DataContext db = new DataLayer.DataContext())
{
    foreach(var i in db.Groups)
    {
        await GetAdminsFromGroup(i.Gid);
    }
}

Solution 5 - C#

Here is an actual working version of the above async foreach variants with sequential processing:

public static async Task ForEachAsync<T>(this List<T> enumerable, Action<T> action)
{
    foreach (var item in enumerable)
        await Task.Run(() => { action(item); }).ConfigureAwait(false);
}

Here is the implementation:

public async void SequentialAsync()
{
    var list = new List<Action>();

    Action action1 = () => {
	    //do stuff 1
    };

    Action action2 = () => {
	    //do stuff 2
    };

    list.Add(action1);
    list.Add(action2);

    await list.ForEachAsync();
}

What's the key difference? .ConfigureAwait(false); which keeps the context of main thread while async sequential processing of each task.

Solution 6 - C#

The problem was that the async keyword needs to appear before the lambda, not before the body:

db.Groups.ToList().ForEach(async (i) => {
    await GetAdminsFromGroup(i.Gid);
});

Solution 7 - C#

Add this extension method

public static class ForEachAsyncExtension
{
    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(from partition in Partitioner.Create(source).GetPartitions(dop) 
            select Task.Run(async delegate
            {
                using (partition)
                    while (partition.MoveNext())
                        await body(partition.Current).ConfigureAwait(false);
            }));
    }
}

And then use like so:

Task.Run(async () =>
{
    var s3 = new AmazonS3Client(Config.Instance.Aws.Credentials, Config.Instance.Aws.RegionEndpoint);
    var buckets = await s3.ListBucketsAsync();

    foreach (var s3Bucket in buckets.Buckets)
    {
        if (s3Bucket.BucketName.StartsWith("mybucket-"))
        {
            log.Information("Bucket => {BucketName}", s3Bucket.BucketName);

            ListObjectsResponse objects;
            try
            {
                objects = await s3.ListObjectsAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error getting objects. Bucket => {BucketName}", s3Bucket.BucketName);
                continue;
            }
            
            // ForEachAsync (4 is how many tasks you want to run in parallel)
            await objects.S3Objects.ForEachAsync(4, async s3Object =>
            {
                try
                {
                    log.Information("Bucket => {BucketName} => {Key}", s3Bucket.BucketName, s3Object.Key);
                    await s3.DeleteObjectAsync(s3Bucket.BucketName, s3Object.Key);
                }
                catch
                {
                    log.Error("Error deleting bucket {BucketName} object {Key}", s3Bucket.BucketName, s3Object.Key);
                }
            });

            try
            {
                await s3.DeleteBucketAsync(s3Bucket.BucketName);
            }
            catch
            {
                log.Error("Error deleting bucket {BucketName}", s3Bucket.BucketName);
            }
        }
    }
}).Wait();

Solution 8 - C#

If you are using EntityFramework.Core there is an extension method ForEachAsync.

The example usage looks like this:

using Microsoft.EntityFrameworkCore;
using System.Threading.Tasks;

public class Example
{
    private readonly DbContext _dbContext;
    public Example(DbContext dbContext)
    {
        _dbContext = dbContext;
    }
    public async void LogicMethod()
    {
        
        await _dbContext.Set<dbTable>().ForEachAsync(async x =>
        {
            //logic
            await AsyncTask(x);
        });
    }

    public async Task<bool> AsyncTask(object x)
    {
        //other logic
        return await Task.FromResult<bool>(true);
    }
}

Solution 9 - C#

This is method I created to handle async scenarios with ForEach.

  • If one of tasks fails then other tasks will continue their execution.
  • You have ability to add function that will be executed on every exception.
  • Exceptions are being collected as aggregateException at the end and are available for you.
  • Can handle CancellationToken
 public static class ParallelExecutor
    {
        /// <summary>
        /// Executes asynchronously given function on all elements of given enumerable with task count restriction.
        /// Executor will continue starting new tasks even if one of the tasks throws. If at least one of the tasks throwed exception then <see cref="AggregateException"/> is throwed at the end of the method run.
        /// </summary>
        /// <typeparam name="T">Type of elements in enumerable</typeparam>
        /// <param name="maxTaskCount">The maximum task count.</param>
        /// <param name="enumerable">The enumerable.</param>
        /// <param name="asyncFunc">asynchronous function that will be executed on every element of the enumerable. MUST be thread safe.</param>
        /// <param name="onException">Acton that will be executed on every exception that would be thrown by asyncFunc. CAN be thread unsafe.</param>
        /// <param name="cancellationToken">The cancellation token.</param>
        public static async Task ForEachAsync<T>(int maxTaskCount, IEnumerable<T> enumerable, Func<T, Task> asyncFunc, Action<Exception> onException = null, CancellationToken cancellationToken = default)
        {
            using var semaphore = new SemaphoreSlim(initialCount: maxTaskCount, maxCount: maxTaskCount);

            // This `lockObject` is used only in `catch { }` block.
            object lockObject = new object();
            var exceptions = new List<Exception>();
            var tasks = new Task[enumerable.Count()];
            int i = 0;

            try
            {
                foreach (var t in enumerable)
                {
                    await semaphore.WaitAsync(cancellationToken);
                    tasks[i++] = Task.Run(
                        async () =>
                        {
                            try
                            {
                                await asyncFunc(t);
                            }
                            catch (Exception e)
                            {
                                if (onException != null)
                                {
                                    lock (lockObject)
                                    {
                                        onException.Invoke(e);
                                    }
                                }

                                // This exception will be swallowed here but it will be collected at the end of ForEachAsync method in order to generate AggregateException.
                                throw;
                            }
                            finally
                            {
                                semaphore.Release();
                            }
                        }, cancellationToken);

                    if (cancellationToken.IsCancellationRequested)
                    {
                        break;
                    }
                }
            }
            catch (OperationCanceledException e)
            {
                exceptions.Add(e);
            }

            foreach (var t in tasks)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    break;
                }

                // Exception handling in this case is actually pretty fast.
                // https://gist.github.com/shoter/d943500eda37c7d99461ce3dace42141
                try
                {
                    await t;
                }
#pragma warning disable CA1031 // Do not catch general exception types - we want to throw that exception later as aggregate exception. Nothing wrong here.
                catch (Exception e)
#pragma warning restore CA1031 // Do not catch general exception types
                {
                    exceptions.Add(e);
                }
            }

            if (exceptions.Any())
            {
                throw new AggregateException(exceptions);
            }
        }
    }

Solution 10 - C#

I would like to add that there is a Parallel class with ForEach function built in that can be used for this purpose.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionJames JefferyView Question on Stackoverflow
Solution 1 - C#Stephen ClearyView Answer on Stackoverflow
Solution 2 - C#JD CourtoyView Answer on Stackoverflow
Solution 3 - C#Andrei KrasutskiView Answer on Stackoverflow
Solution 4 - C#RubberDuckView Answer on Stackoverflow
Solution 5 - C#mrogunlanaView Answer on Stackoverflow
Solution 6 - C#James JefferyView Answer on Stackoverflow
Solution 7 - C#superlogicalView Answer on Stackoverflow
Solution 8 - C#ElConradoView Answer on Stackoverflow
Solution 9 - C#ShoterView Answer on Stackoverflow
Solution 10 - C#Luk164View Answer on Stackoverflow