Skip to content

Commit

Permalink
internal/grpcsync: move CallbackSerializer from xdsclient/internal to…
Browse files Browse the repository at this point in the history
… here (#6153)
  • Loading branch information
my4-dev committed Mar 31, 2023
1 parent c2899dd commit e979919
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@
*
*/

package xdsclient
package grpcsync

import (
"context"

"google.golang.org/grpc/internal/buffer"
)

// callbackSerializer provides a mechanism to schedule callbacks in a
// CallbackSerializer provides a mechanism to schedule callbacks in a
// synchronized manner. It provides a FIFO guarantee on the order of execution
// of scheduled callbacks. New callbacks can be scheduled by invoking the
// Schedule() method.
//
// This type is safe for concurrent access.
type callbackSerializer struct {
type CallbackSerializer struct {
callbacks *buffer.Unbounded
}

// newCallbackSerializer returns a new callbackSerializer instance. The provided
// NewCallbackSerializer returns a new CallbackSerializer instance. The provided
// context will be passed to the scheduled callbacks. Users should cancel the
// provided context to shutdown the callbackSerializer. It is guaranteed that no
// provided context to shutdown the CallbackSerializer. It is guaranteed that no
// callbacks will be executed once this context is canceled.
func newCallbackSerializer(ctx context.Context) *callbackSerializer {
t := &callbackSerializer{callbacks: buffer.NewUnbounded()}
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
t := &CallbackSerializer{callbacks: buffer.NewUnbounded()}
go t.run(ctx)
return t
}
Expand All @@ -48,11 +48,11 @@ func newCallbackSerializer(ctx context.Context) *callbackSerializer {
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (t *callbackSerializer) Schedule(f func(ctx context.Context)) {
func (t *CallbackSerializer) Schedule(f func(ctx context.Context)) {
t.callbacks.Put(f)
}

func (t *callbackSerializer) run(ctx context.Context) {
func (t *CallbackSerializer) run(ctx context.Context) {
for ctx.Err() == nil {
select {
case <-ctx.Done():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package xdsclient
package grpcsync

import (
"context"
Expand All @@ -28,11 +28,16 @@ import (
"github.com/google/go-cmp/cmp"
)

const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)

// TestCallbackSerializer_Schedule_FIFO verifies that callbacks are executed in
// the same order in which they were scheduled.
func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := NewCallbackSerializer(ctx)
defer cancel()

// We have two channels, one to record the order of scheduling, and the
Expand Down Expand Up @@ -100,7 +105,7 @@ func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
// scheduled callbacks get executed.
func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := NewCallbackSerializer(ctx)
defer cancel()

// Schedule callbacks concurrently by calling Schedule() from goroutines.
Expand Down Expand Up @@ -136,7 +141,7 @@ func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
// are not executed once Close() returns.
func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := newCallbackSerializer(ctx)
cs := NewCallbackSerializer(ctx)

// Schedule a callback which blocks until the context passed to it is
// canceled. It also closes a couple of channels to signal that it started
Expand Down
5 changes: 3 additions & 2 deletions xds/internal/xdsclient/authority.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/transport"
Expand Down Expand Up @@ -65,7 +66,7 @@ type authority struct {
serverCfg *bootstrap.ServerConfig // Server config for this authority
bootstrapCfg *bootstrap.Config // Full bootstrap configuration
refCount int // Reference count of watches referring to this authority
serializer *callbackSerializer // Callback serializer for invoking watch callbacks
serializer *grpcsync.CallbackSerializer // Callback serializer for invoking watch callbacks
resourceTypeGetter func(string) xdsresource.Type // ResourceType registry lookup
transport *transport.Transport // Underlying xDS transport to the management server
watchExpiryTimeout time.Duration // Resource watch expiry timeout
Expand Down Expand Up @@ -99,7 +100,7 @@ type authorityArgs struct {
// the second case.
serverCfg *bootstrap.ServerConfig
bootstrapCfg *bootstrap.Config
serializer *callbackSerializer
serializer *grpcsync.CallbackSerializer
resourceTypeGetter func(string) xdsresource.Type
watchExpiryTimeout time.Duration
logger *grpclog.PrefixLogger
Expand Down
3 changes: 2 additions & 1 deletion xds/internal/xdsclient/authority_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/google/uuid"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/testutils"
Expand Down Expand Up @@ -69,7 +70,7 @@ func setupTest(ctx context.Context, t *testing.T, opts e2e.ManagementServerOptio
bootstrapCfg: &bootstrap.Config{
NodeProto: &v3corepb.Node{Id: nodeID},
},
serializer: newCallbackSerializer(ctx),
serializer: grpcsync.NewCallbackSerializer(ctx),
resourceTypeGetter: rtRegistry.get,
watchExpiryTimeout: watchExpiryTimeout,
logger: nil,
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/client_new.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, i
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
serializer: newCallbackSerializer(ctx),
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerClose: cancel,
resourceTypes: newResourceTypeRegistry(),
authorities: make(map[string]*authority),
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/xdsclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type clientImpl struct {
config *bootstrap.Config
logger *grpclog.PrefixLogger
watchExpiryTimeout time.Duration
serializer *callbackSerializer
serializer *grpcsync.CallbackSerializer
serializerClose func()
resourceTypes *resourceTypeRegistry

Expand Down

0 comments on commit e979919

Please sign in to comment.