What are the differences between ConcurrentQueue and BlockingCollection in .Net?
.NetGenericsC# 4.0.Net Problem Overview
What are the differences between ConcurrentQueue
and BlockingCollection
in .Net?
Why BlockingCollection
is best for producer-consumer operation when it can be done through ConcurrentQueue
? Do I have to improve anything in the following code?
MessageSlotMachineGameStartOrAndStatusUpdate msg;
while (!aCancellationToken.IsCancellationRequested)
{
try
{
this.isStillConsumingMsg = true;
Boolean takeResult = this.msgQueue.TryTake(out msg, this.msgConsumeTimeOut, aCancellationToken);
if (takeResult)
{
if (msg != null)
{
this.ProcessMessage(msg);
}
}
else
{
break;
}
}
catch (OperationCanceledException err)
{
EngineManager.AddExceptionLog(err, "Signal Operation Canceled");
}
catch (Exception err)
{
EngineManager.AddExceptionLog(err, "Signal exception");
}
finally
{
this.isStillConsumingMsg = false;
}
}
.Net Solutions
Solution 1 - .Net
BlockingCollection
has a Take
method that would block the consumer if there is nothing to take, and wait for a producer side to provide an item. ConcurrentQueue
lacks such method - if it is empty, the consumer would need to handle the wait, and the producer would need to provide a non-empty notification.
Solution 2 - .Net
The BlockingCollection
From Microsoft BlockingCollection:
> When you create a BlockingCollection
The BlockingCollection has a Take() blocking method (hence the name), but it also has a very interesting GetConsumingEnumerable() method which allows you to loop indefinitely : the code will enter the loop inside code only when something is added to collection. See albahari.com excellent online ebook about Threading.
Here is the code sample from this website:
public class PCQueue : IDisposable
{
BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();
public PCQueue (int workerCount)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew (Consume);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public void EnqueueTask (Action action) { _taskQ.Add (action); }
void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called.
foreach (Action action in _taskQ.GetConsumingEnumerable())
action(); // Perform task.
}
}