Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds global onDiscard hook #3240

Open
wants to merge 3 commits into
base: 3.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -135,6 +135,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, unlike the more recent Operators.onDiscard, Operators.onNextDropped doesn't catch exceptions. maybe this could be added as an improvement alongside this PR?

otherwise, a throwing onNextDropped hook would prevent the onDiscard.

Reversing the order of the calls would also avoid this issue (and I think discarding is more important than signalling the malformed signal)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one pattern that could be applied everywhere is to call actual.currentContext() only once, assigning the result to a local Context ctx variable and using ctx in both Operators calls.

return;
}

Expand Down Expand Up @@ -284,6 +285,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return;
}

Expand Down Expand Up @@ -474,6 +476,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -173,6 +173,7 @@ public void onNext(T t) {
}

Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -206,6 +206,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return true;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -174,6 +174,7 @@ public void onNext(T t) {
break;
case TERMINATED:
Operators.onNextDropped(t, currentContext());
Operators.onDiscard(t, currentContext());
break;
}
return;
Expand Down
Expand Up @@ -154,6 +154,7 @@ public FluxSink<T> next(T t) {
Objects.requireNonNull(t, "t is null in sink.next(t)");
if (sink.isTerminated() || done) {
Operators.onNextDropped(t, sink.currentContext());
Operators.onDiscard(t, sink.currentContext());
return this;
}
if (WIP.get(this) == 0 && WIP.compareAndSet(this, 0, 1)) {
Expand Down Expand Up @@ -629,6 +630,7 @@ static final class IgnoreSink<T> extends BaseSink<T> {
public FluxSink<T> next(T t) {
if (isTerminated()) {
Operators.onNextDropped(t, ctx);
Operators.onDiscard(t, ctx);
return this;
}
if (isCancelled()) {
Expand Down Expand Up @@ -662,6 +664,7 @@ static abstract class NoOverflowBaseAsyncSink<T> extends BaseSink<T> {
public final FluxSink<T> next(T t) {
if (isTerminated()) {
Operators.onNextDropped(t, ctx);
Operators.onDiscard(t, ctx);
return this;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,7 @@ public void onSubscribe(Subscription s) {
public void onNext(final T t) {
if (done || delayed < 0) {
Operators.onNextDropped(t, currentContext());
Operators.onDiscard(t, currentContext());
return;
}
//keep track of the number of delayed onNext so that
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,7 @@ public void onNext(Signal<T> t) {
if (done) {
//TODO interpret the Signal and drop differently?
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -134,6 +134,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return true;
}

Expand Down Expand Up @@ -262,6 +263,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return;
}

Expand Down Expand Up @@ -301,6 +303,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return true;
}

Expand Down Expand Up @@ -444,6 +447,7 @@ public boolean tryOnNext(T t) {
}
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return true;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, ctx);
Operators.onDiscard(t, ctx);
return true;
}

Expand Down Expand Up @@ -244,6 +245,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, ctx);
Operators.onDiscard(t, ctx);
return true;
}

Expand Down
Expand Up @@ -147,6 +147,7 @@ public Object scanUnsafe(Attr key) {
public void onNext(T t) {
if (state == STATE_DONE) {
Operators.onNextDropped(t, cachedContext);
Operators.onDiscard(t, cachedContext);
return;
}
try {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -90,6 +90,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return;
}

Expand Down Expand Up @@ -122,6 +123,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return false;
}

Expand Down Expand Up @@ -224,6 +226,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return;
}

Expand Down Expand Up @@ -256,6 +259,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return false;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -96,6 +96,7 @@ public void onNext(T t) {
else {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.currentContext());
return;
}
boolean b;
Expand Down Expand Up @@ -128,6 +129,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return false;
}

Expand Down Expand Up @@ -315,6 +317,7 @@ public void onNext(T t) {
else {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return;
}
boolean b;
Expand Down Expand Up @@ -347,6 +350,7 @@ public void onNext(T t) {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Operators.onDiscard(t, this.ctx);
return false;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -377,6 +377,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}

Expand Down Expand Up @@ -973,6 +974,7 @@ public void onNext(R t) {
else {
if (done) {
Operators.onNextDropped(t, parent.currentContext());
Operators.onDiscard(t, parent.currentContext());
return;
}

Expand Down
Expand Up @@ -160,6 +160,7 @@ public CoreSubscriber<? super T> actual() {
public void next(T t) {
if (terminate) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}
if (hasValue) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -176,6 +176,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if(done){
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}

Expand Down
Expand Up @@ -103,6 +103,7 @@ public ContextView contextView() {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}

Expand Down Expand Up @@ -156,6 +157,7 @@ private void reset() {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return false;
}

Expand Down Expand Up @@ -317,6 +319,7 @@ public void onSubscribe(Subscription s) {
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}

Expand Down Expand Up @@ -371,6 +374,7 @@ private void reset() {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return false;
}

Expand Down
Expand Up @@ -109,6 +109,7 @@ public ContextView contextView() {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return true;
}

Expand Down Expand Up @@ -170,6 +171,7 @@ public void onNext(T t) {
else {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}
try {
Expand Down Expand Up @@ -480,6 +482,7 @@ public void onNext(T t) {
else {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return;
}
try {
Expand Down Expand Up @@ -535,6 +538,7 @@ private void reset() {
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Operators.onDiscard(t, actual.currentContext());
return true;
}

Expand Down