Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): Client side merics #10046

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
189 changes: 142 additions & 47 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
btopt "cloud.google.com/go/bigtable/internal/option"
"cloud.google.com/go/internal/trace"
gax "github.com/googleapis/gax-go/v2"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
gtransport "google.golang.org/api/transport/grpc"
Expand Down Expand Up @@ -56,6 +57,14 @@ type Client struct {
client btpb.BigtableClient
project, instance string
appProfile string
metricsConfig *metricsConfigInternal
}

// metricsConfig is used to represent user provided config.
// This is not configurable right now and only a default instance is used
type metricsConfig struct {
builtInEnabled bool
meterProviders []*sdkmetric.MeterProvider
}

// ClientConfig has configurations for the client.
Expand Down Expand Up @@ -95,12 +104,19 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
return nil, fmt.Errorf("dialing: %w", err)
}

// Create a OpenTelemetry metrics configuration
metricsConfig, err := newMetricsConfigInternal(ctx, project, instance, nil, opts...)
if err != nil {
return nil, err
}

return &Client{
connPool: connPool,
client: btpb.NewBigtableClient(connPool),
project: project,
instance: instance,
appProfile: config.AppProfile,
connPool: connPool,
client: btpb.NewBigtableClient(connPool),
project: project,
instance: instance,
appProfile: config.AppProfile,
metricsConfig: metricsConfig,
}, nil
}

Expand Down Expand Up @@ -166,19 +182,21 @@ func init() {
}

// Convert error to grpc status error
func convertToGrpcStatusErr(err error) error {
if err != nil {
if errStatus, ok := status.FromError(err); ok {
return status.Error(errStatus.Code(), errStatus.Message())
}
func convertToGrpcStatusErr(err error) (codes.Code, error) {
if err == nil {
return codes.OK, nil
}

ctxStatus := status.FromContextError(err)
if ctxStatus.Code() != codes.Unknown {
return status.Error(ctxStatus.Code(), ctxStatus.Message())
}
if errStatus, ok := status.FromError(err); ok {
return errStatus.Code(), status.Error(errStatus.Code(), errStatus.Message())
}

return err
ctxStatus := status.FromContextError(err)
if ctxStatus.Code() != codes.Unknown {
return ctxStatus.Code(), status.Error(ctxStatus.Code(), ctxStatus.Message())
}

return codes.Unknown, err
}

func (c *Client) fullTableName(table string) string {
Expand Down Expand Up @@ -285,6 +303,10 @@ func (ti *tableImpl) ApplyReadModifyWrite(ctx context.Context, row string, m *Re
return ti.Table.ApplyReadModifyWrite(ctx, row, m)
}

func (ti *tableImpl) getOperationRecorder(ctx context.Context, method string, isStreaming bool) (*builtinMetricsTracer, func()) {
return ti.Table.getOperationRecorder(ctx, method, isStreaming)
}

// TODO(dsymonds): Read method that returns a sequence of ReadItems.

// ReadRows reads rows from a table. f is called for each row.
Expand All @@ -295,13 +317,22 @@ func (ti *tableImpl) ApplyReadModifyWrite(ctx context.Context, row string, m *Re
// By default, the yielded rows will contain all values in all cells.
// Use RowFilter to limit the cells returned.
func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) {
method := "cloud.google.com/go/bigtable.ReadRows"
ctx = mergeOutgoingMetadata(ctx, t.md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows")
ctx = trace.StartSpan(ctx, method)
defer func() { trace.EndSpan(ctx, err) }()

metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()

err = t.readRows(ctx, arg, f, metricsTracer, opts...)
return metricsTracer.recordAndConvertErr(err)
}

func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
var prevRowKey string
attrMap := make(map[string]interface{})
err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err = t.c.metricsConfig.getAttemptRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
req := &btpb.ReadRowsRequest{
AppProfileId: t.c.appProfile,
}
Expand Down Expand Up @@ -340,12 +371,18 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
cr = newChunkReader()
}

*mt.headerMD, err = stream.Header()
if err != nil {
return err
}
for {
res, err := stream.Recv()
if err == io.EOF {
*mt.trailerMD = stream.Trailer()
break
}
if err != nil {
*mt.trailerMD = stream.Trailer()
// Reset arg for next Invoke call.
if arg == nil {
// Should be lowest possible key value, an empty byte array
Expand Down Expand Up @@ -381,6 +418,7 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
cancel()
for {
if _, err := stream.Recv(); err != nil {
*mt.trailerMD = stream.Trailer()
// The stream has ended. We don't return an error
// because the caller has intentionally interrupted the scan.
return nil
Expand All @@ -407,20 +445,23 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
return err
}, retryOptions...)

return convertToGrpcStatusErr(err)
return err
}

// ReadRow is a convenience implementation of a single-row reader.
// A missing row will return nil for both Row and error.
func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) {
var r Row
method := "cloud.google.com/go/bigtable.ReadRow"
metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()

var r Row
opts = append([]ReadOption{LimitRows(1)}, opts...)
err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool {
err := t.readRows(ctx, SingleRow(row), func(rr Row) bool {
r = rr
return true
}, opts...)
return r, err
}, metricsTracer, opts...)
return r, metricsTracer.recordAndConvertErr(err)
}

// decodeFamilyProto adds the cell data from f to the given row.
Expand Down Expand Up @@ -919,10 +960,19 @@ const maxMutations = 100000
// Apply mutates a row atomically. A mutation must contain at least one
// operation and at most 100000 operations.
func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) {
method := "cloud.google.com/go/bigtable/Apply"
ctx = mergeOutgoingMetadata(ctx, t.md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
ctx = trace.StartSpan(ctx, method)
defer func() { trace.EndSpan(ctx, err) }()

metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()

err = t.apply(ctx, metricsTracer, row, m, opts...)
return metricsTracer.recordAndConvertErr(err)
}

func (t *Table) apply(ctx context.Context, mt *builtinMetricsTracer, row string, m *Mutation, opts ...ApplyOption) (err error) {
after := func(res proto.Message) {
for _, o := range opts {
o.after(res)
Expand All @@ -945,15 +995,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
callOptions = retryOptions
}
var res *btpb.MutateRowResponse
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err := t.c.metricsConfig.getAttemptRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
var err error
res, err = t.c.client.MutateRow(ctx, req)
res, err = t.c.client.MutateRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD))
return err
}, callOptions...)
if err == nil {
after(res)
}
return convertToGrpcStatusErr(err)
return err
}

