Skip to content

azyobuzin/BiDaFlow

Repository files navigation

BiDaFlow

Get the power of parallel processing with TPL Dataflow

API Documentation

Packages

Package Version Changelog
BiDaFlow 0.2.0 CHANGELOG.md
BiDaFlow.AsyncEnumerable 0.2.2 CHANGELOG.md

Features

Fluent API for TPL Dataflow

await Enumerable.Range(1, 100).AsSourceBlock()
    // RunWith connects to a ITargetBlock and returns a single IDataflowBlock
    .RunWith(
        new BatchBlock<int>(5, new GroupingDataflowBlockOptions { BoundedCapacity = 5 })
            // Chain makes a new IPropagatorBlock linking the upstream and downstream blocks
            .Chain(new TransformBlock<int[], int>(
                x => x.Sum(),
                new ExecutionDataflowBlockOptions
                {
                    BoundedCapacity = 4,
                    MaxDegreeOfParallelism = 4,
                    SingleProducerConstrained = true,
                }))
            // ChainToTarget makes a new ITargetBlock linking the upstream and downstream blocks
            .ChainToTarget(new ActionBlock<int>(x => Console.WriteLine(x)))
    )
    .Completion;

Do you want to make more customized block? You can use FluentDataflow.EncapsulateAsDataflowBlock. See the API documentation and get the power to make blocks freely.

AsyncEnumerable Integration

IAsyncEnumerable is the key interface of data flow with back pressure in .NET. BiDaFlow.AsyncEnumerable empowers IAsyncEnumerable to be able to process data parallelly in manner of Task Async.

await AsyncEnumerable.Range(1, 100)
    // Process elements in parallel with IPropagatorBlock
    .RunThroughDataflowBlock(() =>
        new TransformBlock<int, int>(
            x => x * 10,
            new ExecutionDataflowBlockOptions
            {
                BoundedCapacity = 6,
                MaxDegreeOfParallelism = 4,
                EnsureOrdered = false,
                SingleProducerConstrained = true,
            })
    )
    // The result is an IAsyncEnumerable
    // Subsequent process can be written with System.Linq.Async
    .ForEachAsync(x => Console.WriteLine(x));

Roadmap

  • BiDaFlow.ReactiveStreams - integration with Reactive Streams, TPL Dataflow and AsyncEnumerable