Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data doesn't get removed from subscribers and subscribersByTopicLock maps (GoChannel pub/sub) #391

Open
avlajcic-axilis opened this issue Sep 13, 2023 · 0 comments

Comments

@avlajcic-axilis
Copy link

avlajcic-axilis commented Sep 13, 2023

Hello,
I've been testing Watermill with one of our services and I've noticed constant memory increase while using it (slight, but still there). After checking code in my service, I went to check Watermill implementation and I might have find an issue with implementation of GoChannel pub/sub.

Explanation

When we create new Sub, new data will be added to subscribers map and subscribersByTopicLock map.
After context is done or pub/sub is closing, subscriber will be removed from subscribers map.
But, only data from that subscriber will be removed from map. If that is last subscriber of that topic, data from g.subsribers[topic], and g.subscribersByTopicLock[topic] won't be removed.

That leads to believe that if we generate new topics over time, maps will still contain data for topics which are no longer used.

Proof

I've added following logs in pubSub.go L214

g.logger.Debug("Removing subscriber", watermill.LogFields{
	"lockMapSize":  g.countLockMapSize(),
	"subSizeTopic": len(g.subscribers[topic]),
	"subSizeMap":   len(g.subscribers),
	"topic":        topic,
})
g.removeSubscriber(topic, s)
g.logger.Debug("Removed subscriber", watermill.LogFields{
	"lockMapSize":  g.countLockMapSize(),
	"subSizeTopic": len(g.subscribers[topic]),
	"subSizeMap":   len(g.subscribers),
	"topic":        topic,
})

And I've run simple test which creates 100 new subscribers with different topics, and closes them after 500ms

func TestSubscribe_clean_map(t *testing.T) {
	subCount := 100
	pubSub := gochannel.NewGoChannel(
		gochannel.Config{OutputChannelBuffer: int64(subCount)},
		watermill.NewStdLogger(true, false),
	)
	topicName := "test_topic"

	done := make(chan bool, subCount)

	for i := 0; i < subCount; i++ {
		ctx, cancel := context.WithCancel(context.Background())
		_, err := pubSub.Subscribe(ctx, topicName+"_index_"+strconv.Itoa(i))
		require.NoError(t, err)
		go func() {
			select {
			case <-time.After(500 * time.Millisecond):
				cancel()
				done <- true
			}
		}()
	}

	for i := 0; i < subCount; i++ {
		<-done
	}
	assert.NoError(t, pubSub.Close())
}

Final debug logs produced from this test:

[watermill] 2023/09/13 15:40:11.697577 pubsub.go:214: 	level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_47 
[watermill] 2023/09/13 15:40:11.697587 pubsub.go:224: 	level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_47 
[watermill] 2023/09/13 15:40:11.697604 pubsub.go:364: 	level=DEBUG msg="GoChannel Pub/Sub Subscriber closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj 
[watermill] 2023/09/13 15:40:11.697615 pubsub.go:214: 	level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_22 
[watermill] 2023/09/13 15:40:11.697630 pubsub.go:224: 	level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_22 
[watermill] 2023/09/13 15:40:11.692512 pubsub.go:364: 	level=DEBUG msg="GoChannel Pub/Sub Subscriber closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj 
[watermill] 2023/09/13 15:40:11.697649 pubsub.go:214: 	level=DEBUG msg="Removing subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=1 topic=test_topic_index_29 
[watermill] 2023/09/13 15:40:11.697660 pubsub.go:224: 	level=DEBUG msg="Removed subscriber" lockMapSize=100 pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj subSizeMap=100 subSizeTopic=0 topic=test_topic_index_29 
[watermill] 2023/09/13 15:40:11.697667 pubsub.go:333: 	level=INFO  msg="Pub/Sub closed" pubsub_uuid=icUNGxf6FMmnVjzpskg9Aj 

As you can see, size of subscriber and subscribersByTopicLock map remained 100 even though all subscribers are closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant