Skip to content
This repository has been archived by the owner on Sep 14, 2021. It is now read-only.

Commit

Permalink
example updated
Browse files Browse the repository at this point in the history
  • Loading branch information
hoc081098 committed Jan 8, 2021
1 parent 3e0c438 commit 8738d68
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions example/distinct_value_connectable_stream_example.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';

import 'package:distinct_value_connectable_stream/distinct_value_connectable_stream.dart';
import 'package:rxdart/rxdart.dart';

Expand All @@ -20,27 +22,28 @@ class CounterBloc {
});

factory CounterBloc() {
final incrementController = PublishSubject<int>();
final decrementController = PublishSubject<int>();
final incrementController = StreamController<int>();
final decrementController = StreamController<int>();

final streams = [
incrementController,
decrementController.map((i) => -i),
incrementController.stream,
decrementController.stream.map((i) => -i),
];
final state$ = Rx.merge(streams)
.scan<int>((acc, e, _) => acc! + e, 0)
.publishValueDistinct(0);

final subscription = state$.connect();

return CounterBloc._(
increment: incrementController.add,
decrement: decrementController.add,
state$: state$,
dispose: () async {
await Future.wait<void>(
[incrementController, decrementController].map((c) => c.close()));
await subscription.cancel();
await Future.wait<void>([
incrementController.close(),
decrementController.close(),
]);
},
);
}
Expand Down

0 comments on commit 8738d68

Please sign in to comment.