Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAC not working properly #106

Closed
laststem opened this issue Jul 26, 2022 · 11 comments
Closed

SAC not working properly #106

laststem opened this issue Jul 26, 2022 · 11 comments
Assignees

Comments

@laststem
Copy link

laststem commented Jul 26, 2022

[Issue]

I have 2 single active consumers (A, B).

A first consumer was active, B second was ready.

then, i killed A first active consumer while other process publish 10000 messages.

I was expecting a transition from A to B for ACTIVE consumer.

but it isn't.

[Reproduce]

  1. run rabbitmq server
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmqtest rabbitmq:3.10.6-management
  1. run consumer
func main() {
	conn, err := amqp091.DialConfig("amqp://guest:guest@localhost:5672/", amqp091.Config{
		Heartbeat: time.Second * 30,
	})
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	if err := ch.Qos(200, 0, false); err != nil {
		panic(err)
	}

	queueArgs := make(amqp091.Table)
	queueArgs["x-single-active-consumer"] = true
	_, err = ch.QueueDeclare("queue",
		true,      // durable
		false,     // auto delete
		false,     //exclusive
		false,     //noWait
		queueArgs, // queue args
	)
	if err != nil {
		panic(err)
	}

	msgs, err := ch.Consume("queue", "consumer", false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	d := make(chan bool)
	go func() {
		for msg := range msgs {
			fmt.Println(string(msg.Body))
			_ = msg.Ack(true)
		}
		d <- true
	}()

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
	<-sigs

	if err := ch.Cancel("consumer", false); err != nil {
		panic(err)
	}
	fmt.Println("cancel consume")
	if err := ch.Close(); err != nil {
		panic(err)
	}
	if err := conn.Close(); err != nil {
		panic(err)
	}
	<-d
	fmt.Println("terminate")
}
go run consumer.go (A session)
go run consumer.go (B session)
  1. run producer
func main() {
	conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10000; i++ {
		err := ch.Publish("", "queue", false, false, amqp091.Publishing{
			DeliveryMode: 0,
			ContentType:  "text/plain",
			Body:         []byte(fmt.Sprintf("%d", i)),
		})
		if err != nil {
			panic(err)
		}
	}

	if err := ch.Close(); err != nil {
		panic(err)
	}
	if err := conn.Close(); err != nil {
		panic(err)
	}
	fmt.Println("terminate")
}
  1. check active consumer
root@2d6192fe4fed:/# rabbitmqctl list_consumers
Listing consumers in vhost / ...
queue_name	channel_pid	consumer_tag	ack_required	prefetch_count	active	arguments
queue	<rabbit@2d6192fe4fed.1659508893.895.0>	consumer	true	200	false	[]
queue	<rabbit@2d6192fe4fed.1659508893.880.0>	consumer	true	200	true	[]
  1. run publisher
go run publisher.go
  1. kill Active consumer �when message is received.
    It is important that Active consumer does not receive all messages.
    It should be killed in the middle of receiving messages.
# active consumer log
1
2
3
4
...
^C (it will be print text "terminate")
  1. check active consumer
root@2d6192fe4fed:/# rabbitmqctl list_consumers
Listing consumers in vhost / ..

As a result, I was expecting a transition from A to B , but it didn't.
All consumers are gone.

However, I found error logs of rabbitmq server.
It's too long, so I'm attaching it as a file.
rabbit-error-logs.txt

It doesn't seem to be a problem with amqp091-go.

@Zerpet Zerpet self-assigned this Aug 2, 2022
@Zerpet
Copy link
Contributor

Zerpet commented Aug 2, 2022

Hey there 👋

I can't reproduce this issue with the code you provided. Make sure that the queue is declared with x-single-active-consumer set to true.

I added the following lines to your consumer code to declare the queue. Please note that Channel.QueueDeclare() returns an error if the queue exists and its parameters don't match the queue declaration; it's a good safety check to make sure your queues are as your app expects.

	queueArgs := make(amqp091.Table)
	queueArgs["x-single-active-consumer"] = true

	queue, err := ch.QueueDeclare("queue",
		true, // durable
		false, // auto delete
		false, //exclusive
		false, //noWait
		queueArgs, // queue args
	)
	if err != nil {
		panic(err)
	}

Edit: I can reproduce the issue, see #106 (comment)

@laststem
Copy link
Author

laststem commented Aug 3, 2022

Thanks to reply.

When I try again, I got the same problem.
I edited the description to be more detailed.

@Zerpet
Copy link
Contributor

Zerpet commented Aug 5, 2022

I can confirm that the problem seems to be in RabbitMQ. The queue crashes for some reason, and the consumer does not take over.

Edit: I can reproduce the behaviour of the queue crash when both consumers are attached to the queue before the producer starts publishing. If the producer is already running, there's no queue crash. This is a CQ. Verified this in RabbitMQ 3.10.6, current rabbitmq:3-management in DockerHub.

@laststem
Copy link
Author

laststem commented Aug 8, 2022

Thank you.

Is there any plans to fix it?
I want to use this features in production.

@acogoluegnes
Copy link

@laststem You can try to use a quorum queue (SAC implementation is different between classic queues and quorum queues) and stick to it if you don't reproduce the issue and quorum queue is an acceptable type of queue for your use case.

@acogoluegnes
Copy link

@Zerpet Can you reference or create an issue on the broker repository so we can keep track of this issue? Thanks.

@laststem
Copy link
Author

laststem commented Aug 8, 2022

Thanks to reply. @acogoluegnes

I can't use quorum queue because there is constraint that queue have only one consumer.

currently, we use classic queue with exclusive flag. (1 Queue - 1 Consumer)
but this model can't provide high availability. (if consumer dead?)

if we use quorum queue, it can provide high availability, but doesn't meet constraint. (1 Queue - N Consumer)
if we use classic queue with SAC, it can provide high availability(through change active consumer) and meet constraint. (1 Queue - N Consumer, but only one Active)

@Zerpet
Copy link
Contributor

Zerpet commented Aug 8, 2022

@acogoluegnes done! rabbitmq/rabbitmq-server#5460

@acogoluegnes
Copy link

acogoluegnes commented Aug 8, 2022

@laststem Quorum queues support single active consumer, so you make sure that only 1 consumer consumes at a time. You enable it just like with a classic queue. Sorry I was not clear in my first comment, I meant "try with a quorum queue with single active consumer enabled".

@laststem
Copy link
Author

laststem commented Aug 8, 2022

Good! I will try. thank you @acogoluegnes

@Zerpet
Copy link
Contributor

Zerpet commented Dec 7, 2022

rabbitmq/rabbitmq-server#5460 has been closed and is targetting RabbitMQ server 3.11.5. I'll mark this issue as complete for now. Feel free to re-open if you encounter this issue in 3.11.5+

@Zerpet Zerpet closed this as completed Dec 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants