Skip to content

Commit

Permalink
Feature: Stress Tests for Cache-to-Cache MergeManyChangeSets (#802)
Browse files Browse the repository at this point in the history
* Stress Tests for the Cache-Cache Version

* Test tuning

* Improve Stress for Cache-Cache version

* Cache-to-Cache Improvements
  • Loading branch information
dwcullop committed Dec 19, 2023
1 parent 13bd8c6 commit 5748318
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 25 deletions.
120 changes: 111 additions & 9 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheFixture.cs
@@ -1,7 +1,13 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;
Expand Down Expand Up @@ -29,16 +35,94 @@ public sealed class MergeManyChangeSetsCacheFixture : IDisposable
const decimal HighestPrice = BasePrice + PriceOffset + 1.0m;
const decimal LowestPrice = BasePrice - 1.0m;

private static readonly Random Random = new (0x21123737);

private static decimal GetRandomPrice() => MarketPrice.RandomPrice(Random, BasePrice, PriceOffset);

private readonly ISourceCache<IMarket, Guid> _marketCache = new SourceCache<IMarket, Guid>(p => p.Id);

private readonly ChangeSetAggregator<IMarket, Guid> _marketCacheResults;

private readonly Randomizer _randomizer = new (0x21123737);

public MergeManyChangeSetsCacheFixture() => _marketCacheResults = _marketCache.Connect().AsAggregator();

[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(5, 100)]
[InlineData(200, 500)]
[InlineData(100, 5)]
public async Task MultiThreadedStressTest(int marketCount, int priceCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
var MaxRemoveTime = TimeSpan.FromSeconds(0.100);

TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null;

IObservable<Unit> AddRemoveStress(int marketCount, int priceCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
AddRemoveMarkets(marketCount, parallel, scheduler)
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex)),
_marketCache.Connect()
.MergeMany(market => AddRemovePrices((Market)market, priceCount, parallel, scheduler))
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});

IObservable<IMarket> AddRemoveMarkets(int ownerCount, int parallel, IScheduler scheduler) =>
_randomizer.Interval(MaxAddTime, scheduler).Select(_ => new Market(_randomizer.Utf16String(5, 10, true)))
//.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler))
.Take(ownerCount)
.StressAddRemove(_marketCache, _ => GetRemoveTime(), scheduler)
.Finally(_marketCache.Dispose);

IObservable<MarketPrice> AddRemovePrices(Market market, int priceCount, int parallel, IScheduler scheduler) =>
_randomizer.Interval(MaxAddTime, scheduler).Select(_ => market.CreateUniquePrice(_ => GetRandomPrice()))
//.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler))
.Take(priceCount)
.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)
.Finally(market.PricesCache.Dispose);

var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices);
using var priceResults = merged.AsAggregator();

var adding = true;

// Start asynchrononously modifying the parent list and the child lists
using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
.Finally(() => adding = false)
.Subscribe();

// Subscribe / unsubscribe over and over while the collections are being modified
do
{
// Ensure items are being added asynchronously before subscribing to changes
await Task.Yield();

{
// Subscribe
var mergedSub = merged.Subscribe();

// Let other threads run
await Task.Yield();

// Unsubscribe
mergedSub.Dispose();
}
}
while (adding);

// Verify the results
CheckResultContents(_marketCacheResults, priceResults);
}

[Fact]
public void NullChecks()
{
Expand Down Expand Up @@ -136,7 +220,7 @@ public void AllExistingSubItemsPresentInResult()
// having
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// when
_marketCache.AddOrUpdate(markets);
Expand All @@ -160,7 +244,7 @@ public void AllNewSubItemsPresentInResult()
_marketCache.AddOrUpdate(markets);

// when
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// then
_marketCacheResults.Data.Count.Should().Be(MarketCount);
Expand All @@ -179,7 +263,7 @@ public void AllRefreshedSubItemsAreRefreshed()
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
_marketCache.AddOrUpdate(markets);
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// when
markets.ForEach(m => m.RefreshAllPrices(GetRandomPrice));
Expand Down Expand Up @@ -285,7 +369,7 @@ public void AnyRemovedSubItemIsRemoved()
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
_marketCache.AddOrUpdate(markets);
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// when
markets.ForEach(m => m.PricesCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount))));
Expand All @@ -306,7 +390,7 @@ public void AnySourceItemRemovedRemovesAllSourceValues()
var markets = Enumerable.Range(0, MarketCount).Select(n => new Market(n)).ToArray();
using var results = _marketCache.Connect().MergeManyChangeSets(m => m.LatestPrices, MarketPrice.EqualityComparer).AsAggregator();
_marketCache.AddOrUpdate(markets);
markets.Select((m, index) => new { Market = m, Index = index }).ForEach(m => m.Market.SetPrices(m.Index * ItemIdStride, (m.Index * ItemIdStride) + PricesPerMarket, GetRandomPrice));
AddUniquePrices(markets);

// when
_marketCache.Edit(updater => updater.RemoveKeys(updater.Keys.Take(RemoveCount)));
Expand Down Expand Up @@ -691,10 +775,28 @@ public void Dispose()
DisposeMarkets();
}

private void AddUniquePrices(Market[] markets) => markets.ForEach(m => m.AddUniquePrices(PricesPerMarket, _ => GetRandomPrice()));

private void CheckResultContents(ChangeSetAggregator<IMarket, Guid> marketResults, ChangeSetAggregator<MarketPrice, int> priceResults)
{
var expectedMarkets = _marketCache.Items.ToList();
var expectedPrices = expectedMarkets.SelectMany(market => ((Market)market).PricesCache.Items).ToList();

// These should be subsets of each other
expectedMarkets.Should().BeSubsetOf(marketResults.Data.Items);
marketResults.Data.Items.Count().Should().Be(expectedMarkets.Count);

// These should be subsets of each other
expectedPrices.Should().BeSubsetOf(priceResults.Data.Items);
priceResults.Data.Items.Count().Should().Be(expectedPrices.Count);
}

private void DisposeMarkets()
{
_marketCache.Items.ForEach(m => (m as IDisposable)?.Dispose());
_marketCache.Dispose();
_marketCache.Clear();
}

private decimal GetRandomPrice() => MarketPrice.RandomPrice(_randomizer, BasePrice, PriceOffset);
}
2 changes: 2 additions & 0 deletions src/DynamicData.Tests/Domain/Fakers.cs
Expand Up @@ -45,6 +45,8 @@ internal static class Fakers

public static Faker<AnimalOwner> AnimalOwnerWithAnimals { get; } = AnimalOwner.Clone().WithInitialAnimals(Animal);

public static Faker<Market> Market { get; } = new Faker<Market>().CustomInstantiator(faker => new Market($"{faker.Commerce.ProductName()} Market Id#{faker.Random.AlphaNumeric(5)}"));

public static Faker<AnimalOwner> WithInitialAnimals(this Faker<AnimalOwner> existing, Faker<Animal> animalFaker, int minCount, int maxCount) =>
existing.FinishWith((faker, owner) => owner.Animals.AddRange(animalFaker.GenerateLazy(faker.Random.Number(minCount, maxCount))));

Expand Down
21 changes: 21 additions & 0 deletions src/DynamicData.Tests/Domain/Market.cs
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
using DynamicData.Kernel;
using DynamicData.Tests.Utilities;

Expand All @@ -21,6 +22,8 @@ internal interface IMarket

