Skip to content

Commit

Permalink
Add parallel buffered multipart upload
Browse files Browse the repository at this point in the history
Add multipart upload mode that will buffer a number of parts in memory and fill/upload them in parallel.

Use `ConcurrentStreamParts` to enable and `NumThreads` to control the number of buffers. `PartSize` will control the size of each buffer.
  • Loading branch information
klauspost committed Jan 5, 2023
1 parent 4eab739 commit 5fb3b7b
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 4 deletions.
196 changes: 193 additions & 3 deletions api-put-object-streaming.go
Expand Up @@ -28,6 +28,7 @@ import (
"net/url"
"sort"
"strings"
"sync"

"github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/s3utils"
Expand All @@ -44,7 +45,9 @@ import (
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
reader io.Reader, size int64, opts PutObjectOptions,
) (info UploadInfo, err error) {
if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
} else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
} else {
Expand Down Expand Up @@ -385,7 +388,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
// Update progress reader appropriately to the latest offset
// as we read from the source.
hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress)
p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: partSize, sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
return UploadInfo{}, uerr
Expand All @@ -395,7 +398,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
partsInfo[partNumber] = objPart

// Save successfully uploaded size.
totalUploadedSize += partSize
totalUploadedSize += int64(length)
}

// Verify if we uploaded all the data.
Expand Down Expand Up @@ -446,6 +449,193 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
return uploadInfo, nil
}

// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return UploadInfo{}, err
}

if err = s3utils.CheckValidObjectName(objectName); err != nil {
return UploadInfo{}, err
}

if !opts.SendContentMd5 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
}

// Cancel all when an error occurs.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Calculate the optimal parts info for a given size.
partSize := int64(maxPartSize)
if opts.PartSize > 0 {
partSize = int64(opts.PartSize)
}
if err != nil {
return UploadInfo{}, err
}
// Initiates a new multipart request
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return UploadInfo{}, err
}
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")

// Aborts the multipart upload if the function returns
// any error, since we do not resume we should purge
// the parts which have been uploaded to relinquish
// storage space.
defer func() {
if err != nil {
c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
}
}()

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
md5Hash := c.md5Hasher()
defer md5Hash.Close()

// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Create a buffer.
nBuffers := int64(opts.NumThreads)
bufs := make(chan []byte, nBuffers)
all := make([]byte, nBuffers*partSize)
for i := int64(0); i < nBuffers; i++ {
bufs <- all[i*nBuffers : i*nBuffers+nBuffers]
}

// Part number always starts with '1'.
var partNumber int
var wg sync.WaitGroup
errCh := make(chan error, opts.NumThreads)

reader = newHook(reader, opts.Progress)

for partNumber = 1; ; partNumber++ {
// Proceed to upload the part.
var buf []byte
select {
case buf = <-bufs:
case err = <-errCh:
cancel()
wg.Wait()
return UploadInfo{}, err
}

length, rerr := readFull(reader, buf)
if rerr == io.EOF && partNumber > 1 {
// Done
break
}

if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
cancel()
wg.Wait()
return UploadInfo{}, rerr
}

// Calculate md5sum.
customHeader := make(http.Header)
if !opts.SendContentMd5 {
// Add CRC32C instead.
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

wg.Add(1)
go func(partNumber int) {
// Avoid declaring variables in the for loop
var md5Base64 string

if opts.SendContentMd5 {
md5Hash.Reset()
md5Hash.Write(buf[:length])
md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
}

defer wg.Done()
p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: bytes.NewReader(buf[:length]), partNumber: partNumber, md5Base64: md5Base64, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
errCh <- uerr
}

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Send buffer back so it can be reused.
bufs <- buf
}(partNumber)

// Save successfully uploaded size.
totalUploadedSize += int64(length)
}
wg.Wait()

// Collect any error
select {
case err = <-errCh:
return UploadInfo{}, err
default:
}

// Complete multipart upload.
var complMultipartUpload completeMultipartUpload

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
})
}

// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))

opts = PutObjectOptions{}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
}

uploadInfo.Size = totalUploadedSize
return uploadInfo, nil
}

// putObject special function used Google Cloud Storage. This special function
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
Expand Down
10 changes: 9 additions & 1 deletion api-put-object.go
Expand Up @@ -87,7 +87,12 @@ type PutObjectOptions struct {
SendContentMd5 bool
DisableContentSha256 bool
DisableMultipart bool
Internal AdvancedPutOptions

// ConcurrentStreamParts will create NumThreads buffers of PartSize bytes,
// fill them serially and upload them in parallel.
// This can be used for faster uploads on non-seekable or slow-to-seek input.
ConcurrentStreamParts bool
Internal AdvancedPutOptions
}

// getNumThreads - gets the number of threads to be used in the multipart
Expand Down Expand Up @@ -272,6 +277,9 @@ func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName str
if opts.DisableMultipart {
return UploadInfo{}, errors.New("no length provided and multipart disabled")
}
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
return c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
}
return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
}

Expand Down

0 comments on commit 5fb3b7b

Please sign in to comment.