Skip to content
This repository has been archived by the owner on Sep 14, 2021. It is now read-only.

Commit

Permalink
1.3.0 (#11)
Browse files Browse the repository at this point in the history
* 1.3.0

* docs
  • Loading branch information
hoc081098 committed May 9, 2021
1 parent c973aa5 commit 5e7e8b2
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 73 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,7 @@
## 1.3.0 - May 9, 2021

- Update `rxdart` to `0.27.0`.

## 1.2.0 - Mar 27, 2021

- Update dependencies.
Expand Down
2 changes: 1 addition & 1 deletion example/distinct_value_connectable_stream_example.dart
Expand Up @@ -30,7 +30,7 @@ class CounterBloc {
decrementController.stream.map((i) => -i),
];
final state$ = Rx.merge(streams)
.scan<int>((acc, e, _) => acc! + e, 0)
.scan<int>((acc, e, _) => acc + e, 0)
.publishValueDistinct(0);

final subscription = state$.connect();
Expand Down
11 changes: 4 additions & 7 deletions lib/src/as_broadcast.dart
@@ -1,9 +1,8 @@
import 'dart:async';

import 'package:rxdart_ext/rxdart_ext.dart' show ValueWrapper;

import 'distinct_value_connectable_stream.dart';
import 'distinct_value_stream.dart';
import 'distinct_value_stream_mixin.dart';

/// Convert a [DistinctValueStream] to a broadcast [DistinctValueStream].
extension BroadcastDistinctValueStreamExtensions<T> on DistinctValueStream<T> {
Expand All @@ -19,7 +18,7 @@ extension BroadcastDistinctValueStreamExtensions<T> on DistinctValueStream<T> {
final self = this;
return self is DistinctValueConnectableStream<T>
? self
: DistinctValueConnectableStream<T>(this, requireValue,
: DistinctValueConnectableStream<T>(this, value,
equals: equals, sync: sync);
}

Expand All @@ -36,6 +35,7 @@ extension BroadcastDistinctValueStreamExtensions<T> on DistinctValueStream<T> {
}

class _AsBroadcastStream<T> extends StreamView<T>
with DistinctValueStreamMixin<T>
implements DistinctValueStream<T> {
final DistinctValueStream<T> source;

Expand All @@ -46,8 +46,5 @@ class _AsBroadcastStream<T> extends StreamView<T>
bool Function(T p1, T p2) get equals => source.equals;

@override
Null get errorAndStackTrace => null;

@override
ValueWrapper<T> get valueWrapper => source.valueWrapper;
T get value => source.value;
}
11 changes: 4 additions & 7 deletions lib/src/distinct_value_connectable_stream.dart
Expand Up @@ -6,10 +6,10 @@ import 'package:rxdart_ext/rxdart_ext.dart'
ConnectableStream,
ConnectableStreamSubscription,
ValueStream,
ValueSubject,
ValueWrapper;
ValueSubject;

import 'distinct_value_stream.dart';
import 'distinct_value_stream_mixin.dart';
import 'distinct_value_subject.dart';

/// A [ConnectableStream] that converts a single-subscription Stream into
Expand Down Expand Up @@ -52,7 +52,7 @@ abstract class DistinctValueConnectableStream<T> extends ConnectableStream<T>
}

class _DistinctValueConnectableStream<T>
extends DistinctValueConnectableStream<T> {
extends DistinctValueConnectableStream<T> with DistinctValueStreamMixin<T> {
final Stream<T> _source;
final DistinctValueSubject<T> _subject;
var _used = false;
Expand Down Expand Up @@ -119,10 +119,7 @@ class _DistinctValueConnectableStream<T>
}

@override
Null get errorAndStackTrace => null;

@override
ValueWrapper<T> get valueWrapper => _subject.valueWrapper;
T get value => _subject.value;
}

/// Provide two extension methods for [Stream]:
Expand Down
54 changes: 21 additions & 33 deletions lib/src/distinct_value_stream.dart
@@ -1,7 +1,8 @@
import 'dart:async';

import 'package:distinct_value_connectable_stream/src/distinct_value_stream_mixin.dart';
import 'package:rxdart_ext/rxdart_ext.dart'
show NotReplayValueStream, ValueStreamController, ValueWrapper;
show NotReplayValueStream, ValueStreamController;

/// An [Stream] that provides synchronous access to the last emitted item,
/// and two consecutive values are not equal.
Expand All @@ -15,36 +16,25 @@ abstract class DistinctValueStream<T> extends NotReplayValueStream<T> {
static bool defaultEquals(Object? lhs, Object? rhs) => lhs == rhs;

@override
Null get errorAndStackTrace;
T get value;

@override
ValueWrapper<T> get valueWrapper;
}

/// Extensions to easily access value and error.
extension DistinctValueStreamExtensions<T> on DistinctValueStream<T> {
/// A flag that turns true as soon as at least one event has been emitted.
/// Always returns `true`.
bool get hasValue => true;
T get valueOrNull;

/// Returns latest value.
T get value => valueWrapper.value;
@override
bool get hasValue;

/// Returns latest value.
T get requireValue => valueWrapper.value;
@override
Never get error;

/// A flag that turns true as soon as at an error event has been emitted.
/// Always returns `false`.
bool get hasError => false;
@override
Null get errorOrNull;

/// Last emitted error.
/// Always returns `null`.
Null get error => null;
@override
bool get hasError;

/// Last emitted error.
/// Always throws.
Never get requireError =>
throw StateError('DistinctValueStream always has no error!');
@override
Null get stackTrace;
}

/// Convert this [Stream] to a [DistinctValueStream].
Expand Down Expand Up @@ -77,6 +67,7 @@ extension ToDistinctValueStreamExtension<T> on Stream<T> {
///
/// This stream is a single-subscription stream.
class _DistinctValueStream<T> extends Stream<T>
with DistinctValueStreamMixin<T>
implements DistinctValueStream<T> {
@override
final bool Function(T p1, T p2) equals;
Expand All @@ -89,15 +80,15 @@ class _DistinctValueStream<T> extends Stream<T>
/// Construct a [_DistinctValueStream] with source stream, seed value.
_DistinctValueStream(
Stream<T> source,
T value,
T seedValue,
this.equals,
) : controller = ValueStreamController<T>(value, sync: true) {
) : controller = ValueStreamController<T>(seedValue, sync: true) {
late StreamSubscription<T> subscription;

controller.onListen = () {
subscription = source.listen(
(data) {
if (!equals(valueWrapper.value, data)) {
if (!equals(value, data)) {
controller.add(data);
}
},
Expand All @@ -113,12 +104,6 @@ class _DistinctValueStream<T> extends Stream<T>
controller.onCancel = () => subscription.cancel();
}

@override
Null get errorAndStackTrace => null;

@override
ValueWrapper<T> get valueWrapper => controller.stream.valueWrapper!;

@override
StreamSubscription<T> listen(
void Function(T event)? onData, {
Expand All @@ -132,4 +117,7 @@ class _DistinctValueStream<T> extends Stream<T>
onDone: onDone,
cancelOnError: cancelOnError,
);

@override
T get value => controller.stream.value;
}
24 changes: 24 additions & 0 deletions lib/src/distinct_value_stream_mixin.dart
@@ -0,0 +1,24 @@
import 'package:rxdart_ext/rxdart_ext.dart' show ValueStreamError;

import 'distinct_value_stream.dart';

/// This mixin implements all [DistinctValueStream] members except [DistinctValueStream.value].
mixin DistinctValueStreamMixin<T> implements DistinctValueStream<T> {
@override
Never get error => throw ValueStreamError.hasNoError();

@override
Null get errorOrNull => null;

@override
bool get hasError => false;

@override
Null get stackTrace => null;

@override
bool get hasValue => true;

@override
T get valueOrNull => value;
}
17 changes: 8 additions & 9 deletions lib/src/distinct_value_subject.dart
@@ -1,8 +1,9 @@
import 'dart:async';

import 'package:distinct_value_connectable_stream/src/distinct_value_stream_mixin.dart';
import 'package:meta/meta.dart';
import 'package:rxdart_ext/rxdart_ext.dart'
show PublishSubject, Subject, ValueSubject, ValueWrapper;
show PublishSubject, Subject, ValueSubject;

import 'distinct_value_stream.dart';

Expand Down Expand Up @@ -38,6 +39,7 @@ import 'distinct_value_stream.dart';
/// subject.close();
@sealed
class DistinctValueSubject<T> extends Subject<T>
with DistinctValueStreamMixin<T>
implements DistinctValueStream<T> {
final ValueSubject<T> _subject;

Expand Down Expand Up @@ -73,16 +75,10 @@ class DistinctValueSubject<T> extends Subject<T>
equals ?? DistinctValueStream.defaultEquals, subject);
}

@override
Null get errorAndStackTrace => null;

@override
ValueWrapper<T> get valueWrapper => _subject.valueWrapper!;

@nonVirtual
@override
void add(T event) {
if (!equals(valueWrapper.value, event)) {
if (!equals(value, event)) {
_subject.add(event);
}
}
Expand All @@ -91,7 +87,7 @@ class DistinctValueSubject<T> extends Subject<T>
Future<void> close() => _subject.close();

@override
void addError(Object error, [StackTrace? stackTrace]) =>
Never addError(Object error, [StackTrace? stackTrace]) =>
throw StateError('Cannot add error to DistinctValueSubject');

@override
Expand All @@ -117,4 +113,7 @@ class DistinctValueSubject<T> extends Subject<T>
onCancel: onCancel,
sync: sync,
);

@override
T get value => _subject.value;
}
6 changes: 3 additions & 3 deletions pubspec.yaml
@@ -1,7 +1,7 @@
name: distinct_value_connectable_stream
description: Distinct value connectable stream for RxDart, useful for BLoC pattern
author: Petrus Nguyen Thai Hoc <hoc081098@gmail.com>
version: 1.2.0
version: 1.3.0
homepage: https://github.com/hoc081098/distinct_value_connectable_stream.git
repository: https://github.com/hoc081098/distinct_value_connectable_stream.git
issue_tracker: https://github.com/hoc081098/distinct_value_connectable_stream/issues
Expand All @@ -10,9 +10,9 @@ environment:
sdk: '>=2.12.0 <3.0.0'

dependencies:
rxdart_ext: ^0.0.1
rxdart_ext: ^0.1.0
meta: ^1.3.0

dev_dependencies:
pedantic: ^1.11.0
test: ^1.16.8
test: ^1.17.3
9 changes: 4 additions & 5 deletions test/distinct_value_connectable_stream_test.dart
Expand Up @@ -278,15 +278,14 @@ void main() {

test('extensions', () {
final distinctValue = Stream.value(1).shareValueDistinct(0);
expect(distinctValue.valueWrapper, ValueWrapper(0));
expect(distinctValue.valueOrNull, 0);
expect(distinctValue.hasValue, true);
expect(distinctValue.value, 0);
expect(distinctValue.requireValue, 0);

expect(distinctValue.errorAndStackTrace, isNull);
expect(distinctValue.stackTrace, isNull);
expect(distinctValue.hasError, false);
expect(distinctValue.error, null);
expect(() => distinctValue.requireError, throwsStateError);
expect(distinctValue.errorOrNull, null);
expect(() => distinctValue.error, throwsA(isA<ValueStreamError>()));
});
});
}
Expand Down
3 changes: 2 additions & 1 deletion test/distinct_value_subject_test.dart
Expand Up @@ -77,7 +77,8 @@ void main() {
});

test('get error', () {
expect(DistinctValueSubject(0).errorAndStackTrace, isNull);
expect(DistinctValueSubject(0).errorOrNull, isNull);
expect(DistinctValueSubject(0).stackTrace, isNull);
});

test('Rx', () {
Expand Down
11 changes: 5 additions & 6 deletions test/distinct_value_test.dart
Expand Up @@ -14,7 +14,7 @@ void main() {
for (var i = 0; i < 10; i++) {
controller.add(i);
expect(stream.value, i);
expect(stream.error, isNull);
expect(stream.errorOrNull, isNull);
}
});

Expand Down Expand Up @@ -118,15 +118,14 @@ void main() {

test('extensions', () {
final distinctValue = Stream.value(1).distinctValue(0);
expect(distinctValue.valueWrapper, ValueWrapper(0));
expect(distinctValue.valueOrNull, 0);
expect(distinctValue.hasValue, true);
expect(distinctValue.value, 0);
expect(distinctValue.requireValue, 0);

expect(distinctValue.errorAndStackTrace, isNull);
expect(distinctValue.stackTrace, isNull);
expect(distinctValue.hasError, false);
expect(distinctValue.error, null);
expect(() => distinctValue.requireError, throwsStateError);
expect(distinctValue.errorOrNull, null);
expect(() => distinctValue.error, throwsA(isA<ValueStreamError>()));
});
});
}
3 changes: 2 additions & 1 deletion test/single_subscription_to_broadcast_test.dart
Expand Up @@ -22,7 +22,8 @@ void main() {
final stream = source.asBroadcastDistinctValueStream();

expect(identical(stream.equals, source.equals), true);
expect(stream.errorAndStackTrace, null);
expect(stream.errorOrNull, null);
expect(stream.stackTrace, null);
await _test(stream);
});

Expand Down

0 comments on commit 5e7e8b2

Please sign in to comment.