Skip to content

Commit

Permalink
[WIP] feat(storage): implement WriteTo in Reader
Browse files Browse the repository at this point in the history
This allows us to write directly into a user application stream,
removing one buffer copy.

WIP, just for testing now.
  • Loading branch information
tritone committed Feb 28, 2024
1 parent d130d86 commit 233ad23
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 1 deletion.
58 changes: 58 additions & 0 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,64 @@ func (r *gRPCReader) Read(p []byte) (int, error) {
return n, nil
}

func (r *gRPCReader) WriteTo(w io.Writer) (int64, error) {
// The entire object has been read by this reader, return EOF.
if r.size == r.seen || r.zeroRange {
return 0, io.EOF
}

// No stream to read from, either never initialized or Close was called.
// Note: There is a potential concurrency issue if multiple routines are
// using the same reader. One encounters an error and the stream is closed
// and then reopened while the other routine attempts to read from it.
if r.stream == nil {
return 0, fmt.Errorf("reader has been closed")
}

// Track bytes written during before call.
var alreadySeen = r.seen

// Write any leftovers to the stream. There will be some leftovers from the
// original NewRangeReader call.
if len(r.leftovers) > 0 {
// Write() will write the entire leftovers slice unless there is an error.
written, err := w.Write(r.leftovers)
r.seen += int64(written)
r.leftovers = nil
if err != nil {
return r.seen - alreadySeen, err
}
}

// Loop and receive additional messages until the entire data is written.
for {
// Attempt to Recv the next message on the stream.
// Will terminate with io.EOF once data has all come through.
msg, err := r.recv()
if err != nil {
if err == io.EOF {
err = nil
}
return r.seen - alreadySeen, err
}

// TODO: Determine if we need to capture incremental CRC32C for this
// chunk. The Object CRC32C checksum is captured when directed to read
// the entire Object. If directed to read a range, we may need to
// calculate the range's checksum for verification if the checksum is
// present in the response here.
// TODO: Figure out if we need to support decompressive transcoding
// https://cloud.google.com/storage/docs/transcoding.
content := msg.GetChecksummedData().GetContent()
written, err := w.Write(content)
r.seen += int64(written)
if err != nil {
return r.seen - alreadySeen, err
}
}

}

// Close cancels the read stream's context in order for it to be closed and
// collected.
func (r *gRPCReader) Close() error {
Expand Down
26 changes: 26 additions & 0 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,32 @@ func (r *httpReader) Close() error {
return r.body.Close()
}

func (r *httpReader) WriteTo(w io.Writer) (int64, error) {
// TODO: implement WriteTo for HTTP reads
return 0, nil
// var n int64 = 0
// w.Write()
// for {
// m, err := r.body.Read(p[n:])
// n += m
// r.seen += int64(m)
// if err == nil || err == io.EOF {
// return n, err
// }
// // Read failed (likely due to connection issues), but we will try to reopen
// // the pipe and continue. Send a ranged read request that takes into account
// // the number of bytes we've already seen.
// res, err := r.reopen(r.seen)
// if err != nil {
// // reopen already retries
// return n, err
// }
// r.body.Close()
// r.body = res.Body
// }
// return n, nil
}

func setRangeReaderHeaders(h http.Header, params *newRangeReaderParams) error {
if params.readCompressed {
h.Set("Accept-Encoding", "gzip")
Expand Down
28 changes: 27 additions & 1 deletion storage/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,15 @@ type Reader struct {
wantCRC uint32 // the CRC32c value the server sent in the header
gotCRC uint32 // running crc

reader io.ReadCloser
reader ReadCloserWriterTo
ctx context.Context
}

type ReadCloserWriterTo interface {

Check failure on line 209 in storage/reader.go

View workflow job for this annotation

GitHub Actions / vet

exported type ReadCloserWriterTo should have comment or be unexported
io.ReadCloser
io.WriterTo
}

// Close closes the Reader. It must be called when done reading.
func (r *Reader) Close() error {
err := r.reader.Close()
Expand Down Expand Up @@ -233,6 +238,27 @@ func (r *Reader) Read(p []byte) (int, error) {
return n, err
}

func (r *Reader) WriteTo(w io.Writer) (int64, error) {

Check failure on line 241 in storage/reader.go

View workflow job for this annotation

GitHub Actions / vet

exported method Reader.WriteTo should have comment or be unexported
n, err := r.reader.WriteTo(w)
if r.remain != -1 {
r.remain -= int64(n)
}
// TODO: figure out how to track CRC with WriteTo
// if r.checkCRC {
// r.gotCRC = crc32.Update(r.gotCRC, crc32cTable, p[:n])
// // Check CRC here. It would be natural to check it in Close, but
// // everybody defers Close on the assumption that it doesn't return
// // anything worth looking at.
// if err == io.EOF {
// if r.gotCRC != r.wantCRC {
// return n, fmt.Errorf("storage: bad CRC on read: got %d, want %d",
// r.gotCRC, r.wantCRC)
// }
// }
// }
return n, err
}

// Size returns the size of the object in bytes.
// The returned value is always the same and is not affected by
// calls to Read or Close.
Expand Down

0 comments on commit 233ad23

Please sign in to comment.