This repository has been archived by the owner on Sep 14, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
distinct_value_connectable_stream.dart
107 lines (87 loc) 路 2.8 KB
/
distinct_value_connectable_stream.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import 'dart:async';
import 'package:rxdart_ext/rxdart_ext.dart';
import 'distinct_value_stream.dart';
/// A [ConnectableStream] that converts a single-subscription Stream into
/// a broadcast [Stream], and provides synchronous access to the latest emitted value.
///
/// This is a combine of [ConnectableStream], [ValueStream], [ValueSubject] and [Stream.distinct].
class DistinctValueConnectableStream<T> extends ConnectableStream<T>
implements DistinctValueStream<T> {
final Stream<T> _source;
final ValueSubject<T> _subject;
var _used = false;
@override
final bool Function(T, T) equals;
DistinctValueConnectableStream._(
this._source,
this._subject,
bool Function(T, T)? equals,
) : equals = equals ?? DistinctValueStream.defaultEquals,
super(_subject);
/// Constructs a [Stream] which only begins emitting events when
/// the [connect] method is called, this [Stream] acts like a
/// [ValueSubject] and distinct until changed.
///
/// Data events are skipped if they are equal to the previous data event.
/// Equality is determined by the provided [equals] method. If that is omitted,
/// the '==' operator on the last provided data element is used.
factory DistinctValueConnectableStream(
Stream<T> source,
T seedValue, {
bool Function(T previous, T next)? equals,
bool sync = true,
}) =>
DistinctValueConnectableStream<T>._(
source, ValueSubject(seedValue, sync: sync), equals);
ConnectableStreamSubscription<T> _connect() =>
ConnectableStreamSubscription<T>(
_source.listen(
_onData,
onError: null,
onDone: _subject.close,
),
_subject,
);
void _checkUsed() {
if (_used) {
throw StateError('Cannot reuse this stream. This causes many problems.');
}
_used = true;
}
@override
DistinctValueStream<T> autoConnect({
void Function(StreamSubscription<T> subscription)? connection,
}) {
_checkUsed();
_subject.onListen = () {
final subscription = _connect();
connection?.call(subscription);
};
_subject.onCancel = null;
return this;
}
@override
StreamSubscription<T> connect() {
_checkUsed();
_subject.onListen = _subject.onCancel = null;
return _connect();
}
@override
DistinctValueStream<T> refCount() {
_checkUsed();
late ConnectableStreamSubscription<T> subscription;
_subject.onListen = () => subscription = _connect();
_subject.onCancel = () => subscription.cancel();
return this;
}
void _onData(T data) {
if (!equals(_subject.requireValue, data)) {
_subject.add(data);
}
}
@override
Never get errorAndStackTrace =>
throw StateError('This Stream always has no error!');
@override
ValueWrapper<T> get valueWrapper => _subject.valueWrapper!;
}