Skip to content

Commit

Permalink
3.x: Fix MulticastProcessor not requesting more after limit is reached (
Browse files Browse the repository at this point in the history
#6714)

* 3.x: Fix MulticastProcessor not requesting more after limit is reached

* Test for more prefetch values and patterns.
  • Loading branch information
akarnokd committed Nov 14, 2019
1 parent e4c4903 commit 5026999
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ void drain() {
}
}

consumed = c;
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,4 +784,41 @@ public void noUpstream() {
assertTrue(mp.hasSubscribers());
}

@Test
public void requestUpstreamPrefetchNonFused() {
for (int j = 1; j < 12; j++) {
MulticastProcessor<Integer> mp = MulticastProcessor.create(j, true);

TestSubscriber<Integer> ts = mp.test(0).withTag("Prefetch: " + j);

Flowable.range(1, 10).hide().subscribe(mp);

ts.assertEmpty()
.requestMore(3)
.assertValuesOnly(1, 2, 3)
.requestMore(3)
.assertValuesOnly(1, 2, 3, 4, 5, 6)
.requestMore(4)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
}

@Test
public void requestUpstreamPrefetchNonFused2() {
for (int j = 1; j < 12; j++) {
MulticastProcessor<Integer> mp = MulticastProcessor.create(j, true);

TestSubscriber<Integer> ts = mp.test(0).withTag("Prefetch: " + j);

Flowable.range(1, 10).hide().subscribe(mp);

ts.assertEmpty()
.requestMore(2)
.assertValuesOnly(1, 2)
.requestMore(2)
.assertValuesOnly(1, 2, 3, 4)
.requestMore(6)
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
}
}

0 comments on commit 5026999

Please sign in to comment.