Posted on 12. December 2019

Знакомство с System.Threading.Channels

Проблемы «производитель / потребитель» существуют везде, во всех аспектах нашей жизни. Последовательность приготовления заказа в заведениях быстрого питания: повар, нарезающий помидоры, которые передаются другому который складывает гамбургер, который предают работнику выдачи вашего заказа, который вы после с удовольствием съедаете. Почтовые водители доставляют почту по всем своим маршрутам, вы либо наблюдаете за прибывшим грузовиком и забираете свои посылки, либо же проверяете ящик придя с работы. Сотрудник авиакомпании выгружает чемоданы из грузового отсека реактивного лайнера, помещая их на конвейерную ленту, где они доставляются другому работнику, который переносит сумки в фургон и доставляет их на еще один конвейер, который привезет их к вам. Счастливая пара готовится разослать приглашения на свою свадьбу, причем один партнер закрывает конверты, передает другому который заклеивает и кладет в ящик.

Как разработчики программного обеспечения, регулярно наблюдаем, как события из нашей повседневной жизни проникают в наше программное обеспечение, и проблемы «производитель/потребитель» не являются исключением. Любой, кто собирал команды в командной строке, использовал производителя/потребителя, причем стандартный вывод одной программы передается как стандартный ввод другой. Любой, кто запустил несколько загрузок данных с нескольких сайтов или вычисления дискретных задач использовал метод производитель/потребитель, с потребителем агрегируя результаты для отображения или дальнейшей обработки.  Кто пытался распараллелить конвейер, явно использовал этот метод. И так далее.

У всех этих сценариев, происходящих в реальной жизни или в жизни программного обеспечения, есть что-то общее: есть какой-то механизм для передачи результатов от производителя к потребителю. Сотрудник фаст-фуда кладет гамбургеры на полку, из которой возьмет их другой работник, для выдачи заказа клиенту. Работник почты кладет посылки в почтовый ящик. Руки помолвленной пары встречаются, чтобы передать материалы от одного к другому. В программном обеспечении такая передача требует некоторой структуры данных для облегчения транзакции, хранилища, которое может использовать производитель для передачи результата и, возможно, буферизовать больше, в то же время позволяя потребителю получать уведомление о наличии одного или нескольких результатов. Вступление в System.Threading.Channels.

Что такое Канал?

Мне часто проще понять некоторые технологии, реализовав для себя простую версию. При этом я узнаю о различных проблемах, которые разработчики этой технологии, возможно должны были устранить, и о наилучшем способе использования функционала.  Для этого давайте начнем знакомство с System.Threading.Channels, реализовав «канал» с нуля.

Канал - это просто структура данных, используемая для хранения полученных данных для извлечения потребителем, и соответствующая синхронизация, позволяющая это безопасно выполнять, одновременно с включением соответствующих уведомлений в обоих направлениях. Существует множество возможных проектных решений. Должен ли канал содержать неограниченное количество элементов? Что должно произойти, когда он заполняется? Насколько критична производительность? Нужно ли пытаться минимизировать синхронизацию?  Можем ли сделать какие-либо предположения о том, сколько производителей и потребителей может быть допущено одновременно? В целях быстрого написания простого канала, давайте сделаем упрощающие предположения, что нам не нужно применять какие-либо конкретные ограничения и что нам не нужно слишком беспокоиться о дополнительных расходах. Разработчики также составили простой API.

Для начала нам нужен наш тип, к которому добавим несколько простых методов:

public sealed class Channel<T>

{

    public void Write(T value);

    public ValueTask<T> ReadAsync(CancellationToken cancellationToken = default);

}

 

Метод Write позволяет нам его использовать для вывода данных в канал, и ReadAsync метод дает нам потреблять от него. Так как решили, что наш канал неограничен, ввод данных в него всегда завершается успешно и синхронно, так же, как запрос Add в List<T> поэтому сделали его не асинхронным и возвращающим пустоту. В противоположность, наш метод использования это ReadAsync который является асинхронным, потому что данные, которые хотим использовать, могут быть еще не доступны, и, таким образом, нам нужно будет дождаться их обновлений.  И пока начинаем разработку, не слишком обеспокоены производительностью, также не хотим иметь много ненужных дополнительных расходов. Так как ожидаем, что будем читать, данные которые уже доступны для использования, метод ­­­­­ ReadAsync возвращает ValueTask<T> а не к Task<T>, так что можем сделать это без выделения при синхронном завершении.

Теперь нам просто нужно реализовать эти два метода. Для начала добавим два поля к нашему типу: один, будет служить механизмом хранения, и вотрой для координации между производителями и потребителями:

private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);

Используем ConcurrentQueue<T> для хранения данных, освобождая нас от необходимости делать собственную блокировку для защиты структуры данных буферизации, такую как ConcurrentQueue<T> она является потоко-безопасной для любого количества производителей/потребителей. И используем SempahoreSlim он помогает координировать между производителями и потребителей и уведомлять потребителей о поступлении дополнительных данных.

Наш метод Write прост. Ему нужно сохранить данные в очереди и увеличить счетчик SemaphoreSlim, “публикуя” его:

public void Write(T value)
{
    _queue.Enqueue(value); // store the data
    _semaphore.Release(); // notify any consumers that more data is available
}

 

Метод ReadAsync почти так же прост. Нужно подождать, пока данные будут доступны, а затем вынуть их.

public async ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
{
    await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); // wait
    bool gotOne = _queue.TryDequeue(out T item); // retrieve the data
    Debug.Assert(gotOne);
    return item;
}

 

Обратите внимание, поскольку никакой другой код не может управлять семафором или очередью, как только мы успешно дождались на семафор, в очереди будут данные, чтобы передать их нам, вот почему мы можем просто утверждать, что метод TryDequeue успешно вернулся. 

На этом то все: у нас есть основной канал. Если все, что вам нужно, это базовые функции, такая реализация вполне разумна. Разумеется, требования часто более значимы как для производительности, так и для API, необходимых для обеспечения большего количества сценариев.

Теперь, когда понимаем основы того, что предоставляет канал, можем перейти к рассмотрению реальных API-интерфейсов System.Threading.Channel.

 

Представляем System.Threading.Channels

Основные абстракции, представленные в библиотеке System.Threading.Channels, написаны так:

public abstract class ChannelWriter<T>
{
    public abstract bool TryWrite(T item);
    public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
    public abstract ValueTask WaitToWriteAsync(CancellationToken cancellationToken = default);
    public void Complete(Exception error);
    public virtual bool TryComplete(Exception error);
}

 

и читается:

public abstract class ChannelReader<T>
{
    public abstract bool TryRead(out T item);
    public virtual ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
    public abstract ValueTask WaitToReadAsync(CancellationToken cancellationToken = default);
    public virtual IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default);
    public virtual Task Completion { get; }
}

 

Только что завершил свою собственную разработку и внедрение простого канала, большая часть поверхности API должна быть знакома. ChannelWriter<T> обеспечивает метод TryWrite, который очень похож на наш метод записи; тем не менее, это абстрактно и метод проб, который возвращает Boolean учитывать тот факт, что некоторые реализации могут быть ограничены в количестве элементов они могут на нем храниться и если канал был заполнен так, что запись не могла завершиться синхронно, TryWrite потребуется вернуть false, чтобы указать, что запись была неудачной.  Тем не менее ChannelWriter<T> также обеспечивает метод WriteAsync; в таком случае, канал переполнен и запись должна ждать (часто упоминается как «обратное воздействие»), можно использовать WriteAsync, с производителем в ожидании результата WriteAsync его можно будет использовать, только когда пространство освободиться.

Конечно, бывают ситуации, когда код может не сразу создать значение; если полученные значения будут дорогим или если значение, представляет собой дорогой ресурс, (к примеру, это большой объект, который занял бы много памяти, или, может быть, он хранит кучу открытых файлов) в таких случаях, производитель работает быстрее, чем потребитель, производитель может захотеть отложить создание значения, пока не убедиться, что запись будет успешной. Для этого, есть соответствующий сценарий WaitToWriteAsync. Создатель может ожидать WaitToWriteAsync возвращение true, и только затем создатель выбирает значение TryWrite или WriteAsync на канал.

Обратите внимание что, WriteAsync виртуальный. Некоторые ее осуществления могут выбрать более оптимизированные переходы, но с абстрактным TryWrite и WaitToWriteAsync, базовый тип может обеспечить благоразумную реализацию, что немного сложнее, чем эта:

public async ValueTask WriteAsync(T item, CancellationToken cancellationToken)
{
    while (await WaitToWriteAsync(cancellationToken).ConfigureAwait(false))
        if (TryWrite(item))
            return;
 
    throw new ChannelCompletedException();
}

 

