Parallel foreach with asynchronous lambda

C#Async AwaitTask Parallel-Libraryparallel.foreach

C# Problem Overview


I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.

The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

The problem occurs with the count being 0, because all the threads created are effectively just background threads and the Parallel.ForEach call doesn't wait for completion. If I remove the async keyword, the method looks like this:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).

How can I implement a Parallel.ForEach loop, that uses the await keyword within the lambda? Is it possible?

The prototype of the Parallel.ForEach method takes an Action<T> as parameter, but I want it to wait for my asynchronous lambda.

C# Solutions


Solution 1 - C#

If you just want simple parallelism, you can do this:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

If you need something more complex, check out Stephen Toub's ForEachAsync post.

Solution 2 - C#

You can use the ParallelForEachAsync extension method from AsyncEnumerator NuGet Package:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Solution 3 - C#

One of the new .NET 6 APIs is Parallel.ForEachAsync, a way to schedule asynchronous work that allows you to control the degree of parallelism:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Another example see in SCOTT HANSELMAN blog

Solution 4 - C#

With SemaphoreSlim you can achieve parallelism control.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

Solution 5 - C#

Simplest possible extension method compiled from other answers and the article referenced by the accepted asnwer:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
	var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
	var tasks = source.Select(async item =>
	{
		await throttler.WaitAsync();
		try
		{
			await asyncAction(item).ConfigureAwait(false);
		}
		finally
		{
			throttler.Release();
		}
	});
	await Task.WhenAll(tasks);
}

UPDATE: here's a simple modification that also supports a cancellation token like requested in the comments (untested)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
	var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
	var tasks = source.Select(async item =>
	{
		await throttler.WaitAsync(cancellationToken);
		if (cancellationToken.IsCancellationRequested) return;

		try
		{
			await asyncAction(item, cancellationToken).ConfigureAwait(false);
		}
		finally
		{
			throttler.Release();
		}
	});
	await Task.WhenAll(tasks);
}

Solution 6 - C#

My lightweight implementation of ParallelForEach async.

Features:

  1. Throttling (max degree of parallelism).
  2. Exception handling (aggregation exception will be thrown at completion).
  3. Memory efficient (no need to store the list of tasks).

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.TrySetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

Usage example:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);

Solution 7 - C#

I've created an extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }
	

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
	async item =>
	{
		await SomeAsyncMethod(item);
	},
	5);
	

Solution 8 - C#

In the accepted answer the ConcurrentBag is not required. Here's an implementation without it:

var tasks = myCollection.Select(GetData).ToList();
await Task.WhenAll(tasks);
var results = tasks.Select(t => t.Result);

Any of the "// some pre stuff" and "// some post stuff" can go into the GetData implementation (or another method that calls GetData)

Aside from being shorter, there's no use of an "async void" lambda, which is an anti pattern.

Solution 9 - C#

With the introduction of .Net 6 Parallel.ForEachAsync is now available.

using System.Net.Http.Headers;
using System.Net.Http.Json;
 
var userHandlers = new []
{
    "users/okyrylchuk",
    "users/shanselman",
    "users/jaredpar",
    "users/davidfowl"
};
 
using HttpClient client = new()
{
    BaseAddress = new Uri("https://api.github.com"),
};
client.DefaultRequestHeaders.UserAgent.Add(new ProductInfoHeaderValue("DotNet", "6"));
 
ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 3
};
 
await Parallel.ForEachAsync(userHandlers, parallelOptions, async (uri, token) =>
{
    var user = await client.GetFromJsonAsync<GitHubUser>(uri, token);
 
    Console.WriteLine($"Name: {user.Name}\nBio: {user.Bio}\n");
});
 
public class GitHubUser
{
    public string Name { get; set; }
    public string  Bio { get; set; }
}

The complete issue tracking on github And some usage examples here by SCOTT HANSELMAN

Solution 10 - C#

The following is set to work with IAsyncEnumerable but can be modified to use IEnumerable by just changing the type and removing the "await" on the foreach. It's far more appropriate for large sets of data than creating countless parallel tasks and then awaiting them all.

    public static async Task ForEachAsyncConcurrent<T>(this IAsyncEnumerable<T> enumerable, Func<T, Task> action, int maxDegreeOfParallelism, int? boundedCapacity = null)
    {
        ActionBlock<T> block = new ActionBlock<T>(
           action, 
           new ExecutionDataflowBlockOptions 
           { 
             MaxDegreeOfParallelism = maxDegreeOfParallelism, 
             BoundedCapacity = boundedCapacity ?? maxDegreeOfParallelism * 3 
           });

        await foreach (T item in enumerable)
        {
           await block.SendAsync(item).ConfigureAwait(false);
        }

        block.Complete();
        await block.Completion;
    }

Solution 11 - C#

For a more simple solution (not sure if the most optimal), you can simply nest Parallel.ForEach inside a Task - as such

var options = new ParallelOptions { MaxDegreeOfParallelism = 5 }
Task.Run(() =>
{
    Parallel.ForEach(myCollection, options, item =>
    {
        DoWork(item);
    }
}

The ParallelOptions will do the throttlering for you, out of the box.

I am using it in a real world scenario to run a very long operations in the background. These operations are called via HTTP and it was designed not to block the HTTP call while the long operation is running.

  1. Calling HTTP for long background operation.
  2. Operation starts at the background.
  3. User gets status ID which can be used to check the status using another HTTP call.
  4. The background operation update its status.

That way, the CI/CD call does not timeout because of long HTTP operation, rather it loops the status every x seconds without blocking the process

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionclausndkView Question on Stackoverflow
Solution 1 - C#Stephen ClearyView Answer on Stackoverflow
Solution 2 - C#Serge SemenovView Answer on Stackoverflow
Solution 3 - C#Majid ShahabfarView Answer on Stackoverflow
Solution 4 - C#Felipe lView Answer on Stackoverflow
Solution 5 - C#Alex from JitbitView Answer on Stackoverflow
Solution 6 - C#nicolas2008View Answer on Stackoverflow
Solution 7 - C#Jay ShahView Answer on Stackoverflow
Solution 8 - C#TomView Answer on Stackoverflow
Solution 9 - C#Mahesh BonganiView Answer on Stackoverflow
Solution 10 - C#Caleb HoltView Answer on Stackoverflow
Solution 11 - C#Gravity APIView Answer on Stackoverflow