Skip to content

Commit

Permalink
refactor: augment poolRouter interface (and simpleRouter)
Browse files Browse the repository at this point in the history
This PR extends the poolRouter interface to allow writers to be
registered and removed, and augments the existing simpleRouter to
support the contract.

PR adds a basic test of the router.

A future refactor (when we wire up the new abstractions) will hook
up the functionality properly.

Towards: googleapis#7103
  • Loading branch information
shollyman committed Mar 2, 2023
1 parent fe33c7b commit f5c9a9c
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 3 deletions.
82 changes: 79 additions & 3 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package managedwriter
import (
"context"
"fmt"
"io"
"sync"

"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
Expand Down Expand Up @@ -71,6 +72,20 @@ func (pool *connectionPool) processWrite(pw *pendingWrite) error {
return conn.appendWithRetry(pw)
}

func (pool *connectionPool) addWriter(writer *ManagedStream) error {
if pool.router != nil {
return pool.router.attachWriter(writer)
}
return nil
}

func (pool *connectionPool) disconnectWriter(writer *ManagedStream) error {
if pool.router != nil {
return pool.router.detachWriter(writer)
}
return nil
}

// openWithRetry establishes a new bidi stream and channel pair. It is used by connection objects
// when (re)opening the network connection to the backend.
//
Expand Down Expand Up @@ -151,6 +166,30 @@ func newConnection(ctx context.Context, pool *connectionPool) *connection {
}
}

// close closes a connection.
func (co *connection) close() {
co.mu.Lock()
defer co.mu.Unlock()
// first, cancel the retained context.
if co.cancel != nil {
co.cancel()
co.cancel = nil
}
// close sending if we have an ARC.
if co.arc != nil {
(*co.arc).CloseSend()
co.arc = nil
}
// mark terminal error if not already set.
if co.err != nil {
co.err = io.EOF
}
// signal pending channel close.
if co.pending != nil {
close(co.pending)
}
}

// lockingAppend handles a single append request on a given connection.
func (co *connection) lockingAppend(pw *pendingWrite) error {
// Don't both calling/retrying if this append's context is already expired.
Expand Down Expand Up @@ -327,6 +366,12 @@ func connRecvProcessor(co *connection, arc storagepb.BigQueryWrite_AppendRowsCli
}

type poolRouter interface {
// attachWriter signals the router that a new writer is being used.
attachWriter(writer *ManagedStream) error

// detachWriter signals the router that a writer is going away (being closed).
detachWriter(writer *ManagedStream) error

pickConnection(pw *pendingWrite) (*connection, error)
}

Expand All @@ -336,7 +381,38 @@ type poolRouter interface {
// a connection pool and a connection for its explicit use.
type simpleRouter struct {
pool *connectionPool
conn *connection

mu sync.RWMutex
conn *connection
writers map[string]struct{}
}

func (rtr *simpleRouter) attachWriter(writer *ManagedStream) error {
if writer.id == "" {
return fmt.Errorf("writer has no ID")
}
rtr.mu.Lock()
defer rtr.mu.Unlock()
rtr.writers[writer.id] = struct{}{}
if rtr.conn == nil {
rtr.conn = newConnection(rtr.pool.ctx, rtr.pool)
}
return nil
}

func (rtr *simpleRouter) detachWriter(writer *ManagedStream) error {
if writer.id == "" {
return fmt.Errorf("writer has no ID")
}
rtr.mu.Lock()
defer rtr.mu.Unlock()
delete(rtr.writers, writer.id)
if len(rtr.writers) == 0 && rtr.conn != nil {
// no attached writers, cleanup and remove connection.
defer rtr.conn.close()
rtr.conn = nil
}
return nil
}

func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) {
Expand All @@ -348,7 +424,7 @@ func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) {

func newSimpleRouter(pool *connectionPool) *simpleRouter {
return &simpleRouter{
pool: pool,
conn: newConnection(pool.ctx, pool),
pool: pool,
writers: make(map[string]struct{}),
}
}
37 changes: 37 additions & 0 deletions bigquery/storage/managedwriter/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func TestConnection_OpenWithRetry(t *testing.T) {
},
}
pool.router = newSimpleRouter(pool)
writer := &ManagedStream{id: "foo"}
pool.addWriter(writer)
conn, err := pool.router.pickConnection(nil)
if err != nil {
t.Errorf("case %s, failed to add connection: %v", tc.desc, err)
Expand All @@ -88,3 +90,38 @@ func TestConnection_OpenWithRetry(t *testing.T) {
}
}
}

func TestSimpleRouter(t *testing.T) {

ctx := context.Background()

pool := &connectionPool{
ctx: ctx,
open: func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
return &testAppendRowsClient{}, nil
},
}
// TODO: switch to attach semantics in future PR.
pool.router = newSimpleRouter(pool)
pw := newPendingWrite(ctx, [][]byte{[]byte("foo")})
// picking before attaching should yield error
if _, err := pool.router.pickConnection(pw); err == nil {
t.Errorf("pickConnection: expected error, got success")
}
writer := &ManagedStream{
id: "writer",
pool: pool,
}
if err := pool.addWriter(writer); err != nil {
t.Errorf("addWriter: %v", err)
}
if _, err := pool.router.pickConnection(pw); err != nil {
t.Errorf("pickConnection error: %v", err)
}
if err := pool.disconnectWriter(writer); err != nil {
t.Errorf("disconnectWriter: %v", err)
}
if _, err := pool.router.pickConnection(pw); err == nil {
t.Errorf("pickConnection: expected error, got success")
}
}

0 comments on commit f5c9a9c

Please sign in to comment.