This repository has been archived by the owner on Sep 14, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
distinct_value_stream.dart
135 lines (114 loc) 路 4.17 KB
/
distinct_value_stream.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import 'dart:async';
import 'package:rxdart_ext/rxdart_ext.dart'
show NotReplayValueStream, ValueStreamController, ValueWrapper;
/// An [Stream] that provides synchronous access to the last emitted item,
/// and two consecutive values are not equal.
/// The equality between previous data event and current data event is determined by [equals].
abstract class DistinctValueStream<T> extends NotReplayValueStream<T> {
/// Determined equality between previous data event and current data event.
bool Function(T, T) get equals;
/// Default [equals] function.
/// Use '==' operator on the last provided data element.
static bool defaultEquals<T>(T lhs, T rhs) => lhs == rhs;
@override
Null get errorAndStackTrace;
@override
ValueWrapper<T> get valueWrapper;
}
/// Extensions to easily access value and error.
extension DistinctValueStreamExtensions<T> on DistinctValueStream<T> {
/// A flag that turns true as soon as at least one event has been emitted.
/// Always returns `true`.
bool get hasValue => true;
/// Returns latest value.
T get value => valueWrapper.value;
/// Returns latest value.
T get requireValue => valueWrapper.value;
/// A flag that turns true as soon as at an error event has been emitted.
/// Always returns `false`.
bool get hasError => false;
/// Last emitted error.
/// Always returns `null`.
Null get error => null;
/// Last emitted error.
/// Always throws.
Never get requireError =>
throw StateError('DistinctValueStream always has no error!');
}
/// Convert this [Stream] to a [DistinctValueStream].
extension ToDistinctValueStreamExtension<T> on Stream<T> {
/// Convert this [Stream] to a [DistinctValueStream].
///
/// Returned stream acts like [Stream.distinct] except it provides seed value
/// used to check for equality, and synchronous access to the last emitted item.
///
/// Data events are skipped if they are equal to the previous data event.
/// Equality is determined by the provided [equals] method. If that is omitted,
/// the '==' operator on the last provided data element is used.
///
/// This stream is a single-subscription stream.
DistinctValueStream<T> distinctValue(
T value, {
bool Function(T p1, T p2)? equals,
}) =>
_DistinctValueStream(
this, value, equals ?? DistinctValueStream.defaultEquals);
}
/// Default implementation of [DistinctValueStream].
/// This stream acts like [Stream.distinct] except it provides seed value
/// used to check for equality, and synchronous access to the last emitted item.
///
/// Data events are skipped if they are equal to the previous data event.
/// Equality is determined by the provided [equals] method. If that is omitted,
/// the '==' operator on the last provided data element is used.
///
/// This stream is a single-subscription stream.
class _DistinctValueStream<T> extends Stream<T>
implements DistinctValueStream<T> {
@override
final bool Function(T p1, T p2) equals;
final ValueStreamController<T> controller;
@override
bool get isBroadcast => false;
/// Construct a [_DistinctValueStream] with source stream, seed value.
_DistinctValueStream(
Stream<T> source,
T value,
this.equals,
) : controller = ValueStreamController<T>(value, sync: true) {
late StreamSubscription<T> subscription;
controller.onListen = () {
subscription = source.listen(
(data) {
if (!equals(valueWrapper.value, data)) {
controller.add(data);
}
},
onError: null,
onDone: controller.close,
);
if (!source.isBroadcast) {
controller.onPause = subscription.pause;
controller.onResume = subscription.resume;
}
};
controller.onCancel = () => subscription.cancel();
}
@override
Null get errorAndStackTrace => null;
@override
ValueWrapper<T> get valueWrapper => controller.stream.valueWrapper!;
@override
StreamSubscription<T> listen(
void Function(T event)? onData, {
Function? onError,
void Function()? onDone,
bool? cancelOnError,
}) =>
controller.stream.listen(
onData,
onError: onError,
onDone: onDone,
cancelOnError: cancelOnError,
);
}