Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SequenceBarrier.waitForSequence sometimes doesn't respect BlockingWaitStrategy #462

Open
tibco-jufernan opened this issue Oct 5, 2023 · 0 comments

Comments

@tibco-jufernan
Copy link

Describe the bug
With many producers and a blocking wait strategy, SequenceBarrier.waitForSequence(nextSequence) can return a value less than its input. Specifically, it can return nextSequence - 1.

To Reproduce

Run thisFails(). It calls testWithNumProducers, which fails if waitForSequence(nextSequence) returns a value less than its input.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;

class WaitForReproTest {

    @Test
    void thisFails() throws InterruptedException {
        testWithNumProducers(128);
    }

    @Test
    void thisWorks() throws InterruptedException {
        testWithNumProducers(3);
    }

    void testWithNumProducers(int numProducers) throws InterruptedException {
        WaitForRepro w = new WaitForRepro();

        w.startConsumer();
        w.startProducers(numProducers);

        Thread.sleep(4000);

        Assertions.assertFalse(w.failed());
    }

    static class WaitForRepro {

        private final RingBuffer<Integer> buffer;
        private final SequenceBarrier barrier;
        private volatile boolean failed;

        public WaitForRepro() {
            buffer = RingBuffer.createMultiProducer(() -> 2, 16, new BlockingWaitStrategy());
            barrier = buffer.newBarrier();
        }

        public void startConsumer() {
            Executors.newSingleThreadExecutor().submit(() -> {
                long nextSequence = 0;
                long availableSequence;
                while (true) {
                    availableSequence = barrier.waitFor(nextSequence);
                    if (nextSequence > availableSequence) {
                        fail();
                    }
                    while (nextSequence <= availableSequence) {
                        buffer.get(nextSequence++);
                    }
                    Thread.sleep(5);
                }
            });
        }

        public void startProducers(int numProducers) {
            ExecutorService executor = Executors.newFixedThreadPool(numProducers);
            for (int i = 0; i < numProducers; ++i) {
                executor.submit(() -> {
                    while (true) {
                        buffer.publishEvent((event, seq) -> {
                            // NOP
                        });
                        Thread.sleep(3);
                    }
                });
            }
        }

        private void fail() {
            failed = true;
        }

        public boolean failed() {
            return failed;
        }
    }
}

Expected behavior
I expected that with a BlockingWaitStrategy, waitFor(nextSequence) should block until it can return nextSequence or a value greater than it.

Desktop (please complete the following information):

  • OS: MacOS
  • Version 3.4.4 and, 4.0.0
  • JVM Version OpenJDK Runtime Environment Temurin-17.0.8+7 (build 17.0.8+7)

Additional context
I had opened a thread on Google Groups, but thought that this problem might be more easily addressed as a GitHub issue.

In 2.10.4, I did not observe this behavior.

When debugging, I found that the blocking wait strategy returns the a sequence greater than or equal to sequence on line 56 of waitFor. But on line 63, sequencer.getHighestPublishedSequence returns sequence - 1.
https://github.com/LMAX-Exchange/disruptor/blob/3.4.4/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java#L51

In 2.10.4, waitFor simply defers to the wait strategy, so sequence - 1 can never be returned with a blocking wait strategy.
https://github.com/LMAX-Exchange/disruptor/blob/2.10.4/code/src/main/com/lmax/disruptor/ProcessingSequenceBarrier.java#L40

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant