Skip to content

Commit

Permalink
Add stress tests from Cache-List to List-List (#800)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwcullop committed Dec 18, 2023
1 parent 49ad041 commit 13bd8c6
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 164 deletions.
198 changes: 62 additions & 136 deletions src/DynamicData.Tests/Cache/MergeManyChangeSetsListFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public sealed class MergeManyChangeSetsListFixture : IDisposable
const int AddRangeSize = 53;
const int RemoveRangeSize = 37;
#endif
private static readonly TimeSpan s_MaxAddTime = TimeSpan.FromSeconds(0.010);
private static readonly TimeSpan s_MaxRemoveTime = TimeSpan.FromSeconds(5.0);
private static readonly TimeSpan s_MaxAddTime = TimeSpan.FromSeconds(0.250);
private static readonly TimeSpan s_MaxRemoveTime = TimeSpan.FromSeconds(0.100);

private readonly ISourceCache<AnimalOwner, Guid> _animalOwners = new SourceCache<AnimalOwner, Guid>(o => o.Id);
private readonly ChangeSetAggregator<AnimalOwner, Guid> _animalOwnerResults;
Expand All @@ -56,111 +56,73 @@ public MergeManyChangeSetsListFixture()
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(1_000, 10)]
public async Task MultiThreadedStressTest(int ownerCount, int animalCount) =>
_ = await AddRemoveAnimalsStress(ownerCount, animalCount, TaskPoolScheduler.Default)
.Finally(CheckResultContents);

[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(1_000, 10)]
public void MultiThreadedExplicitChangeSetStressTest(int ownerCount, int animalCount)
public async Task MultiThreadedStressTest(int ownerCount, int animalCount)
{
IScheduler testingScheduler = TaskPoolScheduler.Default;

IObservable<IChangeSet<Animal>> AddMoreAnimals(AnimalOwner owner, int count, int parallel, IScheduler scheduler) =>
Observable.Create<IChangeSet<Animal>>(observer =>
{
var locker = new object();
var MaxAddTime = TimeSpan.FromSeconds(0.250);
var MaxRemoveTime = TimeSpan.FromSeconds(0.100);

// Forward OnNext only
var ownerSub = owner.Animals.Connect().Synchronize(locker).Subscribe(observer.OnNext);
TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null;

// Forward All Rx Events to Observer
var animalSub = GenerateAnimals(scheduler)
.Take(count / parallel)
.StressAddRemoveExplicit(parallel, _ => NextRemoveTime(), scheduler)
.Synchronize(locker)
.Subscribe(observer);
IObservable<Unit> AddRemoveAnimalsStress(int ownerCount, int animalCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
AddRemoveOwners(ownerCount, parallel, scheduler)
.Subscribe(
onNext: static _ => { },
onError: ex => observer.OnError(ex)),
return new CompositeDisposable(ownerSub, animalSub);
_animalOwners.Connect()
.MergeMany(owner => AddRemoveAnimals(owner, animalCount, parallel, scheduler))
.Subscribe(
onNext: static _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});

// Arrange
var merged = _animalOwners.Connect().MergeManyChangeSets(owner => AddMoreAnimals(owner, animalCount, 5, testingScheduler));
var populateOwners = Observable.Interval(TimeSpan.FromMilliseconds(1), testingScheduler)
.Select(_ => _animalOwnerFaker.Generate())
.Take(ownerCount)
.Do(owner => _animalOwners.AddOrUpdate(owner), _animalOwners.Dispose);

// Act
using var subOwners = populateOwners.Subscribe();
using var mergedResults = merged.AsAggregator();
while (!mergedResults.IsCompleted)
{
Thread.Sleep(100);
}

// Assert
mergedResults.Data.Count.Should().Be(_animalOwners.Items.Sum(owner => owner.Animals.Count));
CheckResultContents();
}

[Theory]
[InlineData(5, 7)]
[InlineData(5, 200)]
[InlineData(10, 100)]
[InlineData(20, 50)]
[InlineData(100, 10)]
public void NoDeadlockOrExceptionIfSubscribeDuringModify(int ownerCount, int animalCount)
{
// Not used so don't let it waste time
_animalResults.Dispose();

// Arrange
Func<Task> CreateTest(IScheduler sch, int owners, int animals) =>
async () =>
{
var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect());
IObservable<AnimalOwner> AddRemoveOwners(int ownerCount, int parallel, IScheduler scheduler) =>
_animalOwnerFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler)
.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_animalOwners, _ => GetRemoveTime(), scheduler))
.Finally(_animalOwners.Dispose);

var addingAnimals = true;
IObservable<Animal> AddRemoveAnimals(AnimalOwner owner, int animalCount, int parallel, IScheduler scheduler) =>
_animalFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler)
.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler))
.Finally(owner.Animals.Dispose);

using var addOwners = GenerateOwners(sch)
.Take(owners)
.StressAddRemove(_animalOwners, _ => GetRemoveTime(), sch)
.Finally(() => _animalOwners.Dispose())
.Subscribe();
var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect());

using var addAnimals = _animalOwners.Connect()
.MergeMany(owner => AddRemoveAnimals(owner, sch, animals))
.Finally(() => addingAnimals = false)
.Subscribe();
var addingAnimals = true;

do
{
// Ensure items are being added asynchronously before subscribing to the animal changes
await Task.Yield();
// Start asynchrononously modifying the parent list and the child lists
using var addAnimals = AddRemoveAnimalsStress(ownerCount, animalCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
.Finally(() => addingAnimals = false)
.Subscribe();

{
// Subscribe
var mergedSub = mergeAnimals.Subscribe();
// Subscribe / unsubscribe over and over while the collections are being modified
do
{
// Ensure items are being added asynchronously before subscribing to the animal changes
await Task.Yield();

// Let other threads run
await Task.Yield();
{
// Subscribe
var mergedSub = mergeAnimals.Subscribe();

// Unsubscribe
mergedSub.Dispose();
}
}
while (addingAnimals);
};
// Let other threads run
await Task.Yield();

// Act
// Unsubscribe
mergedSub.Dispose();
}
}
while (addingAnimals);

// Assert
CreateTest(TaskPoolScheduler.Default, ownerCount, animalCount).Should().NotThrowAsync();
// Verify the results
CheckResultContents();
}

[Fact]
Expand Down Expand Up @@ -488,41 +450,6 @@ public void Dispose()
_animalOwners.Dispose();
}

private IObservable<Unit> AddRemoveAnimalsStress(int ownerCount, int animalCount, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
GenerateOwners(scheduler)
.Take(ownerCount)
.StressAddRemove(_animalOwners, _ => GetRemoveTime(), scheduler)
.Finally(() => _animalOwners.Dispose())
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex)),
_animalOwners.Connect()
.MergeMany(owner => AddRemoveAnimals(owner, scheduler, animalCount))
.Subscribe(
onNext: _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});

private IObservable<Animal> AddRemoveAnimals(AnimalOwner owner, IScheduler sch, int addCount) =>
GenerateAnimals(sch)
.Take(addCount)
.StressAddRemove(owner.Animals, _ => GetRemoveTime(), sch)
.Finally(owner.Animals.Dispose);

private IObservable<AnimalOwner> GenerateOwners(IScheduler scheduler) =>
_randomizer.Interval(s_MaxAddTime, scheduler).Select(_ => _animalOwnerFaker.Generate());

private IObservable<Animal> GenerateAnimals(IScheduler scheduler) =>
_randomizer.Interval(s_MaxAddTime, scheduler).Select(_ => _animalFaker.Generate());

private AnimalOwner CreateWithSameId(AnimalOwner original)
{
var newOwner = _animalOwnerFaker.Generate();
Expand All @@ -536,19 +463,18 @@ private AnimalOwner CreateWithSameId(AnimalOwner original)
private static void CheckResultContents(IEnumerable<AnimalOwner> owners, ChangeSetAggregator<AnimalOwner, Guid> ownerResults, ChangeSetAggregator<Animal> animalResults)
{
var expectedOwners = owners.ToList();
var expectedAnimals = expectedOwners.SelectMany(owner => owner.Animals.Items).ToList();

// These should be subsets of each other
expectedOwners.Should().BeSubsetOf(ownerResults.Data.Items);
ownerResults.Data.Items.Should().BeSubsetOf(expectedOwners);
ownerResults.Data.Items.Count().Should().Be(expectedOwners.Count);

// These should be subsets of each other
expectedAnimals.Should().BeSubsetOf(animalResults.Data.Items);
animalResults.Data.Items.Should().BeSubsetOf(expectedAnimals);
animalResults.Data.Items.Count().Should().Be(expectedAnimals.Count);
}
// All owner animals should be in the results
foreach (var owner in owners)
{
owner.Animals.Items.Should().BeSubsetOf(animalResults.Data.Items);
}

private TimeSpan? GetRemoveTime() => _randomizer.Bool() ? NextRemoveTime() : null;
private TimeSpan NextRemoveTime() => _randomizer.TimeSpan(s_MaxRemoveTime);
// Results should not have more than the total number of animals
animalResults.Data.Count.Should().Be(owners.Sum(owner => owner.Animals.Count));
}
}
105 changes: 96 additions & 9 deletions src/DynamicData.Tests/List/MergeManyChangeSetsListFixture.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Bogus;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
using DynamicData.Tests.Utilities;
using FluentAssertions;
using Xunit;

Expand Down Expand Up @@ -39,6 +45,81 @@ public MergeManyChangeSetsListFixture()
_animalResults = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect()).AsAggregator();
}

[Theory]
[InlineData(5, 7)]
[InlineData(10, 50)]
[InlineData(10, 1_000)]
[InlineData(200, 500)]
[InlineData(1_000, 10)]
public async Task MultiThreadedStressTest(int ownerCount, int animalCount)
{
var MaxAddTime = TimeSpan.FromSeconds(0.250);
var MaxRemoveTime = TimeSpan.FromSeconds(0.100);

TimeSpan? GetRemoveTime() => _randomizer.Bool() ? _randomizer.TimeSpan(MaxRemoveTime) : null;

IObservable<Unit> AddRemoveAnimalsStress(int ownerCount, int animalCount, int parallel, IScheduler scheduler) =>
Observable.Create<Unit>(observer => new CompositeDisposable
{
AddRemoveOwners(ownerCount, parallel, scheduler)
.Subscribe(
onNext: static _ => { },
onError: ex => observer.OnError(ex)),
_animalOwners.Connect()
.MergeMany(owner => AddRemoveAnimals(owner, animalCount, parallel, scheduler))
.Subscribe(
onNext: static _ => { },
onError: ex => observer.OnError(ex),
onCompleted: () =>
{
observer.OnNext(Unit.Default);
observer.OnCompleted();
})
});

IObservable<AnimalOwner> AddRemoveOwners(int ownerCount, int parallel, IScheduler scheduler) =>
_animalOwnerFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler)
.Parallelize(ownerCount, parallel, obs => obs.StressAddRemove(_animalOwners, _ => GetRemoveTime(), scheduler))
.Finally(_animalOwners.Dispose);

IObservable<Animal> AddRemoveAnimals(AnimalOwner owner, int animalCount, int parallel, IScheduler scheduler) =>
_animalFaker.IntervalGenerate(_randomizer, MaxAddTime, scheduler)
.Parallelize(animalCount, parallel, obs => obs.StressAddRemove(owner.Animals, _ => GetRemoveTime(), scheduler))
.Finally(owner.Animals.Dispose);

var mergeAnimals = _animalOwners.Connect().MergeManyChangeSets(owner => owner.Animals.Connect());

var addingAnimals = true;

// Start asynchrononously modifying the parent list and the child lists
using var addAnimals = AddRemoveAnimalsStress(ownerCount, animalCount, Environment.ProcessorCount, TaskPoolScheduler.Default)
.Finally(() => addingAnimals = false)
.Subscribe();

// Subscribe / unsubscribe over and over while the collections are being modified
do
{
// Ensure items are being added asynchronously before subscribing to the animal changes
await Task.Yield();

{
// Subscribe
var mergedSub = mergeAnimals.Subscribe();

// Let other threads run
await Task.Yield();

// Unsubscribe
mergedSub.Dispose();
}
}
while (addingAnimals);

// Verify the results
CheckResultContents();
}

[Fact]
public void NullChecks()
{
Expand Down Expand Up @@ -438,18 +519,24 @@ public void ResultFailsIfSourceFails()
results.Exception.Should().Be(expectedError);
}

private void CheckResultContents()
private void CheckResultContents() => CheckResultContents(_animalOwners.Items, _animalOwnerResults, _animalResults);

private static void CheckResultContents(IEnumerable<AnimalOwner> owners, ChangeSetAggregator<AnimalOwner> ownerResults, ChangeSetAggregator<Animal> animalResults)
{
var expectedOwners = _animalOwners.Items.ToList();
var expectedAnimals = expectedOwners.SelectMany(owner => owner.Animals.Items).ToList();
var expectedOwners = owners.ToList();

// These should be subsets of each other, so check one subset and the size
expectedOwners.Should().BeSubsetOf(_animalOwnerResults.Data.Items);
_animalOwnerResults.Data.Items.Count().Should().Be(expectedOwners.Count);
// These should be subsets of each other
expectedOwners.Should().BeSubsetOf(ownerResults.Data.Items);
ownerResults.Data.Items.Count().Should().Be(expectedOwners.Count);

// All owner animals should be in the results
foreach (var owner in owners)
{
owner.Animals.Items.Should().BeSubsetOf(animalResults.Data.Items);
}

// These should be subsets of each other, so check one subset and the size
expectedAnimals.Should().BeSubsetOf(_animalResults.Data.Items);
_animalResults.Data.Items.Count().Should().Be(expectedAnimals.Count);
// Results should not have more than the total number of animals
animalResults.Data.Count.Should().Be(owners.Sum(owner => owner.Animals.Count));
}

public void Dispose()
Expand Down
18 changes: 18 additions & 0 deletions src/DynamicData.Tests/Utilities/FakerExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Concurrency;
using Bogus;

namespace DynamicData.Tests.Utilities;

internal static class FakerExtensions
{
public static IObservable<T> IntervalGenerate<T>(this Faker<T> faker, Randomizer randomizer, TimeSpan maxTime, IScheduler? scheduler = null)
where T : class =>
randomizer.Interval(maxTime, scheduler).Select(_ => faker.Generate());

public static IObservable<T> IntervalGenerate<T>(this Faker<T> faker, TimeSpan period, IScheduler? scheduler = null)
where T : class =>
Observable.Interval(period, scheduler ?? DefaultScheduler.Instance).Select(_ => faker.Generate());
}

0 comments on commit 13bd8c6

Please sign in to comment.