Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add onErrorClose to onErrorResume extensions #369

Open
shinayser opened this issue Nov 21, 2019 · 10 comments · May be fixed by #371
Open

Add onErrorClose to onErrorResume extensions #369

shinayser opened this issue Nov 21, 2019 · 10 comments · May be fixed by #371

Comments

@shinayser
Copy link

I am trying to figure out how can I convert a "error" event into a "done" event but can't find a way.

Example: I have a timeout operator on my stream and want it to close the stream intead of transmitting the error.

Is there a way of doing it?

Perhaps a onErrorClose event could be an interesting addition? What you think?

@shinayser shinayser changed the title How to close stream after error? Closing stream after error Nov 21, 2019
@hoc081098
Copy link
Collaborator

listen(cancelOnError: true)

@brianegan
Copy link
Collaborator

brianegan commented Nov 22, 2019

Yep, @hoc081098 is exactly right: When you listen to the Stream, you can set cancelOnError: true and that should give you the behavior you're looking for!

@shinayser
Copy link
Author

shinayser commented Nov 22, 2019

No guys, that os not the behavior I want because I demands the user to manually call cancelOnError. I am developing an API and it must be concise, I don't want to depend on user for it.
Also, sometimes the user can't call cancelOnError because he is not listening directly, that is the case of Flutter's StreamBuilder.

@brianegan
Copy link
Collaborator

brianegan commented Nov 22, 2019

@shinayser Ah, gotcha -- just as a heads up, that would go against the normal behavior for Dart Streams, which usually require the listener to define that behavior.

If you're cool living with that, you could create a custom transformer and apply it to the Streams that need to exhibit this behavior.

import 'dart:async';

import 'package:rxdart/rxdart.dart';
import 'package:test/test.dart';

extension OnErrorCancel<T> on Stream<T> {
  Stream<T> onErrorCancel() {
    return transform(
      StreamTransformer.fromHandlers(
        handleError: (err, stacktrace, sink) => sink.close(),
      ),
    );
  }
}

void main() {
  test('onErrorCancel cancels a Stream when it emits an error', () {
    final stream = ConcatStream<int>([
      Stream.fromIterable([1]),
      Stream.error(Exception()),
      Stream.fromIterable([2]),
    ]);

    expect(
      stream.onErrorCancel(),
      emitsInOrder(<dynamic>[1, emitsDone]),
    );
  });
}

@shinayser
Copy link
Author

shinayser commented Nov 22, 2019

Iactually I solved it using the operators we already have, like this:

timeout(ms(timeoutMillis))
.onErrorResumeNext(Observable.empty())

But it is a ugly workaround. I was thinking on on official, api supported, way of doing it. Perhaps even going forward and passing a predicate as a parameter that will check if the stream needs to be closed, or not.

@brianegan
Copy link
Collaborator

Yep, that's another way to go about it. If you were designing the api, what would it look like?

onErrorCancel([bool Function(dynamic err) shouldCancel]) kind of a thing? By default, it would cancel the Stream on error. If a shouldCancel function is provided, it would use that to check if the Stream should be closed?

@brianegan brianegan reopened this Nov 22, 2019
@shinayser
Copy link
Author

shinayser commented Nov 22, 2019

Yep, that's another way to go about it. If you were designing the api, what would it look like?

onErrorCancel([bool Function(dynamic err) shouldCancel]) kind of a thing? By default, it would cancel the Stream on error. If a shouldCancel function is provided, it would use that to check if the Stream should be closed?

Yes, exaclly like that!
Actually my resollution has a bit more of code, let me share with you:

return _maxpbPackagesStream
          .where((it) => getMessageType(it) == Mts.MAXPB_CMD_POSITION)
          .map((it) => MultipleReportData.fromBuffer(removeHeader(it)))
          .doOnData((it) => sendAckForCommandId(it.packetID))
          .timeout(ms(timeoutMillis))
          .onErrorResume(
        (error) {
          if (error is TimeoutException) {  // <--- Here I check is the error is just a timout, if it is, I "close" the stream.
            return Observable.empty();
          } else {
            throw error;
          }
        },
      );

As you can see, it is pretty similar to what you proposed.

Now going beyond that, we could also have some more operators that could close the stream based on the data received, what you think? Something like that:

stream.closeAfterEmitting( bool Function<T>(T) shouldClose) //Checks if need to close, and if true, emits the last value, and closes.

stream.closeBeforeEmitting( bool Function<T>(T) shouldClose) //Checks if need to close, and if true, closes without emitting the latest value.

(the word "close" is more expressive than "cancel", what you think?)

@shinayser
Copy link
Author

(Ops, I closed by mistake, sorry!)

@shinayser shinayser reopened this Nov 22, 2019
@brianegan brianegan changed the title Closing stream after error Add onErrorClose to onErrorResume extensions Nov 22, 2019
@brianegan
Copy link
Collaborator

(the word "close" is more expressive than "cancel", what you think?)

Naming is hard :) I don't feel strongly one way or the other. I changed the issue to reflect the work that needs to be done to address the initial topic.

Now going beyond that, we could also have some more operators that could close the stream based on the data received

If I understand you correctly, closeBeforeEmitting already exists, but it's called takeWhile: https://pub.dev/documentation/rxdart/latest/rx/Observable/takeWhile.html

closeAfterEmitting does not exist, but it might make sense for it to exist since other Rx implementations have something along this line: takeWhile(myPredicate, inclusive: true). Since takeWhile comes from the Stream Interface, we'd have to create a new method called takeWhileInclusive or something.

I'd say that'd be better to discuss in another issue, though, if you find it important so we can keep this issue focused on the original topic!

@shinayser
Copy link
Author

(the word "close" is more expressive than "cancel", what you think?)

Naming is hard :) I don't feel strongly one way or the other. I changed the issue to reflect the work that needs to be done to address the initial topic.

Now going beyond that, we could also have some more operators that could close the stream based on the data received

If I understand you correctly, closeBeforeEmitting already exists, but it's called takeWhile: https://pub.dev/documentation/rxdart/latest/rx/Observable/takeWhile.html

closeAfterEmitting does not exist, but it might make sense for it to exist since other Rx implementations have something along this line: takeWhile(myPredicate, inclusive: true). Since takeWhile comes from the Stream Interface, we'd have to create a new method called takeWhileInclusive or something.

I'd say that'd be better to discuss in another issue, though, if you find it important so we can keep this issue focused on the original topic!

Great brianegan! I will open another issue to discuss the takeWhileInclusive then! Thanks!

@brianegan brianegan linked a pull request Nov 22, 2019 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants