Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/transfermanager): prototype #10045

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
58 changes: 57 additions & 1 deletion go.work.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,60 @@
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.46.0/go.mod h1:V28hx+cUCZC9e3qcqszMb+Sbt8cQZtHTiXOmyDzoDOg=
github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBaRMhvYXJNkGuM=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go-v2 v1.16.10/go.mod h1:WTACcleLz6VZTp7fak4EO5b9Q4foxbn+8PIz3PmyKlo=
github.com/aws/aws-sdk-go-v2/config v1.15.9/go.mod h1:rv/l/TbZo67kp99v/3Kb0qV6Fm1KEtKyruEV2GvVfgs=
github.com/aws/aws-sdk-go-v2/credentials v1.12.12/go.mod h1:vFHC2HifIWHebmoVsfpqliKuqbAY2LaVlvy03JzF4c4=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.11/go.mod h1:38Asv/UyQbDNpSXCurZRlDMjzIl6J+wUe8vY3TtUuzA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.17/go.mod h1:6qtGip7sJEyvgsLjphRZWF9qPe3xJf1mL/MM01E35Wc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.11/go.mod h1:cYAfnB+9ZkmZWpQWmPDsuIGm4EA+6k2ZVtxKjw/XJBY=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.18/go.mod h1:hTHq8hL4bAxJyng364s9d4IUGXZOs7Y5LSqAhIiIQ2A=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.18.3/go.mod h1:BrAJyOMrnwzYVQcP5ziqlCpnEuFfkNppZLzqDyW/YTg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.11/go.mod h1:OEofCUKF7Hri4ShOCokF6k6hGq9PCB2sywt/9rLSXjY=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.15/go.mod h1:dDVD4ElJRTQXx7dOQ59EkqGyNU9tnwy1RKln+oLIOTU=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.12/go.mod h1:b53qpmhHk7mTL2J/tfG6f38neZiyBQSiNXGCuNKq4+4=
github.com/aws/smithy-go v1.12.1/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/chromedp/cdproto v0.0.0-20230802225258-3cf4e6d46a89/go.mod h1:GKljq0VrfU4D5yc+2qA6OVr8pmO/MBbPEWqWQ/oqGEs=
github.com/chromedp/chromedp v0.9.2/go.mod h1:LkSXJKONWTCHAfQasKFUZI+mxqS4tZqhmtGzzhLsnLs=
github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/fullstorydev/grpcurl v1.8.7/go.mod h1:pVtM4qe3CMoLaIzYS8uvTuDj2jVYmXqMUkZeijnXp/E=
github.com/gliderlabs/ssh v0.3.7/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/hoisie/redis v0.0.0-20160730154456-b5c6e81454e0/go.mod h1:pMYMxVaKJqCDC1JUg/XbPJ4/fSazB25zORpFzqsIGIc=
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/itchyny/gojq v0.12.9/go.mod h1:T4Ip7AETUXeGpD+436m+UEl3m3tokRgajd5pRfsR5oE=
github.com/itchyny/timefmt-go v0.1.4/go.mod h1:nEP7L+2YmAbT2kZ2HfSs1d8Xtw9LY8D2stDBckWakZ8=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/miekg/dns v1.1.33/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mmcloughlin/avo v0.5.0/go.mod h1:ChHFdoV7ql95Wi7vuq2YT1bwCJqiWdZrQ1im3VujLYM=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
go.opentelemetry.io/contrib/detectors/gcp v1.24.0 h1:1Szzq5d735VbnwbEmwPUJ/FIpY9CPM9KYfCRsSNjPgw=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0/go.mod h1:U707O40ee1FpQGyhvqnzmCJm1Wh6OX6GGBVn0E6Uyyk=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.44.0/go.mod h1:qcTO4xHAxZLaLxPd60TdE88rxtItPHgHWqOhOGRr0as=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0/go.mod h1:sTt30Evb7hJB/gEk27qLb1+l9n4Tb8HvHkR0Wx3S6CU=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
89 changes: 89 additions & 0 deletions storage/tmsample/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"context"
"log"
"os"
"time"

"cloud.google.com/go/storage"
"cloud.google.com/go/storage/transfermanager"
"google.golang.org/api/iterator"
)

func main() {
ctx := context.Background()

// Pass in any client opts or set retry policy here
client, err := storage.NewClient(ctx) // can also use NewGRPCClient
if err != nil {
// handle error
}

// Create Downloader with desired options, including number of workers,
// part size, per operation timeout, etc.
d, err := transfermanager.NewDownloader(client, transfermanager.WithWorkers(16))
if err != nil {
// handle error
}

// Sharded and/or parallelized download
// Create local file writer for output
f, err := os.Create("/path/to/localfile")
if err != nil {
// handle error
}

// Create download input
in := &transfermanager.DownloadObjectInput{
Bucket: "mybucket",
Source: "myblob",
Destination: f,
// Optionally specify params to apply to download.
EncryptionKey: []byte("mykey"),
}

// Can set timeout on this download using context.
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

// Add to Downloader
d.DownloadObject(ctx, in)

// Repeat if desired

// Download many files to path
// Create query, using any GCS list objects options
dirIn := &transfermanager.DownloadDirectoryInput{
Bucket: "mybucket",
LocalDirectory: "/path/to/dir",

// Optional filtering within bucket.
Prefix: "objectprefix/",
MatchGlob: "objectprefix/**abc**",
}

d.DownloadDirectory(ctx, dirIn)

// Wait for all downloads to complete.
d.WaitAndClose()

// Iterate through completed downloads and process results. This can
// also happen async in a go routine as the downloads run.
it := d.Results()
for {
out, err := it.Next() // if async, blocks until next result is available.
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("error getting next result: %v")
}
if out.Err != nil {
log.Printf("download of %v failed with error %v", out.Name, out.Err)
} else {
log.Printf("download of %v succeeded", out.Name)
}
}

}
243 changes: 243 additions & 0 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transfermanager

import (
"context"
"errors"
"io"
"sync"
"time"

"cloud.google.com/go/storage"
"google.golang.org/api/iterator"
)

// Downloader manages parallel download operations from a Cloud Storage bucket.
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
type Downloader struct {
client *storage.Client
config *transferManagerConfig
work chan *DownloadObjectInput // Piece of work to be executed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this should be send-only and output should be receive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are sending and receiving from both channels in different places in the downloader. Unidirectional channels could be used in subcomponents or if we were providing the channel to the user, but I don't see how we could implement this with unidirectional channels - if we only received from output, who would send us the output (and vice-versa for work)?

output chan *DownloadOutput // Channel for completed downloads; used to feed results iterator.
workers *sync.WaitGroup // Keeps track of the workers that are currently running.
}

// DownloadObject queues the download of a single object. If it's larger than
// the specified part size, the download will automatically be broken up into
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
// multiple range reads. This will initiate the download but is non-blocking;
// call Downloader.Results to process the result.
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) {
input.ctx = ctx
d.work <- input
}

// Download a set of objects to a local path. Downloader will create the needed
// directory structure locally as the operations progress.
// This will initiate the download but is non-blocking; wait on Downloader.Results
// to process the result. Results will be split into individual objects.
// NOTE: Do not use, DownloadDirectory is not implemented.
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
d.output <- &DownloadOutput{Bucket: input.Bucket, Err: errors.New("DownloadDirectory is not implemented")}
// This does obj listing/os.File calls and then converts to object inputs.
}

// Waits for all outstanding downloads to complete. The Downloader must not be
// used to download more objects or directories after this has been called.
func (d *Downloader) WaitAndClose() error {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
close(d.work)
d.workers.Wait()
close(d.output)
return nil
}

// Results returns the iterator for download outputs.
func (d *Downloader) Results() *DownloadOutputIterator {
return &DownloadOutputIterator{
output: d.output,
}
}

// downloadWorker continuously processes downloads until the work channel is closed.
func (d *Downloader) downloadWorker() {
d.workers.Add(1)
for {
input, ok := <-d.work
if !ok {
break // no more work; exit
}

// TODO: break down the input into smaller pieces if necessary; maybe as follows:
// Only request partSize data to begin with. If no error and we haven't finished
// reading the object, enqueue the remaining pieces of work and mark in the
// out var the amount of shards to wait for.
d.output <- input.downloadShard(d.client, d.config.perOperationTimeout)
}
d.workers.Done()
}

// NewDownloader creates a new Downloader to add operations to.
// Choice of transport, etc is configured on the client that's passed in.
func NewDownloader(c *storage.Client, opts ...TransferManagerOption) (*Downloader, error) {
const (
chanBufferSize = 1000 // how big is it reasonable to make this?
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
// We should probably expose this as max concurrent ops, because programs can deadlock if calling d.Waitclose before processing it.Next
)

d := &Downloader{
client: c,
config: initTransferManagerConfig(opts...),
output: make(chan *DownloadOutput, chanBufferSize),
work: make(chan *DownloadObjectInput, chanBufferSize),
workers: &sync.WaitGroup{},
}

// Start workers in background.
for i := 0; i < d.config.numWorkers; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably we could optimize this by spinning up workers as needed when there are objects enqueued? Doesn't have to be in this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, though I'm not sure how much that would optimize this by... I guess it depends on the num of workers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah something we can test out later.

go d.downloadWorker()
}

return d, nil
}

// DownloadRange specifies the object range.
type DownloadRange struct {
// Offset is the starting offset (inclusive) from with the object is read.
// If offset is negative, the object is read abs(offset) bytes from the end,
// and length must also be negative to indicate all remaining bytes will be read.
Offset int64
// Length is the number of bytes to read.
// If length is negative or larger than the object size, the object is read
// until the end.
Length int64
}

// DownloadObjectInput is the input for a single object to download.
type DownloadObjectInput struct {
// Required fields
Bucket string
Object string
Destination io.WriterAt

// Optional fields
Generation *int64
Conditions *storage.Conditions
EncryptionKey []byte
Range *DownloadRange // if specified, reads only a range

ctx context.Context
}

// downloadShard will read a specific object into in.Destination.
// TODO: download a single shard instead of the entire object.
func (in *DownloadObjectInput) downloadShard(client *storage.Client, timeout time.Duration) (out *DownloadOutput) {
out = &DownloadOutput{Bucket: in.Bucket, Object: in.Object}

// Set timeout.
ctx := in.ctx
if timeout > 0 {
c, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ctx = c
}

// Set options on the object.
o := client.Bucket(in.Bucket).Object(in.Object)

if in.Conditions != nil {
o.If(*in.Conditions)
}
if in.Generation != nil {
o = o.Generation(*in.Generation)
}
if len(in.EncryptionKey) > 0 {
o = o.Key(in.EncryptionKey)
}

var offset, length int64 = 0, -1 // get the entire object by default

if in.Range != nil {
offset, length = in.Range.Offset, in.Range.Length
}

// Read.
r, err := o.NewRangeReader(ctx, offset, length)
if err != nil {
out.Err = err
return
}

// TODO: write at a specific offset.
off := io.NewOffsetWriter(in.Destination, 0)
_, err = io.Copy(off, r)
if err != nil {
out.Err = err
r.Close()
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to look into whether we should be attempting to close the writer if possible -- seems annoying for end users to have to clean up. Though, I guess we don't guarantee that a close method exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the writer provided through in.Destination? The io.WriterAt interface and for that matter io. OffsetWriter do not have a Close method, as you say. In any case, I feel like it'd be bad practice to close a writer we don't own?


if err = r.Close(); err != nil {
out.Err = err
return
}

out.Attrs = &r.Attrs
return
}

type DownloadDirectoryInput struct {
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
// Required fields
Bucket string
LocalDirectory string

// Optional fields to filter objects in bucket. Selection from storage.Query.
Prefix string
StartOffset string
EndOffset string
IncludeTrailingDelimiter bool // maybe unnecessary?
MatchGlob string

// Maybe some others about local file naming.
}

// DownloadOutput provides output for a single object download, including all
// errors received while downloading object parts. If the download was successful,
// Attrs will be populated.
type DownloadOutput struct {
Bucket string
Object string
Err error // error occurring during download. Can use multi-error in Go 1.20+ if multiple failures.
Attrs *storage.ReaderObjectAttrs // Attributes of downloaded object, if successful.
}

// DownloadOutputIterator allows the end user to iterate through completed
// object downloads.
type DownloadOutputIterator struct {
output <-chan *DownloadOutput
}

// Next iterates through results. When complete, will return the iterator.Done
// error. It is considered complete once WaitAndClose() has been called on the
// Downloader.
// DownloadOutputs will be available as the downloads complete; they can
// be iterated through asynchronously or at the end of the job.
// Next will block if there are no more completed downloads (and the Downloader
// is not closed).
func (it *DownloadOutputIterator) Next() (*DownloadOutput, error) {
out, ok := <-it.output
if !ok {
return nil, iterator.Done
}
return out, nil
}