В дополнение к обзору WaitToWriteAsync и TryWrite  можно использовать, это позволит подчеркнуть несколько дополнительных интересных вещей. Во-первых, здесь присутствует цикл, потому что каналы по умолчанию могут использоваться любым количеством производителей и любым количеством потребителей одновременно. Если канал имеет верхнюю границу для количества элементов, которые он может хранить,  если несколько потоков стремятся записаться в буфер, это возможно для двух потоков, если есть место используя WaitToWriteAsync. 

В этом примере также показано, почему WaitToWriteAsync возвращает ValueTask<bool> вместо ValueTask, а также ситуации за пределами полного буфера, в которых TryWrite может возвращать false. Каналы поддерживают концепцию завершения, когда производитель может сигнализировать потребителю, что больше не будет произведено никаких товаров, что позволяет потребителю прекратить попытки потреблять. Это делается с помощью методов Complete или TryComplete , ранее показанных на ChannelWriter<T> (Complete  реализован для вызова TryComplete  и throw, если он возвращает false). Но если один продюсер помечает канал как завершенный, другим продюсерам нужно знать, что они больше не могут писать в канал; в этом случае TryWrite возвращает false, WaitToWriteAsync также возвращает false, а WriteAsync генерирует исключение ChannelCompletedException.

Большинство членов ChannelReader<T>, вероятно, тоже говорят сами за себя.TryRead  попытается синхронно извлечь следующий элемент из канала. ReadAsync также извлекает следующий элемент из канала, но если элемент не может быть получен синхронно, он возвращает задачу. И WaitToReadAsync возвращает ValueTask<bool>, который служит уведомлением о доступе элемента. Как и в случае WriteAsync для ChannelWriter<T>, ReadAsync является виртуальным, с базовой реализацией, осуществляемой в терминах абстрактных TryRead  и WaitToReadAsync; это не точная реализация в базовом классе, но очень близка:

public async ValueTask<T> ReadAsync(CancellationToken cancellationToken)
{
    while (true)
    {
        if (!await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
            throw new ChannelClosedException();
 
        if (TryRead(out T item))
            return item;
    }
}

 

Существует множество типичных шаблонов того, как использовать канал ChannelReader<T>. Если он представляет собой бесконечный поток значений, один из подходов состоит в том, чтобы просто сидеть в бесконечном цикле потребителя через ReadAsync:

while (true)
{
    T item = await channelReader.ReadAsync();
    Use(item);
}

 

Конечно, если поток значений не является бесконечным и канал будет в какой-то момент помечен как завершенный, после того как потребители очистят канал от всех своих данных, последующие попытки чтения из него будут отброшены. Напротив, TryRead  вернет false, как и WaitToReadAsync. Итак, более распространенная модель потребления - через вложенный цикл:

while (await channelReader.WaitToReadAsync())
    while (channelReader.TryRead(out T item))
        Use(item);

 

Вместо этого внутреннее «while» могло бы быть простым «if», но наличие тесного внутреннего цикла позволяет экономному разработчику избегать небольших дополнительных издержек WaitToReadAsync, так что TryRead будет успешно использовать этот элемент. Фактически, это точный шаблон, используемый методом ReadAllAsync. ReadAllAsync был представлен в .NET Core 3.0 и возвращает IAsyncEnumerable<T>. Это позволяет всем данным быть прочитанными из канала, используя знакомые языковые конструкции:

await foreach (T item in channelReader.ReadAllAsync())
    Use(item);

 

А базовая реализация виртуального метода использует точный шаблон вложенного цикла, показанный ранее с WaitToReadAsync и TryRead:

public virtual async IAsyncEnumerable<T> ReadAllAsync(
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
        while (TryRead(out T item))
            yield return item;
}

 

Последняя часть ChannelReader это Completion. Возвращает Task, который будет выполнено после завершения чтения канала. Это означает, что писатель отметил, что канал завершен, и все данные были использованы.

Реализация встроенных каналов

Итак, мы знаем, как писать писателям и читать письма от читателей ... но откуда мы берем этих писателей и читателей? 

Тип Channel<TWrite, TRead> предоставляет свойство Writer и свойство Reader, которое возвращает ChannelWriter<TWrite> и ChannelReader<TRead>  соответственно:

public abstract class Channel<TWrite, TRead>
{
    public ChannelReader<TRead> Reader { get;  }
    public ChannelWriter<TWrite> Writer { get; }
}

 

Этот базовый абстрактный класс доступен для нишевых случаев использования, когда канал сам может преобразовать записанные данные в другой тип для потребления, но в большинстве случаев использование TWrite и TRead совпадают, поэтому использование большинства происходит через производную Тип канала, который не более чем:

public abstract class Channel<T> : Channel<T, T> { }

 

Нетипичный Channel type обеспечивает фабрики для нескольких реализаций Channel<T>:

public static class Channel
{
    public static Channel<T> CreateUnbounded<T>();
    public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
 
    public static Channel<T> CreateBounded<T>(int capacity);
    public static Channel<T> CreateBounded<T>(BoundedChannelOptions options);
}

 

Метод CreateUnbounded создает канал без ограничения на количество элементов, которые могут быть сохранены (конечно, в какой-то момент он может выйти за лимит памяти, так же, как с List<T> любые другие подборки), очень похоже на канальный тип, который реализовали в начале этого поста. TryWrite всегда возвращает true, и оба его WriteAsync и WaitToWriteAsync будут всегда завершаться синхронно.

Метод CreateBounded создает канал с явным ограничением. Как и в случае CreateUnbounded, TryWrite возвращает значение true, и WriteAsync и WaitToWriteAsync завершатся синхронно. Но как только канал заполняется, TryWrite возвращает false, и WriteAsync и WriteAsync завершаются асинхронно, выполняя свои возвращенные задачи только при наличии свободного места. (Само собой разумеется, что все эти API, которые принимают CancellationToken, также могут быть прерваны запросом отмены).

В CreateUnbounded, и CreateBounded есть перегрузки, которые принимает тип ChannelOption. Эта базовая опция предоставляет параметры, которые могут контролировать поведение любого канала. Например, он предоставляет свойства SingleWriter и SingleReader, которые позволяют создателю указывать ограничения, которые они готовы принять; разработчик устанавливает для SingleWriter значение true, чтобы указать, что один производитель будет одновременно обращаться к устройству записи, и аналогичным образом устанавливает для SingleReader значение true, чтобы указать, что не более одного потребителя будет одновременно обращаться к читателю. Это позволяет методам специализировать созданную реализацию, оптимизируя ее на основе предоставленных опций; например, если параметры, переданные CreateUnbounded, указывают на SingleReader как true, он возвращает реализацию, которая также позволяет избежать операций с блокировкой при чтении, что значительно снижает накладные расходы, связанные с потреблением из канала. Базовые ChannelOptions также предоставляют свойство AllowSynchronousContinuations. Как и в случае с SingleReader и SingleWriter, по умолчанию используется значение false, а для создателя значение true означает подписку на некоторые оптимизации, которые также имеют серьезные последствия для написания кода.  В частности, AllowSynchronousContinuations в некотором смысле позволяет производителю временно стать потребителем. Допустим, в канале нет данных, и потребитель приходит и вызывает ReadAsync. Ожидая задачу, возвращенную из ReadAsync, этот потребитель эффективно подключает обратный вызов, который будет вызываться при записи данных в канал. По умолчанию этот обратный вызов будет вызываться асинхронно, при этом производитель записывает данные в канал, а затем ставит в очередь вызов этого обратного вызова, что позволяет производителю одновременно идти своим путем, пока потребитель обрабатывается каким-то другим потоком. Однако в некоторых ситуациях для производительности может быть выгодно, чтобы этот производитель, записывающий данные, также сам обрабатывал обратный вызов, например, вместо того, чтобы TryWrite ставил в очередь вызов обратного вызова, он просто вызывает сам обратный вызов. Это может значительно сократить накладные расходы, но также требует глубокого понимания окружающей среды, так как, например, если вы удерживали блокировку при вызове TryWrite, а для параметра AllowSynchronousContinuations установлено значение true, вы можете в конечном итоге вызвать обратный вызов, удерживая блокировку, который (в зависимости от того, что пытался сделать обратный вызов) можете в конечном итоге наблюдать за некоторыми сломанными инвариантами, которые пытается поддерживать ваша блокировка.

BoundedChannelOptions передается слоям CreateBounded для дополнительных опций, связанных с ограничением. В дополнение к максимальной емкости, поддерживаемой каналом, он также предоставляет перечисление BoundedChannelFullMode, которое указывает, что записи поведения должны возникать при заполнении канала:

  public enum BoundedChannelFullMode
{
    Wait,
    DropNewest,
    DropOldest,
    DropWrite
}

 

TryWrite  на полном канале возвращает false, WriteAsync  вернет задачу, которая будет выполнена только тогда, когда освободится место, и запись может быть успешно завершена, и аналогично WaitToWriteAsync  будет завершена только тогда, когда пространство станет доступным. Вместо этого в трех других режимах запись всегда выполняется синхронно, удаляя элемент, если канал заполнен. DropOldest удалит «самый старый» элемент из очереди, что означает, что какой-либо элемент будет в дальнейшем удален потребителем. И наоборот, DropNewest удалит новый элемент, какой элемент был записан в канал в последний раз. И DropWrite удаляет элемент, который в данный момент пишется, то есть, например, TryWrite вернет true, но добавленный элемент будет немедленно удален.

Выполнение

С точки зрения API, представленные абстракции относительно просты, что является большой частью того, откуда исходит сила библиотеки. Простые абстракции и несколько конкретных реализаций, которые должны удовлетворять потребности разработчиков на 99,9%. Конечно, площадь поверхности библиотеки может указывать на простоту реализации. По правде говоря, в реализации есть приличная сложность, в основном сфокусированная на обеспечении высокой пропускной способности при одновременном использовании простых шаблонов потребления, легко используемых в потреблении кода. Реализация, например, делает все возможное, чтобы минимизировать распределение. Возможно, вы заметили, что многие методы возвращают ValueTask и ValueTask<T>, а не Task и Task<T>. Как увидели в нашем тривиальном примере реализации в начале этой статьи, мы можем использовать ValueTask<T>, чтобы избежать выделения при синхронном завершении методов, но реализация System.Threading.Channels также использует расширенные интерфейсы IValueTaskSource и IValueTaskSource<T>, чтобы избежать выделения ресурсов даже когда различные методы завершаются асинхронно и нужно возвращать задачи.

Рассмотрим этот тест:

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using System.Threading.Channels;
using System.Threading.Tasks;
 
[MemoryDiagnoser]
public class Program
{
    static void Main() => BenchmarkRunner.Run();
 
    private readonly Channel s_channel = Channel.CreateUnbounded();
 
    [Benchmark]
    public async Task WriteThenRead()
    {
        ChannelWriter writer = s_channel.Writer;
        ChannelReader reader = s_channel.Reader;
        for (int i = 0; i < 10_000_000; i++)
        {
            writer.TryWrite(i);
            await reader.ReadAsync();
        }
    }
}

 

Здесь мы просто тестируем пропускную способность и распределение памяти на неограниченном канале при записи элемента, а затем читаем этот элемент 10 миллионов раз, это означает, что элемент всегда будет доступен для чтения, и, таким образом, чтение всегда будет завершаться синхронно, что дает следующие результаты на моем компьютере (72 байта, показанные в столбце Allocated, предназначены для одной Задачи, возвращенной из WriteThenRead):

 

Method                                          Mean                            Error StdDev Gen 0    Gen 1    Gen 2    Allocated

WriteThenRead                           527.8 ms                    2.03 ms              1.90 ms                                                       72 B

Но теперь давайте немного его изменим, сначала выполняя чтение, и только потом записывая элемент, который его удовлетворит. В этом случае чтение всегда завершается асинхронно, потому что данные для их завершения никогда не будут доступны:

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using System.Threading.Channels;
using System.Threading.Tasks;
 
[MemoryDiagnoser]
public class Program
{
    static void Main() => BenchmarkRunner.Run();
 
    private readonly Channel s_channel = Channel.CreateUnbounded();
 
    [Benchmark]
    public async Task ReadThenWrite()
    {
        ChannelWriter writer = s_channel.Writer;
        ChannelReader reader = s_channel.Reader;
        for (int i = 0; i < 10_000_000; i++)
        {
            ValueTask vt = reader.ReadAsync();
            writer.TryWrite(i);
            await vt;
        }
    }
}

 

который на моей машине за 10 миллионов операций записи и чтения дает такие результаты:

Method                                          Mean                            Error StdDev Gen 0    Gen 1    Gen 2    Allocated

ReadThenWrite                           881.2 ms                    4.60 ms              4.30 ms                                                       72 B

 

Итак, есть некоторые дополнительные издержки, когда каждое чтение завершается асинхронно, но даже здесь мы видим нулевое распределение для 10 миллионов асинхронно завершающих чтений (опять же, 72 байта, показанные в столбце Allocated, предназначены для Задачи, возвращаемой из ReadThenWrite)!

Комбинаторы

Обычно потребление каналов является простым, используя один из подходов, показанных ранее. Но, как и в случае с IEnumerables, можно также выполнять различные виды операций по каналам для достижения определенной цели. Например, допустим, я хочу дождаться поступления первого элемента от одного из двух предоставленных читателей; Я мог бы написать что-то вроде этого:

public static async ValueTask> WhenAny(
    ChannelReader reader1, ChannelReader reader2)
{
    var cts = new CancellationTokenSource();
    Task t1 = reader1.WaitToReadAsync(cts.Token).AsTask();
    Task t2 = reader2.WaitToReadAsync(cts.Token).AsTask();
    Task completed = await Task.WhenAny(t1, t2);
    cts.Cancel();
    return completed == t1 ? reader1 : reader2;
}

 

Здесь мы просто запрашиваем WaitToReadAsync на обоих каналах и возвращаем считыватель в зависимости от того, что завершается первым. Одна из интересных вещей, которую стоит отметить в этом примере, это то, что в то время как ChannelReader<T> имеет много общего с IEnumerator<T>, этот пример не может быть хорошо реализован поверх IEnumerator<T> (или IAsyncEnumerator<T>).

I{Async}Enumerator<T> предоставляет метод MoveNext{Async}, который перемещает курсор вперед к следующему элементу, который затем открывается из Current. Если бы мы попытались реализовать такой WhenAny поверх IAsyncEnumerator<T>, нам нужно было бы вызвать MoveNextAsync для каждого. При этом мы потенциально могли бы перейти к следующему пункту. Если бы мы затем использовали этот метод в цикле, мы, скорее всего, в конечном итоге пропустили бы элементы из одного или обоих перечислителей, потому что мы потенциально могли бы расширить перечислитель, который мы не вернули из метода.

Отношение к остальной части .NET Core

System.Threading.Channels является частью общей платформы .NET Core, что означает, что приложение .NET Core может начать использовать его, не устанавливая ничего дополнительного. Он также доступен в виде отдельного пакета NuGet, хотя отдельная реализация не имеет всех оптимизаций, в значительной степени потому, что встроенная реализация может использовать преимущества дополнительной среды выполнения и поддержки библиотеки. NET Core.

Он также используется множеством других систем в .NET. Например, ASP.NET использует каналы как часть SignalR, а также в своей транспортировке Kestrel на основе Libuv. Каналы также используются будущей реализацией QUIC, которая в настоящее время разрабатывается для .NET 5.

Библиотека System.Threading.Channels также выглядит немного похожей на библиотеку System.Threading.Tasks.Dataflow, которая уже много лет доступна в .NET. В некотором смысле библиотека потоков данных является расширенным набором библиотеки каналов; в частности, тип BufferBlock<T> из библиотеки потока данных предоставляет большую часть той же функциональности. Однако библиотека потоков данных также ориентирована на другую модель программирования, в которой блоки связаны между собой так, что данные автоматически передаются от одного к другому. Он также включает расширенные функциональные возможности, которые поддерживают, например, форму двухфазной фиксации, с несколькими блоками, связанными с одними и теми же потребителями, и теми пользователями, которые позволяют извлекать механизмы из нескольких блоков без блокировки, которые необходимы для его включения, гораздо более сложны, и, в то же время, более мощные и более дорогие. Это очевидно, просто написав такой же тест для BufferBlock<T>, как мы делали ранее для каналов.

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
 
[MemoryDiagnoser]
public class Program
{
    static void Main() => BenchmarkRunner.Run();
 
    private readonly Channel _channel = Channel.CreateUnbounded();
    private readonly BufferBlock _bufferBlock = new BufferBlock();
 
    [Benchmark]
    public async Task Channel_ReadThenWrite()
    {
        ChannelWriter writer = _channel.Writer;
        ChannelReader reader = _channel.Reader;
        for (int i = 0; i < 10_000_000; i++)
        {
            ValueTask vt = reader.ReadAsync();
            writer.TryWrite(i);
            await vt;
        }
    }
 
    [Benchmark]
    public async Task BufferBlock_ReadThenWrite()
    {
        for (int i = 0; i < 10_000_000; i++)
        {
            Task t = _bufferBlock.ReceiveAsync();
            _bufferBlock.Post(i);
            await t;
        }
    }
}

 

 

Method                                          Mean                            Error StdDev Gen 0    Gen 1    Gen 2    Allocated

Channel_ReadThenWrite       878.9 ms                    0.68 ms              0.60 ms                72 B                                 72 B

BufferBlock_ReadThenWrite                                  20,116.4 ms         192.82 ms           180.37 ms           1184000.0000    2000.0000                                                                                        7360000232 B

 

Это не означает, что библиотека System.Threading.Tasks.Dataflow не должна использоваться. Он позволяет разработчикам кратко выражать большое количество концепций и может демонстрировать очень хорошую производительность применительно к задачам, которые ему наиболее подходят. Однако, когда все, что вам нужно, - это структура передачи данных между одним или несколькими производителями и одним или несколькими потребителями, которую вы внедрили вручную, System.Threading.Channels - гораздо более компактная и экономная ставка.

Что же дальше?

Надеюсь, на данный момент вы более детально разобрались в библиотеке System.Threading.Channels и сможете с ее помощью улучшить ваши приложения. Попробуйте, и будем рады вашим отзывам, предложениям, проблемам и PR, чтобы улучшить ее

https://github.com/dotnet/runtime.

Источник

 



Posted on 15. November 2019

Анонс Windows Community Toolkit v6.0

Microsoft объявляет о следующем обновлении Windows Community Toolkit версии 6.0, которое стало возможным, благодаря помощи и вкладу нашего сообщества разработчиков. В этом выпуске реализована поддержка ARM64 для инструментария, а также обновление XAML Islands для поддержки .NET Core 3. Кроме того, у нас есть новые функции, такие как элемент управления "Eye Dropper " и новые помощники уведомлений Win32. У нас также есть обновление Microsoft Graph благодаря XAML контролам.


Смотрите более подробную информацию об этих функциях ниже.

XAML Islands приближает UWP к WPF, WinForms и Win32

XAML  Islands позволяет разработчику улучшить внешний вид и функциональность существующего приложения Win32 для WPF, Windows Forms или C ++, использовать новейшие функции пользовательского интерфейса Windows 10, которые доступны только через UWP контроллы, например, рукописный ввод:

 

В этой версии усовершенствована поддержка инструментов для .NET Core 3, что делает ее еще проще для того чтобы начать.

 

Документация для XAML Islands.

 

Поддержка ARM64

Windows Community Toolkit теперь поддерживает приложения, которые ориентированные на ARM64. Что позволяет разработчикам использовать преимущества времени автономной работы и производительности, работая на собственной архитектуре для таких устройств, как Surface Pro X. Разработчики также тесно сотрудничали с командой Win2D, чтобы убедиться, что она поддерживает ARM64. Это было важно для Lottie и других функций инструментария, основанных на Win2D.

Улучшенная анимация Lottie

Обновление предоставляет больше возможностей Adobe After Effects для Lottie-Windows, в том числе Linear и Radial Gradients, Masks, Track Mattes поддержку codegen для Image Layers . Мы надеемся, что эти дополнения позволят дизайнерам и разработчикам приложений создавать еще более привлекательные визуальные интерфейсы для пользователей Windows 10. Поскольку некоторые из этих функций основаны на более новых SDK, Lottie-Windows теперь также предлагает адаптивное управление версиями. Мы рассчитываем на то, что сообщество определит приоритеты работы с функциями, поэтому, пожалуйста, продолжайте оставлять свои ценные отзывы и предложения для Lottie-Windows!

 

Eye Dropper

Новый элемент управления “Eye Dropper” позволяет вам легко и быстро выбирать цвет приложения.

Документация для EyeDropper.

 

Превью XAML Graph Controls

Новое дополнение к Windows Community Toolkit позволяет разработчикам легко проходить проверку подлинности и получать доступ к Microsoft Graph в приложениях Windows 10 для создания обширных данных. Эти элементы управления доступны в качестве предварительного просмотра версии 6.1 и будут работать с приложениями UWP и в приложениях WPF / WinForms для Win32 через XAML Islands в .NET Core 3. Кроме того, совсем скоро с помощью Xamarin и Uno Platform у вас появится возможность использовать их на Android и iOS.

 

Читайте об этих новых элементах управления в нашем официальном заявлении или на GitHub.

Начни сегодня

Существует гораздо больше обновлений, чем мы можем описать здесь. Обязательно прочтите примечания к анонсу для получения более подробной информации обо всех исправлениях, представленных в этой версии.

Напоминаем, что вы можете начать работу, следуя нашему учебному пособию docs.microsoft.com tutorial, или просмотреть последние функции, установив образец приложения Windows Community Toolkit из Магазина Microsoft. Если вы хотите внести свой вклад в разработку, пожалуйста, присоединяйтесь на GitHub! Чтобы присоединиться к беседе в Twitter, используйте хештег #WindowsToolkit.

Всем удачного кодирования!

Источник



Posted on 3. August 2010

Доступен Prism 4.0 CTP

Команда разработчиков проекта Prism опубликовала CTP 4-й версии. Изменения этой версии касаются модульности и MVVM. А именно:

  • библиотека Prism для Windows Presentation Foundation и Silverlight;
  • QuickStarts:
    • новые и обновленные QuickStarts в Prism 4.0:
      • базовый MVVM QuickStart;
      • MVVM QuickStart;
      • QuickStart модульности (обновлен для использования MEF так же как и Unity);
    • адаптированные QuickStart из Prism 2.0;
      • QuickStarts по командам:
      • агрегатор событий QuickStarts;
      • Hello Worl QuickStarts;
      • QuickStarts по использованию проекта в нескольких технологиях:
      • QuckStarts по компоновке пользовательского интерфейса
    • RI адаптирован с Prism 2.0

Кроме MVVM QuickStarts остальные примеры имеют по два решения для настольных приложения и для Silverlight приложений.



Posted on 2. August 2010

Настройка Xaml FXCop правил в FXCop GUI

Lester в своем блоге опубликовал описание как настроить Xaml FXCop правила в FXCop GUI.

Для того, что бы настроить необходимо выполнить следующие шаги:

1. Установиться FXCop.

FXCop installation

2. Обновить FxCop.exe.config (находится в %ProgramFiles(x86)%\ Microsoft FxCop 1.36) для запуска на .Net 4.0.

Добавить ниже приведенный xml в нижнюю часть файла конфигурации после тега

3. Открыть FxCop.exe (находится в %ProgramFiles(x86)%\ Microsoft FxCop 1.36).

Microsoft FxCop

4. Загрузить сборку с правилами FxCop (если у вас не собственной с правилами, то можете использовать Microsoft.Xaml.Tools.FxCop.dll).

Microsoft FxCop 2

5. Загрузить Targets\сборку проекта.

Microsoft FxCop 3

6. На этом настройка завершена. Нажмите кнопку Analyze и вы увидите результаты работы установленных правил.

Microsoft FxCop 4

 



Posted on 21. July 2010

Обновился Microsoft Silverlight Analytics Framework

Michael S. Scherotter анонсировал обновление Microsoft Silverlight Analytics Frameworkписал о нем раньше). Возможности этого обнолвения:

