Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proper timeout handling in func (*pullConsumer) fetch() #1482

Open
yz89122 opened this issue Dec 2, 2023 · 1 comment
Open

Proper timeout handling in func (*pullConsumer) fetch() #1482

yz89122 opened this issue Dec 2, 2023 · 1 comment
Labels
proposal Enhancement idea or proposal

Comments

@yz89122
Copy link

yz89122 commented Dec 2, 2023

Proposed change

IMHO, there're something we can improve in these lines of code.

nats.go/jetstream/pull.go

Lines 770 to 772 in 8712190

case <-time.After(req.Expires + 1*time.Second):
res.done = true
return

  1. In batch request, it'll create a lot of time.Timer according to the doc. Before the timeout reached, the timer is not GC-able. For client with Higher throughput and with higher Expires, there will be more concurrent timer running, which is unnecessary.
  2. Currently, the timeout is controlled by Expires and a constant 1 * time.Second, which is the same field as the request pass to server. For client or server under stress, the timeout may easily reached. Especially for FetchNoWait().
    • I think we can separate the Expires into 2 options, one used as request that passes to the server, another serves as client receiving timeout.
    • Or we can uses context.Context.Done() for receiving timeout.
  3. Messages delivered after timeout reached AND before the server received UNSUB should be unacked explicitly (instread of waiting server ack timeout). Current implementation is just dropping the messages. For those using AckPolicy: AckAll, this behavior is dangerous, we could lose message permanently if we received later message and ack them. It's possible because we normally write a loop to batch request (Fetch()).

Use case

outer:
for {
    batch, err := stream.Fetch(batchSize)
    // ...
inner:
    var lastMessage jetstream.Msg
    for _, message := batch.Messages() {
        lastMessage = message
        // If the client or server is under stress, this inner loop may never run.
    }
    // ...
    if lastMessage == nil {
        // empty batch
        continue
    }
    err := lastMessage.Ack(ctx) // Ack entire batch of message with `AckPolicy: AckAll`
    // ...
}

Contribution

Yes

@yz89122 yz89122 added the proposal Enhancement idea or proposal label Dec 2, 2023
@YarBor
Copy link

YarBor commented Dec 8, 2023

For 1), in nats package, there is a timerPool that can reuse time.Timer, I think it is possible to make a same design , or reuse timerPool in nats package?

type timerPool struct {

var globalTimerPool timerPool
// timerPool provides GC-able pooling of *time.Timer's.
// can be used by multiple goroutines concurrently.
type timerPool struct {
	p sync.Pool
}
func (tp *timerPool) Get(d time.Duration) *time.Timer {
	if t, _ := tp.p.Get().(*time.Timer); t != nil {
		t.Reset(d)
		return t
	}

	return time.NewTimer(d)
}
func (tp *timerPool) Put(t *time.Timer) {
	if !t.Stop() {
		select {
		case <-t.C:
		default:
		}
	}

	tp.p.Put(t)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Enhancement idea or proposal
Projects
None yet
Development

No branches or pull requests

2 participants