Skip to content

Commit

Permalink
feat: implement OpenStream func of OSS artifactdriver. Part of #8489 (#…
Browse files Browse the repository at this point in the history
…12908)

Signed-off-by: AlbeeSo <suyashi1321@163.com>
Co-authored-by: shuangkun <tsk2013uestc@163.com>
  • Loading branch information
AlbeeSo and shuangkun committed May 4, 2024
1 parent b4af68b commit ccb71bd
Showing 1 changed file with 39 additions and 3 deletions.
42 changes: 39 additions & 3 deletions workflow/artifacts/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,45 @@ func (ossDriver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string)
return err
}

func (ossDriver *ArtifactDriver) OpenStream(a *wfv1.Artifact) (io.ReadCloser, error) {
// todo: this is a temporary implementation which loads file to disk first
return common.LoadToStream(a, ossDriver)
// OpenStream opens a stream reader for an artifact from OSS compliant storage
func (ossDriver *ArtifactDriver) OpenStream(inputArtifact *wfv1.Artifact) (io.ReadCloser, error) {
var stream io.ReadCloser
err := waitutil.Backoff(defaultRetry,
func() (bool, error) {
log.Infof("OSS OpenStream, key: %s", inputArtifact.OSS.Key)
osscli, err := ossDriver.newOSSClient()
if err != nil {
return !isTransientOSSErr(err), err
}
bucketName := inputArtifact.OSS.Bucket
err = setBucketLogging(osscli, bucketName)
if err != nil {
return !isTransientOSSErr(err), err
}
bucket, err := osscli.Bucket(bucketName)
if err != nil {
return !isTransientOSSErr(err), err
}
s, origErr := bucket.GetObject(inputArtifact.OSS.Key)
if origErr == nil {
stream = s
return true, nil
}
if !IsOssErrCode(err, "NoSuchKey") {
return !isTransientOSSErr(origErr), fmt.Errorf("failed to get file: %w", origErr)
}
isDir, err := IsOssDirectory(bucket, inputArtifact.OSS.Key)
if err != nil {
return !isTransientOSSErr(err), fmt.Errorf("failed to test if %s/%s is a directory: %w", bucketName, inputArtifact.OSS.Key, err)
}
if !isDir {
return false, origErr
}
// directory case:
// todo: make a .tgz file which can be streamed to user
return false, errors.New(errors.CodeNotImplemented, "Directory Stream capability currently unimplemented for OSS")
})
return stream, err
}

// Save stores an artifact to OSS compliant storage, e.g., uploading a local file to OSS bucket
Expand Down

0 comments on commit ccb71bd

Please sign in to comment.