Поддержка Windows Phone 7

Добавлено поведение TrackLocation для Windows Phone 7, которое позволяет отслеживать перемещение пользователя. При этом (по-хорошему) необходимо запрашивать разрешение у пользователя.

Silverlight Media Framework Player или Smooth Streaming Client пока что не доступны для Windows Phone 7. Но когда они появятся для этой платформы, то буду и состветствующие поведения для них.

Microsoft Silverlight Media Framework 2.0

Полностью поддерживается возможность отслеживания действий пользователя при использовании Microsoft Silverlight Media Framework 2.0.

Поддержка Webtrends

Разработчики продукта добавили поведение WebtrendsAnalytics для Silverlight 4, WPF и Windows Phone 7. Оно разработано с использованием Webtrends Data Collection API.

Добавлена поддержка Google Analytics в WPF и Windows Phone 7

Разработчики сделали несколько улучшений Google Analytics компонента для поддержки WPF и Windows Phone 7. Было исправлено несколько ошибок, которые нашли пользователи.

 



Posted on 2. July 2010

Доступен Prism 4 Drop 3

Вышло небольшое обновление Prism 4.0. Содержание этой версии:

  • QuickStart (Находится в каталоге QuickStar);
    • Model-View-ViewModel - для этого решения необходим установленый Silverlight;
    • Использование модульности через MEF - WPF и Silverlight версии. Использует .Net Framework 4.0;
    • Использование модульности через Unity - WPF и Silverlight версии. Некоторые функции аналогичны решению с MEF, но используют Unity с контейнером DI.
  • Hot to решения: простой MVVM - простые решения с использованием MVVM паттерна;
  • Stock Trader решение
  • Документация
Изменения этой версии:
  • Использования модульности через MEF: класс ModuleManager реализует асинхронный шаблон;
  • Использование модульности через Unity: добавлены примеры для Silverlight и WPF;
  • Добавлены How to по использованию MVVM;
  • Упрощен пример реализации INotifyDataErrorInfo.