Skip to content

Commit

Permalink
Feature: Use Flag instead of Counter for TransformManyAsync/Transform…
Browse files Browse the repository at this point in the history
…OnObservable (#844)

* Updated TransformManyAsync and TransformOnObservable to avoid using a counter and use a parent update flag instead
* Fixed handling of Exceptions for TransformManySafeAsync scenarios
* Minor code clean-up / improvements
  • Loading branch information
dwcullop committed Feb 5, 2024
1 parent fb86420 commit 5bf069d
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 76 deletions.
3 changes: 2 additions & 1 deletion src/DynamicData/Cache/Internal/MergeChangeSets.cs
Expand Up @@ -4,6 +4,7 @@

using System.Reactive.Concurrency;
using System.Reactive.Linq;
using DynamicData.Internal;

namespace DynamicData.Cache.Internal;

Expand Down Expand Up @@ -33,7 +34,7 @@ public MergeChangeSets(IEnumerable<IObservable<IChangeSet<TObject, TKey>>> sourc
.Synchronize(locker)
.Do(cache.Clone)
.MergeMany(mc => mc.Source.Do(static _ => { }, observer.OnError))
.Subscribe(
.SubscribeSafe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);
Expand Down
5 changes: 3 additions & 2 deletions src/DynamicData/Cache/Internal/MergeMany.cs
Expand Up @@ -5,6 +5,7 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using DynamicData.Internal;

namespace DynamicData.Cache.Internal;

Expand Down Expand Up @@ -39,9 +40,9 @@ public MergeMany(IObservable<IChangeSet<TObject, TKey>> source, Func<TObject, IO
.SubscribeMany((t, key) =>
{
counter.Added();
return _observableSelector(t, key).Synchronize(locker).Finally(() => counter.Finally()).Subscribe(observer.OnNext, _ => { }, () => { });
return _observableSelector(t, key).Synchronize(locker).Finally(() => counter.Finally()).Subscribe(observer.OnNext, static _ => { });
})
.Subscribe(_ => { }, observer.OnError, observer.OnCompleted);
.SubscribeSafe(observer.OnError, observer.OnCompleted);
return new CompositeDisposable(disposable, counter);
});
Expand Down
20 changes: 12 additions & 8 deletions src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs
Expand Up @@ -31,8 +31,11 @@ internal sealed class MergeManyCacheChangeSets<TObject, TKey, TDestination, TDes
var shared = source
.Transform((obj, key) => new ChangeSetCache<TDestination, TDestinationKey>(selector(obj, key).Synchronize(locker)))
.Synchronize(locker)
.Do(cache.Clone)
.Do(_ => parentUpdate = true)
.Do(changes =>
{
cache.Clone(changes);
parentUpdate = true;
})
.Publish();
// Merge the child changeset changes together and apply to the tracker
Expand All @@ -47,12 +50,13 @@ internal sealed class MergeManyCacheChangeSets<TObject, TKey, TDestination, TDes
var subRemove = shared
.OnItemRemoved(changeSetCache => changeTracker.RemoveItems(changeSetCache.Cache.KeyValues), invokeOnUnsubscribe: false)
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues))
.Do(_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
})
.Subscribe();
.SubscribeSafe(
_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError);
return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove);
});
Expand Down
Expand Up @@ -38,8 +38,11 @@ internal sealed class MergeManyCacheChangeSetsSourceCompare<TObject, TKey, TDest
var shared = source
.Transform((obj, key) => new ChangeSetCache<ParentChildEntry, TDestinationKey>(_changeSetSelector(obj, key).Synchronize(locker)))
.Synchronize(locker)
.Do(cache.Clone)
.Do(_ => parentUpdate = true)
.Do(changes =>
{
cache.Clone(changes);
parentUpdate = true;
})
.Publish();
// Merge the child changeset changes together and apply to the tracker
Expand All @@ -63,12 +66,13 @@ internal sealed class MergeManyCacheChangeSetsSourceCompare<TObject, TKey, TDest
// Subscribe to handle all the requested changes and emit them downstream
var subParent = parentObservable
.Do(_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
})
.Subscribe();
.SubscribeSafe(
_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError);
return new CompositeDisposable(shared.Connect(), subMergeMany, subParent);
}).TransformImmutable(entry => entry.Child);
Expand Down
13 changes: 7 additions & 6 deletions src/DynamicData/Cache/Internal/MergeManyListChangeSets.cs
Expand Up @@ -45,12 +45,13 @@ internal sealed class MergeManyListChangeSets<TObject, TKey, TDestination>(IObse
var subRemove = shared
.OnItemRemoved(clonedList => changeTracker.RemoveItems(clonedList.List), invokeOnUnsubscribe: false)
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.List))
.Do(_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
})
.Subscribe();
.SubscribeSafe(
_ =>
{
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError);
return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove);
});
Expand Down
76 changes: 41 additions & 35 deletions src/DynamicData/Cache/Internal/TransformManyAsync.cs
Expand Up @@ -21,62 +21,68 @@ internal sealed class TransformManyAsync<TSource, TKey, TDestination, TDestinati
{
var locker = new object();
var cache = new Cache<ChangeSetCache<TDestination, TDestinationKey>, TKey>();
var updateCounter = 0;
// Transformation Function:
// Create the Child Observable by invoking the async selector, appending the counter and the synchronize
// Pass the result to a new ChangeSetCache instance.
ChangeSetCache<TDestination, TDestinationKey> Transform_(TSource obj, TKey key) => new(
Observable.Defer(() => selector(obj, key))
.Do(_ => Interlocked.Increment(ref updateCounter))
.Synchronize(locker!));
var parentUpdate = false;
// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TDestination, TDestinationKey>(() => cache.Items, comparer, equalityComparer);
// Transform Helper
async Task<IObservable<IChangeSet<TDestination, TDestinationKey>>> InvokeSelector(TSource obj, TKey key)
{
if (errorHandler != null)
{
try
{
return await selector(obj, key).ConfigureAwait(false);
}
catch (Exception e)
{
errorHandler.Invoke(new Error<TSource, TKey>(e, obj, key));
return Observable.Empty<IChangeSet<TDestination, TDestinationKey>>();
}
}
return await selector(obj, key).ConfigureAwait(false);
}
// Transformation Function:
// Create the Child Observable by invoking the async selector, appending the synchronize, and creating a new ChangeSetCache instance.
ChangeSetCache<TDestination, TDestinationKey> Transform_(TSource obj, TKey key) =>
new(Observable.Defer(() => InvokeSelector(obj, key)).Synchronize(locker!));
// Transform to a cache changeset of child caches, synchronize, clone changes to the local copy, and publish.
// Increment updateCounter BEFORE the lock so that incoming changesets will cause the downstream changeset to be delayed
// until all pending changesets have been handled.
var shared =
(errorHandler is null ? source.Transform(Transform_) : source.TransformSafe(Transform_, errorHandler))
.Do(_ => Interlocked.Increment(ref updateCounter))
.Synchronize(locker)
.Do(cache.Clone)
.Publish();
var shared = source
.Transform(Transform_)
.Synchronize(locker)
.Do(
changes =>
{
cache.Clone(changes);
parentUpdate = true;
})
.Publish();
// Merge the child changeset changes together and apply to the tracker
// Emit the changeset if there are no other pending changes
// Emit the changeset if not currently handling a parent stream update
var subMergeMany = shared
.MergeMany(cacheChangeSet => cacheChangeSet.Source)
.SubscribeSafe(
changes => changeTracker.ProcessChangeSet(changes, Interlocked.Decrement(ref updateCounter) == 0 ? observer : null),
changes => changeTracker.ProcessChangeSet(changes, !parentUpdate ? observer : null),
observer.OnError);
// When a source item is removed, all of its sub-items need to be removed
// Emit the changeset if there are no other pending changes
// Emit any pending changes
var subRemove = shared
.OnItemRemoved(changeSetCache => changeTracker.RemoveItems(changeSetCache.Cache.KeyValues), invokeOnUnsubscribe: false)
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues))
.SubscribeSafe(
_ =>
{
if (Interlocked.Decrement(ref updateCounter) == 0)
{
changeTracker.EmitChanges(observer);
}
changeTracker.EmitChanges(observer);
parentUpdate = false;
},
observer.OnError,
() =>
{
if (Volatile.Read(ref updateCounter) == 0)
{
observer.OnCompleted();
}
else
{
changeTracker.MarkComplete();
}
});
observer.OnCompleted);
return new CompositeDisposable(shared.Connect(), subMergeMany, subRemove);
});
Expand Down
23 changes: 11 additions & 12 deletions src/DynamicData/Cache/Internal/TransformOnObservable.cs
Expand Up @@ -5,7 +5,6 @@
using System.Reactive.Disposables;
using System.Reactive.Linq;
using DynamicData.Internal;
using DynamicData.Kernel;

namespace DynamicData.Cache.Internal;

Expand All @@ -18,18 +17,20 @@ internal sealed class TransformOnObservable<TSource, TKey, TDestination>(IObserv
{
var cache = new ChangeAwareCache<TDestination, TKey>();
var locker = new object();
var pendingUpdates = 0;
var parentUpdate = false;
// Helper to emit any pending changes when all the updates have been handled
void EmitChanges()
// Helper to emit any pending changes when appropriate
void EmitChanges(bool fromParent)
{
if (Interlocked.Decrement(ref pendingUpdates) == 0)
if (fromParent || !parentUpdate)
{
var changes = cache!.CaptureChanges();
if (changes.Count > 0)
{
observer.OnNext(changes);
}
parentUpdate = false;
}
}
Expand All @@ -38,27 +39,25 @@ void EmitChanges()
IObservable<TDestination> CreateSubObservable(TSource obj, TKey key) =>
transform(obj, key)
.DistinctUntilChanged()
.Do(_ => Interlocked.Increment(ref pendingUpdates))
.Synchronize(locker!)
.Do(val => cache!.AddOrUpdate(val, key));
// Always increment the counter OUTSIDE of the lock to signal any thread currently holding the lock
// to not emit the changeset because more changes are incoming.
// Flag a parent update is happening once inside the lock
var shared = source
.Do(_ => Interlocked.Increment(ref pendingUpdates))
.Synchronize(locker!)
.Do(_ => parentUpdate = true)
.Publish();
// Use MergeMany because it automatically handles Add/Update/Remove and OnCompleted/OnError correctly
// MergeMany automatically handles Add/Update/Remove and OnCompleted/OnError correctly
var subMerged = shared
.MergeMany(CreateSubObservable)
.SubscribeSafe(_ => EmitChanges(), observer.OnError, observer.OnCompleted);
.SubscribeSafe(_ => EmitChanges(fromParent: false), observer.OnError, observer.OnCompleted);
// Subscribe to the shared Observable to handle Remove events. MergeMany will unsubscribe from the sub-observable,
// but the corresponding key value needs to be removed from the Cache so the remove is observed downstream.
var subRemove = shared
.OnItemRemoved((_, key) => cache!.Remove(key), invokeOnUnsubscribe: false)
.SubscribeSafe(_ => EmitChanges());
.SubscribeSafe(_ => EmitChanges(fromParent: true), observer.OnError);
return new CompositeDisposable(shared.Connect(), subMerged, subRemove);
});
Expand Down
13 changes: 9 additions & 4 deletions src/DynamicData/Internal/ObservableEx.cs
Expand Up @@ -14,9 +14,14 @@ internal static class ObservableEx
public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<T> onNext, Action<Exception> onError) =>
observable.SubscribeSafe(Observer.Create(onNext, onError));

public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<T> onNext, Action onComplete) =>
observable.SubscribeSafe(Observer.Create(onNext, onComplete));
public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<Exception> onError, Action onComplete) =>
observable.SubscribeSafe(Observer.Create(Stub<T>.Ignore, onError, onComplete));

public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<T> onNext) =>
observable.SubscribeSafe(Observer.Create(onNext));
public static IDisposable SubscribeSafe<T>(this IObservable<T> observable, Action<Exception> onError) =>
observable.SubscribeSafe(Observer.Create(Stub<T>.Ignore, onError));

private static class Stub<T>
{
public static readonly Action<T> Ignore = static _ => { };
}
}

0 comments on commit 5bf069d

Please sign in to comment.