-
Notifications
You must be signed in to change notification settings - Fork 246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
configure dispatch for tests #438
Conversation
decc06f
to
7b44cfa
Compare
@@ -20,6 +21,8 @@ type RemoteNowFunction func(context.Context) (datastore.Revision, error) | |||
// RemoteClockRevisions handles revision calculation for datastores that provide | |||
// their own clocks. | |||
type RemoteClockRevisions struct { | |||
sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a mutex here to protect lastQuantizedRevision
access from multiple goroutines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use a CAS?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed an implementation that uses atomic Load/Store, reasoning that we don't really care what the value was before the new one, just that the new one is set (so we don't need the C in CAS). We're singlelfight fetching the new value so there will only ever be one goroutine updating the value at a time already.
I could see an argument for a full CAS from a sanity checking / debug logging perspective, but lmk what you think.
@@ -125,9 +125,12 @@ func NewDispatcher(nsm namespace.Manager, options ...Option) (dispatch.Dispatche | |||
// NewClusterDispatcher takes a caching redispatcher (such as one created by | |||
// NewDispatcher) and returns a cluster dispatcher suitable for use as the | |||
// dispatcher for the dispatch grpc server. | |||
func NewClusterDispatcher(cachingRedispatch dispatch.Dispatcher, nsm namespace.Manager, config *ristretto.Config) (dispatch.Dispatcher, error) { | |||
func NewClusterDispatcher(cachingRedispatch dispatch.Dispatcher, nsm namespace.Manager, prometheusSubsystem string, config *ristretto.Config) (dispatch.Dispatcher, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made the subsystem configurable - these are registered process-globally by prometheus, so if we spin up a bunch of servers in parallel for tests they need to have unique subsystems
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should change to a ClusterDispatcher config struct now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^
dispatchv1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1" | ||
) | ||
|
||
type dispatchServer struct { | ||
dispatchv1.UnimplementedDispatchServiceServer | ||
shared.WithServiceSpecificInterceptors |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We missed dispatch when switching to the service-specific middleware
internal/testserver/cluster.go
Outdated
balancer.Register(hashbalancer.NewConsistentHashringBuilder(xxhash.Sum64, 20, 1)) | ||
|
||
// register a manual resolver that we can feed addresses during tests | ||
resolver.Register(testResolverbuilder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resolver registration is not thread safe, so we register one resolver that handles many prefixes instead of many resolvers that each handles one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's lots of comments on the object that implements it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added more detail here though
|
||
lastQuantizedRevision, err, _ := rcr.updateGroup.Do("", func() (interface{}, error) { | ||
rcr.Lock() | ||
defer rcr.Unlock() | ||
log.Debug().Time("now", localNow).Time("valid", rcr.revisionValidThrough).Msg("computing new revision") | ||
|
||
nowHLC, err := rcr.NowFunc(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean the DB access is under the lock? If so, we can't have that occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refactored so that this is not an issue anymore
@@ -20,6 +21,8 @@ type RemoteNowFunction func(context.Context) (datastore.Revision, error) | |||
// RemoteClockRevisions handles revision calculation for datastores that provide | |||
// their own clocks. | |||
type RemoteClockRevisions struct { | |||
sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use a CAS?
@@ -125,9 +125,12 @@ func NewDispatcher(nsm namespace.Manager, options ...Option) (dispatch.Dispatche | |||
// NewClusterDispatcher takes a caching redispatcher (such as one created by | |||
// NewDispatcher) and returns a cluster dispatcher suitable for use as the | |||
// dispatcher for the dispatch grpc server. | |||
func NewClusterDispatcher(cachingRedispatch dispatch.Dispatcher, nsm namespace.Manager, config *ristretto.Config) (dispatch.Dispatcher, error) { | |||
func NewClusterDispatcher(cachingRedispatch dispatch.Dispatcher, nsm namespace.Manager, prometheusSubsystem string, config *ristretto.Config) (dispatch.Dispatcher, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should change to a ClusterDispatcher config struct now?
// Copyright (c) The go-grpc-middleware Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
// Package logging is a copy of https://github.com/grpc-ecosystem/go-grpc-middleware/tree/v2/providers/zerolog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you file an issue with them as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grpc-ecosystem/go-grpc-middleware#487
Also removed the mutex; made copies of the logger instead
internal/testserver/cluster.go
Outdated
balancer.Register(hashbalancer.NewConsistentHashringBuilder(xxhash.Sum64, 20, 1)) | ||
|
||
// register a manual resolver that we can feed addresses during tests | ||
resolver.Register(testResolverbuilder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment?
internal/testserver/cluster.go
Outdated
|
||
// TestClusterWithDispatch creates a cluster with `size` nodes | ||
// The cluster has a real dispatch stack that uses bufconn grpc connections | ||
func TestClusterWithDispatch(t testing.TB, size int, ds datastore.Datastore) ([]*grpc.ClientConn, func()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a uint for size
internal/testserver/cluster.go
Outdated
addresses := make([]resolver.Address, 0, size) | ||
for i := 0; i < size; i++ { | ||
addresses = append(addresses, resolver.Address{ | ||
Addr: prefix + "_" + strconv.Itoa(i), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fmt.Stringf
pkg/cmd/server/server.go
Outdated
@@ -193,6 +203,7 @@ func (c *Config) Complete() (RunnableServer, error) { | |||
v1SchemaServiceOption, | |||
) | |||
}, | |||
// grpc.NumStreamWorkers(uint32(runtime.NumCPU())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove?
c23bf55
to
1d322c7
Compare
@@ -1,3 +1,4 @@ | |||
// go:build ci |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from the line below it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they're the same, but 1.17 go fmt
enforces both lines during the transition from +build
to go:build
https://go.googlesource.com/proposal/+/master/design/draft-gobuild.md
@@ -2,6 +2,7 @@ package common | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test for this module that is run with the race detector in CI?
@@ -125,9 +125,12 @@ func NewDispatcher(nsm namespace.Manager, options ...Option) (dispatch.Dispatche | |||
// NewClusterDispatcher takes a caching redispatcher (such as one created by | |||
// NewDispatcher) and returns a cluster dispatcher suitable for use as the | |||
// dispatcher for the dispatch grpc server. | |||
func NewClusterDispatcher(cachingRedispatch dispatch.Dispatcher, nsm namespace.Manager, config *ristretto.Config) (dispatch.Dispatcher, error) { | |||
func NewClusterDispatcher(cachingRedispatch dispatch.Dispatcher, nsm namespace.Manager, prometheusSubsystem string, config *ristretto.Config) (dispatch.Dispatcher, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^
729b15c
to
c3272b4
Compare
internal/services/perf_test.go
Outdated
|
||
func TestBurst(t *testing.T) { | ||
blacklist := []string{ | ||
"spanner", // spanner emulator doesn't support parallel transactions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have the ID be pulled from the spanner package or would that create a weird dependency?
} | ||
|
||
// SafeManualResolverBuilder is a resolver builder | ||
type SafeManualResolverBuilder struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what makes it safe and manual?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's similar to the manual resolver builder: https://pkg.go.dev/google.golang.org/grpc@v1.45.0/resolver/manual
but is thread safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add that to the comment?
func NewTestDatastoreBuilder(t testing.TB, engine string) TestDatastoreBuilder { | ||
switch engine { | ||
case "memory": | ||
return TestDatastoreBuilderFunc(func(t testing.TB, initFunc InitFunc) datastore.Datastore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is memory special?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's no external resource (docker image) that the "builder" needs to track, unlike with all of the other datastores.
de4b12e
to
e50dd92
Compare
internal/dispatch/cluster/cluster.go
Outdated
// NewClusterDispatcher takes a caching redispatcher (such as one created by | ||
// combined.NewDispatcher) and returns a cluster dispatcher suitable for use as | ||
// the dispatcher for the dispatch grpc server. | ||
func NewClusterDispatcher(cachingRedispatch dispatch.Dispatcher, nsm namespace.Manager, options ...Option) (dispatch.Dispatcher, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the internal dispatcher be called cachingDispatcher
? It doesn't have to be cached, right?
} | ||
|
||
// SafeManualResolverBuilder is a resolver builder | ||
type SafeManualResolverBuilder struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add that to the comment?
internal/testserver/cluster.go
Outdated
} | ||
|
||
func (b *SafeManualResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { | ||
if target.URL.Scheme != "test" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put into a const?
this adds a test that can fire a large set of requests at the same time to see how spicedb handles the load it also adds a way to start up a spicedb cluster configured with dispatch over bufconn
this adds an abstraction that makes it easy to spin up a container for a backing datastore and generate test datastores for it
internal/dispatch/cluster/cluster.go
Outdated
} | ||
} | ||
|
||
// NewClusterDispatcher takes a caching redispatcher (such as one created by |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This adds a test that can fire a large set of requests at the same time to see how spicedb handles the load (likely should make it a benchmark)
This sets up spicedb with a custom resolver that will return
bufconn
connections to the dispatch api, meaning that we can easily test full cluster behavior in "unit" tests.This also addresses a long-standing comment in the consistency tests (to run with dispatch) and runs the dispatch tests in parallel.