Skip to content

Commit

Permalink
Feature: Multithread Stress Test for Parent Compare version of Cache-…
Browse files Browse the repository at this point in the history
…to-Cache MergeManyChangeSets (#807)

* Some progress, not done yet

* - Added stress test to Parent Compare version of MergeManyChangeSets
- Fixed deadlock / exception bug in operator
- Added Transform function for Cache ChangeSets (List already had one)
- Replaced Transform operator with direct transformation of changeset

* Code Tweaks
  • Loading branch information
dwcullop committed Dec 21, 2023
1 parent 56eda45 commit b605a18
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;
Expand Down Expand Up @@ -29,15 +34,94 @@ public sealed class MergeManyChangeSetsCacheSourceCompareFixture : IDisposable
const decimal HighestPrice = BasePrice + PriceOffset + 1.0m;
const decimal LowestPrice = BasePrice - 1.0m;

private static readonly Random Random = new (0x10012022);

private static decimal GetRandomPrice() => MarketPrice.RandomPrice(Random, BasePrice, PriceOffset);

private readonly ISourceCache<IMarket, Guid> _marketCache = new SourceCache<IMarket, Guid>(p => p.Id);

private readonly ChangeSetAggregator<IMarket, Guid> _marketCacheResults;

public MergeManyChangeSetsCacheSourceCompareFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator();
private readonly Faker<Market> _marketFaker;

private readonly Randomizer _randomizer;

public MergeManyChangeSetsCacheSourceCompareFixture()
{
_randomizer = new(0x10012022);
_marketFaker = Fakers.Market.RuleFor(m => m.Rating, faker => faker.Random.Double(0, 5)).WithSeed(_randomizer);
_marketCacheResults = _marketCache.Connect().AsAggregator();
}

[Theory]
#if DEBUG
[InlineData(5, 7)]
[InlineData(10, 50)]
#else
[InlineData(10, 1_000)]
[InlineData(100, 100)]
[InlineData(1_000, 10)]
#endif
public async Task MultiThreadedStressTest(int marketCount, int priceCount)
{
const int MaxItemId = 50;
var MaxAddTime = TimeSpan.FromSeconds(0.250);
var MaxRemoveTime = TimeSpan.FromSeconds(0.100);

TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null;

IObservable<Unit> AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
AddRemoveMarkets(marketCount, parallel, scheduler)
.Subscribe(
onNext: static _ => { },
onError: observer.OnError),
_marketCache.Connect()
.MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler))
.Subscribe(
onNext: static _ => { },
onError: observer.OnError,
onCompleted: observer.OnCompleted)
});

IObservable<IMarket> AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) =>
_marketFaker.IntervalGenerate(MaxAddTime, scheduler)
.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler))
.Finally(_marketCache.Dispose);

IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) =>
_randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreatePrice(_randomizer.Number(MaxItemId), GetRandomPrice()))
.Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler))
.Finally(market.PricesCache.Dispose);

var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare);
var adding = true;
using var priceResults = merged.AsAggregator();

// Start asynchrononously modifying the parent list and the child lists
using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
.Finally(() => adding = false)
.Subscribe();

// Subscribe / unsubscribe over and over while the collections are being modified
do
{
// Ensure items are being added asynchronously before subscribing to changes
await Task.Yield();

{
// Subscribe
var mergedSub = merged.Subscribe();

// Let other threads run
await Task.Yield();

// Unsubscribe
mergedSub.Dispose();
}
}
while (adding);

// Verify the results
CheckResultContents(_marketCacheResults, priceResults, Market.RatingCompare);
}

[Fact]
public void NullChecks()
Expand Down Expand Up @@ -985,10 +1069,32 @@ public void Dispose()
DisposeMarkets();
}

private void CheckResultContents(ChangeSetAggregator<IMarket, Guid> marketResults, ChangeSetAggregator<MarketPrice, int> priceResults, IComparer<IMarket> comparer)
{
var expectedMarkets = _marketCache.Items.ToList();

// These should be subsets of each other
expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items);
marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count);

// Pair up all the Markets/Prices, Group them by ItemId, and sort each Group by the Market comparer
// Then pull out the first value from each group, which should be the price from the best market for each ItemId
var expectedPrices = expectedMarkets.Select(m => (Market)m).SelectMany(m => m.PricesCache.Items.Select(mp => (Market: m, MarketPrice: mp)))
.GroupBy(tuple => tuple.MarketPrice.ItemId)
.Select(group => group.OrderBy(tuple => tuple.Market, comparer).Select(tuple => tuple.MarketPrice).First())
.ToList();

// These should be subsets of each other
expectedPrices.Should().BeSubsetOf(priceResults.Data.Items);
priceResults.Data.Items.Count().Should().Be(expectedPrices.Count);
}

private void DisposeMarkets()
{
_marketCache.Items.ForEach(m => (m as IDisposable)?.Dispose());
_marketCache.Dispose();
_marketCache.Clear();
}

private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset);
}
2 changes: 1 addition & 1 deletion src/DynamicData.Tests/Domain/Fakers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal static class Fakers

public static Faker<AnimalOwner> AnimalOwnerWithAnimals { get; } = AnimalOwner.Clone().WithInitialAnimals(Animal);

