Skip to content

Commit

Permalink
refactor: expand pool and connection implementations (#7224)
Browse files Browse the repository at this point in the history
This PR includes much of the rewiring of the existing ManagedStream abstraction, but doesn't cut over to the new implemention yet.

We add a reference to the origin writer as part of the pendingWrite which retains information about a single write request and response. This allows us to resolve retry settings for a given write by checking if the writer has a custom retry policy. In other cases, we use the default settings of the connection pool.

We introduce internal UUID identifiers to the core abstractions (pool, connection, writer) so that we can add observability later to see which components are responsible for processing requests.

We remove the notion of adding connections to the connectionpool contract. Instead, we introduce a new interface in the pool called a poolRouter. By interface contract, it's responsible for picking the correct connection for a given write. However, this allows us to abstract away different implementations for pool behavior and make it the responsibility of an individual router.

Further, this PR adds the most simplistic router we'll use for the initial migration to multiplexing (simpleRouter): it supports a single connection, and routes all traffic to it.

This PR also moves over more internal functionality from the ManagedStream, namely appendWithRetry() and lockingAppend(). The implementations still remain on the ManagedStream implementation at this time, we'll remove most of the functionality when we cut over to using pools/connections.

Towards: #7103
  • Loading branch information
shollyman committed Jan 9, 2023
1 parent 7a1022e commit 2b7af68
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 19 deletions.
4 changes: 4 additions & 0 deletions bigquery/storage/managedwriter/appendresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ func (ar *AppendResult) TotalAttempts(ctx context.Context) (int, error) {
// pendingWrite tracks state for a set of rows that are part of a single
// append request.
type pendingWrite struct {
// writer retains a reference to the origin of a pending write. Primary
// used is to inform routing decisions.
writer *ManagedStream

request *storagepb.AppendRowsRequest
// for schema evolution cases, accept a new schema
newSchema *descriptorpb.DescriptorProto
Expand Down
4 changes: 1 addition & 3 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ import (
// does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"

const managedstreamIDPrefix = "managedstream"

// Client is a managed BigQuery Storage write client scoped to a single project.
type Client struct {
rawClient *storage.BigQueryWriteClient
Expand Down Expand Up @@ -109,7 +107,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
ctx, cancel := context.WithCancel(ctx)

ms := &ManagedStream{
id: newUUID(managedstreamIDPrefix),
id: newUUID(writerIDPrefix),
streamSettings: defaultStreamSettings(),
c: c,
ctx: ctx,
Expand Down
185 changes: 170 additions & 15 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,21 @@ package managedwriter

import (
"context"
"fmt"
"sync"

"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

const (
poolIDPrefix string = "connectionpool"
connIDPrefix string = "connection"
writerIDPrefix string = "writer"
)

// connectionPool represents a pooled set of connections.
Expand All @@ -34,10 +42,11 @@ import (
type connectionPool struct {
id string

// the pool retains the long-lived context responsible for opening/maintaining bidi connections.
ctx context.Context
cancel context.CancelFunc
// baseFlowController isn't used directly, but is the prototype used for each connection instance.
baseFlowController *flowController

baseFlowController *flowController // template flow controller used for building connections.

// We centralize the open function on the pool, rather than having an instance of the open func on every
// connection. Opening the connection is a stateless operation.
Expand All @@ -46,21 +55,20 @@ type connectionPool struct {
// We specify one set of calloptions for the pool.
// All connections in the pool open with the same call options.
callOptions []gax.CallOption
}

// addConnection creates an additional connection associated to the connection pool.
func (cp *connectionPool) addConnection() (*connection, error) {
router poolRouter // poolManager makes the decisions about connections and routing.

coCtx, cancel := context.WithCancel(cp.ctx)
conn := &connection{
id: newUUID("connection"),
pool: cp,
fc: copyFlowController(cp.baseFlowController),
ctx: coCtx,
cancel: cancel,
retry *statelessRetryer // default retryer for the pool.
}

// processWrite is responsible for routing a write request to an appropriate connection. It's used by ManagedStream instances
// to send writes without awareness of individual connections.
func (pool *connectionPool) processWrite(pw *pendingWrite) error {
conn, err := pool.router.pickConnection(pw)
if err != nil {
return err
}
// TODO: retain a reference to the connection in the pool registry.
return conn, nil
return conn.appendWithRetry(pw)
}

// openWithRetry establishes a new bidi stream and channel pair. It is used by connection objects
Expand Down Expand Up @@ -96,6 +104,17 @@ func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite
}
}

// returns the stateless default retryer for the pool. If one's not set (re-enqueue retries disabled),
// it returns a retryer that only permits single attempts.
func (cp *connectionPool) defaultRetryer() *statelessRetryer {
if cp.retry != nil {
return cp.retry
}
return &statelessRetryer{
maxAttempts: 1,
}
}

// connection models the underlying AppendRows grpc bidi connection used for writing
// data and receiving acknowledgements. It is responsible for enqueing writes and processing
// responses from the backend.
Expand All @@ -104,16 +123,96 @@ type connection struct {
pool *connectionPool // each connection retains a reference to its owning pool.

fc *flowController // each connection has it's own flow controller.
ctx context.Context // retained context for maintaining the connection.
ctx context.Context // retained context for maintaining the connection, derived from the owning pool.
cancel context.CancelFunc

retry *statelessRetryer

mu sync.Mutex
arc *storagepb.BigQueryWrite_AppendRowsClient // reference to the grpc connection (send, recv, close)
reconnect bool //
err error // terminal connection error
pending chan *pendingWrite
}

func newConnection(ctx context.Context, pool *connectionPool) *connection {
// create and retain a cancellable context.
connCtx, cancel := context.WithCancel(ctx)
fc := newFlowController(0, 0)
if pool != nil {
fc = copyFlowController(pool.baseFlowController)
}
return &connection{
id: newUUID(connIDPrefix),
pool: pool,
fc: fc,
ctx: connCtx,
cancel: cancel,
}
}

// lockingAppend handles a single append request on a given connection.
func (co *connection) lockingAppend(pw *pendingWrite) error {
// Don't both calling/retrying if this append's context is already expired.
if err := pw.reqCtx.Err(); err != nil {
return err
}

var statsOnExit func()

// critical section: Things that need to happen inside the critical section:
//
// * get/open conenction
// * issue the append
// * add the pending write to the channel for the connection (ordering for the response)
co.mu.Lock()
defer func() {
co.mu.Unlock()
if statsOnExit != nil {
statsOnExit()
}
}()

var arc *storagepb.BigQueryWrite_AppendRowsClient
var ch chan *pendingWrite
var err error

arc, ch, err = co.getStream(arc, false)
if err != nil {
return err
}

// TODO: optimization logic here
// Here, we need to compare values for the previous append and compare them to the pending write.
// If they are matches, we can clear fields in the request that aren't needed (e.g. not resending descriptor,
// stream ID, etc.)
//
// Current implementation just clones the request.
req := proto.Clone(pw.request).(*storagepb.AppendRowsRequest)

pw.attemptCount = pw.attemptCount + 1
if err = (*arc).Send(req); err != nil {
if shouldReconnect(err) {
// if we think this connection is unhealthy, force a reconnect on the next send.
co.reconnect = true
}
return err
}

// Compute numRows, once we pass ownership to the channel the request may be
// cleared.
numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows()))
statsOnExit = func() {
// these will get recorded once we exit the critical section.
// TODO: resolve open questions around what labels should be attached (connection, streamID, etc)
recordStat(co.ctx, AppendRequestRows, numRows)
recordStat(co.ctx, AppendRequests, 1)
recordStat(co.ctx, AppendRequestBytes, int64(pw.reqSize))
}
ch <- pw
return nil
}

// getStream returns either a valid ARC client stream or permanent error.
//
// Any calls to getStream should do so in possesion of the critical section lock.
Expand Down Expand Up @@ -148,6 +247,35 @@ func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, f
return co.arc, co.pending, co.err
}

// appendWithRetry handles the details of adding sending an append request on a connection. Retries here address
// problems with sending the request. The processor on the connection is responsible for retrying based on the
// response received from the service.
func (co *connection) appendWithRetry(pw *pendingWrite) error {
appendRetryer := resolveRetry(pw, co.pool)
for {
appendErr := co.lockingAppend(pw)
if appendErr != nil {
// Append yielded an error. Retry by continuing or return.
status := grpcstatus.Convert(appendErr)
if status != nil {
ctx, _ := tag.New(co.ctx, tag.Insert(keyError, status.Code().String()))
recordStat(ctx, AppendRequestErrors, 1)
}
bo, shouldRetry := appendRetryer.Retry(appendErr, pw.attemptCount)
if shouldRetry {
if err := gax.Sleep(co.ctx, bo); err != nil {
return err
}
continue
}
// Mark the pending write done. This will not be returned to the user, they'll receive the returned error.
pw.markDone(nil, appendErr, co.fc)
return appendErr
}
return nil
}
}

// connRecvProcessor is used to propagate append responses back up with the originating write requests. It
// It runs as a goroutine. A connection object allows for reconnection, and each reconnection establishes a new
// processing gorouting and backing channel.
Expand Down Expand Up @@ -197,3 +325,30 @@ func connRecvProcessor(co *connection, arc storagepb.BigQueryWrite_AppendRowsCli
}
}
}

type poolRouter interface {
pickConnection(pw *pendingWrite) (*connection, error)
}

// simpleRouter is a primitive traffic router that routes all traffic to its single connection instance.
//
// This router is appropriate for our migration case, where an single ManagedStream writer implicitly has
// a connection pool and a connection for its explicit use.
type simpleRouter struct {
pool *connectionPool
conn *connection
}

func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) {
if rtr.conn != nil {
return rtr.conn, nil
}
return nil, fmt.Errorf("no connection available")
}

func newSimpleRouter(pool *connectionPool) *simpleRouter {
return &simpleRouter{
pool: pool,
conn: newConnection(pool.ctx, pool),
}
}
3 changes: 2 additions & 1 deletion bigquery/storage/managedwriter/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func TestConnection_OpenWithRetry(t *testing.T) {
return nil, err
},
}
conn, err := pool.addConnection()
pool.router = newSimpleRouter(pool)
conn, err := pool.router.pickConnection(nil)
if err != nil {
t.Errorf("case %s, failed to add connection: %v", tc.desc, err)
}
Expand Down
3 changes: 3 additions & 0 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type ManagedStream struct {
// Unique id for the managedstream instance.
id string

// pool retains a reference to the writer's pool. A writer is only associated to a single pool.
pool *connectionPool

streamSettings *streamSettings
schemaDescriptor *descriptorpb.DescriptorProto
destinationTable string
Expand Down
16 changes: 16 additions & 0 deletions bigquery/storage/managedwriter/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ func newStatelessRetryer() *statelessRetryer {
}
}

// resolveRetry handles fetching an appropriate retry policy. If one isn't found,
// it defaults to a policy that allows a single attempt (no retries).
func resolveRetry(pw *pendingWrite, pool *connectionPool) *statelessRetryer {
if pw != nil {
if pw.writer != nil {
return pw.writer.statelessRetryer()
}
}
if pool != nil {
return pw.writer.pool.defaultRetryer()
}
return &statelessRetryer{
maxAttempts: 1,
}
}

func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration {
jitter := sr.jitter.Nanoseconds()
if jitter > 0 {
Expand Down

0 comments on commit 2b7af68

Please sign in to comment.