Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking in GoChannel sendMessageToSubscriber #382

Open
qiulin opened this issue Sep 4, 2023 · 0 comments
Open

Blocking in GoChannel sendMessageToSubscriber #382

qiulin opened this issue Sep 4, 2023 · 0 comments

Comments

@qiulin
Copy link

qiulin commented Sep 4, 2023

My use case is reading kafka message using segment-io kafka driver and publishing to watermill using GoChannel. One day I got a kafka topic blocking. After read watermill code, I found it blocked at case <-msgToSend.Acked():, but logs show msg.Ack() is ok, and "Message acked" in handler.handleMessage was printed.

func (s *subscriber) sendMessageToSubscriber(msg *message.Message, logFields watermill.LogFields) {
	s.sending.Lock()
	defer s.sending.Unlock()

	ctx, cancelCtx := context.WithCancel(s.ctx)
	defer cancelCtx()

SendToSubscriber:
	for {
		// copy the message to prevent ack/nack propagation to other consumers
		// also allows to make retries on a fresh copy of the original message
		msgToSend := msg.Copy()
		msgToSend.SetContext(ctx)

		s.logger.Trace("Sending msg to subscriber", logFields)

		if s.closed {
			s.logger.Info("Pub/Sub closed, discarding msg", logFields)
			return
		}

		select {
		case s.outputChannel <- msgToSend:
			s.logger.Trace("Sent message to subscriber", logFields)
		case <-s.closing:
			s.logger.Trace("Closing, message discarded", logFields)
			return
		}

		select {
		case <-msgToSend.Acked():
			s.logger.Trace("Message acked", logFields)
			return
		case <-msgToSend.Nacked():
			s.logger.Trace("Nack received, resending message", logFields)
			continue SendToSubscriber
		case <-s.closing:
			s.logger.Trace("Closing, message discarded", logFields)
			return
		}
	}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant