Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat(storage/transfermanager): prototype #9838

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 57 additions & 1 deletion go.work.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,60 @@
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.46.0/go.mod h1:V28hx+cUCZC9e3qcqszMb+Sbt8cQZtHTiXOmyDzoDOg=
github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU=
github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBaRMhvYXJNkGuM=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/aws/aws-sdk-go-v2 v1.16.10/go.mod h1:WTACcleLz6VZTp7fak4EO5b9Q4foxbn+8PIz3PmyKlo=
github.com/aws/aws-sdk-go-v2/config v1.15.9/go.mod h1:rv/l/TbZo67kp99v/3Kb0qV6Fm1KEtKyruEV2GvVfgs=
github.com/aws/aws-sdk-go-v2/credentials v1.12.12/go.mod h1:vFHC2HifIWHebmoVsfpqliKuqbAY2LaVlvy03JzF4c4=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.11/go.mod h1:38Asv/UyQbDNpSXCurZRlDMjzIl6J+wUe8vY3TtUuzA=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.17/go.mod h1:6qtGip7sJEyvgsLjphRZWF9qPe3xJf1mL/MM01E35Wc=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.11/go.mod h1:cYAfnB+9ZkmZWpQWmPDsuIGm4EA+6k2ZVtxKjw/XJBY=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.18/go.mod h1:hTHq8hL4bAxJyng364s9d4IUGXZOs7Y5LSqAhIiIQ2A=
github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.18.3/go.mod h1:BrAJyOMrnwzYVQcP5ziqlCpnEuFfkNppZLzqDyW/YTg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.11/go.mod h1:OEofCUKF7Hri4ShOCokF6k6hGq9PCB2sywt/9rLSXjY=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.15/go.mod h1:dDVD4ElJRTQXx7dOQ59EkqGyNU9tnwy1RKln+oLIOTU=
github.com/aws/aws-sdk-go-v2/service/sts v1.16.12/go.mod h1:b53qpmhHk7mTL2J/tfG6f38neZiyBQSiNXGCuNKq4+4=
github.com/aws/smithy-go v1.12.1/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/chromedp/cdproto v0.0.0-20230802225258-3cf4e6d46a89/go.mod h1:GKljq0VrfU4D5yc+2qA6OVr8pmO/MBbPEWqWQ/oqGEs=
github.com/chromedp/chromedp v0.9.2/go.mod h1:LkSXJKONWTCHAfQasKFUZI+mxqS4tZqhmtGzzhLsnLs=
github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a/go.mod h1:Ro8st/ElPeALwNFlcTpWmkr6IoMFfkjXAvTHpevnDsM=
github.com/fullstorydev/grpcurl v1.8.7/go.mod h1:pVtM4qe3CMoLaIzYS8uvTuDj2jVYmXqMUkZeijnXp/E=
github.com/gliderlabs/ssh v0.3.7/go.mod h1:zpHEXBstFnQYtGnB8k8kQLol82umzn/2/snG7alWVD8=
github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/google/go-jsonnet v0.20.0/go.mod h1:VbgWF9JX7ztlv770x/TolZNGGFfiHEVx9G6ca2eUmeA=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/hoisie/redis v0.0.0-20160730154456-b5c6e81454e0/go.mod h1:pMYMxVaKJqCDC1JUg/XbPJ4/fSazB25zORpFzqsIGIc=
github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20240312041847-bd984b5ce465/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/itchyny/gojq v0.12.9/go.mod h1:T4Ip7AETUXeGpD+436m+UEl3m3tokRgajd5pRfsR5oE=
github.com/itchyny/timefmt-go v0.1.4/go.mod h1:nEP7L+2YmAbT2kZ2HfSs1d8Xtw9LY8D2stDBckWakZ8=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/miekg/dns v1.1.33/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mmcloughlin/avo v0.5.0/go.mod h1:ChHFdoV7ql95Wi7vuq2YT1bwCJqiWdZrQ1im3VujLYM=
github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
go.opentelemetry.io/contrib/detectors/gcp v1.24.0 h1:1Szzq5d735VbnwbEmwPUJ/FIpY9CPM9KYfCRsSNjPgw=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.44.0/go.mod h1:U707O40ee1FpQGyhvqnzmCJm1Wh6OX6GGBVn0E6Uyyk=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v0.44.0/go.mod h1:qcTO4xHAxZLaLxPd60TdE88rxtItPHgHWqOhOGRr0as=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0/go.mod h1:sTt30Evb7hJB/gEk27qLb1+l9n4Tb8HvHkR0Wx3S6CU=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
89 changes: 89 additions & 0 deletions storage/tmsample/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"context"
"log"
"os"
"time"

"cloud.google.com/go/storage"
"cloud.google.com/go/storage/transfermanager"
"google.golang.org/api/iterator"
)