req := &btpb.CheckAndMutateRowRequest{
Expand Down Expand Up @@ -982,15 +1032,15 @@ func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...Appl
callOptions = retryOptions
}
var cmRes *btpb.CheckAndMutateRowResponse
err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err = t.c.metricsConfig.getAttemptRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
var err error
cmRes, err = t.c.client.CheckAndMutateRow(ctx, req)
cmRes, err = t.c.client.CheckAndMutateRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD))
return err
}, callOptions...)
if err == nil {
after(cmRes)
}
return convertToGrpcStatusErr(err)
return err
}

// An ApplyOption is an optional argument to Apply.
Expand Down Expand Up @@ -1118,10 +1168,18 @@ type entryErr struct {
//
// Conditional mutations cannot be applied in bulk and providing one will result in an error.
func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
method := "cloud.google.com/go/bigtable/Apply"
ctx = mergeOutgoingMetadata(ctx, t.md)
ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk")
ctx = trace.StartSpan(ctx, method)
defer func() { trace.EndSpan(ctx, err) }()

metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()
errs, err = t.applyBulk(ctx, metricsTracer, rowKeys, muts, opts...)
return errs, metricsTracer.recordAndConvertErr(err)
}

func (t *Table) applyBulk(ctx context.Context, metricsTracer *builtinMetricsTracer, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
if len(rowKeys) != len(muts) {
return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
}
Expand All @@ -1137,10 +1195,10 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio

for _, group := range groupEntries(origEntries, maxMutations) {
attrMap := make(map[string]interface{})
err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err = t.c.metricsConfig.getAttemptRecorder(ctx, metricsTracer, func(ctx context.Context, _ gax.CallSettings) error {
attrMap["rowCount"] = len(group)
trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
err := t.doApplyBulk(ctx, group, opts...)
err := t.doApplyBulk(ctx, group, metricsTracer, opts...)
if err != nil {
// We want to retry the entire request with the current group
return err
Expand Down Expand Up @@ -1187,7 +1245,7 @@ func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr {
}

// doApplyBulk does the work of a single ApplyBulk invocation
func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error {
func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, mt *builtinMetricsTracer, opts ...ApplyOption) error {
after := func(res proto.Message) {
for _, o := range opts {
o.after(res)
Expand All @@ -1207,16 +1265,20 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...
} else {
req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
}

stream, err := t.c.client.MutateRows(ctx, req)
if err != nil {
return err
}
*mt.headerMD, err = stream.Header()
for {
res, err := stream.Recv()
if err == io.EOF {
*mt.trailerMD = stream.Trailer()
break
}
if err != nil {
*mt.trailerMD = stream.Trailer()
return err
}

Expand Down Expand Up @@ -1288,6 +1350,16 @@ func (ts Timestamp) TruncateToMilliseconds() Timestamp {
// It returns the newly written cells.
func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
ctx = mergeOutgoingMetadata(ctx, t.md)

method := "cloud.google.com/go/bigtable/ApplyReadModifyWrite"
metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()

updatedRow, err := t.applyReadModifyWrite(ctx, metricsTracer, row, m)
return updatedRow, metricsTracer.recordAndConvertErr(err)
}

func (t *Table) applyReadModifyWrite(ctx context.Context, mt *builtinMetricsTracer, row string, m *ReadModifyWrite) (Row, error) {
req := &btpb.ReadModifyWriteRowRequest{
AppProfileId: t.c.appProfile,
RowKey: []byte(row),
Expand All @@ -1298,18 +1370,23 @@ func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadMod
} else {
req.AuthorizedViewName = t.c.fullAuthorizedViewName(t.table, t.authorizedView)
}
res, err := t.c.client.ReadModifyWriteRow(ctx, req)
if err != nil {
return nil, err
}
if res.Row == nil {
return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil")
}
r := make(Row)
for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
decodeFamilyProto(r, row, fam)
}
return r, nil

var r Row
err := t.c.metricsConfig.getAttemptRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
res, err := t.c.client.ReadModifyWriteRow(ctx, req, grpc.Header(mt.headerMD), grpc.Trailer(mt.trailerMD))
if err != nil {
return err
}
if res.Row == nil {
return errors.New("unable to apply ReadModifyWrite: res.Row=nil")
}
r = make(Row)
for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
decodeFamilyProto(r, row, fam)
}
return nil
})
return r, err
}

// ReadModifyWrite represents a set of operations on a single row of a table.
Expand Down Expand Up @@ -1352,9 +1429,19 @@ func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of
// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces.
func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
method := "cloud.google.com/go/bigtable/SampleRowKeys"
ctx = mergeOutgoingMetadata(ctx, t.md)

metricsTracer, opRecorder := t.getOperationRecorder(ctx, method, true)
defer opRecorder()

rowKeys, err := t.sampleRowKeys(ctx, metricsTracer)
return rowKeys, metricsTracer.recordAndConvertErr(err)
}

func (t *Table) sampleRowKeys(ctx context.Context, mt *builtinMetricsTracer) ([]string, error) {
var sampledRowKeys []string
err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
err := t.c.metricsConfig.getAttemptRecorder(ctx, mt, func(ctx context.Context, _ gax.CallSettings) error {
sampledRowKeys = nil
req := &btpb.SampleRowKeysRequest{
AppProfileId: t.c.appProfile,
Expand All @@ -1371,12 +1458,15 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
if err != nil {
return err
}
*mt.headerMD, err = stream.Header()
for {
res, err := stream.Recv()
if err == io.EOF {
*mt.trailerMD = stream.Trailer()
break
}
if err != nil {
*mt.trailerMD = stream.Trailer()
return err
}

Expand All @@ -1389,5 +1479,10 @@ func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
}
return nil
}, retryOptions...)
return sampledRowKeys, convertToGrpcStatusErr(err)

return sampledRowKeys, err
}

func (t *Table) getOperationRecorder(ctx context.Context, method string, isStreaming bool) (*builtinMetricsTracer, func()) {
return t.c.metricsConfig.getOperationRecorder(ctx, t.table, t.c.appProfile, method, isStreaming)
}