Menü Schließen

A Thread Safe Queue with Multiple Worker Tasks

In this post I will explain how to create a thread safe queue that can hold multiple tasks. You will be able to add multiple tasks to the queue, while a defined number of tasks are executed in parallel.

The tasks are executed in the background while the UI of the application remains fully responsive.

Create a Project and Install Required NuGet Package

Create a new .NET project in Visual Studio, right click on the solution in the Solution Explorer and select „Manage NuGet Packages for Solution“.

Install the latest version of System.Threading.Tasks.Dataflow from Microsoft.

Create a Processing Item

The processing item contains the logic to start/stop/pause/resume the work of a single task. This is created in a versatile way that we can not only create tasks of the same type.

IProcessingItem is a simple interface that provides methods to start and stop a work. It might contain anything, a long-running calculation, loading data from a web service, and so on. It also has events that provide progress information. It is a custom implementation, not part of the Microsoft library. The interface is just shown here as reference.

public interface IProcessingItem : IEquatable<IProcessingItem>
{
  event EventHandler ProgressUpdated;
  int ID { get; }
  string TaskDescription { get; }
  double Progress { get; }
  string StatusMessage { get; }
  void Start();
  void Cancel();
  void Pause();
  void Resume();
  void IsInQueue();
}

Please take a look at attached example for a concrete implementation of a PathProcessingItem. This is just a made up example class that simulates some work by reading files in a directory.

Create a Processing Queue

Now we have to create a ProcessingQueue class that will hold the tasks executes them. This is a static class. It uses the ActionBlock<T> collection that contains the IProcessingItems.

public static class ProcessingQueue
{
  private static readonly ActionBlock<IProcessingItem> _queue = new ActionBlock<IProcessingItem>(Process, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

  private static void Process(IProcessingItem item)
  {
    item.Start();
  }

  public static void AddToQueue(IProcessingItem item)
  {
    item.IsInQueue();
    _queue.Post(item);
  }

  public static int ItemsInQueue => _queue.InputCount;
}

The constructor of ActionBlock<IProcessingItem> takes two parameters. A lambda expression, defining what method is triggered when working on an task starts. this is the Process(IProcessingItem) method in the example. The second parameter defines some execution options, for example, how many tasks are executed in parallel.

By calling AddToQueue(IProcessingItem), a task is added to the queue.

When an IProcessingItem is added to the queue and the number of running tasks is lower than the maximum executed task, it is started automatically by the queue by calling the Process(IProcessingItem) method. If the number of running tasks is equal to the maximum, the item will simply be added to the queue and execution starts as soon as another task is completed.

The property ItemsInQueue returns the number of items currently in the queue.

Example Project

Download the example project (source code, 7-zip file):

Ähnliche Beiträge

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert