Skip to content

Commit

Permalink
Implemented a variety of utilities for testing the core correctness o…
Browse files Browse the repository at this point in the history
…f RX operators and streams, including notification sequencing and synchronization. (#797)
  • Loading branch information
JakenVeina committed Dec 18, 2023
1 parent d8c3ac6 commit ff8b94a
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 0 deletions.
60 changes: 60 additions & 0 deletions src/DynamicData.Tests/Utilities/ObservableExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading;

namespace DynamicData.Tests.Utilities;

Expand All @@ -18,4 +20,62 @@ internal static class ObservableExtensions
e is not null
? source.Take(count).Concat(Observable.Throw<T>(e))
: source;

public static IObservable<T> ValidateSynchronization<T>(this IObservable<T> source)
// Using Raw observable and observer classes to bypass normal RX safeguards, which prevent out-of-sequence notifications.
// This allows the operator to be combined with TestableObserver, for correctness-testing of operators.
=> RawAnonymousObservable.Create<T>(observer =>
{
var inFlightNotification = null as Notification<T>;
var synchronizationGate = new object();
// Not using .Do() so we can track the *entire* in-flight period of a notification, including all synchronous downstream processing.
return source.SubscribeSafe(RawAnonymousObserver.Create<T>(
onNext: value => ProcessIncomingNotification(Notification.CreateOnNext(value)),
onError: error => ProcessIncomingNotification(Notification.CreateOnError<T>(error)),
onCompleted: () => ProcessIncomingNotification(Notification.CreateOnCompleted<T>())));
void ProcessIncomingNotification(Notification<T> incomingNotification)
{
try
{
var priorNotification = Interlocked.Exchange(ref inFlightNotification, incomingNotification);
if (priorNotification is not null)
throw new UnsynchronizedNotificationException<T>()
{
IncomingNotification = incomingNotification,
PriorNotification = priorNotification
};
lock (synchronizationGate)
{
switch(incomingNotification.Kind)
{
case NotificationKind.OnNext:
observer.OnNext(incomingNotification.Value);
break;
case NotificationKind.OnError:
observer.OnError(incomingNotification.Exception!);
break;
case NotificationKind.OnCompleted:
observer.OnCompleted();
break;
}
}
}
catch (Exception ex)
{
lock (synchronizationGate)
{
observer.OnError(ex);
}
}
finally
{
Interlocked.Exchange(ref inFlightNotification, null);
}
}
});
}
22 changes: 22 additions & 0 deletions src/DynamicData.Tests/Utilities/RawAnonymousObservable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System;

namespace DynamicData.Tests.Utilities;

internal static class RawAnonymousObservable
{
public static RawAnonymousObservable<T> Create<T>(Func<IObserver<T>, IDisposable> onSubscribe)
=> new(onSubscribe);
}

// Allows bypassing of safeguards implemented within Observable.Create<T>(), for testing.
internal class RawAnonymousObservable<T>
: IObservable<T>
{
private readonly Func<IObserver<T>, IDisposable> _onSubscribe;

public RawAnonymousObservable(Func<IObserver<T>, IDisposable> onSubscribe)
=> _onSubscribe = onSubscribe;

public IDisposable Subscribe(IObserver<T> observer)
=> _onSubscribe.Invoke(observer);
}
43 changes: 43 additions & 0 deletions src/DynamicData.Tests/Utilities/RawAnonymousObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System;

namespace DynamicData.Tests.Utilities;

internal static class RawAnonymousObserver
{
public static RawAnonymousObserver<T> Create<T>(
Action<T> onNext,
Action<Exception> onError,
Action onCompleted)
=> new(
onNext: onNext,
onError: onError,
onCompleted: onCompleted);
}

// Allows bypassing of safeguards implemented within Observer.Create<T>(), for testing.
internal class RawAnonymousObserver<T>
: IObserver<T>
{
private readonly Action _onCompleted;
private readonly Action<Exception> _onError;
private readonly Action<T> _onNext;

public RawAnonymousObserver(
Action<T> onNext,
Action<Exception> onError,
Action onCompleted)
{
_onNext = onNext;
_onError = onError;
_onCompleted = onCompleted;
}

public void OnCompleted()
=> _onCompleted.Invoke();

public void OnError(Exception error)
=> _onError.Invoke(error);

public void OnNext(T value)
=> _onNext.Invoke(value);
}
46 changes: 46 additions & 0 deletions src/DynamicData.Tests/Utilities/TestableObserver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;
using System.Reactive;
using System.Reactive.Concurrency;

using Microsoft.Reactive.Testing;

namespace DynamicData.Tests.Utilities;

public static class TestableObserver
{
public static TestableObserver<T> Create<T>(IScheduler? scheduler = null)
=> new(scheduler ?? DefaultScheduler.Instance);
}

// Not using any existing Observer class, or Observer.Create<T>() to bypass normal RX safeguards, which prevent out-of-sequence notifications.
public sealed class TestableObserver<T>
: ITestableObserver<T>
{
private readonly List<Recorded<Notification<T>>> _messages;
private readonly IScheduler _scheduler;

public TestableObserver(IScheduler scheduler)
{
_messages = new();
_scheduler = scheduler;
}

public IList<Recorded<Notification<T>>> Messages
=> _messages;

void IObserver<T>.OnCompleted()
=> _messages.Add(new(
time: _scheduler.Now.Ticks,
value: Notification.CreateOnCompleted<T>()));

void IObserver<T>.OnError(Exception error)
=> _messages.Add(new(
time: _scheduler.Now.Ticks,
value: Notification.CreateOnError<T>(error)));

void IObserver<T>.OnNext(T value)
=> _messages.Add(new(
time: _scheduler.Now.Ticks,
value: Notification.CreateOnNext(value)));
}
34 changes: 34 additions & 0 deletions src/DynamicData.Tests/Utilities/TestableObserverExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive;

using FluentAssertions;

using Microsoft.Reactive.Testing;

namespace DynamicData.Tests.Utilities;

internal static class TestableObserverExtensions
{
public static IEnumerable<Recorded<Notification<T>>> EnumerateInvalidNotifications<T>(this ITestableObserver<T> observer)
=> observer.Messages
.SkipWhile(message => message.Value.Kind is NotificationKind.OnNext)
.Skip(1);

public static IEnumerable<T> EnumerateRecordedValues<T>(this ITestableObserver<T> observer)
=> observer.Messages
.TakeWhile(message => message.Value.Kind is NotificationKind.OnNext)
.Select(message => message.Value.Value);

public static Exception? TryGetRecordedError<T>(this ITestableObserver<T> observer)
=> observer.Messages
.Where(message => message.Value.Kind is NotificationKind.OnError)
.Select(message => message.Value.Exception)
.FirstOrDefault();

public static bool TryGetRecordedCompletion<T>(this ITestableObserver<T> observer)
=> observer.Messages
.Where(message => message.Value.Kind is NotificationKind.OnCompleted)
.Any();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System;
using System.Reactive;

namespace DynamicData.Tests.Utilities;

public class UnsynchronizedNotificationException<T>
: Exception
{
public UnsynchronizedNotificationException()
: base("Unsynchronized notification received: Another notification is already being processed")
{ }

public required Notification<T> IncomingNotification { get; init; }

public required Notification<T> PriorNotification { get; init; }
}

0 comments on commit ff8b94a

Please sign in to comment.