Skip to content

Commit

Permalink
refactor: make newConnection less brittle
Browse files Browse the repository at this point in the history
This PR switches newConnection to a typed argument to make it less prone
to invoke incorrectly.  Raised during a review on a related PR.

Towards: googleapis#7103
  • Loading branch information
shollyman committed Mar 24, 2023
1 parent 439050d commit 82ff546
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
4 changes: 2 additions & 2 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
}
// Resolve behavior for connectionPool interactions. In the multiplex case we use shared pools, in the
// default case we setup a connectionPool per writer, and that pool gets a single connection instance.
mode := ""
mode := simplexConnectionMode
if c.cfg != nil && c.cfg.useMultiplex {
mode = "MULTIPLEX"
mode = multiplexConnectionMode
}
pool, err := c.resolvePool(ctx, writer.streamSettings, streamFunc, newSimpleRouter(mode))
if err != nil {
Expand Down
20 changes: 14 additions & 6 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,15 @@ type connection struct {
pending chan *pendingWrite
}

func newConnection(pool *connectionPool, mode string) *connection {
type connectionMode string

var (
multiplexConnectionMode connectionMode = "MULTIPLEX"
simplexConnectionMode connectionMode = "SIMPLEX"
verboseConnectionMode connectionMode = "VERBOSE"
)

func newConnection(pool *connectionPool, mode connectionMode) *connection {
if pool == nil {
return nil
}
Expand All @@ -214,11 +222,11 @@ func newConnection(pool *connectionPool, mode string) *connection {
}
}

func optimizer(mode string) sendOptimizer {
func optimizer(mode connectionMode) sendOptimizer {
switch mode {
case "MULTIPLEX":
case multiplexConnectionMode:
return &multiplexOptimizer{}
case "VERBOSE":
case verboseConnectionMode:
return &verboseOptimizer{}
default:
return &simplexOptimizer{}
Expand Down Expand Up @@ -458,7 +466,7 @@ type poolRouter interface {
// This router is designed for our migration case, where an single ManagedStream writer has as 1:1 relationship
// with a connectionPool. You can multiplex with this router, but it will never scale beyond a single connection.
type simpleRouter struct {
mode string
mode connectionMode
pool *connectionPool

mu sync.RWMutex
Expand Down Expand Up @@ -522,7 +530,7 @@ func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) {
return nil, fmt.Errorf("no connection available")
}

func newSimpleRouter(mode string) *simpleRouter {
func newSimpleRouter(mode connectionMode) *simpleRouter {
return &simpleRouter{
// We don't add a connection until writers attach.
mode: mode,
Expand Down

0 comments on commit 82ff546

Please sign in to comment.