Атомно все берет из ConcurrentQueue

У меня есть несколько потоков, генерирующих элементы и помещающих их в общий ConcurrentQueue :

private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    queuedItems.Enqueue(new GeneratedItem(...));
    // ...
}

У меня есть еще один отдельный потребительский поток, но способ, которым он должен работать в контексте этого приложения, заключается в том, что иногда ему просто нужно захватить все, что в данный момент находится в очереди потоков, удалив его из этой очереди, все в одном кадре. Что-то вроде:

private Queue<GeneratedItem> GetAllNewItems () {

    return queuedItems.TakeEverything(); // <-- not a real method

}

Я думаю, что я просмотрел всю документацию (для коллекции и ее реализованных интерфейсов), но я не нашел ничего похожего на «одновременное извлечение всех объектов из очереди» или даже «одновременное обмен содержимым с другой очередью».

Я мог бы сделать это без проблем, если бы я отказался от ConcurrentQueue и просто защитил обычную Queue lock , например так:

private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    lock (queuedItems) {
        queuedItems.Enqueue(new GeneratedItem(...));
    }
    // ...
}

private Queue<GeneratedItem> GetAllNewItems () {

    lock (queuedItems) {
        Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
        queuedItems.Clear();
        return newItems;
    }

}

Но мне нравится удобство ConcurrentQueue а также, поскольку я только изучаю C #, мне интересно узнать об API; поэтому мой вопрос, есть ли способ сделать это с одной из параллельных коллекций?

Возможно, есть какой-нибудь способ получить доступ к какому-либо объекту синхронизации, который использует ConcurrentQueue и заблокировать его для себя в своих целях, чтобы все хорошо сочеталось? Тогда я могу заблокировать его, взять все и отпустить?

Всего 2 ответа


Это зависит от того, что вы хотите сделать. Согласно комментариям в исходном коде

//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.

Это работает внутренним вызовом ToList (), который, в свою очередь, работает с m_numSnapshotTakers и механизмом вращения

/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
   // Increments the number of active snapshot takers. This increment must happen before the snapshot is 
   // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
   // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. 
   Interlocked.Increment(ref m_numSnapshotTakers);

   List<T> list = new List<T>();
   try
   {
       //store head and tail positions in buffer, 
       Segment head, tail;
       int headLow, tailHigh;
       GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

       if (head == tail)
       {
           head.AddToList(list, headLow, tailHigh);
       }
       else
       {
           head.AddToList(list, headLow, SEGMENT_SIZE - 1);
           Segment curr = head.Next;
           while (curr != tail)
           {
               curr.AddToList(list, 0, SEGMENT_SIZE - 1);
               curr = curr.Next;
           }
           //Add tail segment
           tail.AddToList(list, 0, tailHigh);
       }
   }
   finally
   {
       // This Decrement must happen after copying is over. 
       Interlocked.Decrement(ref m_numSnapshotTakers);
   }
   return list;
}

Если снимок - это все, что вам нужно, то вам повезло. Однако, по-видимому, нет встроенного способа получить и удалить все элементы из ConcurrentQueue потокобезопасным способом. Вам нужно будет испечь свою собственную синхронизацию, используя lock или подобное. Или сверните свое собственное (что может быть не так уж сложно, глядя на источник).


Такого метода не существует, потому что то, что на самом деле должен делать TakeEverything неоднозначно:

  1. Возьмите предмет за предметом, пока очередь не опустеет, а затем верните взятые предметы.
  2. Заблокируйте полный доступ к очереди, сделайте снимок (сделайте все элементы в цикле) = очистите очередь, разблокируйте, верните снимок.

Рассмотрим первый сценарий и представим, что другие потоки пишут в очередь в то время, когда вы TakeEverything элементы один за другим из очереди - должен TakeEverything метод TakeEverything включать их в результат?

Если да, то вы можете написать это так:

public List<GeneratedItem> TakeEverything()
{
    var list = new List<GeneratedItem>();

    while (queuedItems.TryDequeue(out var item))
    {
        list.Add(item);
    }

    return list;
}

Если нет, то я все равно использовал бы ConcurrentQueue ( поскольку все члены экземпляра - методы и свойства - из обычной Queue не являются потокобезопасными ) и реализовал настраиваемую блокировку для каждого доступа на чтение / запись, так что вы убедитесь, что не добавляете элементы во время «взятия» все "из очереди.


Есть идеи?

10000