Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds global onDiscard hook #3240

Open
wants to merge 3 commits into
base: 3.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -134,7 +134,9 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
Operators.onNextDropped(t, ctx);
return;
}

Expand Down Expand Up @@ -283,7 +285,9 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, this.ctx);
Context ctx = this.ctx;
Operators.onDiscard(t, ctx);
Operators.onNextDropped(t, ctx);
return;
}

Expand Down Expand Up @@ -473,7 +477,9 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
Operators.onNextDropped(t, ctx);
return;
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -172,7 +172,9 @@ public void onNext(T t) {
}
}

Operators.onNextDropped(t, this.ctx);
Context ctx = this.ctx;
Operators.onDiscard(t, ctx);
Operators.onNextDropped(t, ctx);
}

@Override
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -205,6 +205,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return true;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -173,6 +173,7 @@ public void onNext(T t) {
Operators.onDiscard(t, currentContext());
break;
case TERMINATED:
Operators.onDiscard(t, currentContext());
Operators.onNextDropped(t, currentContext());
break;
}
Expand Down
Expand Up @@ -153,6 +153,7 @@ public ContextView contextView() {
public FluxSink<T> next(T t) {
Objects.requireNonNull(t, "t is null in sink.next(t)");
if (sink.isTerminated() || done) {
Operators.onDiscard(t, sink.currentContext());
Operators.onNextDropped(t, sink.currentContext());
return this;
}
Expand Down Expand Up @@ -628,6 +629,7 @@ static final class IgnoreSink<T> extends BaseSink<T> {
@Override
public FluxSink<T> next(T t) {
if (isTerminated()) {
Operators.onDiscard(t, ctx);
Operators.onNextDropped(t, ctx);
return this;
}
Expand Down Expand Up @@ -661,6 +663,7 @@ static abstract class NoOverflowBaseAsyncSink<T> extends BaseSink<T> {
@Override
public final FluxSink<T> next(T t) {
if (isTerminated()) {
Operators.onDiscard(t, ctx);
Operators.onNextDropped(t, ctx);
return this;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,6 +94,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(final T t) {
if (done || delayed < 0) {
Operators.onDiscard(t, currentContext());
Operators.onNextDropped(t, currentContext());
return;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,6 +82,7 @@ public void onSubscribe(Subscription s) {
public void onNext(Signal<T> t) {
if (done) {
//TODO interpret the Signal and drop differently?
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -133,6 +133,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return true;
}
Expand Down Expand Up @@ -261,6 +262,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return;
}
Expand Down Expand Up @@ -300,6 +302,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return true;
}
Expand Down Expand Up @@ -443,6 +446,7 @@ public boolean tryOnNext(T t) {
return true;
}
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return true;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -107,6 +107,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, ctx);
Operators.onNextDropped(t, ctx);
return true;
}
Expand Down Expand Up @@ -243,6 +244,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, ctx);
Operators.onNextDropped(t, ctx);
return true;
}
Expand Down
Expand Up @@ -146,6 +146,7 @@ public Object scanUnsafe(Attr key) {
@Override
public void onNext(T t) {
if (state == STATE_DONE) {
Operators.onDiscard(t, cachedContext);
Operators.onNextDropped(t, cachedContext);
return;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -89,6 +89,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return;
}
Expand Down Expand Up @@ -121,6 +122,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return false;
}
Expand Down Expand Up @@ -223,6 +225,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return;
}
Expand Down Expand Up @@ -255,6 +258,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return false;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,7 @@ public void onNext(T t) {
}
else {
if (done) {
Operators.onDiscard(t, this.currentContext());
Operators.onNextDropped(t, this.ctx);
return;
}
Expand Down Expand Up @@ -127,6 +128,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return false;
}
Expand Down Expand Up @@ -314,6 +316,7 @@ public void onNext(T t) {
}
else {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return;
}
Expand Down Expand Up @@ -346,6 +349,7 @@ public void onNext(T t) {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, this.ctx);
Operators.onNextDropped(t, this.ctx);
return false;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -376,6 +376,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return;
}
Expand Down Expand Up @@ -972,6 +973,7 @@ public void onNext(R t) {
}
else {
if (done) {
Operators.onDiscard(t, parent.currentContext());
Operators.onNextDropped(t, parent.currentContext());
return;
}
Expand Down
Expand Up @@ -159,6 +159,7 @@ public CoreSubscriber<? super T> actual() {
@Override
public void next(T t) {
if (terminate) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return;
}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -175,6 +175,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if(done){
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return;
}
Expand Down
Expand Up @@ -102,6 +102,7 @@ public ContextView contextView() {
@Override
public void onNext(T t) {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return;
}
Expand Down Expand Up @@ -155,6 +156,7 @@ private void reset() {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return false;
}
Expand Down Expand Up @@ -316,6 +318,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(T t) {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return;
}
Expand Down Expand Up @@ -370,6 +373,7 @@ private void reset() {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return false;
}
Expand Down
Expand Up @@ -108,6 +108,7 @@ public ContextView contextView() {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return true;
}
Expand Down Expand Up @@ -169,6 +170,7 @@ public void onNext(T t) {
}
else {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return;
}
Expand Down Expand Up @@ -479,6 +481,7 @@ public void onNext(T t) {
}
else {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return;
}
Expand Down Expand Up @@ -534,6 +537,7 @@ private void reset() {
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onDiscard(t, actual.currentContext());
Operators.onNextDropped(t, actual.currentContext());
return true;
}
Expand Down