This repository has been archived by the owner on Sep 14, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
distinct_value_connectable_stream_example.dart
83 lines (69 loc) 路 1.97 KB
/
distinct_value_connectable_stream_example.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// ignore_for_file: deprecated_member_use_from_same_package
import 'dart:async';
import 'package:distinct_value_connectable_stream/distinct_value_connectable_stream.dart';
import 'package:rxdart/rxdart.dart';
class CounterBloc {
/// Inputs
final void Function(int) increment;
final void Function(int) decrement;
/// Outputs
final DistinctValueStream<int> state$;
/// Clean up
final void Function() dispose;
CounterBloc._({
required this.increment,
required this.decrement,
required this.state$,
required this.dispose,
});
factory CounterBloc() {
final incrementController = StreamController<int>();
final decrementController = StreamController<int>();
final streams = [
incrementController.stream,
decrementController.stream.map((i) => -i),
];
final state$ = Rx.merge(streams)
.scan<int>((acc, e, _) => acc + e, 0)
.publishValueDistinct(0);
final subscription = state$.connect();
return CounterBloc._(
increment: incrementController.add,
decrement: decrementController.add,
state$: state$,
dispose: () async {
await subscription.cancel();
await Future.wait<void>([
incrementController.close(),
decrementController.close(),
]);
},
);
}
}
void main() async {
final counterBloc = CounterBloc();
print('[LOGGER] state=${counterBloc.state$.value}');
final listen1 =
counterBloc.state$.listen((i) => print('[LOGGER 1] state=$i'));
final listen2 =
counterBloc.state$.listen((i) => print('[LOGGER 2] state=$i'));
counterBloc
..increment(0)
..increment(2)
..decrement(2)
..decrement(2)
..decrement(2)
..increment(2)
..increment(2)
..increment(0)
..increment(0)
..increment(0)
..increment(0)
..increment(0);
await Future<void>.delayed(Duration(seconds: 1));
print(counterBloc.state$.value);
await listen1.cancel();
await listen2.cancel();
counterBloc.dispose();
}