Skip to content

Commit

Permalink
feat(storage): change gRPC writes to use bi-directional streams (#8930)
Browse files Browse the repository at this point in the history
* feat(storage): change gRPC writes to use bi-direcional streams
  • Loading branch information
BrennaEpp committed Nov 8, 2023
1 parent cc83515 commit 3e23a36
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 82 deletions.
5 changes: 5 additions & 0 deletions storage/client_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"log"
"os"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -790,6 +791,10 @@ func TestOpenReaderEmulated(t *testing.T) {

func TestOpenWriterEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
if strings.Contains(project, "grpc") {
t.Skip("Implementation in testbench pending: https://github.com/googleapis/storage-testbench/issues/568")
}

// Populate test data.
_, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{
Name: bucket,
Expand Down
217 changes: 135 additions & 82 deletions storage/grpc_client.go
Expand Up @@ -1066,7 +1066,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
}
}

o, off, finalized, err := gw.uploadBuffer(recvd, offset, doneReading)
o, off, err := gw.uploadBuffer(recvd, offset, doneReading)
if err != nil {
err = checkCanceled(err)
errorf(err)
Expand All @@ -1085,9 +1085,9 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
progress(offset)
}

// When we are done reading data and the chunk has been finalized,
// we are done.
if doneReading && finalized {
// When we are done reading data without errors, set the object and
// finish.
if doneReading {
// Build Object from server's response.
setObj(newObjectFromProto(o))
return
Expand Down Expand Up @@ -1537,7 +1537,7 @@ type gRPCWriter struct {
chunkSize int

// The gRPC client-stream used for sending buffers.
stream storagepb.Storage_WriteObjectClient
stream storagepb.Storage_BidiWriteObjectClient

// The Resumable Upload ID started by a gRPC-based Writer.
upid string
Expand Down Expand Up @@ -1581,45 +1581,56 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
return persistedSize, err
}

// uploadBuffer opens a Write stream and uploads the buffer at the given offset (if
// uploading a chunk for a resumable uploadBuffer), and will mark the write as
// finished if we are done receiving data from the user. The resulting write
// offset after uploading the buffer is returned, as well as a boolean
// indicating if the Object has been finalized. If it has been finalized, the
// final Object will be returned as well. Finalizing the upload is primarily
// important for Resumable Uploads. A simple or multi-part upload will always
// be finalized once the entire buffer has been written.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, bool, error) {
var err error
var finishWrite bool
var sent, limit int = 0, maxPerMessageWriteSize
// uploadBuffer uploads the buffer at the given offset using a bi-directional
// Write stream. It will open a new stream if necessary (on the first call or
// after resuming from failure). The resulting write offset after uploading the
// buffer is returned, as well as well as the final Object if the upload is
// completed.
//
// Returns object, persisted size, and any error that is not retriable.
func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*storagepb.Object, int64, error) {
var shouldRetry = ShouldRetry
if w.settings.retry != nil && w.settings.retry.shouldRetry != nil {
shouldRetry = w.settings.retry.shouldRetry
}
offset := start

var err error
var lastWriteOfEntireObject bool

sent := 0
writeOffset := start

toWrite := w.buf[:recvd]

// Send a request with as many bytes as possible.
// Loop until all bytes are sent.
for {
// This indicates that this is the last message and the remaining
// data fits in one message.
belowLimit := recvd-sent <= limit
if belowLimit {
limit = recvd - sent
bytesNotYetSent := recvd - sent
remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize

if remainingDataFitsInSingleReq && doneReading {
lastWriteOfEntireObject = true
}
if belowLimit && doneReading {
finishWrite = true

// Send the maximum amount of bytes we can, unless we don't have that many.
bytesToSendInCurrReq := maxPerMessageWriteSize
if remainingDataFitsInSingleReq {
bytesToSendInCurrReq = bytesNotYetSent
}

// Prepare chunk section for upload.
data := toWrite[sent : sent+limit]
req := &storagepb.WriteObjectRequest{
Data: &storagepb.WriteObjectRequest_ChecksummedData{
data := toWrite[sent : sent+bytesToSendInCurrReq]

req := &storagepb.BidiWriteObjectRequest{
Data: &storagepb.BidiWriteObjectRequest_ChecksummedData{
ChecksummedData: &storagepb.ChecksummedData{
Content: data,
},
},
WriteOffset: offset,
FinishWrite: finishWrite,
WriteOffset: writeOffset,
FinishWrite: lastWriteOfEntireObject,
Flush: remainingDataFitsInSingleReq,
StateLookup: remainingDataFitsInSingleReq,
}

// Open a new stream if necessary and set the first_message field on
Expand All @@ -1628,19 +1639,20 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
if w.stream == nil {
hds := []string{"x-goog-request-params", fmt.Sprintf("bucket=projects/_/buckets/%s", url.QueryEscape(w.bucket))}
ctx := gax.InsertMetadataIntoOutgoingContext(w.ctx, hds...)
w.stream, err = w.c.raw.WriteObject(ctx)

w.stream, err = w.c.raw.BidiWriteObject(ctx)
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

if w.upid != "" {
req.FirstMessage = &storagepb.WriteObjectRequest_UploadId{UploadId: w.upid}
} else {
if w.upid != "" { // resumable upload
req.FirstMessage = &storagepb.BidiWriteObjectRequest_UploadId{UploadId: w.upid}
} else { // non-resumable
spec, err := w.writeObjectSpec()
if err != nil {
return nil, 0, false, err
return nil, 0, err
}
req.FirstMessage = &storagepb.WriteObjectRequest_WriteObjectSpec{
req.FirstMessage = &storagepb.BidiWriteObjectRequest_WriteObjectSpec{
WriteObjectSpec: spec,
}
req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey)
Expand All @@ -1650,42 +1662,53 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// on the *last* message of the stream (instead of the first).
req.ObjectChecksums = toProtoChecksums(w.sendCRC32C, w.attrs)
}

}

err = w.stream.Send(req)
if err == io.EOF {
// err was io.EOF. The client-side of a stream only gets an EOF on Send
// when the backend closes the stream and wants to return an error
// status. Closing the stream receives the status as an error.
_, err = w.stream.CloseAndRecv()
// status.

// Receive from the stream Recv() until it returns a non-nil error
// to receive the server's status as an error. We may get multiple
// messages before the error due to buffering.
err = nil
for err == nil {
_, err = w.stream.Recv()
}
// Drop the stream reference as a new one will need to be created if
// we retry.
w.stream = nil

// Drop the stream reference as a new one will need to be created if
// we can retry the upload
w.stream = nil

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
// If not retriable, falling through will return the error received.
if shouldRetry(err) {
sent = 0
finishWrite = false
// TODO: Add test case for failure modes of querying progress.
offset, err = w.determineOffset(start)
if err == nil {
continue
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)

// Continue sending requests, opening a new stream and resending
// any bytes not yet persisted as per QueryWriteStatus
continue
}
}
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

// Update the immediate stream's sent total and the upload offset with
// the data sent.
sent += len(data)
offset += int64(len(data))
writeOffset += int64(len(data))

// Not done sending data, do not attempt to commit it yet, loop around
// and send more data.
Expand All @@ -1694,31 +1717,81 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
}

// The buffer has been uploaded and there is still more data to be
// uploaded, but this is not a resumable upload session. Therefore
// keep the stream open and don't commit yet.
if !finishWrite && w.chunkSize == 0 {
return nil, offset, false, nil
// uploaded, but this is not a resumable upload session. Therefore,
// don't check persisted data.
if !lastWriteOfEntireObject && w.chunkSize == 0 {
return nil, writeOffset, nil
}

// Done sending data. Close the stream to "commit" the data sent.
resp, finalized, err := w.commit()
// Done sending data (remainingDataFitsInSingleReq should == true if we
// reach this code). Receive from the stream to confirm the persisted data.
resp, err := w.stream.Recv()

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if shouldRetry(err) {
sent = 0
finishWrite = false
offset, err = w.determineOffset(start)
if err == nil {
continue
writeOffset, err = w.determineOffset(start)
if err != nil {
return nil, 0, err
}
sent = int(writeOffset) - int(start)

// Drop the stream reference as a new one will need to be created.
w.stream = nil

continue
}
if err != nil {
return nil, 0, false, err
return nil, 0, err
}

return resp.GetResource(), offset, finalized, nil
// Confirm the persisted data if we have not finished uploading the object.
if !lastWriteOfEntireObject {
if resp.GetPersistedSize() != writeOffset {
// Retry if not all bytes were persisted.
writeOffset = resp.GetPersistedSize()
sent = int(writeOffset) - int(start)
continue
}
} else {
// If the object is done uploading, close the send stream to signal
// to the server that we are done sending so that we can receive
// from the stream without blocking.
err = w.stream.CloseSend()
if err != nil {
// CloseSend() retries the send internally. It never returns an
// error in the current implementation, but we check it anyway in
// case that it does in the future.
return nil, 0, err
}

// Stream receives do not block once send is closed, but we may not
// receive the response with the object right away; loop until we
// receive the object or error out.
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
if err != nil {
return nil, 0, err
}

obj = resp.GetResource()
}

// Even though we received the object response, continue reading
// until we receive a non-nil error, to ensure the stream does not
// leak even if the context isn't cancelled. See:
// https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
for err == nil {
_, err = w.stream.Recv()
}

return obj, writeOffset, nil
}

return nil, writeOffset, nil
}
}

Expand All @@ -1738,26 +1811,6 @@ func (w *gRPCWriter) determineOffset(offset int64) (int64, error) {
return offset, nil
}

// commit closes the stream to commit the data sent and potentially receive
// the finalized object if finished uploading. If the last request sent
// indicated that writing was finished, the Object will be finalized and
// returned. If not, then the Object will be nil, and the boolean returned will
// be false.
func (w *gRPCWriter) commit() (*storagepb.WriteObjectResponse, bool, error) {
finalized := true
resp, err := w.stream.CloseAndRecv()
if err == io.EOF {
// Closing a stream for a resumable upload finish_write = false results
// in an EOF which can be ignored, as we aren't done uploading yet.
finalized = false
err = nil
}
// Drop the stream reference as it has been closed.
w.stream = nil

return resp, finalized, err
}

// writeObjectSpec constructs a WriteObjectSpec proto using the Writer's
// ObjectAttrs and applies its Conditions. This is only used for gRPC.
func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) {
Expand Down

0 comments on commit 3e23a36

Please sign in to comment.