func main() {
ctx := context.Background()

// Pass in any client opts or set retry policy here
client, err := storage.NewClient(ctx) // can also use NewGRPCClient
if err != nil {
// handle error
}

// Create Downloader with desired options, including number of workers,
// part size, per operation timeout, etc.
d, err := transfermanager.NewDownloader(client, transfermanager.WithWorkers(16))
if err != nil {
// handle error
}

// Sharded and/or parallelized download
// Create local file writer for output
f, err := os.Create("/path/to/localfile")
if err != nil {
// handle error
}

// Create download input
in := &transfermanager.DownloadObjectInput{
Bucket: "mybucket",
Source: "myblob",
Destination: f,
// Optionally specify params to apply to download.
EncryptionKey: []byte("mykey"),
}

// Can set timeout on this download using context.
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
defer cancel()

// Add to Downloader
d.DownloadObject(ctx, in)

// Repeat if desired

// Download many files to path
// Create query, using any GCS list objects options
dirIn := &transfermanager.DownloadDirectoryInput{
Bucket: "mybucket",
LocalDirectory: "/path/to/dir",

// Optional filtering within bucket.
Prefix: "objectprefix/",
MatchGlob: "objectprefix/**abc**",
}

d.DownloadDirectory(ctx, dirIn)

// Wait for all downloads to complete.
d.WaitAndClose()

// Iterate through completed downloads and process results. This can
// also happen async in a go routine as the downloads run.
it := d.Results()
for {
out, err := it.Next() // if async, blocks until next result is available.
if err == iterator.Done {
break
}
if err != nil {
log.Fatalf("error getting next result: %v")
}
if out.Err != nil {
log.Printf("download of %v failed with error %v", out.Name, out.Err)
} else {
log.Printf("download of %v succeeded", out.Name)
}
}

}
149 changes: 149 additions & 0 deletions storage/transfermanager/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package transfermanager

import (
"context"
"io"
"time"

"cloud.google.com/go/storage"
)

type TransferManagerOption interface {
apply(*transferManagerConfig)
}

func WithWorkers(numWorkers int) TransferManagerOption {
return &withWorkers{numWorkers: numWorkers}
}

type withWorkers struct {
numWorkers int
}

func (ww withWorkers) apply(tm *transferManagerConfig) {
tm.numWorkers = ww.numWorkers
}

func WithPartSize(partSize int) TransferManagerOption {
return &withPartSize{partSize: partSize}
}

type withPartSize struct {
partSize int
}

func (wps withPartSize) apply(tm *transferManagerConfig) {
tm.partSize = wps.partSize
}

// etc for additional options.

type transferManagerConfig struct {
// Workers in thread pool; default numCPU/2 based on previous benchmarks?
numWorkers int
// Size of shards to transfer; Python found 32 MiB to be good default for
// JSON downloads but gRPC may benefit from larger.
partSize int
// Timeout for a single operation (including all retries).
perOperationTimeout time.Duration

// others, including opts that apply to uploads.
}

// etc

type Downloader struct {
ctx context.Context
BrennaEpp marked this conversation as resolved.
Show resolved Hide resolved
client *storage.Client
config *transferManagerConfig
objectInputs []DownloadObjectInput // Will be sent via a channel to workers
directoryInputs []DownloadDirectoryInput // This does obj listing/os.File calls and then converts to object inputs
output <-chan *DownloadOutput // Channel for completed downloads; used to feed results iterator
done <-chan bool // Used to signal completion of all downloads.
// etc
}

// Create a new Downloader to add operations to.
// Choice of transport, etc is configured on the client that's passed in.
func NewDownloader(c *storage.Client, opts ...TransferManagerOption) (*Downloader, error) {

return nil, nil
}

// Input for a single object to download.
type DownloadObjectInput struct {
// Required fields
Bucket string
Source string
Destination io.WriterAt

// Optional fields
Generation *int64
Conditions *storage.Conditions
EncryptionKey []byte
Offset, Length int64 // if specified, reads only a range.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this will be different than the regular behaviour (I believe normally a length of 0 will work at least for HTTP but will not read the whole object), or do I make these (at least length) pointers as well?

}

// Download a single object. If it's larger than the specified part size,
// the operation will automatically be broken up into multiple range reads.
// This will initiate the download but is non-blocking; wait on Downloader.Output
// to process the results.
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) {
}

type DownloadDirectoryInput struct {
// Required fields
Bucket string
LocalDirectory string

// Optional fields to filter objects in bucket. Selection from storage.Query.
Prefix string
StartOffset string
EndOffset string
IncludeTrailingDelimiter bool // maybe unnecessary?
MatchGlob string

// Maybe some others about local file naming.
}

// Download a set of objects to a local path. Downloader will
// resolve the query and created the needed directory structure locally as the
// operations progress.
// This will initiate the download but is non-blocking; wait on Downloader.Output
// to process the result.
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) {
}

// Waits for all outstanding downloads to complete. Then, closes the Output
// channel.
func (d *Downloader) WaitAndClose() error {
return nil
}

// Results returns the iterator for download outputs.
func (d *Downloader) Results() *DownloadOutputIterator {
return nil
}

// DownloadOutput provides output for a single object download, including all
// errors received while downloading object parts. If the download was successful,
// Attrs will be populated.
type DownloadOutput struct {
Name string // name of object
Err error // error occurring during download. Can use multi-error in Go 1.20+ if multiple failures.
Attrs *storage.ReaderObjectAttrs // Attributes of downloaded object, if successful.
}

// DownloadOutputIterator allows the end user to iterate through completed
// object downloads.
type DownloadOutputIterator struct {
// unexported fields including buffered DownloadOutputs
}

// Use this to iterate through results. When complete, will return error
// iterator.Done.
// DownloadOutputs will be available as the downloads complete; they can
// be iterated through asynchronously or at the end of the job.
func (it *DownloadOutputIterator) Next() (*DownloadOutput, error) {
return nil, nil
}