Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): unload idle partition publishers #7105

Merged
merged 13 commits into from
Dec 9, 2022
6 changes: 3 additions & 3 deletions pubsublite/internal/test/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,11 @@ func (sv *streamVerifiers) Push(v *RPCVerifier) {
sv.verifiers.PushBack(v)
}

func (sv *streamVerifiers) Pop() (*RPCVerifier, error) {
func (sv *streamVerifiers) Pop(key string) (*RPCVerifier, error) {
sv.numStreams++
elem := sv.verifiers.Front()
if elem == nil {
sv.t.Errorf("stream(%d): unexpected connection with no verifiers", sv.numStreams)
sv.t.Errorf("unexpected stream index %d for key %s", sv.numStreams, key)
return nil, status.Error(codes.FailedPrecondition, "mockserver: got unexpected stream connection")
}

Expand Down Expand Up @@ -294,7 +294,7 @@ func (kv *keyedStreamVerifiers) Pop(key string) (*RPCVerifier, error) {
if !ok {
return nil, status.Error(codes.FailedPrecondition, "mockserver: unexpected connection with no configured responses")
}
return sv.Pop()
return sv.Pop(key)
}

func (kv *keyedStreamVerifiers) Flush() {
Expand Down
22 changes: 15 additions & 7 deletions pubsublite/internal/wire/publish_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ var errPublishQueueEmpty = errors.New("pubsublite: received publish response fro
// PublishResultFunc receives the result of a publish.
type PublishResultFunc func(*MessageMetadata, error)

type publishResult struct {
Metadata *MessageMetadata
OnResult PublishResultFunc
}

// messageHolder stores a message to be published, with associated metadata.
type messageHolder struct {
msg *pb.PubSubMessage
Expand Down Expand Up @@ -133,26 +138,29 @@ func (b *publishMessageBatcher) AddBatch(batch *publishBatch) {
b.publishQueue.PushBack(batch)
}

func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) error {
func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) ([]*publishResult, error) {
frontElem := b.publishQueue.Front()
if frontElem == nil {
return errPublishQueueEmpty
return nil, errPublishQueueEmpty
}
if firstOffset < b.minExpectedNextOffset {
return fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset)
return nil, fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset)
}

batch, _ := frontElem.Value.(*publishBatch)
var results []*publishResult
for i, msgHolder := range batch.msgHolders {
// Messages are ordered, so the offset of each message is firstOffset + i.
mm := &MessageMetadata{Partition: b.partition, Offset: firstOffset + int64(i)}
msgHolder.onResult(mm, nil)
b.availableBufferBytes += msgHolder.size
results = append(results, &publishResult{
Metadata: &MessageMetadata{Partition: b.partition, Offset: firstOffset + int64(i)},
OnResult: msgHolder.onResult,
})
}

b.availableBufferBytes += batch.totalSize
b.minExpectedNextOffset = firstOffset + int64(len(batch.msgHolders))
b.publishQueue.Remove(frontElem)
return nil
return results, nil
}

func (b *publishMessageBatcher) OnPermanentError(err error) {
Expand Down
41 changes: 26 additions & 15 deletions pubsublite/internal/wire/publish_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) {
batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch)

t.Run("empty in-flight batches", func(t *testing.T) {
if gotErr, wantErr := batcher.OnPublishResponse(0), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) {
_, gotErr := batcher.OnPublishResponse(0)
if wantErr := errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("OnPublishResponse() got err: %v, want err: %v", gotErr, wantErr)
}
})
Expand All @@ -297,30 +298,40 @@ func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) {

// Batch 2
msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
pubResult1 := newTestPublishResultReceiver(t, msg1)
pubResult2 := newTestPublishResultReceiver(t, msg2)
pubResult3 := newTestPublishResultReceiver(t, msg3)

batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1, pubResult1), makeMsgHolder(msg2, pubResult2)))
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3, pubResult3)))
if err := batcher.OnPublishResponse(70); err != nil {
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1), makeMsgHolder(msg2)))
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3)))

got, err := batcher.OnPublishResponse(70)
if err != nil {
t.Errorf("OnPublishResponse() got err: %v", err)
}
if err := batcher.OnPublishResponse(80); err != nil {
t.Errorf("OnPublishResponse() got err: %v", err)
want := []*publishResult{
{Metadata: &MessageMetadata{Partition: partition, Offset: 70}},
{Metadata: &MessageMetadata{Partition: partition, Offset: 71}},
}
if diff := testutil.Diff(got, want); diff != "" {
t.Errorf("Results got: -, want: +\n%s", diff)
}

pubResult1.ValidateResult(partition, 70)
pubResult2.ValidateResult(partition, 71)
pubResult3.ValidateResult(partition, 80)
got, err = batcher.OnPublishResponse(80)
if err != nil {
t.Errorf("OnPublishResponse() got err: %v", err)
}
want = []*publishResult{
{Metadata: &MessageMetadata{Partition: partition, Offset: 80}},
}
if diff := testutil.Diff(got, want); diff != "" {
t.Errorf("Results got: -, want: +\n%s", diff)
}
})

t.Run("inconsistent offset", func(t *testing.T) {
msg := &pb.PubSubMessage{Data: []byte{'4'}}
pubResult := newTestPublishResultReceiver(t, msg)
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg, pubResult)))
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg)))

