How to limit the amount of concurrent async I/O operations?

C#AsynchronousAsync AwaitConcurrencyTask Parallel-Library

C# Problem Overview


// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
});

Here is the problem, it starts 1000+ simultaneous web requests. Is there an easy way to limit the concurrent amount of these async http requests? So that no more than 20 web pages are downloaded at any given time. How to do it in the most efficient manner?

C# Solutions


Solution 1 - C#

You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5

If you look at our beloved SemaphoreSlim class (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...) series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)

Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What’s New for Parallelism in .NET 4.5 Beta.

Last, here's some sample code about how to use SemaphoreSlim for async method throttling:

public async Task MyOuterMethod()
{
    // let's say there is a list of 1000+ URLs
    var urls = { "http://google.com", "http://yahoo.com", ... };

    // now let's send HTTP requests to each of these URLs in parallel
    var allTasks = new List<Task>();
    var throttler = new SemaphoreSlim(initialCount: 20);
    foreach (var url in urls)
    {
        // do an async wait until we can schedule again
        await throttler.WaitAsync();

        // using Task.Run(...) to run the lambda in its own parallel
        // flow on the threadpool
        allTasks.Add(
            Task.Run(async () =>
            {
                try
                {
                    var client = new HttpClient();
                    var html = await client.GetStringAsync(url);
                }
                finally
                {
                    throttler.Release();
                }
            }));
    }

    // won't get here until all urls have been put into tasks
    await Task.WhenAll(allTasks);

    // won't get here until all tasks have completed in some way
    // (either success or exception)
}

Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:

See also TaskScheduler .

Solution 2 - C#

If you have an IEnumerable (ie. strings of URL s) and you want to do an I/O bound operation with each of these (ie. make an async http request) concurrently AND optionally you also want to set the maximum number of concurrent I/O requests in real time, here is how you can do that. This way you do not use thread pool et al, the method uses semaphoreslim to control max concurrent I/O requests similar to a sliding window pattern one request completes, leaves the semaphore and the next one gets in.

usage:

await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
		IEnumerable<TIn> inputEnumerable,
		Func<TIn, Task> asyncProcessor,
		int? maxDegreeOfParallelism = null)
	{
		int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
		SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);

		IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
		{
			await throttler.WaitAsync().ConfigureAwait(false);
			try
			{
				await asyncProcessor(input).ConfigureAwait(false);
			}
			finally
			{
				throttler.Release();
			}
		});

		return Task.WhenAll(tasks);
	}

Solution 3 - C#

There are a lot of pitfalls and direct use of a semaphore can be tricky in error cases, so I would suggest to use AsyncEnumerator NuGet Package instead of re-inventing the wheel:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };

// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
    var client = new HttpClient();
    var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);

Solution 4 - C#

Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in.

Look at the AsyncSemaphore class built by the most respectable Stephen Toub. What you want is called a semaphore, and you need an async version of it.

Solution 5 - C#

SemaphoreSlim can be very helpful here. Here's the extension method I've created.

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxActionsToRunInParallel">Optional, max numbers of the actions to run in parallel,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxActionsToRunInParallel = null)
    {
        if (maxActionsToRunInParallel.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxActionsToRunInParallel.Value, maxActionsToRunInParallel.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all of the provided tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

await enumerable.ForEachAsyncConcurrent(
	async item =>
	{
		await SomeAsyncMethod(item);
	},
	5);

Solution 6 - C#

After the release of the .NET 6 (in November, 2021), the recommended way of limiting the amount of concurrent asynchronous I/O operations is the Parallel.ForEachAsync API, with the MaxDegreeOfParallelism configuration. Here is how it can be used in practice:

// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", /*...*/ };
var client = new HttpClient();
var options = new ParallelOptions() { MaxDegreeOfParallelism = 20 };

// now let's send HTTP requests to each of these URLs in parallel
await Parallel.ForEachAsync(urls, options, async (url, cancellationToken) =>
{
    var html = await client.GetStringAsync(url, cancellationToken);
});

In the above example the Parallel.ForEachAsync task is awaited asynchronously. You can also Wait it synchronously if you need to, which will block the current thread until the completion of all asynchronous operations. The synchronous Wait has the advantage that in case of errors, all exceptions will be propagated. On the contrary the await operator propagates by design only the first exception. In case this is a problem, you can find solutions [here][6].

(Note: an idiomatic implementation of a ForEachAsync extension method that also propagates the results, can be found in the [4th revision][7] of this answer)

[6]: https://stackoverflow.com/questions/18314961/i-want-await-to-throw-aggregateexception-not-just-the-first-exception "I want await to throw AggregateException, not just the first Exception" [7]: https://stackoverflow.com/revisions/64455549/4

Solution 7 - C#

Although 1000 tasks might be queued very quickly, the Parallel Tasks library can only handle concurrent tasks equal to the amount of CPU cores in the machine. That means that if you have a four-core machine, only 4 tasks will be executing at a given time (unless you lower the MaxDegreeOfParallelism).

Solution 8 - C#

this is not good practice as it changes a global variable. it is also not a general solution for async. but it is easy for all instances of HttpClient, if that's all you're after. you can simply try:

System.Net.ServicePointManager.DefaultConnectionLimit = 20;

Solution 9 - C#

Here is a handy Extension Method you can create to wrap a list of tasks such that they will be executed with a maximum degree of concurrency:

/// <summary>Allows to do any async operation in bulk while limiting the system to a number of concurrent items being processed.</summary>
private static IEnumerable<Task<T>> WithMaxConcurrency<T>(this IEnumerable<Task<T>> tasks, int maxParallelism)
{
    SemaphoreSlim maxOperations = new SemaphoreSlim(maxParallelism);
    // The original tasks get wrapped in a new task that must first await a semaphore before the original task is called.
    return tasks.Select(task => maxOperations.WaitAsync().ContinueWith(_ =>
    {
        try { return task; }
        finally { maxOperations.Release(); }
    }).Unwrap());
}

Now instead of:

await Task.WhenAll(someTasks);

You can go

await Task.WhenAll(someTasks.WithMaxConcurrency(20));

Solution 10 - C#

Parallel computations should be used for speeding up CPU-bound operations. Here we are talking about I/O bound operations. Your implementation should be purely async, unless you're overwhelming the busy single core on your multi-core CPU.

EDIT I like the suggestion made by usr to use an "async semaphore" here.

Solution 11 - C#

Essentially you're going to want to create an Action or Task for each URL that you want to hit, put them in a List, and then process that list, limiting the number that can be processed in parallel.

My blog post shows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.

With Actions

If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 20 threads in parallel.

var listOfActions = new List<Action>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}

var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());

With Tasks

With Tasks there is no built-in function. However, you can use the one that I provide on my blog.

	/// <summary>
	/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
	/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
	/// </summary>
	/// <param name="tasksToRun">The tasks to run.</param>
	/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
	/// <param name="cancellationToken">The cancellation token.</param>
	public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
	{
		await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
	}

	/// <summary>
	/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
	/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
	/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
	/// </summary>
	/// <param name="tasksToRun">The tasks to run.</param>
	/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
	/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
	/// <param name="cancellationToken">The cancellation token.</param>
	public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
	{
		// Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
		var tasks = tasksToRun.ToList();

		using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
		{
			var postTaskTasks = new List<Task>();

			// Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
			tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));

			// Start running each task.
			foreach (var task in tasks)
			{
				// Increment the number of tasks currently running and wait if too many are running.
				await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);

				cancellationToken.ThrowIfCancellationRequested();
				task.Start();
			}

			// Wait for all of the provided tasks to complete.
			// We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
			await Task.WhenAll(postTaskTasks.ToArray());
		}
	}

And then creating your list of Tasks and calling the function to have them run, with say a maximum of 20 simultaneous at a time, you could do this:

var listOfTasks = new List<Task>();
foreach (var url in urls)
{
    var localUrl = url;
    // Note that we create the Task here, but do not start it.
    listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);

Solution 12 - C#

Use MaxDegreeOfParallelism, which is an option you can specify in https://msdn.microsoft.com/en-us/library/system.threading.tasks.parallel.foreach%28v=vs.110%29.aspx">`Parallel.ForEach()`</a>;:

var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };

Parallel.ForEach(urls, options,
    url =>
        {
            var client = new HttpClient();
            var html = client.GetStringAsync(url);
            // do stuff with html
        });

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionGrief CoderView Question on Stackoverflow
Solution 1 - C#Theo YaungView Answer on Stackoverflow
Solution 2 - C#Dogu ArslanView Answer on Stackoverflow
Solution 3 - C#Serge SemenovView Answer on Stackoverflow
Solution 4 - C#usrView Answer on Stackoverflow
Solution 5 - C#Jay ShahView Answer on Stackoverflow
Solution 6 - C#Theodor ZouliasView Answer on Stackoverflow
Solution 7 - C#scottmView Answer on Stackoverflow
Solution 8 - C#symbiontView Answer on Stackoverflow
Solution 9 - C#AlainView Answer on Stackoverflow
Solution 10 - C#GregCView Answer on Stackoverflow
Solution 11 - C#deadlydogView Answer on Stackoverflow
Solution 12 - C#Sean UView Answer on Stackoverflow