Проблемы «производитель / потребитель» существуют везде, во всех аспектах нашей жизни. Последовательность приготовления заказа в заведениях быстрого питания: повар, нарезающий помидоры, которые передаются другому который складывает гамбургер, который предают работнику выдачи вашего заказа, который вы после с удовольствием съедаете. Почтовые водители доставляют почту по всем своим маршрутам, вы либо наблюдаете за прибывшим грузовиком и забираете свои посылки, либо же проверяете ящик придя с работы. Сотрудник авиакомпании выгружает чемоданы из грузового отсека реактивного лайнера, помещая их на конвейерную ленту, где они доставляются другому работнику, который переносит сумки в фургон и доставляет их на еще один конвейер, который привезет их к вам. Счастливая пара готовится разослать приглашения на свою свадьбу, причем один партнер закрывает конверты, передает другому который заклеивает и кладет в ящик.
Как разработчики программного обеспечения, регулярно наблюдаем, как события из нашей повседневной жизни проникают в наше программное обеспечение, и проблемы «производитель/потребитель» не являются исключением. Любой, кто собирал команды в командной строке, использовал производителя/потребителя, причем стандартный вывод одной программы передается как стандартный ввод другой. Любой, кто запустил несколько загрузок данных с нескольких сайтов или вычисления дискретных задач использовал метод производитель/потребитель, с потребителем агрегируя результаты для отображения или дальнейшей обработки. Кто пытался распараллелить конвейер, явно использовал этот метод. И так далее.
У всех этих сценариев, происходящих в реальной жизни или в жизни программного обеспечения, есть что-то общее: есть какой-то механизм для передачи результатов от производителя к потребителю. Сотрудник фаст-фуда кладет гамбургеры на полку, из которой возьмет их другой работник, для выдачи заказа клиенту. Работник почты кладет посылки в почтовый ящик. Руки помолвленной пары встречаются, чтобы передать материалы от одного к другому. В программном обеспечении такая передача требует некоторой структуры данных для облегчения транзакции, хранилища, которое может использовать производитель для передачи результата и, возможно, буферизовать больше, в то же время позволяя потребителю получать уведомление о наличии одного или нескольких результатов. Вступление в System.Threading.Channels.
Что такое Канал?
Мне часто проще понять некоторые технологии, реализовав для себя простую версию. При этом я узнаю о различных проблемах, которые разработчики этой технологии, возможно должны были устранить, и о наилучшем способе использования функционала. Для этого давайте начнем знакомство с System.Threading.Channels, реализовав «канал» с нуля.
Канал - это просто структура данных, используемая для хранения полученных данных для извлечения потребителем, и соответствующая синхронизация, позволяющая это безопасно выполнять, одновременно с включением соответствующих уведомлений в обоих направлениях. Существует множество возможных проектных решений. Должен ли канал содержать неограниченное количество элементов? Что должно произойти, когда он заполняется? Насколько критична производительность? Нужно ли пытаться минимизировать синхронизацию? Можем ли сделать какие-либо предположения о том, сколько производителей и потребителей может быть допущено одновременно? В целях быстрого написания простого канала, давайте сделаем упрощающие предположения, что нам не нужно применять какие-либо конкретные ограничения и что нам не нужно слишком беспокоиться о дополнительных расходах. Разработчики также составили простой API.
Для начала нам нужен наш тип, к которому добавим несколько простых методов:
public sealed class Channel<T>
{
public void Write(T value);
public ValueTask<T> ReadAsync(CancellationToken cancellationToken = default);
}
Метод Write
позволяет нам его использовать для вывода данных в канал, и ReadAsync
метод дает нам потреблять от него. Так как решили, что наш канал неограничен, ввод данных в него всегда завершается успешно и синхронно, так же, как запрос Add в List<T> поэтому сделали его не асинхронным и возвращающим пустоту. В противоположность, наш метод использования это ReadAsync который является асинхронным, потому что данные, которые хотим использовать, могут быть еще не доступны, и, таким образом, нам нужно будет дождаться их обновлений. И пока начинаем разработку, не слишком обеспокоены производительностью, также не хотим иметь много ненужных дополнительных расходов. Так как ожидаем, что будем читать, данные которые уже доступны для использования, метод ReadAsync
возвращает ValueTask<T> а не к Task<T>, так что можем сделать это без выделения при синхронном завершении.
Теперь нам просто нужно реализовать эти два метода. Для начала добавим два поля к нашему типу: один, будет служить механизмом хранения, и вотрой для координации между производителями и потребителями:
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
Используем ConcurrentQueue<T> для хранения данных, освобождая нас от необходимости делать собственную блокировку для защиты структуры данных буферизации, такую как ConcurrentQueue<T> она является потоко-безопасной для любого количества производителей/потребителей. И используем SempahoreSlim
он помогает координировать между производителями и потребителей и уведомлять потребителей о поступлении дополнительных данных.
Наш метод Write
прост. Ему нужно сохранить данные в очереди и увеличить счетчик SemaphoreSlim
, “публикуя” его:
public void Write(T value)
{
_queue.Enqueue(value); // store the data
_semaphore.Release(); // notify any consumers that more data is available
}
Метод ReadAsync
почти так же прост. Нужно подождать, пока данные будут доступны, а затем вынуть их.
public async ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); // wait
bool gotOne = _queue.TryDequeue(out T item); // retrieve the data
Debug.Assert(gotOne);
return item;
}
Обратите внимание, поскольку никакой другой код не может управлять семафором или очередью, как только мы успешно дождались на семафор, в очереди будут данные, чтобы передать их нам, вот почему мы можем просто утверждать, что метод TryDequeue
успешно вернулся.
На этом то все: у нас есть основной канал. Если все, что вам нужно, это базовые функции, такая реализация вполне разумна. Разумеется, требования часто более значимы как для производительности, так и для API, необходимых для обеспечения большего количества сценариев.
Теперь, когда понимаем основы того, что предоставляет канал, можем перейти к рассмотрению реальных API-интерфейсов System.Threading.Channel.
Представляем System.Threading.Channels
Основные абстракции, представленные в библиотеке System.Threading.Channels, написаны так:
public abstract class ChannelWriter<T>
{
public abstract bool TryWrite(T item);
public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
public abstract ValueTask WaitToWriteAsync(CancellationToken cancellationToken = default);
public void Complete(Exception error);
public virtual bool TryComplete(Exception error);
}
и читается:
public abstract class ChannelReader<T>
{
public abstract bool TryRead(out T item);
public virtual ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
public abstract ValueTask WaitToReadAsync(CancellationToken cancellationToken = default);
public virtual IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default);
public virtual Task Completion { get; }
}
Только что завершил свою собственную разработку и внедрение простого канала, большая часть поверхности API должна быть знакома. ChannelWriter<T> обеспечивает метод TryWrite, который очень похож на наш метод записи; тем не менее, это абстрактно и метод проб, который возвращает Boolean учитывать тот факт, что некоторые реализации могут быть ограничены в количестве элементов они могут на нем храниться и если канал был заполнен так, что запись не могла завершиться синхронно, TryWrite потребуется вернуть false
, чтобы указать, что запись была неудачной. Тем не менее ChannelWriter<T> также обеспечивает метод WriteAsync; в таком случае, канал переполнен и запись должна ждать (часто упоминается как «обратное воздействие»), можно использовать WriteAsync, с производителем в ожидании результата WriteAsync
его можно будет использовать, только когда пространство освободиться.
Конечно, бывают ситуации, когда код может не сразу создать значение; если полученные значения будут дорогим или если значение, представляет собой дорогой ресурс, (к примеру, это большой объект, который занял бы много памяти, или, может быть, он хранит кучу открытых файлов) в таких случаях, производитель работает быстрее, чем потребитель, производитель может захотеть отложить создание значения, пока не убедиться, что запись будет успешной. Для этого, есть соответствующий сценарий WaitToWriteAsync. Создатель может ожидать WaitToWriteAsync
возвращение true, и только затем создатель выбирает значение TryWrite
или WriteAsync
на канал.
Обратите внимание что, WriteAsync
виртуальный. Некоторые ее осуществления могут выбрать более оптимизированные переходы, но с абстрактным TryWrite
и WaitToWriteAsync, базовый тип может обеспечить благоразумную реализацию, что немного сложнее, чем эта:
public async ValueTask WriteAsync(T item, CancellationToken cancellationToken)
{
while (await WaitToWriteAsync(cancellationToken).ConfigureAwait(false))
if (TryWrite(item))
return;
throw new ChannelCompletedException();
}
В дополнение к обзору WaitToWriteAsync
и TryWrite
можно использовать, это позволит подчеркнуть несколько дополнительных интересных вещей. Во-первых, здесь присутствует цикл, потому что каналы по умолчанию могут использоваться любым количеством производителей и любым количеством потребителей одновременно. Если канал имеет верхнюю границу для количества элементов, которые он может хранить, если несколько потоков стремятся записаться в буфер, это возможно для двух потоков, если есть место используя WaitToWriteAsync.
В этом примере также показано, почему WaitToWriteAsync
возвращает ValueTask<bool> вместо ValueTask, а также ситуации за пределами полного буфера, в которых TryWrite
может возвращать false. Каналы поддерживают концепцию завершения, когда производитель может сигнализировать потребителю, что больше не будет произведено никаких товаров, что позволяет потребителю прекратить попытки потреблять. Это делается с помощью методов Complete
или TryComplete
, ранее показанных на ChannelWriter<T> (Complete
реализован для вызова TryComplete
и throw, если он возвращает false). Но если один продюсер помечает канал как завершенный, другим продюсерам нужно знать, что они больше не могут писать в канал; в этом случае TryWrite
возвращает false, WaitToWriteAsync
также возвращает false, а WriteAsync
генерирует исключение ChannelCompletedException.
Большинство членов ChannelReader
<
T
>
, вероятно, тоже говорят сами за себя.TryRead
попытается синхронно извлечь следующий элемент из канала. ReadAsync
также извлекает следующий элемент из канала, но если элемент не может быть получен синхронно, он возвращает задачу. И WaitToReadAsync
возвращает ValueTask<bool>, который служит уведомлением о доступе элемента. Как и в случае WriteAsync для ChannelWriter<T>, ReadAsync
является виртуальным, с базовой реализацией, осуществляемой в терминах абстрактных TryRead
и WaitToReadAsync; это не точная реализация в базовом классе, но очень близка:
public async ValueTask<T> ReadAsync(CancellationToken cancellationToken)
{
while (true)
{
if (!await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
throw new ChannelClosedException();
if (TryRead(out T item))
return item;
}
}
Существует множество типичных шаблонов того, как использовать канал ChannelReader<T>. Если он представляет собой бесконечный поток значений, один из подходов состоит в том, чтобы просто сидеть в бесконечном цикле потребителя через ReadAsync:
while (true)
{
T item = await channelReader.ReadAsync();
Use(item);
}
Конечно, если поток значений не является бесконечным и канал будет в какой-то момент помечен как завершенный, после того как потребители очистят канал от всех своих данных, последующие попытки чтения из него будут отброшены. Напротив, TryRead
вернет false, как и WaitToReadAsync. Итак, более распространенная модель потребления - через вложенный цикл:
while (await channelReader.WaitToReadAsync())
while (channelReader.TryRead(out T item))
Use(item);
Вместо этого внутреннее «while» могло бы быть простым «if», но наличие тесного внутреннего цикла позволяет экономному разработчику избегать небольших дополнительных издержек WaitToReadAsync
, так что TryRead будет успешно использовать этот элемент. Фактически, это точный шаблон, используемый методом ReadAllAsync
. ReadAllAsync
был представлен в .NET Core 3.0 и возвращает IAsyncEnumerable<T>. Это позволяет всем данным быть прочитанными из канала, используя знакомые языковые конструкции:
await foreach (T item in channelReader.ReadAllAsync())
Use(item);
А базовая реализация виртуального метода использует точный шаблон вложенного цикла, показанный ранее с WaitToReadAsync
и TryRead:
public virtual async IAsyncEnumerable<T> ReadAllAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
while (TryRead(out T item))
yield return item;
}
Последняя часть ChannelReader это Completion. Возвращает Task
, который будет выполнено после завершения чтения канала. Это означает, что писатель отметил, что канал завершен, и все данные были использованы.
Реализация встроенных каналов
Итак, мы знаем, как писать писателям и читать письма от читателей ... но откуда мы берем этих писателей и читателей?
Тип Channel
<
TWrite
,
TRead
>
предоставляет свойство Writer и свойство Reader, которое возвращает ChannelWriter<TWrite> и ChannelReader<TRead> соответственно:
public abstract class Channel<TWrite, TRead>
{
public ChannelReader<TRead> Reader { get; }
public ChannelWriter<TWrite> Writer { get; }
}
Этот базовый абстрактный класс доступен для нишевых случаев использования, когда канал сам может преобразовать записанные данные в другой тип для потребления, но в большинстве случаев использование TWrite
и TRead
совпадают, поэтому использование большинства происходит через производную Тип канала, который не более чем:
public abstract class Channel<T> : Channel<T, T> { }
Нетипичный Channel type обеспечивает фабрики для нескольких реализаций Channel<T>:
public static class Channel
{
public static Channel<T> CreateUnbounded<T>();
public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
public static Channel<T> CreateBounded<T>(int capacity);
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options);
}
Метод CreateUnbounded
создает канал без ограничения на количество элементов, которые могут быть сохранены (конечно, в какой-то момент он может выйти за лимит памяти, так же, как с List<T> любые другие подборки), очень похоже на канальный тип, который реализовали в начале этого поста. TryWrite
всегда возвращает true, и оба его WriteAsync
и WaitToWriteAsync
будут всегда завершаться синхронно.
Метод CreateBounded создает канал с явным ограничением. Как и в случае CreateUnbounded, TryWrite
возвращает значение true
, и WriteAsync
и WaitToWriteAsync
завершатся синхронно. Но как только канал заполняется, TryWrite
возвращает false, и WriteAsync
и WriteAsync
завершаются асинхронно, выполняя свои возвращенные задачи только при наличии свободного места. (Само собой разумеется, что все эти API, которые принимают CancellationToken
, также могут быть прерваны запросом отмены).
В CreateUnbounded
, и CreateBounded
есть перегрузки, которые принимает тип ChannelOption. Эта базовая опция предоставляет параметры, которые могут контролировать поведение любого канала. Например, он предоставляет свойства SingleWriter
и SingleReader
, которые позволяют создателю указывать ограничения, которые они готовы принять; разработчик устанавливает для SingleWriter
значение true
, чтобы указать, что один производитель будет одновременно обращаться к устройству записи, и аналогичным образом устанавливает для SingleReader
значение true
, чтобы указать, что не более одного потребителя будет одновременно обращаться к читателю. Это позволяет методам специализировать созданную реализацию, оптимизируя ее на основе предоставленных опций; например, если параметры, переданные CreateUnbounded
, указывают на SingleReader
как true, он возвращает реализацию, которая также позволяет избежать операций с блокировкой при чтении, что значительно снижает накладные расходы, связанные с потреблением из канала. Базовые ChannelOptions
также предоставляют свойство AllowSynchronousContinuations
. Как и в случае с SingleReader
и SingleWriter, по умолчанию используется значение false, а для создателя значение true
означает подписку на некоторые оптимизации, которые также имеют серьезные последствия для написания кода. В частности, AllowSynchronousContinuations
в некотором смысле позволяет производителю временно стать потребителем. Допустим, в канале нет данных, и потребитель приходит и вызывает ReadAsync. Ожидая задачу, возвращенную из ReadAsync, этот потребитель эффективно подключает обратный вызов, который будет вызываться при записи данных в канал. По умолчанию этот обратный вызов будет вызываться асинхронно, при этом производитель записывает данные в канал, а затем ставит в очередь вызов этого обратного вызова, что позволяет производителю одновременно идти своим путем, пока потребитель обрабатывается каким-то другим потоком. Однако в некоторых ситуациях для производительности может быть выгодно, чтобы этот производитель, записывающий данные, также сам обрабатывал обратный вызов, например, вместо того, чтобы TryWrite
ставил в очередь вызов обратного вызова, он просто вызывает сам обратный вызов. Это может значительно сократить накладные расходы, но также требует глубокого понимания окружающей среды, так как, например, если вы удерживали блокировку при вызове TryWrite, а для параметра AllowSynchronousContinuations
установлено значение true, вы можете в конечном итоге вызвать обратный вызов, удерживая блокировку, который (в зависимости от того, что пытался сделать обратный вызов) можете в конечном итоге наблюдать за некоторыми сломанными инвариантами, которые пытается поддерживать ваша блокировка.
BoundedChannelOptions
передается слоям CreateBounded
для дополнительных опций, связанных с ограничением. В дополнение к максимальной емкости, поддерживаемой каналом, он также предоставляет перечисление BoundedChannelFullMode
, которое указывает, что записи поведения должны возникать при заполнении канала:
public enum BoundedChannelFullMode
{
Wait,
DropNewest,
DropOldest
,
DropWrite
}
TryWrite
на полном канале возвращает false, WriteAsync
вернет задачу, которая будет выполнена только тогда, когда освободится место, и запись может быть успешно завершена, и аналогично WaitToWriteAsync
будет завершена только тогда, когда пространство станет доступным. Вместо этого в трех других режимах запись всегда выполняется синхронно, удаляя элемент, если канал заполнен. DropOldest
удалит «самый старый» элемент из очереди, что означает, что какой-либо элемент будет в дальнейшем удален потребителем. И наоборот, DropNewest
удалит новый элемент, какой элемент был записан в канал в последний раз. И DropWrite
удаляет элемент, который в данный момент пишется, то есть, например, TryWrite
вернет true
, но добавленный элемент будет немедленно удален.
Выполнение
С точки зрения API, представленные абстракции относительно просты, что является большой частью того, откуда исходит сила библиотеки. Простые абстракции и несколько конкретных реализаций, которые должны удовлетворять потребности разработчиков на 99,9%. Конечно, площадь поверхности библиотеки может указывать на простоту реализации. По правде говоря, в реализации есть приличная сложность, в основном сфокусированная на обеспечении высокой пропускной способности при одновременном использовании простых шаблонов потребления, легко используемых в потреблении кода. Реализация, например, делает все возможное, чтобы минимизировать распределение. Возможно, вы заметили, что многие методы возвращают ValueTask
и ValueTask<T>, а не Task
и Task<T>. Как увидели в нашем тривиальном примере реализации в начале этой статьи, мы можем использовать ValueTask<T>, чтобы избежать выделения при синхронном завершении методов, но реализация System.Threading.Channels также использует расширенные интерфейсы IValueTaskSource и IValueTaskSource<T>, чтобы избежать выделения ресурсов даже когда различные методы завершаются асинхронно и нужно возвращать задачи.
Рассмотрим этот тест:
using
BenchmarkDotNet
.
Attributes
;
using BenchmarkDotNet.Running;
using System.Threading.Channels;
using System.Threading.Tasks;
[MemoryDiagnoser]
public class Program
{
static void Main() => BenchmarkRunner.Run();
private readonly Channel s_channel = Channel.CreateUnbounded();
[Benchmark]
public async Task WriteThenRead()
{
ChannelWriter writer = s_channel.Writer;
ChannelReader reader = s_channel.Reader;
for (int i = 0; i < 10_000_000; i++)
{
writer.TryWrite(i);
await
reader
.
ReadAsync
();
}
}
}
Здесь мы просто тестируем пропускную способность и распределение памяти на неограниченном канале при записи элемента, а затем читаем этот элемент 10 миллионов раз, это означает, что элемент всегда будет доступен для чтения, и, таким образом, чтение всегда будет завершаться синхронно, что дает следующие результаты на моем компьютере (72 байта, показанные в столбце Allocated, предназначены для одной Задачи, возвращенной из WriteThenRead):
Method Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
WriteThenRead 527.8 ms 2.03 ms 1.90 ms – – – 72 B
Но теперь давайте немного его изменим, сначала выполняя чтение, и только потом записывая элемент, который его удовлетворит. В этом случае чтение всегда завершается асинхронно, потому что данные для их завершения никогда не будут доступны:
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using System.Threading.Channels;
using System.Threading.Tasks;
[MemoryDiagnoser]
public class Program
{
static void Main() => BenchmarkRunner.Run();
private readonly Channel s_channel = Channel.CreateUnbounded();
[Benchmark]
public async Task ReadThenWrite()
{
ChannelWriter writer = s_channel.Writer;
ChannelReader reader = s_channel.Reader;
for (int i = 0; i < 10_000_000; i++)
{
ValueTask vt = reader.ReadAsync();
writer.TryWrite(i);
await vt;
}
}
}
который на моей машине за 10 миллионов операций записи и чтения дает такие результаты:
Method Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
ReadThenWrite 881.2 ms 4.60 ms 4.30 ms – – – 72 B
Итак, есть некоторые дополнительные издержки, когда каждое чтение завершается асинхронно, но даже здесь мы видим нулевое распределение для 10 миллионов асинхронно завершающих чтений (опять же, 72 байта, показанные в столбце Allocated, предназначены для Задачи, возвращаемой из ReadThenWrite)!
Комбинаторы
Обычно потребление каналов является простым, используя один из подходов, показанных ранее. Но, как и в случае с IEnumerables, можно также выполнять различные виды операций по каналам для достижения определенной цели. Например, допустим, я хочу дождаться поступления первого элемента от одного из двух предоставленных читателей; Я мог бы написать что-то вроде этого:
public static async ValueTask> WhenAny(
ChannelReader reader1, ChannelReader reader2)
{
var cts = new CancellationTokenSource();
Task t1 = reader1.WaitToReadAsync(cts.Token).AsTask();
Task t2 = reader2.WaitToReadAsync(cts.Token).AsTask();
Task completed = await Task.WhenAny(t1, t2);
cts.Cancel();
return completed == t1 ? reader1 : reader2;
}
Здесь мы просто запрашиваем WaitToReadAsync
на обоих каналах и возвращаем считыватель в зависимости от того, что завершается первым. Одна из интересных вещей, которую стоит отметить в этом примере, это то, что в то время как ChannelReader<T> имеет много общего с IEnumerator<T>, этот пример не может быть хорошо реализован поверх IEnumerator<T> (или IAsyncEnumerator<T>).
I{Async}Enumerator<T> предоставляет метод MoveNext{Async}, который перемещает курсор вперед к следующему элементу, который затем открывается из Current. Если бы мы попытались реализовать такой WhenAny
поверх IAsyncEnumerator<T>, нам нужно было бы вызвать MoveNextAsync
для каждого. При этом мы потенциально могли бы перейти к следующему пункту. Если бы мы затем использовали этот метод в цикле, мы, скорее всего, в конечном итоге пропустили бы элементы из одного или обоих перечислителей, потому что мы потенциально могли бы расширить перечислитель, который мы не вернули из метода.
Отношение к остальной части .NET Core
System.Threading.Channels является частью общей платформы .NET Core, что означает, что приложение .NET Core может начать использовать его, не устанавливая ничего дополнительного. Он также доступен в виде отдельного пакета NuGet, хотя отдельная реализация не имеет всех оптимизаций, в значительной степени потому, что встроенная реализация может использовать преимущества дополнительной среды выполнения и поддержки библиотеки. NET Core.
Он также используется множеством других систем в .NET. Например, ASP.NET использует каналы как часть SignalR, а также в своей транспортировке Kestrel на основе Libuv. Каналы также используются будущей реализацией QUIC, которая в настоящее время разрабатывается для .NET 5.
Библиотека System.Threading.Channels также выглядит немного похожей на библиотеку System.Threading.Tasks.Dataflow, которая уже много лет доступна в .NET. В некотором смысле библиотека потоков данных является расширенным набором библиотеки каналов; в частности, тип BufferBlock<T> из библиотеки потока данных предоставляет большую часть той же функциональности. Однако библиотека потоков данных также ориентирована на другую модель программирования, в которой блоки связаны между собой так, что данные автоматически передаются от одного к другому. Он также включает расширенные функциональные возможности, которые поддерживают, например, форму двухфазной фиксации, с несколькими блоками, связанными с одними и теми же потребителями, и теми пользователями, которые позволяют извлекать механизмы из нескольких блоков без блокировки, которые необходимы для его включения, гораздо более сложны, и, в то же время, более мощные и более дорогие. Это очевидно, просто написав такой же тест для BufferBlock
<
T
>
, как мы делали ранее для каналов.
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
[MemoryDiagnoser]
public class Program
{
static void Main() => BenchmarkRunner.Run();
private readonly Channel _channel = Channel.CreateUnbounded();
private readonly BufferBlock _bufferBlock = new BufferBlock();
[Benchmark]
public async Task Channel_ReadThenWrite()
{
ChannelWriter writer = _channel.Writer;
ChannelReader reader = _channel.Reader;
for (int i = 0; i < 10_000_000; i++)
{
ValueTask vt = reader.ReadAsync();
writer.TryWrite(i);
await vt;
}
}
[Benchmark]
public async Task BufferBlock_ReadThenWrite()
{
for (int i = 0; i < 10_000_000; i++)
{
Task t = _bufferBlock.ReceiveAsync();
_bufferBlock.Post(i);
await t;
}
}
}
Method Mean Error StdDev Gen 0 Gen 1 Gen 2 Allocated
Channel_ReadThenWrite 878.9 ms 0.68 ms 0.60 ms 72 B – – 72 B
BufferBlock_ReadThenWrite 20,116.4 ms 192.82 ms 180.37 ms 1184000.0000 2000.0000 – 7360000232 B
Это не означает, что библиотека System.Threading.Tasks.Dataflow не должна использоваться. Он позволяет разработчикам кратко выражать большое количество концепций и может демонстрировать очень хорошую производительность применительно к задачам, которые ему наиболее подходят. Однако, когда все, что вам нужно, - это структура передачи данных между одним или несколькими производителями и одним или несколькими потребителями, которую вы внедрили вручную, System.Threading.Channels - гораздо более компактная и экономная ставка.
Что же дальше?
Надеюсь, на данный момент вы более детально разобрались в библиотеке System.Threading.Channels и сможете с ее помощью улучшить ваши приложения. Попробуйте, и будем рады вашим отзывам, предложениям, проблемам и PR, чтобы улучшить ее
https://github.com/dotnet/runtime.
Источник