Параллельное программирование: обработка списка динамических очередей

У меня есть словарь предметов разных типов. Словарь изменяется путем добавления новых записей и элементов к существующим записям.

Какой эффективный способ обрабатывать записи параллельно. Когда словарная запись 1 содержит 1 элемент, а словарная запись 2 содержит 200 элементов. Когда новый элемент добавляется в словарную запись 1, он должен создать новый поток и обработать его, если старая запись уже обработана вместо ожидания завершения всего пакета.

Dictionary<ItemType, Queue<Item>> ItemsTypes = new Dictionary<Guid, Queue<Item>>();

С кодом ниже, мне нужно подождать, пока первая партия товаров будет обработана, прежде чем начинать новую партию.

result = Parallel.ForEach(ItemsTypes, Items => processor.ProcessItems(Items.Value));

Всего 2 ответа


Что вы делаете здесь с Parallel.ForEach - это параллелизм данных. В тот момент, когда начинается этот цикл ForEach, он делает (своего рода) снимок текущего состояния словаря и проходит через него. Я считаю, что не имеет значения, добавляете ли вы больше предметов в любое место после запуска forEach.

Что вам действительно нужно сделать, так это модель Pub / Sub, где код, который добавляет новые элементы / записи, публикует событие, которое вызывает создание нового потока и обработку элемента. Или вы можете добавить объект задачи к каждому элементу.


Вместо Parallel.Foreach () обработал элементы Dictionary в foreach и выделил новый поток для каждого элемента и статусы отслеживания, чтобы предотвратить параллельную обработку элементов очереди. Установка статуса в false внутри ProcessItems.

ConcurrentDictionary<Guid, bool> Status = new ConcurrentDictionary<Guid, bool>();

while (ItemsTypes.Values.Count() > 0)
{
    foreach (var item in ItemsTypes)
    {
      if (item.Value.Count > 0)
      {
        if (Status[issuer.Key] == false)
        {
          Status[issuer.Key] = true;
          Task.Run(() => ProcessItems(item.Value.Dequeue()));
         }
       }
     }
  }

Есть идеи?

10000