Skip to content

Commit

Permalink
Added new implementations of ISourceCache<> and ISourceList<>, to…
Browse files Browse the repository at this point in the history
… allow manual injection of error and completion notifications, during tests for operators that work directly upon these interfaces, rather than `IObservable<IChangeSet>>` (#801)
  • Loading branch information
JakenVeina committed Dec 28, 2023
1 parent c31b570 commit 2217b59
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 0 deletions.
109 changes: 109 additions & 0 deletions src/DynamicData.Tests/Utilities/TestSourceCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;

using DynamicData.Kernel;

namespace DynamicData.Tests.Utilities;

public sealed class TestSourceCache<TObject, TKey>
: ISourceCache<TObject, TKey>
where TObject : notnull
where TKey : notnull
{
private readonly IObservable<int> _countChanged;
private readonly BehaviorSubject<Exception?> _error;
private readonly BehaviorSubject<bool> _hasCompleted;
private readonly SourceCache<TObject, TKey> _source;

public TestSourceCache(Func<TObject, TKey> keySelector)
{
_error = new(null);
_hasCompleted = new(false);
_source = new(keySelector);

_countChanged = WrapStream(_source.CountChanged);
}

public int Count
=> _source.Count;

public IObservable<int> CountChanged
=> _countChanged;

public IEnumerable<TObject> Items
=> _source.Items;

public IEnumerable<TKey> Keys
=> _source.Keys;

public Func<TObject, TKey> KeySelector
=> _source.KeySelector;

public IEnumerable<KeyValuePair<TKey, TObject>> KeyValues
=> KeyValues;

public void Complete()
{
AssertCanMutate();

_hasCompleted.OnNext(true);
}

public IObservable<IChangeSet<TObject, TKey>> Connect(
Func<TObject, bool>? predicate = null,
bool suppressEmptyChangeSets = true)
=> WrapStream(_source.Connect(predicate, suppressEmptyChangeSets));

public void Dispose()
{
_error.Dispose();
_hasCompleted.Dispose();
_source.Dispose();
}

public void Edit(Action<ISourceUpdater<TObject, TKey>> updateAction)
{
AssertCanMutate();

_source.Edit(updateAction);
}

public Optional<TObject> Lookup(TKey key)
=> _source.Lookup(key);

public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool>? predicate = null)
=> WrapStream(_source.Preview(predicate));

public void SetError(Exception error)
{
AssertCanMutate();

_error.OnNext(error);
}

public IObservable<Change<TObject, TKey>> Watch(TKey key)
=> WrapStream(_source.Watch(key));

private void AssertCanMutate()
{
if (_error.Value is not null)
throw new InvalidOperationException("The source collection is in an error state and cannot be mutated.");

if (_hasCompleted.Value)
throw new InvalidOperationException("The source collection is in a completed state and cannot be mutated.");
}

private IObservable<T> WrapStream<T>(IObservable<T> sourceStream)
=> Observable
.Merge(
_error
.Select(static error => (error is not null)
? Observable.Throw<T>(error!)
: Observable.Empty<T>())
.Switch(),
sourceStream)
.TakeUntil(_hasCompleted
.Where(static hasCompleted => hasCompleted));
}
89 changes: 89 additions & 0 deletions src/DynamicData.Tests/Utilities/TestSourceList.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace DynamicData.Tests.Utilities;

public sealed class TestSourceList<T>
: ISourceList<T>
where T : notnull
{
private readonly IObservable<int> _countChanged;
private readonly BehaviorSubject<Exception?> _error;
private readonly BehaviorSubject<bool> _hasCompleted;
private readonly SourceList<T> _source;

public TestSourceList()
{
_error = new(null);
_hasCompleted = new(false);
_source = new();

_countChanged = WrapStream(_source.CountChanged);
}

public int Count
=> _source.Count;

public IObservable<int> CountChanged
=> _countChanged;

public IEnumerable<T> Items
=> _source.Items;

public IObservable<IChangeSet<T>> Connect(Func<T, bool>? predicate = null)
=> WrapStream(_source.Connect(predicate));

public void Complete()
{
AssertCanMutate();

_hasCompleted.OnNext(true);
}

public void Dispose()
{
_error.Dispose();
_hasCompleted.Dispose();
_source.Dispose();
}

public void Edit(Action<IExtendedList<T>> updateAction)
{
AssertCanMutate();

_source.Edit(updateAction);
}

public IObservable<IChangeSet<T>> Preview(Func<T, bool>? predicate = null)
=> WrapStream(_source.Preview(predicate));

public void SetError(Exception error)
{
AssertCanMutate();

_error.OnNext(error);
}

private void AssertCanMutate()
{
if (_error.Value is not null)
throw new InvalidOperationException("The source collection is in an error state and cannot be mutated.");

if (_hasCompleted.Value)
throw new InvalidOperationException("The source collection is in a completed state and cannot be mutated.");
}

private IObservable<U> WrapStream<U>(IObservable<U> sourceStream)
=> Observable
.Merge(
_error
.Select(static error => (error is not null)
? Observable.Throw<U>(error!)
: Observable.Empty<U>())
.Switch(),
sourceStream)
.TakeUntil(_hasCompleted
.Where(static hasCompleted => hasCompleted));
}

0 comments on commit 2217b59

Please sign in to comment.