internal sealed class Market : IMarket, IDisposable
{
private static int s_UniquePriceId;

private readonly ISourceCache<MarketPrice, int> _latestPrices = new SourceCache<MarketPrice, int>(p => p.ItemId);

public static IComparer<IMarket> RatingCompare { get; } = new RatingComparer();
Expand All @@ -39,6 +42,10 @@ public Market(int name) : this($"Market #{name}", Guid.NewGuid())
{
}

public Market(string name) : this(name, Guid.NewGuid())
{
}

public string Name { get; }

public Guid Id { get; }
Expand All @@ -51,6 +58,12 @@ public Market(int name) : this($"Market #{name}", Guid.NewGuid())

public MarketPrice CreatePrice(int itemId, decimal price) => new(itemId, price, Id);

public MarketPrice CreateUniquePrice(Func<int, decimal> getPrice)
{
var id = Interlocked.Increment(ref s_UniquePriceId);
return CreatePrice(id, getPrice(id));
}

public Market AddRandomIdPrices(Random r, int count, int minId, int maxId, Func<decimal> randPrices)
{
_latestPrices.AddOrUpdate(Enumerable.Range(0, int.MaxValue).Select(_ => r.Next(minId, maxId)).Distinct().Take(count).Select(id => CreatePrice(id, randPrices())));
Expand Down Expand Up @@ -95,10 +108,18 @@ public Market AddRandomIdPrices(Random r, int count, int minId, int maxId, Func<

public Market SetPrices(int minId, int maxId, decimal newPrice) => SetPrices(minId, maxId, _ => newPrice);

public Market SetPrice(int id, Func<decimal> getPrice) => this.With(_ => _latestPrices.AddOrUpdate(CreatePrice(id, getPrice())));

public Market AddUniquePrices(int count, Func<int, decimal> getPrice) =>
this.With(_ => _latestPrices.AddOrUpdate(CreateUniquePrices(count, getPrice)));

public void Dispose() => _latestPrices.Dispose();

public override string ToString() => $"Market '{Name}' [{Id}] (Rating: {Rating})";

private IEnumerable<MarketPrice> CreateUniquePrices(int count, Func<int, decimal> getPrice) =>
Enumerable.Range(0, count).Select(_ => CreateUniquePrice(getPrice));

private class RatingComparer : IComparer<IMarket>
{
public int Compare([DisallowNull] IMarket x, [DisallowNull] IMarket y) =>
Expand Down
19 changes: 15 additions & 4 deletions src/DynamicData.Tests/Domain/MarketPrice.cs
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using Bogus;

namespace DynamicData.Tests.Domain;

Expand Down Expand Up @@ -42,18 +43,28 @@ public decimal Price

public static decimal RandomPrice(Random r, decimal basePrice, decimal offset) => basePrice + (decimal)r.NextDouble() * offset;

public static decimal RandomPrice(Randomizer r, decimal basePrice, decimal offset) => r.Decimal(basePrice, basePrice + offset);

public override bool Equals(object? obj) => obj is MarketPrice price && Price == price.Price && TimeStamp.Equals(price.TimeStamp) && MarketId.Equals(price.MarketId) && ItemId == price.ItemId;

public override int GetHashCode() => HashCode.Combine(Price, TimeStamp, MarketId, ItemId);

public static bool operator ==(MarketPrice? left, MarketPrice? right) => EqualityComparer<MarketPrice>.Default.Equals(left, right);

public static bool operator !=(MarketPrice? left, MarketPrice? right) => !(left == right);

private class CurrentPriceEqualityComparer : IEqualityComparer<MarketPrice>
{
public virtual bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => x.MarketId.Equals(x.MarketId) && x.ItemId == y.ItemId && x.Price == y.Price;
public int GetHashCode([DisallowNull] MarketPrice obj) => throw new NotImplementedException();
}

private class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer<MarketPrice>
private sealed class TimeStampPriceEqualityComparer : CurrentPriceEqualityComparer, IEqualityComparer<MarketPrice>
{
public override bool Equals([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y) => base.Equals(x, y) && x.TimeStamp == y.TimeStamp;
}

private class LowestPriceComparer : IComparer<MarketPrice>
private sealed class LowestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Expand All @@ -62,7 +73,7 @@ public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
}
}

private class HighestPriceComparer : IComparer<MarketPrice>
private sealed class HighestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Expand All @@ -71,7 +82,7 @@ public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
}
}

private class LatestPriceComparer : IComparer<MarketPrice>
private sealed class LatestPriceComparer : IComparer<MarketPrice>
{
public int Compare([DisallowNull] MarketPrice x, [DisallowNull] MarketPrice y)
{
Expand Down
21 changes: 9 additions & 12 deletions src/DynamicData/Cache/Internal/MergeManyCacheChangeSets.cs
Expand Up @@ -21,29 +21,26 @@ internal sealed class MergeManyCacheChangeSets<TObject, TKey, TDestination, TDes
{
var locker = new object();
// Transform to an observable cache of merge containers.
// Transform to an observable changeset of cached changesets
var sourceCacheOfCaches = source
.IgnoreSameReferenceUpdate()
.WhereReasonsAre(ChangeReason.Add, ChangeReason.Remove, ChangeReason.Update)
.Transform((obj, key) => new ChangeSetCache<TDestination, TDestinationKey>(selector(obj, key)))
.Synchronize(locker)
.AsObservableCache();
.Transform((obj, key) => new ChangeSetCache<TDestination, TDestinationKey>(selector(obj, key).Synchronize(locker)))
.AsObservableCache();
var shared = sourceCacheOfCaches.Connect().Publish();
// this is manages all of the changes
// This is manages all of the changes
var changeTracker = new ChangeSetMergeTracker<TDestination, TDestinationKey>(() => sourceCacheOfCaches.Items, comparer, equalityComparer);
// merge the items back together
var shared = sourceCacheOfCaches.Connect().Publish();
// Merge the child changeset changes together and apply to the tracker
var allChanges = shared.MergeMany(mc => mc.Source)
.Synchronize(locker)
.Subscribe(
changes => changeTracker.ProcessChangeSet(changes, observer),
observer.OnError,
observer.OnCompleted);
// when a source item is removed, all of its sub-items need to be removed
// When a source item is removed, all of its sub-items need to be removed
var removedItems = shared
.Synchronize(locker)
.OnItemRemoved(mc => changeTracker.RemoveItems(mc.Cache.KeyValues, observer))
.OnItemUpdated((_, prev) => changeTracker.RemoveItems(prev.Cache.KeyValues, observer))
.Subscribe();
Expand Down

0 comments on commit 5748318

Please sign in to comment.