You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
My use case is reading kafka message using segment-io kafka driver and publishing to watermill using GoChannel. One day I got a kafka topic blocking. After read watermill code, I found it blocked at case <-msgToSend.Acked():, but logs show msg.Ack() is ok, and "Message acked" in handler.handleMessage was printed.
func (s*subscriber) sendMessageToSubscriber(msg*message.Message, logFields watermill.LogFields) {
s.sending.Lock()
defers.sending.Unlock()
ctx, cancelCtx:=context.WithCancel(s.ctx)
defercancelCtx()
SendToSubscriber:
for {
// copy the message to prevent ack/nack propagation to other consumers// also allows to make retries on a fresh copy of the original messagemsgToSend:=msg.Copy()
msgToSend.SetContext(ctx)
s.logger.Trace("Sending msg to subscriber", logFields)
ifs.closed {
s.logger.Info("Pub/Sub closed, discarding msg", logFields)
return
}
select {
cases.outputChannel<-msgToSend:
s.logger.Trace("Sent message to subscriber", logFields)
case<-s.closing:
s.logger.Trace("Closing, message discarded", logFields)
return
}
select {
case<-msgToSend.Acked():
s.logger.Trace("Message acked", logFields)
returncase<-msgToSend.Nacked():
s.logger.Trace("Nack received, resending message", logFields)
continue SendToSubscriber
case<-s.closing:
s.logger.Trace("Closing, message discarded", logFields)
return
}
}
}
The text was updated successfully, but these errors were encountered:
My use case is reading kafka message using segment-io kafka driver and publishing to watermill using GoChannel. One day I got a kafka topic blocking. After read watermill code, I found it blocked at
case <-msgToSend.Acked():
, but logs showmsg.Ack()
is ok, and "Message acked" inhandler.handleMessage
was printed.The text was updated successfully, but these errors were encountered: