Skip to content

Commit

Permalink
update(Delay): supports pause and resume (#585)
Browse files Browse the repository at this point in the history
* wip

* wip

* remove obsolete tests

* fix lint

* dart latest

* format
  • Loading branch information
hoc081098 committed May 28, 2021
1 parent 477213b commit fc0b4ab
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 98 deletions.
87 changes: 45 additions & 42 deletions .github/workflows/rxdart-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,50 @@ jobs:
runs-on: ubuntu-latest

container:
image: google/dart:2.12
image: google/dart:latest

steps:
- uses: actions/checkout@v2

- name: Install dependencies
run: pub get

- name: Analyze
run: dartanalyzer --fatal-infos --fatal-warnings ./lib

- name: Format code
run: dartfmt -n ./lib --set-exit-if-changed

- name: Active coverage
run: pub global activate coverage

- name: Run tests
run: pub run test test/rxdart_test.dart

- name: Start Observatory
run: dart
--disable-service-auth-codes
--enable-vm-service=8111
--pause-isolates-on-exit
--enable-asserts
test/rxdart_test.dart &

- name: Collect coverage
run: nohup pub global run coverage:collect_coverage
--port=8111
--out=coverage.json
--wait-paused
--resume-isolates

- name: Format coverage
run: pub global run coverage:format_coverage
--lcov
--in=coverage.json
--out=lcov.info
--packages=.packages
--report-on=lib

- uses: codecov/codecov-action@v1
- uses: actions/checkout@v2

- name: Print Dart version
run: dart --version

- name: Install dependencies
run: pub get

- name: Analyze
run: dart analyze --fatal-infos --fatal-warnings

- name: Format code
run: dartfmt -n . --set-exit-if-changed

- name: Active coverage
run: pub global activate coverage

- name: Run tests
run: pub run test test/rxdart_test.dart

- name: Start Observatory
run: dart
--disable-service-auth-codes
--enable-vm-service=8111
--pause-isolates-on-exit
--enable-asserts
test/rxdart_test.dart &

- name: Collect coverage
run: nohup pub global run coverage:collect_coverage
--port=8111
--out=coverage.json
--wait-paused
--resume-isolates

- name: Format coverage
run: pub global run coverage:format_coverage
--lcov
--in=coverage.json
--out=lcov.info
--packages=.packages
--report-on=lib

- uses: codecov/codecov-action@v1
2 changes: 1 addition & 1 deletion lib/src/rx.dart
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ abstract class Rx {
/// Rx.timer('hi', Duration(minutes: 1))
/// .listen((i) => print(i); // print 'hi' after 1 minute
static Stream<T> timer<T>(T value, Duration duration) =>
(TimerStream<T>(value, duration));
TimerStream<T>(value, duration);

/// When listener listens to it, creates a resource object from resource factory function,
/// and creates a [Stream] from the given factory function and resource as argument.
Expand Down
2 changes: 1 addition & 1 deletion lib/src/subjects/replay_subject.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import 'dart:async';
import 'dart:collection';

import 'package:rxdart/rxdart.dart';
import 'package:rxdart/src/rx.dart';
import 'package:rxdart/src/streams/replay_stream.dart';
import 'package:rxdart/src/subjects/subject.dart';
import 'package:rxdart/src/transformers/start_with.dart';
import 'package:rxdart/src/transformers/start_with_error.dart';
import 'package:rxdart/src/utils/error_and_stacktrace.dart';
import 'package:rxdart/src/utils/value_wrapper.dart';
Expand Down
25 changes: 13 additions & 12 deletions lib/src/transformers/delay.dart
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import 'dart:async';
import 'dart:collection';

import 'package:rxdart/src/rx.dart';
import 'package:rxdart/src/utils/forwarding_sink.dart';
import 'package:rxdart/src/utils/forwarding_stream.dart';

class _DelayStreamSink<S> implements ForwardingSink<S, S> {
final Duration _duration;
var _inputClosed = false;
final _timers = Queue<Timer>();
final _subscriptions = Queue<StreamSubscription<void>>();

_DelayStreamSink(this._duration);

@override
void add(EventSink<S> sink, S data) {
final timer = Timer(_duration, () {
_timers.removeFirst();
final subscription = Rx.timer<void>(null, _duration).listen((_) {
_subscriptions.removeFirst();

sink.add(data);

if (_inputClosed && _timers.isEmpty) {
if (_inputClosed && _subscriptions.isEmpty) {
sink.close();
}
});

_timers.addLast(timer);
_subscriptions.addLast(subscription);
}

@override
Expand All @@ -34,27 +35,27 @@ class _DelayStreamSink<S> implements ForwardingSink<S, S> {
void close(EventSink<S> sink) {
_inputClosed = true;

if (_timers.isEmpty) {
if (_subscriptions.isEmpty) {
sink.close();
}
}

@override
FutureOr onCancel(EventSink<S> sink) {
if (_timers.isNotEmpty) {
_timers.forEach((t) => t.cancel());
_timers.clear();
FutureOr<void> onCancel(EventSink<S> sink) {
if (_subscriptions.isNotEmpty) {
return Future.wait(_subscriptions.map((t) => t.cancel()))
.whenComplete(() => _subscriptions.clear());
}
}

@override
void onListen(EventSink<S> sink) {}

@override
void onPause(EventSink<S> sink) {}
void onPause(EventSink<S> sink) => _subscriptions.forEach((s) => s.pause());

@override
void onResume(EventSink<S> sink) {}
void onResume(EventSink<S> sink) => _subscriptions.forEach((s) => s.resume());
}

/// The Delay operator modifies its source Stream by pausing for
Expand Down
2 changes: 1 addition & 1 deletion lib/src/utils/forwarding_sink.dart
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ abstract class ForwardingSink<T, R> {
void onResume(EventSink<R> sink);

/// Fires when a subscriber cancels.
FutureOr onCancel(EventSink<R> sink);
FutureOr<void> onCancel(EventSink<R> sink);
}
1 change: 0 additions & 1 deletion test/streams/never_test.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:rxdart/src/streams/never.dart';
import 'package:test/test.dart';

void main() {
Expand Down
1 change: 0 additions & 1 deletion test/streams/repeat_test.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:rxdart/src/streams/repeat.dart';
import 'package:test/test.dart';

void main() {
Expand Down
1 change: 0 additions & 1 deletion test/streams/retry_test.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:rxdart/src/streams/retry.dart';
import 'package:test/test.dart';

void main() {
Expand Down
11 changes: 5 additions & 6 deletions test/subject/behavior_subject_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ void main() {
final subject = BehaviorSubject<void>();

unawaited(subject
.addStream(Stream<void>.error(Exception()),
cancelOnError: true)
.addStream(Stream<void>.error(Exception()), cancelOnError: true)
.whenComplete(() => subject.add(1)));

await expectLater(subject.stream,
Expand Down Expand Up @@ -874,8 +873,8 @@ void main() {
{
var behaviorSubject = BehaviorSubject.seeded(1);

var mapped = behaviorSubject
.asyncMap((event) => Future.value(event + 1));
var mapped =
behaviorSubject.asyncMap((event) => Future.value(event + 1));
expect(mapped, emitsInOrder(<int>[2, 3]));

behaviorSubject.add(2);
Expand All @@ -884,8 +883,8 @@ void main() {
{
var behaviorSubject = BehaviorSubject<int>();

var mapped = behaviorSubject
.asyncMap((event) => Future.value(event + 1));
var mapped =
behaviorSubject.asyncMap((event) => Future.value(event + 1));
expect(mapped, emitsInOrder(<int>[2, 3]));

behaviorSubject.add(1);
Expand Down
15 changes: 0 additions & 15 deletions test/transformers/delay_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,6 @@ void main() {
}));
});

/// Should also throw if the current [Zone] is unable to install a [Timer]
test('Rx.delay.error.shouldThrowB', () async {
runZoned(() {
final streamWithError =
Stream.value(1).delay(const Duration(milliseconds: 200));

streamWithError.listen(null,
onError: expectAsync2(
(Exception e, StackTrace s) => expect(e, isException)));
},
zoneSpecification: ZoneSpecification(
createTimer: (self, parent, zone, duration, void Function() f) =>
throw Exception('Zone createTimer error')));
});

test('Rx.delay.pause.resume', () async {
late StreamSubscription<int> subscription;
final stream =
Expand Down
1 change: 0 additions & 1 deletion test/transformers/dematerialize_test.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:rxdart/src/utils/notification.dart';
import 'package:stack_trace/stack_trace.dart';
import 'package:test/test.dart';

Expand Down
14 changes: 0 additions & 14 deletions test/transformers/interval_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,6 @@ void main() {
}));
});

test('Rx.interval.error.shouldThrowB', () async {
runZoned(() {
final streamWithError =
Stream.value(1).interval(const Duration(milliseconds: 20));

streamWithError.listen(null,
onError: expectAsync2(
(Exception e, StackTrace s) => expect(e, isException)));
},
zoneSpecification: ZoneSpecification(
createTimer: (self, parent, zone, duration, void Function() f) =>
throw Exception('Zone createTimer error')));
});

test('Rx.interval accidental broadcast', () async {
final controller = StreamController<int>();

Expand Down
1 change: 0 additions & 1 deletion test/transformers/skip_last_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ void main() {
expect(count, equals(values.length));
});


test('Rx.skipLast.skipMoreThanLength', () async {
final stream = Stream.fromIterable([1, 2, 3, 4, 5]).skipLast(100);

Expand Down
1 change: 0 additions & 1 deletion test/transformers/with_latest_from_test.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:rxdart/src/rx.dart';
import 'package:test/test.dart';

/// creates 5 Streams, deferred from a source Stream, so that they all emit
Expand Down

0 comments on commit fc0b4ab

Please sign in to comment.