public static Faker<Market> Market { get; } = new Faker<Market>().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Market Id#{faker.Random.AlphaNumeric(5)}"));
public static Faker<Market> Market { get; } = new Faker<Market>().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Id#{faker.Random.AlphaNumeric(5)}"));

public static Faker<AnimalOwner> WithInitialAnimals(this Faker<AnimalOwner> existing, Faker<Animal> animalFaker, int minCount, int maxCount) =>
existing.FinishWith((faker, owner) => owner.Animals.AddRange(animalFaker.GenerateLazy(faker.Random.Number(minCount, maxCount))));
Expand Down
30 changes: 30 additions & 0 deletions src/DynamicData/Cache/CacheChangeSetEx.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Roland Pheasant licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using DynamicData.Kernel;

namespace DynamicData.Cache;

internal static class CacheChangeSetEx
Expand All @@ -24,4 +26,32 @@ internal static class CacheChangeSetEx
where TObject : notnull
where TKey : notnull =>
changeSet as ChangeSet<TObject, TKey> ?? throw new NotSupportedException("Dynamic Data does not support a custom implementation of IChangeSet");

/// <summary>
/// Transforms the change set into a different type using the specified transform function.
/// </summary>
/// <typeparam name="TSource">The type of the source.</typeparam>
/// <typeparam name="TDestination">The type of the destination.</typeparam>
/// <typeparam name="TKey">The type of the Key.</typeparam>
/// <param name="source">The source.</param>
/// <param name="transformer">The transformer.</param>
/// <returns>The change set.</returns>
/// <exception cref="ArgumentNullException">
/// source
/// or
/// transformer.
/// </exception>
public static IChangeSet<TDestination, TKey> Transform<TSource, TDestination, TKey>(this IChangeSet<TSource, TKey> source, Func<TSource, TDestination> transformer)
where TSource : notnull
where TDestination : notnull
where TKey : notnull
{
source.ThrowArgumentNullExceptionIfNull(nameof(source));
transformer.ThrowArgumentNullExceptionIfNull(nameof(transformer));

var changes = source.Select(change =>
new Change<TDestination, TKey>(change.Reason, change.Key, transformer(change.Current), change.Previous.Convert(transformer), change.CurrentIndex, change.PreviousIndex));

return new ChangeSet<TDestination, TKey>(changes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal sealed class MergeManyCacheChangeSetsSourceCompare<TObject, TKey, TDest
{
private readonly Func<TObject, TKey, IObservable<IChangeSet<ParentChildEntry, TDestinationKey>>> _changeSetSelector = (obj, key) => selector(obj, key).Transform(dest => new ParentChildEntry(obj, dest));

private readonly IComparer<ParentChildEntry>? _comparer = (childCompare is null) ? new ParentOnlyCompare(parentCompare) : new ParentChildCompare(parentCompare, childCompare);
private readonly IComparer<ParentChildEntry> _comparer = (childCompare is null) ? new ParentOnlyCompare(parentCompare) : new ParentChildCompare(parentCompare, childCompare);

private readonly IEqualityComparer<ParentChildEntry>? _equalityComparer = (equalityComparer != null) ? new ParentChildEqualityCompare(equalityComparer) : null;

Expand All @@ -30,37 +30,40 @@ internal sealed class MergeManyCacheChangeSetsSourceCompare<TObject, TKey, TDest
// Transform to an observable cache of merge containers.
var sourceCacheOfCaches = source
.Transform((obj, key) => new ChangeSetCache<ParentChildEntry, TDestinationKey>(_changeSetSelector(obj, key)))
.Synchronize(locker)
.Transform((obj, key) => new ChangeSetCache<ParentChildEntry, TDestinationKey>(_changeSetSelector(obj, key).Synchronize(locker)))
.AsObservableCache();
// Share a single connection to the cache
var shared = sourceCacheOfCaches.Connect().Publish();
// this is manages all of the changes
// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<ParentChildEntry, TDestinationKey>(() => sourceCacheOfCaches.Items, _comparer, _equalityComparer);
// merge the items back together
// Merge the child changeset changes together and apply to the tracker
var allChanges = shared.MergeMany(mc => mc.Source)
.Synchronize(locker)
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);
// when a source item is removed, all of its sub-items need to be removed
// When a source item is removed, all of its sub-items need to be removed
var removedItems = shared
.Synchronize(locker)
.OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer))
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer))
.Subscribe();
// If requested, when the source sees a refresh event, re-evaluate all the keys associated with that source because the priority may have changed
// Because the comparison is based on the parent, which has just been refreshed.
var refreshItems = reevalOnRefresh
? shared.OnItemRefreshed(mc => changeTracker.RefreshItems(mc.Cache.Keys, observer)).Subscribe()
? shared
.Synchronize(locker)
.OnItemRefreshed(mc => changeTracker.RefreshItems(mc.Cache.Keys, observer))
.Subscribe()
: Disposable.Empty;
return new CompositeDisposable(sourceCacheOfCaches, allChanges, removedItems, refreshItems, shared.Connect());
}).Transform(entry => entry.Child);
}).Select(changes => changes.Transform(entry => entry.Child));

private sealed class ParentChildEntry(TObject parent, TDestination child)
{
Expand Down

0 comments on commit b605a18

Please sign in to comment.