Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Stream.multi instead of forwardStream's inner Controller #601

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
97 changes: 2 additions & 95 deletions lib/src/subjects/behavior_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,12 @@ import 'package:rxdart/src/utils/value_wrapper.dart';
/// subject.stream.listen(print); // prints 1
class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {
final _Wrapper<T> _wrapper;
final Stream<T> _stream;

BehaviorSubject._(
StreamController<T> controller,
this._stream,
Stream<T> stream,
this._wrapper,
) : super(controller, _stream);
) : super(controller, stream);

/// Constructs a [BehaviorSubject], optionally pass handlers for
/// [onListen], [onCancel] and a flag to handle events [sync].
Expand Down Expand Up @@ -170,98 +169,6 @@ class BehaviorSubject<T> extends Subject<T> implements ValueStream<T> {

@override
StackTrace? get stackTrace => _wrapper.errorAndStackTrace?.stackTrace;

@override
BehaviorSubject<R> createForwardingSubject<R>({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) =>
BehaviorSubject(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);

// Override built-in operators.

@override
ValueStream<T> where(bool Function(T event) test) =>
_forwardBehaviorSubject<T>((s) => s.where(test));

@override
ValueStream<S> map<S>(S Function(T event) convert) =>
_forwardBehaviorSubject<S>((s) => s.map(convert));

@override
ValueStream<E> asyncMap<E>(FutureOr<E> Function(T event) convert) =>
_forwardBehaviorSubject<E>((s) => s.asyncMap(convert));

@override
ValueStream<E> asyncExpand<E>(Stream<E>? Function(T event) convert) =>
_forwardBehaviorSubject<E>((s) => s.asyncExpand(convert));

@override
ValueStream<T> handleError(Function onError,
{bool Function(dynamic error)? test}) =>
_forwardBehaviorSubject<T>((s) => s.handleError(onError, test: test));

@override
ValueStream<S> expand<S>(Iterable<S> Function(T element) convert) =>
_forwardBehaviorSubject<S>((s) => s.expand(convert));

@override
ValueStream<S> transform<S>(StreamTransformer<T, S> streamTransformer) =>
_forwardBehaviorSubject<S>((s) => s.transform(streamTransformer));

@override
ValueStream<R> cast<R>() => _forwardBehaviorSubject<R>((s) => s.cast<R>());

@override
ValueStream<T> take(int count) =>
_forwardBehaviorSubject<T>((s) => s.take(count));

@override
ValueStream<T> takeWhile(bool Function(T element) test) =>
_forwardBehaviorSubject<T>((s) => s.takeWhile(test));

@override
ValueStream<T> skip(int count) =>
_forwardBehaviorSubject<T>((s) => s.skip(count));

@override
ValueStream<T> skipWhile(bool Function(T element) test) =>
_forwardBehaviorSubject<T>((s) => s.skipWhile(test));

@override
ValueStream<T> distinct([bool Function(T previous, T next)? equals]) =>
_forwardBehaviorSubject<T>((s) => s.distinct(equals));

@override
ValueStream<T> timeout(Duration timeLimit,
{void Function(EventSink<T> sink)? onTimeout}) =>
_forwardBehaviorSubject<T>(
(s) => s.timeout(timeLimit, onTimeout: onTimeout));

ValueStream<R> _forwardBehaviorSubject<R>(
Stream<R> Function(Stream<T> s) transformerStream) {
late BehaviorSubject<R> subject;
late StreamSubscription<R> subscription;

final onListen = () => subscription = transformerStream(_stream).listen(
subject.add,
onError: subject.addError,
onDone: subject.close,
);

final onCancel = () => subscription.cancel();

return subject = createForwardingSubject(
onListen: onListen,
onCancel: onCancel,
sync: true,
);
}
}

class _Wrapper<T> {
Expand Down
12 changes: 0 additions & 12 deletions lib/src/subjects/publish_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,4 @@ class PublishSubject<T> extends Subject<T> {
controller.stream,
);
}

@override
PublishSubject<R> createForwardingSubject<R>({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) =>
PublishSubject(
onListen: onListen,
onCancel: onCancel,
sync: sync,
);
}
13 changes: 0 additions & 13 deletions lib/src/subjects/replay_subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,6 @@ class ReplaySubject<T> extends Subject<T> implements ReplayStream<T> {
.where((event) => event.isError)
.map((event) => event.errorAndStackTrace!.stackTrace)
.toList(growable: false);

@override
ReplaySubject<R> createForwardingSubject<R>({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
}) =>
ReplaySubject(
maxSize: _maxSize,
onCancel: onCancel,
onListen: onListen,
sync: sync,
);
}

class _Event<T> {
Expand Down
9 changes: 0 additions & 9 deletions lib/src/subjects/subject.dart
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,6 @@ abstract class Subject<T> extends StreamView<T> implements StreamController<T> {

return _controller.close();
}

/// Creates a trampoline StreamController, which can forward events
/// in the same manner as the original [Subject] does.
/// e.g. replay or behavior on subscribe.
Subject<R> createForwardingSubject<R>({
void Function()? onListen,
void Function()? onCancel,
bool sync = false,
});
}

class _StreamSinkWrapper<T> implements StreamSink<T> {
Expand Down
4 changes: 2 additions & 2 deletions lib/src/transformers/backpressure/backpressure.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enum WindowStrategy {
onHandler
}

class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
class _BackpressureStreamSink<S, T> extends ForwardingSink<S, T> {
final WindowStrategy _strategy;
final Stream<dynamic> Function(S event)? _windowStreamFactory;
final T Function(S event)? _onWindowStart;
Expand Down Expand Up @@ -352,7 +352,7 @@ class BackpressureStreamTransformer<S, T> extends StreamTransformerBase<S, T> {
dispatchOnClose,
maxLengthQueue,
);
return forwardStream(stream, sink);
return ForwardedStream(inner: stream, connectedSink: sink);
}
}

Expand Down
6 changes: 3 additions & 3 deletions lib/src/transformers/delay.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import 'package:rxdart/src/rx.dart';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _DelayStreamSink<S> implements ForwardingSink<S, S> {
class _DelayStreamSink<S> extends ForwardingSink<S, S> {
final Duration _duration;
var _inputClosed = false;
final _subscriptions = Queue<StreamSubscription<void>>();
Expand Down Expand Up @@ -80,8 +80,8 @@ class DelayStreamTransformer<S> extends StreamTransformerBase<S, S> {
DelayStreamTransformer(this.duration);

@override
Stream<S> bind(Stream<S> stream) =>
forwardStream(stream, _DelayStreamSink<S>(duration));
Stream<S> bind(Stream<S> stream) => ForwardedStream(
inner: stream, connectedSink: _DelayStreamSink<S>(duration));
}

/// Extends the Stream class with the ability to delay events being emitted
Expand Down
30 changes: 16 additions & 14 deletions lib/src/transformers/do.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';
import 'package:rxdart/src/utils/notification.dart';

class _DoStreamSink<S> implements ForwardingSink<S, S> {
class _DoStreamSink<S> extends ForwardingSink<S, S> {
final FutureOr<void> Function()? _onCancel;
final void Function(S event)? _onData;
final void Function()? _onDone;
Expand All @@ -14,6 +14,9 @@ class _DoStreamSink<S> implements ForwardingSink<S, S> {
final void Function()? _onPause;
final void Function()? _onResume;

@override
bool get enforcesSingleSubscription => true;

_DoStreamSink(
this._onCancel,
this._onData,
Expand Down Expand Up @@ -169,19 +172,18 @@ class DoStreamTransformer<S> extends StreamTransformerBase<S, S> {
}

@override
Stream<S> bind(Stream<S> stream) => forwardStream<S, S>(
stream,
_DoStreamSink<S>(
onCancel,
onData,
onDone,
onEach,
onError,
onListen,
onPause,
onResume,
),
);
Stream<S> bind(Stream<S> stream) => ForwardedStream(
inner: stream,
connectedSink: _DoStreamSink<S>(
onCancel,
onData,
onDone,
onEach,
onError,
onListen,
onPause,
onResume,
));
}

/// Extends the Stream class with the ability to execute a callback function
Expand Down
6 changes: 3 additions & 3 deletions lib/src/transformers/exhaust_map.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import 'dart:async';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _ExhaustMapStreamSink<S, T> implements ForwardingSink<S, T> {
class _ExhaustMapStreamSink<S, T> extends ForwardingSink<S, T> {
final Stream<T> Function(S value) _mapper;
StreamSubscription<T>? _mapperSubscription;
bool _inputClosed = false;
Expand Down Expand Up @@ -82,8 +82,8 @@ class ExhaustMapStreamTransformer<S, T> extends StreamTransformerBase<S, T> {
ExhaustMapStreamTransformer(this.mapper);

@override
Stream<T> bind(Stream<S> stream) =>
forwardStream(stream, _ExhaustMapStreamSink(mapper));
Stream<T> bind(Stream<S> stream) => ForwardedStream(
inner: stream, connectedSink: _ExhaustMapStreamSink(mapper));
}

/// Extends the Stream class with the ability to transform the Stream into
Expand Down
4 changes: 2 additions & 2 deletions lib/src/transformers/flat_map.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import 'dart:async';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _FlatMapStreamSink<S, T> implements ForwardingSink<S, T> {
class _FlatMapStreamSink<S, T> extends ForwardingSink<S, T> {
final Stream<T> Function(S value) _mapper;
final List<StreamSubscription<T>> _subscriptions = <StreamSubscription<T>>[];
int _openSubscriptions = 0;
Expand Down Expand Up @@ -87,7 +87,7 @@ class FlatMapStreamTransformer<S, T> extends StreamTransformerBase<S, T> {

@override
Stream<T> bind(Stream<S> stream) =>
forwardStream(stream, _FlatMapStreamSink(mapper));
ForwardedStream(inner: stream, connectedSink: _FlatMapStreamSink(mapper));
}

/// Extends the Stream class with the ability to convert the source Stream into
Expand Down
8 changes: 3 additions & 5 deletions lib/src/transformers/on_error_resume.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import 'dart:async';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _OnErrorResumeStreamSink<S> implements ForwardingSink<S, S> {
class _OnErrorResumeStreamSink<S> extends ForwardingSink<S, S> {
final Stream<S> Function(Object error, StackTrace stackTrace) _recoveryFn;
var _inRecovery = false;
final List<StreamSubscription<S>> _recoverySubscriptions = [];
Expand Down Expand Up @@ -91,10 +91,8 @@ class OnErrorResumeStreamTransformer<S> extends StreamTransformerBase<S, S> {
OnErrorResumeStreamTransformer(this.recoveryFn);

@override
Stream<S> bind(Stream<S> stream) => forwardStream(
stream,
_OnErrorResumeStreamSink<S>(recoveryFn),
);
Stream<S> bind(Stream<S> stream) => ForwardedStream(
inner: stream, connectedSink: _OnErrorResumeStreamSink<S>(recoveryFn));
}

/// Extends the Stream class with the ability to recover from errors in various
Expand Down
4 changes: 2 additions & 2 deletions lib/src/transformers/skip_last.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import 'dart:async';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _SkipLastStreamSink<T> implements ForwardingSink<T, T> {
class _SkipLastStreamSink<T> extends ForwardingSink<T, T> {
_SkipLastStreamSink(this.count);

final int count;
Expand Down Expand Up @@ -59,7 +59,7 @@ class SkipLastStreamTransformer<T> extends StreamTransformerBase<T, T> {

@override
Stream<T> bind(Stream<T> stream) =>
forwardStream(stream, _SkipLastStreamSink(count));
ForwardedStream(inner: stream, connectedSink: _SkipLastStreamSink(count));
}

/// Extends the Stream class with the ability to skip the last [count] items
Expand Down
6 changes: 3 additions & 3 deletions lib/src/transformers/skip_until.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import 'dart:async';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _SkipUntilStreamSink<S, T> implements ForwardingSink<S, S> {
class _SkipUntilStreamSink<S, T> extends ForwardingSink<S, S> {
final Stream<T> _otherStream;
StreamSubscription<T>? _otherSubscription;
var _canAdd = false;
Expand Down Expand Up @@ -61,8 +61,8 @@ class SkipUntilStreamTransformer<S, T> extends StreamTransformerBase<S, S> {
SkipUntilStreamTransformer(this.otherStream);

@override
Stream<S> bind(Stream<S> stream) =>
forwardStream(stream, _SkipUntilStreamSink(otherStream));
Stream<S> bind(Stream<S> stream) => ForwardedStream(
inner: stream, connectedSink: _SkipUntilStreamSink(otherStream));
}

/// Extends the Stream class with the ability to skip events until another
Expand Down
6 changes: 3 additions & 3 deletions lib/src/transformers/start_with.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import 'dart:async';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _StartWithStreamSink<S> implements ForwardingSink<S, S> {
class _StartWithStreamSink<S> extends ForwardingSink<S, S> {
final S _startValue;
var _isFirstEventAdded = false;

Expand Down Expand Up @@ -72,8 +72,8 @@ class StartWithStreamTransformer<S> extends StreamTransformerBase<S, S> {
StartWithStreamTransformer(this.startValue);

@override
Stream<S> bind(Stream<S> stream) =>
forwardStream(stream, _StartWithStreamSink(startValue));
Stream<S> bind(Stream<S> stream) => ForwardedStream(
inner: stream, connectedSink: _StartWithStreamSink(startValue));
}

/// Extends the [Stream] class with the ability to emit the given value as the
Expand Down
7 changes: 4 additions & 3 deletions lib/src/transformers/start_with_error.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import 'dart:async';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _StartWithErrorStreamSink<S> implements ForwardingSink<S, S> {
class _StartWithErrorStreamSink<S> extends ForwardingSink<S, S> {
final Object _e;
final StackTrace? _st;
var _isFirstEventAdded = false;
Expand Down Expand Up @@ -75,6 +75,7 @@ class StartWithErrorStreamTransformer<S> extends StreamTransformerBase<S, S> {
StartWithErrorStreamTransformer(this.error, [this.stackTrace]);

@override
Stream<S> bind(Stream<S> stream) =>
forwardStream(stream, _StartWithErrorStreamSink(error, stackTrace));
Stream<S> bind(Stream<S> stream) => ForwardedStream(
inner: stream,
connectedSink: _StartWithErrorStreamSink(error, stackTrace));
}