Skip to content

Commit

Permalink
TransformAsync enhancements (#819)
Browse files Browse the repository at this point in the history
Improve implementation of TransformAsync + add max concurrency and transform on refresh overloads
  • Loading branch information
RolandPheasant committed Jan 22, 2024
1 parent 11b8b26 commit 5148d14
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1823,14 +1823,26 @@ namespace DynamicData
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, System.Threading.Tasks.Task<TDestination>> transformFactory, DynamicData.TransformAsyncOptions options)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, System.Threading.Tasks.Task<TDestination>> transformFactory, System.IObservable<System.Func<TSource, TKey, bool>>? forceTransform = null)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TKey, System.Threading.Tasks.Task<TDestination>> transformFactory, DynamicData.TransformAsyncOptions options)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TKey, System.Threading.Tasks.Task<TDestination>> transformFactory, System.IObservable<System.Func<TSource, TKey, bool>>? forceTransform = null)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, DynamicData.Kernel.Optional<TSource>, TKey, System.Threading.Tasks.Task<TDestination>> transformFactory, DynamicData.TransformAsyncOptions options)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, DynamicData.Kernel.Optional<TSource>, TKey, System.Threading.Tasks.Task<TDestination>> transformFactory, System.IObservable<System.Func<TSource, TKey, bool>>? forceTransform = null)
where TDestination : notnull
where TSource : notnull
Expand Down Expand Up @@ -1879,14 +1891,26 @@ namespace DynamicData
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafeAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, System.Threading.Tasks.Task<TDestination>> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, DynamicData.TransformAsyncOptions options)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafeAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, System.Threading.Tasks.Task<TDestination>> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, System.IObservable<System.Func<TSource, TKey, bool>>? forceTransform = null)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafeAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TKey, System.Threading.Tasks.Task<TDestination>> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, DynamicData.TransformAsyncOptions options)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafeAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, TKey, System.Threading.Tasks.Task<TDestination>> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, System.IObservable<System.Func<TSource, TKey, bool>>? forceTransform = null)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafeAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, DynamicData.Kernel.Optional<TSource>, TKey, System.Threading.Tasks.Task<TDestination>> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, DynamicData.TransformAsyncOptions options)
where TDestination : notnull
where TSource : notnull
where TKey : notnull { }
public static System.IObservable<DynamicData.IChangeSet<TDestination, TKey>> TransformSafeAsync<TDestination, TSource, TKey>(this System.IObservable<DynamicData.IChangeSet<TSource, TKey>> source, System.Func<TSource, DynamicData.Kernel.Optional<TSource>, TKey, System.Threading.Tasks.Task<TDestination>> transformFactory, System.Action<DynamicData.Kernel.Error<TSource, TKey>> errorHandler, System.IObservable<System.Func<TSource, TKey, bool>>? forceTransform = null)
where TDestination : notnull
where TSource : notnull
Expand Down Expand Up @@ -2420,6 +2444,13 @@ namespace DynamicData
public void Edit(System.Action<DynamicData.IExtendedList<T>> updateAction) { }
public System.IObservable<DynamicData.IChangeSet<T>> Preview(System.Func<T, bool>? predicate = null) { }
}
public struct TransformAsyncOptions : System.IEquatable<DynamicData.TransformAsyncOptions>
{
public static readonly DynamicData.TransformAsyncOptions Default;
public TransformAsyncOptions(int? MaximumConcurrency, bool TransformOnRefresh) { }
public int? MaximumConcurrency { get; set; }
public bool TransformOnRefresh { get; set; }
}
[System.Serializable]
public class UnspecifiedIndexException : System.Exception
{
Expand Down
66 changes: 59 additions & 7 deletions src/DynamicData.Tests/Cache/TransformAsyncFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

using DynamicData.Binding;
using DynamicData.Tests.Domain;

using FluentAssertions;

using Xunit;

namespace DynamicData.Tests.Cache;
Expand Down Expand Up @@ -93,7 +91,7 @@ public void Remove()
public async Task RemoveFlowsToTheEnd()
{
var transform = 0;
var count = 500;
var count = 100;
ReadOnlyObservableCollection<Person> collection;

var cache = new SourceCache<Person, string>(p => p.Name);
Expand Down Expand Up @@ -121,9 +119,8 @@ public async Task RemoveFlowsToTheEnd()
cache.RemoveKey(p.Name);
}

while (transform != count)
await Task.Delay(100);
await Task.Delay(3000);
await collection.ToObservableChangeSet().Take(count * 2);

collection.Count.Should().Be(0);
}

Expand Down Expand Up @@ -207,6 +204,61 @@ public void Update()
stub.Results.Messages[1].Updates.Should().Be(1, "Should be 1 update");
}




[Theory, InlineData(true), InlineData(false)]
public void TransformOnRefresh(bool transformOnRefresh)
{
using var source = new SourceCache<Person, string>(p => p.Name);
using var results = source.Connect()
.AutoRefresh()
.TransformAsync((p, key) => Task.FromResult(new PersonWithAgeGroup(p, p.Age < 18 ? "Child" : "Adult")), TransformAsyncOptions.Default with { TransformOnRefresh = transformOnRefresh }).AsAggregator();

var person = new Person("SomeOne", 16);
source.AddOrUpdate(person);

results.Data.Count.Should().Be(1);
results.Data.Lookup("SomeOne").Value.AgeGroup.Should().Be("Child");

person.Age = 21;


results.Data.Count.Should().Be(1);
results.Data.Lookup("SomeOne").Value.AgeGroup.Should().Be(transformOnRefresh ? "Adult": "Child");

}


[Theory, InlineData(10), InlineData(100)]

public async Task WithMaxConcurrency(int maxConcurrency)
{
/* We need to test whether the max concurrency has any effect.
If maxConcurrency == 100, this test takes a little more than 100 ms
If maxConcurrency = 10, this test takes a little more than 1s
So it works, but how can it be tested in a scientific way ??
*/


const int transformCount = 100;

using var source = new SourceCache<Person, string>(p => p.Name);
using var results = source.Connect()
.TransformAsync(async (p, key) =>
{
await Task.Delay(100);
return new PersonWithAgeGroup(p, p.Age < 18 ? "Child" : "Adult");
}, TransformAsyncOptions.Default with { MaximumConcurrency = maxConcurrency }).AsAggregator();

source.AddOrUpdate(Enumerable.Range(1, transformCount).Select(l => new Person("Person" + l, l)));

await results.Data.CountChanged.Where(c => c == transformCount).Take(1);
}

private class TransformStub : IDisposable
{
public TransformStub()
Expand Down

0 comments on commit 5148d14

Please sign in to comment.