if gotErr, wantMsg := batcher.OnPublishResponse(80), "inconsistent start offset = 80"; !test.ErrorHasMsg(gotErr, wantMsg) {
_, gotErr := batcher.OnPublishResponse(80)
if wantMsg := "inconsistent start offset = 80"; !test.ErrorHasMsg(gotErr, wantMsg) {
t.Errorf("OnPublishResponse() got err: %v, want err msg: %q", gotErr, wantMsg)
}
})
Expand Down
162 changes: 136 additions & 26 deletions pubsublite/internal/wire/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ type singlePartitionPublisher struct {
// singlePartitionPublisherFactory creates instances of singlePartitionPublisher
// for given partition numbers.
type singlePartitionPublisherFactory struct {
ctx context.Context
pubClient *vkit.PublisherClient
settings PublishSettings
topicPath string
ctx context.Context
pubClient *vkit.PublisherClient
settings PublishSettings
topicPath string
unloadDelay time.Duration
}

func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPublisher {
Expand Down Expand Up @@ -113,9 +114,6 @@ func (pp *singlePartitionPublisher) Stop() {

// Publish a pub/sub message.
func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
pp.mu.Lock()
defer pp.mu.Unlock()

processMessage := func() error {
// Messages are accepted while the service is starting up or active. During
// startup, messages are queued in the batcher and will be published once
Expand All @@ -134,10 +132,17 @@ func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult Publ
return nil
}

pp.mu.Lock()
err := processMessage()
// If the new message cannot be published, flush pending messages and then
// terminate the stream once results are received.
if err := processMessage(); err != nil {
if err != nil {
pp.unsafeInitiateShutdown(serviceTerminating, err)
}
pp.mu.Unlock()

if err != nil {
// Invoke callback without lock held.
onResult(nil, err)
}
}
Expand Down Expand Up @@ -199,24 +204,27 @@ func (pp *singlePartitionPublisher) onNewBatch(batch *publishBatch) {
}

func (pp *singlePartitionPublisher) onResponse(response interface{}) {
pp.mu.Lock()
defer pp.mu.Unlock()

processResponse := func() error {
processResponse := func() ([]*publishResult, error) {
pubResponse, _ := response.(*pb.PublishResponse)
if pubResponse.GetMessageResponse() == nil {
return errInvalidMsgPubResponse
return nil, errInvalidMsgPubResponse
}
firstOffset := pubResponse.GetMessageResponse().GetStartCursor().GetOffset()
if err := pp.batcher.OnPublishResponse(firstOffset); err != nil {
return err
}
pp.unsafeCheckDone()
return nil
return pp.batcher.OnPublishResponse(firstOffset)
}
if err := processResponse(); err != nil {

pp.mu.Lock()
results, err := processResponse()
if err != nil {
pp.unsafeInitiateShutdown(serviceTerminated, err)
}
pp.unsafeCheckDone()
pp.mu.Unlock()

// Invoke callbacks without lock held.
for _, r := range results {
r.OnResult(r.Metadata, nil)
}
}

// unsafeInitiateShutdown must be provided a target serviceStatus, which must be
Expand Down Expand Up @@ -274,6 +282,107 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() {
}
}

// lazyPartitionPublisher lazily creates an underlying singlePartitionPublisher
// and unloads it after a period of inactivity.
type lazyPartitionPublisher struct {
// Immutable after creation.
pubFactory *singlePartitionPublisherFactory
partition int
idleTimer *streamIdleTimer

// Fields below must be guarded with mu.
publisher *singlePartitionPublisher
outstandingMessages int

abstractService
}

func newLazyPartitionPublisher(partition int, pubFactory *singlePartitionPublisherFactory) *lazyPartitionPublisher {
pub := &lazyPartitionPublisher{
pubFactory: pubFactory,
partition: partition,
}
pub.idleTimer = newStreamIdleTimer(pubFactory.unloadDelay, pub.onIdle)
return pub
}

func (lp *lazyPartitionPublisher) Start() {
lp.mu.Lock()
defer lp.mu.Unlock()
lp.unsafeUpdateStatus(serviceActive, nil)
}

func (lp *lazyPartitionPublisher) Stop() {
lp.mu.Lock()
defer lp.mu.Unlock()

lp.idleTimer.Shutdown()
if lp.publisher == nil {
lp.unsafeUpdateStatus(serviceTerminated, nil)
} else if lp.unsafeUpdateStatus(serviceTerminating, nil) {
lp.publisher.Stop()
}
}

func (lp *lazyPartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
publisher, err := func() (*singlePartitionPublisher, error) {
lp.mu.Lock()
defer lp.mu.Unlock()

if lp.status >= serviceTerminating {
return nil, ErrServiceStopped
}
if lp.publisher == nil {
lp.publisher = lp.pubFactory.New(lp.partition)
lp.publisher.AddStatusChangeReceiver(lp.Handle(), lp.onStatusChange)
lp.publisher.Start()
}
lp.idleTimer.Stop() // Prevent the underlying publisher from being unloaded
lp.outstandingMessages++
return lp.publisher, nil
}()
if err != nil {
onResult(nil, err)
return
}
// Publish without lock held, as the callback may be invoked inline.
publisher.Publish(msg, func(metadata *MessageMetadata, err error) {
lp.onResult()
onResult(metadata, err)
})
}

func (lp *lazyPartitionPublisher) onStatusChange(handle serviceHandle, status serviceStatus, err error) {
if status >= serviceTerminating {
lp.mu.Lock()
defer lp.mu.Unlock()
lp.unsafeUpdateStatus(status, err)
}
}

func (lp *lazyPartitionPublisher) onResult() {
lp.mu.Lock()
defer lp.mu.Unlock()

lp.outstandingMessages--
if lp.outstandingMessages == 0 {
// Schedule the underlying publisher for unload if no new messages are
// published before the timer expires.
lp.idleTimer.Restart()
}
}

func (lp *lazyPartitionPublisher) onIdle() {
lp.mu.Lock()
defer lp.mu.Unlock()

if lp.outstandingMessages == 0 && lp.publisher != nil {
lp.publisher.RemoveStatusChangeReceiver(lp.Handle())
lp.publisher.Stop()
lp.publisher = nil
}
}

// routingPublisher publishes messages to multiple topic partitions, each
// managed by a singlePartitionPublisher. It supports increasing topic partition
// count, but not decreasing.
Expand All @@ -285,7 +394,7 @@ type routingPublisher struct {

// Fields below must be guarded with mu.
msgRouter messageRouter
publishers []*singlePartitionPublisher
publishers []*lazyPartitionPublisher

compositeService
}
Expand Down Expand Up @@ -319,7 +428,7 @@ func (rp *routingPublisher) onPartitionCountChanged(partitionCount int) {

prevPartitionCount := len(rp.publishers)
for i := prevPartitionCount; i < partitionCount; i++ {
pub := rp.pubFactory.New(i)
pub := newLazyPartitionPublisher(i, rp.pubFactory)
rp.publishers = append(rp.publishers, pub)
rp.unsafeAddServices(pub)
}
Expand All @@ -335,7 +444,7 @@ func (rp *routingPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResul
pub.Publish(msg, onResult)
}

func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePartitionPublisher, error) {
func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*lazyPartitionPublisher, error) {
rp.mu.Lock()
defer rp.mu.Unlock()

Expand Down Expand Up @@ -395,10 +504,11 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa

msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
ctx: ctx,
pubClient: pubClient,
settings: settings,
topicPath: topicPath,
ctx: ctx,
pubClient: pubClient,
settings: settings,
topicPath: topicPath,
unloadDelay: time.Minute * 5,
}
return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil
}