Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): unload idle partition publishers #7105

Merged
merged 13 commits into from
Dec 9, 2022
103 changes: 92 additions & 11 deletions pubsublite/internal/wire/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ type singlePartitionPublisher struct {
// singlePartitionPublisherFactory creates instances of singlePartitionPublisher
// for given partition numbers.
type singlePartitionPublisherFactory struct {
ctx context.Context
pubClient *vkit.PublisherClient
settings PublishSettings
topicPath string
ctx context.Context
pubClient *vkit.PublisherClient
settings PublishSettings
topicPath string
evictionDelay time.Duration
}

func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPublisher {
Expand Down Expand Up @@ -274,6 +275,85 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() {
}
}

// lazyPartitionPublisher lazily creates an underlying singlePartitionPublisher
// and destroys it after a period of inactivity.
type lazyPartitionPublisher struct {
// Immutable after creation.
pubFactory *singlePartitionPublisherFactory
partition int
idleTimer *streamIdleTimer

// Fields below must be guarded with mu.
publisher *singlePartitionPublisher
outstandingMessages int

compositeService
}

func newLazyPartitionPublisher(partition int, pubFactory *singlePartitionPublisherFactory) *lazyPartitionPublisher {
pub := &lazyPartitionPublisher{
pubFactory: pubFactory,
partition: partition,
}
pub.init()
pub.idleTimer = newStreamIdleTimer(pubFactory.evictionDelay, pub.onIdle)
return pub
}

func (lp *lazyPartitionPublisher) Stop() {
lp.idleTimer.Shutdown()
lp.compositeService.Stop()
}

func (lp *lazyPartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
publisher, err := func() (*singlePartitionPublisher, error) {
lp.mu.Lock()
defer lp.mu.Unlock()

if lp.status >= serviceTerminating {
return nil, ErrServiceStopped
}
if lp.publisher == nil {
lp.publisher = lp.pubFactory.New(lp.partition)
lp.unsafeAddServices(lp.publisher)
}
lp.idleTimer.Stop() // Prevent the underlying publisher from being evicted
lp.outstandingMessages++
return lp.publisher, nil
}()
if err != nil {
onResult(nil, err)
return
}
// Publish without lock held, as the callback may be invoked inline.
publisher.Publish(msg, func(metadata *MessageMetadata, err error) {
lp.onResult()
onResult(metadata, err)
})
}

func (lp *lazyPartitionPublisher) onResult() {
lp.mu.Lock()
defer lp.mu.Unlock()

lp.outstandingMessages--
if lp.outstandingMessages == 0 {
// Schedule the underlying publisher for eviction if no new messages are
// published before the timer expires.
lp.idleTimer.Restart()
}
}

func (lp *lazyPartitionPublisher) onIdle() {
lp.mu.Lock()
defer lp.mu.Unlock()

if lp.outstandingMessages == 0 && lp.publisher != nil {
lp.unsafeRemoveService(lp.publisher)
lp.publisher = nil
}
}

// routingPublisher publishes messages to multiple topic partitions, each
// managed by a singlePartitionPublisher. It supports increasing topic partition
// count, but not decreasing.
Expand All @@ -285,7 +365,7 @@ type routingPublisher struct {

// Fields below must be guarded with mu.
msgRouter messageRouter
publishers []*singlePartitionPublisher
publishers []*lazyPartitionPublisher

compositeService
}
Expand Down Expand Up @@ -319,7 +399,7 @@ func (rp *routingPublisher) onPartitionCountChanged(partitionCount int) {

prevPartitionCount := len(rp.publishers)
for i := prevPartitionCount; i < partitionCount; i++ {
pub := rp.pubFactory.New(i)
pub := newLazyPartitionPublisher(i, rp.pubFactory)
rp.publishers = append(rp.publishers, pub)
rp.unsafeAddServices(pub)
}
Expand All @@ -335,7 +415,7 @@ func (rp *routingPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResul
pub.Publish(msg, onResult)
}

func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePartitionPublisher, error) {
func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*lazyPartitionPublisher, error) {
rp.mu.Lock()
defer rp.mu.Unlock()

Expand Down Expand Up @@ -395,10 +475,11 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa

msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
ctx: ctx,
pubClient: pubClient,
settings: settings,
topicPath: topicPath,
ctx: ctx,
pubClient: pubClient,
settings: settings,
topicPath: topicPath,
evictionDelay: time.Minute * 5,
}
return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil
}