How to limit the Maximum number of parallel tasks in c#

C#.NetAsynchronous

C# Problem Overview


I have a collection of 1000 input message to process. I'm looping the input collection and starting the new task for each message to get processed.

//Assume this messages collection contains 1000 items
var messages = new List<string>();

foreach (var msg in messages)
{
   Task.Factory.StartNew(() =>
   {
    Process(msg);
   });
 }

Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time?

How to ensure this message get processed in the same sequence/order of the Collection?

C# Solutions


Solution 1 - C#

You could use Parallel.Foreach and rely on MaxDegreeOfParallelism instead.

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10},
msg =>
{
     // logic
     Process(msg);
});

Solution 2 - C#

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    List<Task> tasks = new List<Task>();
	foreach(var msg in messages)
    {
        concurrencySemaphore.Wait();

        var t = Task.Factory.StartNew(() =>
        {
            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });
		
		tasks.Add(t);
    }
	
	Task.WaitAll(tasks.ToArray());
}

Answer to Comments for those who want to see how semaphore can be disposed without Task.WaitAll Run below code in console app and this exception will be raised.

> System.ObjectDisposedException: 'The semaphore has been disposed.'

static void Main(string[] args)
{
    int maxConcurrency = 5;
    List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach (var msg in messages)
        {
            concurrencySemaphore.Wait();

            var t = Task.Factory.StartNew(() =>
            {
                try
                {
                    Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });

            tasks.Add(t);
        }

       // Task.WaitAll(tasks.ToArray());
    }
    Console.WriteLine("Exited using block");
    Console.ReadKey();
}

private static void Process(string msg)
{            
    Thread.Sleep(2000);
    Console.WriteLine(msg);
}

Solution 3 - C#

I think it would be better to use Parallel LINQ

  Parallel.ForEach(messages ,
     new ParallelOptions{MaxDegreeOfParallelism = 4},
            x => Process(x);
        );

where x is the MaxDegreeOfParallelism

Solution 4 - C#

With .NET 5.0 and Core 3.0 channels were introduced.
The main benefit of this producer/consumer concurrency pattern is that you can also limit the input data processing to reduce resource impact.
This is especially helpful when processing millions of data records.
Instead of reading the whole dataset at once into memory, you can now consecutively query only chunks of the data and wait for the workers to process it before querying more.

Code sample with a queue capacity of 50 messages and 5 consumer threads:

/// <exception cref="System.AggregateException">Thrown on Consumer Task exceptions.</exception>
public static async Task ProcessMessages(List<string> messages)
{
    const int producerCapacity = 10, consumerTaskLimit = 3;
    var channel = Channel.CreateBounded<string>(producerCapacity);

    _ = Task.Run(async () =>
    {
        foreach (var msg in messages)
        {
            await channel.Writer.WriteAsync(msg);
            // blocking when channel is full
            // waiting for the consumer tasks to pop messages from the queue
        }

        channel.Writer.Complete();
        // signaling the end of queue so that 
        // WaitToReadAsync will return false to stop the consumer tasks
    });

    var tokenSource = new CancellationTokenSource();
    CancellationToken ct = tokenSource.Token;

    var consumerTasks = Enumerable
    .Range(1, consumerTaskLimit)
    .Select(_ => Task.Run(async () =>
    {
        try
        {
            while (await channel.Reader.WaitToReadAsync(ct))
            {
                ct.ThrowIfCancellationRequested();
                while (channel.Reader.TryRead(out var message))
                {
                    await Task.Delay(500);
                    Console.WriteLine(message);
                }
            }
        }
        catch (OperationCanceledException) { }
        catch
        {
            tokenSource.Cancel();
            throw;
        }
    }))
    .ToArray();

    Task waitForConsumers = Task.WhenAll(consumerTasks);
    try { await waitForConsumers; }
    catch
    {
        foreach (var e in waitForConsumers.Exception.Flatten().InnerExceptions)
            Console.WriteLine(e.ToString());

        throw waitForConsumers.Exception.Flatten();
    }
}

As pointed out by Theodor Zoulias: On multiple consumer exceptions, the remaining tasks will continue to run and have to take the load of the killed tasks. To avoid this, I implemented a CancellationToken to stop all the remaining tasks and handle the exceptions combined in the AggregateException of waitForConsumers.Exception.

Side note:
The Task Parallel Library (TPL) might be good at automatically limiting the tasks based on your local resources. But when you are processing data remotely via RPC, it's necessary to manually limit your RPC calls to avoid filling the network/processing stack!

Solution 5 - C#

If your Process method is async you can't use Task.Factory.StartNew as it doesn't play well with an async delegate. Also there are some other nuances when using it (see this for example).

The proper way to do it in this case is to use Task.Run. Here's @ClearLogic answer modified for an async Process method.

static void Main(string[] args)
{
    int maxConcurrency = 5;
    List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
    {
        List<Task> tasks = new List<Task>();
        foreach (var msg in messages)
        {
            concurrencySemaphore.Wait();

            var t = Task.Run(async () =>
            {
                try
                {
                    await Process(msg);
                }
                finally
                {
                    concurrencySemaphore.Release();
                }
            });

            tasks.Add(t);
        }

       Task.WaitAll(tasks.ToArray());
    }
    Console.WriteLine("Exited using block");
    Console.ReadKey();
}

private static async Task Process(string msg)
{            
    await Task.Delay(2000);
    Console.WriteLine(msg);
}

Solution 6 - C#

If you need in-order queuing (processing might finish in any order), there is no need for a semaphore. Old fashioned if statements work fine:

        const int maxConcurrency = 5;
        List<Task> tasks = new List<Task>();
        foreach (var arg in args)
        {
            var t = Task.Run(() => { Process(arg); } );

            tasks.Add(t);

            if(tasks.Count >= maxConcurrency)
                Task.WaitAny(tasks.ToArray());
        }

        Task.WaitAll(tasks.ToArray());

Solution 7 - C#

You can create your own TaskScheduler and override QueueTask there.

protected virtual void QueueTask(Task task)

Then you can do anything you like.

One example here:

https://stackoverflow.com/questions/13377365/limited-concurrency-level-task-scheduler-with-task-priority-handling-wrapped-t

Solution 8 - C#

You can simply set the max concurrency degree like this way:

int maxConcurrency=10;
var messages = new List<1000>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    foreach(var msg in messages)
    {
        Task.Factory.StartNew(() =>
        {
            concurrencySemaphore.Wait();
            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });
    }
}

Solution 9 - C#

 public static void RunTasks(List<NamedTask> importTaskList)
    {
        List<NamedTask> runningTasks = new List<NamedTask>();

        try
        {
            foreach (NamedTask currentTask in importTaskList)
            {
                currentTask.Start();
                runningTasks.Add(currentTask);

                if (runningTasks.Where(x => x.Status == TaskStatus.Running).Count() >= MaxCountImportThread)
                {
                    Task.WaitAny(runningTasks.ToArray());
                }
            }

            Task.WaitAll(runningTasks.ToArray());
        }
        catch (Exception ex)
        {
            Log.Fatal("ERROR!", ex);
        }
    }

Solution 10 - C#

you can use the BlockingCollection, If the consume collection limit has reached, the produce will stop producing until a consume process will finish. I find this pattern more easy to understand and implement than the SemaphoreSlim.

int TasksLimit = 10;
BlockingCollection<Task> tasks = new BlockingCollection<Task>(new ConcurrentBag<Task>(), TasksLimit);

void ProduceAndConsume()
{
    var producer = Task.Factory.StartNew(RunProducer);
    var consumer = Task.Factory.StartNew(RunConsumer);

    try
    {
        Task.WaitAll(new[] { producer, consumer });
    }
    catch (AggregateException ae) { }
}

void RunConsumer()
{
    foreach (var task in tasks.GetConsumingEnumerable())
    {
        task.Start();
    }
}

void RunProducer()
{
    for (int i = 0; i < 1000; i++)
    {
        tasks.Add(new Task(() => Thread.Sleep(1000), TaskCreationOptions.AttachedToParent));
    }
}

Note that the RunProducer and RunConsumer has spawn two independent tasks.

Solution 11 - C#

I ran into a similar problem where I wanted to produce 5000 results while calling apis, etc. So, I ran some speed tests.

Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
{
    new ParallelOptions { MaxDegreeOfParallelism = 100 };
    GetProductMetaData(productsMetaData, client, id).GetAwaiter().GetResult();
});

produced 100 results in 30 seconds.

Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
{
    new ParallelOptions { MaxDegreeOfParallelism = 100 };
    GetProductMetaData(productsMetaData, client, id);
});

Moving the GetAwaiter().GetResult() to the individual async api calls inside GetProductMetaData resulted in 14.09 seconds to produce 100 results.

foreach (var id in ids.Take(100))
{
    GetProductMetaData(productsMetaData, client, id);
}

Complete non-async programming with the GetAwaiter().GetResult() in api calls resulted in 13.417 seconds.

var tasks = new List<Task>();
while (y < ids.Count())
{
    foreach (var id in ids.Skip(y).Take(100))
    {
        tasks.Add(GetProductMetaData(productsMetaData, client, id));
    }

    y += 100;
    Task.WhenAll(tasks).GetAwaiter().GetResult();
    Console.WriteLine($"Finished {y}, {sw.Elapsed}");
}

Forming a task list and working through 100 at a time resulted in a speed of 7.36 seconds.

            using (SemaphoreSlim cons = new SemaphoreSlim(10))
            {
                var tasks = new List<Task>();
                foreach (var id in ids.Take(100))
                {
                    cons.Wait();
                    var t = Task.Factory.StartNew(() =>
                    {
                        try
                        {
                            GetProductMetaData(productsMetaData, client, id);
                        }
                        finally
                        {
                            cons.Release();
                        }
                    });

                    tasks.Add(t);
                }

                Task.WaitAll(tasks.ToArray());
            }

Using SemaphoreSlim resulted in 13.369 seconds, but also took a moment to boot to start using it.

var throttler = new SemaphoreSlim(initialCount: take);
foreach (var id in ids)
{
    throttler.WaitAsync().GetAwaiter().GetResult();
    tasks.Add(Task.Run(async () =>
    {
        try
        {
            skip += 1;
            await GetProductMetaData(productsMetaData, client, id);

            if (skip % 100 == 0)
            {
                Console.WriteLine($"started {skip}/{count}, {sw.Elapsed}");
            }
        }
        finally
        {
            throttler.Release();
        }
    }));
}

Using Semaphore Slim with a throttler for my async task took 6.12 seconds.

The answer for me in this specific project was use a throttler with Semaphore Slim. Although the while foreach tasklist did sometimes beat the throttler, 4/6 times the throttler won for 1000 records.

I realize I'm not using the OPs code, but I think this is important and adds to this discussion because how is sometimes not the only question that should be asked, and the answer is sometimes "It depends on what you are trying to do."

Now to answer the specific questions:

  1. How to limit the maximum number of parallel tasks in c#: I showed how to limit the number of tasks that are completed at a time.
  2. Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time? I cannot guess how many will be processed at a time unless I set an upper limit but I can set an upper limit. Obviously different computers function at different speeds due to CPU, RAM etc. and how many threads and cores the program itself has access to as well as other programs running in tandem on the same computer.
  3. How to ensure this message get processed in the same sequence/order of the Collection? If you want to process everything in a specific order, it is synchronous programming. The point of being able to run things asynchronously is ensuring that they can do everything without an order. As you can see from my code, the time difference is minimal in 100 records unless you use async code. In the event that you need an order to what you are doing, use asynchronous programming up until that point, then await and do things synchronously from there. For example, task1a.start, task2a.start, then later task1a.await, task2a.await... then later task1b.start task1b.await and task2b.start task 2b.await.

Attributions

All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionMathiyazhaganView Question on Stackoverflow
Solution 1 - C#Hari PrasadView Answer on Stackoverflow
Solution 2 - C#ClearLogicView Answer on Stackoverflow
Solution 3 - C#Жорик ПургидзеView Answer on Stackoverflow
Solution 4 - C#5andr0View Answer on Stackoverflow
Solution 5 - C#empzView Answer on Stackoverflow
Solution 6 - C#Neil HuntView Answer on Stackoverflow
Solution 7 - C#Serve LaurijssenView Answer on Stackoverflow
Solution 8 - C#error_handlerView Answer on Stackoverflow
Solution 9 - C#DanielView Answer on Stackoverflow
Solution 10 - C#Shahar ShokraniView Answer on Stackoverflow
Solution 11 - C#Patrick KnottView Answer on Stackoverflow