Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds support to initiate streams upload using tus #1359

Merged
merged 11 commits into from
Aug 29, 2023
3 changes: 3 additions & 0 deletions .changelog/1359.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
streams: adds support to initiate tus upload
```
119 changes: 118 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package cloudflare
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/goccy/go-json"
Expand All @@ -23,6 +26,22 @@ var (
ErrMissingVideoID = errors.New("required video id missing")
// ErrMissingFilePath is for when FilePath is required but missing.
ErrMissingFilePath = errors.New("required file path missing")
// ErrMissingTusResumable is for when TusResumable is required but missing.
ErrMissingTusResumable = errors.New("required tus resumable missing")
// ErrInvalidTusResumable is for when TusResumable is invalid.
ErrInvalidTusResumable = errors.New("invalid tus resumable")
// ErrMarshallingTUSMetadata is for when TUS metadata cannot be marshalled.
ErrMarshallingTUSMetadata = errors.New("error marshalling TUS metadata")
// ErrMissingUploadLength is for when UploadLength is required but missing.
ErrMissingUploadLength = errors.New("required upload length missing")
// ErrInvalidStatusCode is for when the status code is invalid.
ErrInvalidStatusCode = errors.New("invalid status code")
)

type TusProtocolVersion string

const (
TusProtocolVersion1_0_0 TusProtocolVersion = "1.0.0"
)

// StreamVideo represents a stream video.
Expand Down Expand Up @@ -171,6 +190,55 @@ type StreamSignedURLParameters struct {
AccessRules []StreamAccessRule `json:"accessRules,omitempty"`
}

type StreamInitiateTUSUploadParameters struct {
DirectUserUpload bool
TusResumable TusProtocolVersion
UploadLength int64
UploadCreator string
Metadata TUSUploadMetadata
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type StreamInitiateTUSUploadParameters struct {
DirectUserUpload bool
TusResumable TusProtocolVersion
UploadLength int64
UploadCreator string
Metadata TUSUploadMetadata
}
type StreamInitiateTUSUploadParameters struct {
DirectUserUpload bool `url:"direct_user,omitempty"`
TusResumable TusProtocolVersion `url:"-"`
UploadLength int64 `url:"-"`
UploadCreator string `url:"-"`
Metadata TUSUploadMetadata `url:"-"`
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this allows us to use the struct for query parameter building in buildURI instead of manually performing it.


type StreamInitiateTUSUploadResponse struct {
ResponseHeaders http.Header
}

type TUSUploadMetadata struct {
Name string `json:"name,omitempty"`
RequireSignedURLs bool `json:"requiresignedurls,omitempty"`
AllowedOrigins string `json:"allowedorigins,omitempty"`
ThumbnailTimestampPct float64 `json:"thumbnailtimestamppct,omitempty"`
ScheduledDeletion *time.Time `json:"scheduledDeletion,omitempty"`
Watermark string `json:"watermark,omitempty"`
}

func (t TUSUploadMetadata) ToTUSCsv() (string, error) {
var metadataValues []string
if t.Name != "" {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "name", base64.StdEncoding.EncodeToString([]byte(t.Name))))
}
if t.RequireSignedURLs {
metadataValues = append(metadataValues, "requiresignedurls")
}
if t.AllowedOrigins != "" {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "allowedorigins", base64.StdEncoding.EncodeToString([]byte(t.AllowedOrigins))))
}
if t.ThumbnailTimestampPct != 0 {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "thumbnailtimestamppct", base64.StdEncoding.EncodeToString([]byte(strconv.FormatFloat(t.ThumbnailTimestampPct, 'f', -1, 64)))))
}
if t.ScheduledDeletion != nil {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "scheduledDeletion", base64.StdEncoding.EncodeToString([]byte(t.ScheduledDeletion.Format(time.RFC3339)))))
}
if t.Watermark != "" {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "watermark", base64.StdEncoding.EncodeToString([]byte(t.Watermark))))
}

if len(metadataValues) > 0 {
return strings.Join(metadataValues, ","), nil
}

return "", nil
}

// StreamVideoResponse represents an API response of a stream video.
type StreamVideoResponse struct {
Response
Expand Down Expand Up @@ -328,7 +396,56 @@ func (api *API) StreamListVideos(ctx context.Context, params StreamListParameter
return streamListResponse.Result, nil
}

// Skipped: https://api.cloudflare.com/#stream-videos-initiate-a-video-upload-using-tus
// StreamInitiateTUSVideoUpload generates a direct upload TUS url for a video.
//
// API Reference: https://developers.cloudflare.com/api/operations/stream-videos-initiate-video-uploads-using-tus
func (api *API) StreamInitiateTUSVideoUpload(ctx context.Context, rc *ResourceContainer, params StreamInitiateTUSUploadParameters) (StreamInitiateTUSUploadResponse, error) {
if rc.Level != AccountRouteLevel {
return StreamInitiateTUSUploadResponse{}, ErrRequiredAccountLevelResourceContainer
}

headers := http.Header{}
if params.TusResumable == "" {
return StreamInitiateTUSUploadResponse{}, ErrMissingTusResumable
} else if params.TusResumable != TusProtocolVersion1_0_0 {
return StreamInitiateTUSUploadResponse{}, ErrInvalidTusResumable
} else {
headers.Set("Tus-Resumable", string(params.TusResumable))
}

if params.UploadLength == 0 {
return StreamInitiateTUSUploadResponse{}, ErrMissingUploadLength
} else {
headers.Set("Upload-Length", strconv.FormatInt(params.UploadLength, 10))
}

if params.UploadCreator != "" {
headers.Set("Upload-Creator", params.UploadCreator)
}

metadataTusCsv, err := params.Metadata.ToTUSCsv()
if err != nil {
return StreamInitiateTUSUploadResponse{}, ErrMarshallingTUSMetadata
}
if metadataTusCsv != "" {
headers.Set("Upload-Metadata", metadataTusCsv)
}

uri := fmt.Sprintf("/accounts/%s/stream", rc.Identifier)
if params.DirectUserUpload {
uri += "?direct_user=true"
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
uri := fmt.Sprintf("/accounts/%s/stream", rc.Identifier)
if params.DirectUserUpload {
uri += "?direct_user=true"
}
uri := buildURI(fmt.Sprintf("/accounts/%s/stream", rc.Identifier), params)

res, err := api.makeRequestWithAuthTypeAndHeadersComplete(ctx, http.MethodPost, uri, nil, api.authType, headers)
if err != nil {
return StreamInitiateTUSUploadResponse{}, err
}

if res.StatusCode != http.StatusCreated {
return StreamInitiateTUSUploadResponse{}, ErrInvalidStatusCode
}

return StreamInitiateTUSUploadResponse{ResponseHeaders: res.Headers}, nil
}

// StreamGetVideo gets the details for a specific video.
//
Expand Down
95 changes: 95 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,98 @@ func TestStream_CreateSignedURL(t *testing.T) {
assert.Equal(t, want, out, "structs not equal")
}
}

func TestStream_TUSUploadMetadataToTUSCsv(t *testing.T) {
md := TUSUploadMetadata{
Name: "test.mp4",
}
csv, err := md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=", csv)

md.RequireSignedURLs = true
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls", csv)

md.AllowedOrigins = "example.com"
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=", csv)

md.ThumbnailTimestampPct = 0.5
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41", csv)

scheduleDeletion, _ := time.Parse(time.RFC3339, "2023-10-01T02:20:00Z")
md.ScheduledDeletion = &scheduleDeletion
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=", csv)

md.Watermark = "watermark-profile-uid"
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=,watermark d2F0ZXJtYXJrLXByb2ZpbGUtdWlk", csv)

// empty metadata should return empty string
md = TUSUploadMetadata{}
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "", csv)
}

func TestStream_StreamInitiateTUSVideoUpload(t *testing.T) {
setup()
defer teardown()

mux.HandleFunc("/accounts/"+testAccountID+"/stream", func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodPost, r.Method, "Expected method 'POST', got %s", r.Method)
// Make sure Tus-Resumable header is set
assert.Equal(t, "1.0.0", r.Header.Get("Tus-Resumable"))
// Make sure Upload-Length header is set
assert.Equal(t, "123", r.Header.Get("Upload-Length"))
// set the response headers
// if query param direct_user=true, then return the direct url in the header
if r.URL.Query().Get("direct_user") == "true" {
w.Header().Set("Location", "https://upload.videodelivery.net/tus/90c68cb5cd4fd5350b1962279c90bec0?tusv2=true")
} else {
w.Header().Set("Location", "https://production.gateway.api.cloudflare.com/client/v4/accounts/test-account-id/media/278f2a7e763c73dedc064b965d2cfbed?tusv2=true")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
w.Header().Set("Location", "https://production.gateway.api.cloudflare.com/client/v4/accounts/test-account-id/media/278f2a7e763c73dedc064b965d2cfbed?tusv2=true")
w.Header().Set("Location", "https://api.cloudflare.com/client/v4/accounts/"+testAccountID+"/media/278f2a7e763c73dedc064b965d2cfbed?tusv2=true")

}

w.Header().Set("stream-media-id", "278f2a7e763c73dedc064b965d2cfbed")
w.Header().Set("Tus-Resumable", "1.0.0")
w.WriteHeader(http.StatusCreated)
})

// Make sure Tus-Resumable header is set
params := StreamInitiateTUSUploadParameters{}
_, err := client.StreamInitiateTUSVideoUpload(context.Background(), AccountIdentifier(testAccountID), params)
if assert.Error(t, err) {
assert.Equal(t, ErrMissingTusResumable, err)
}
params.TusResumable = TusProtocolVersion1_0_0

// Make sure Upload-Length header is set
_, err = client.StreamInitiateTUSVideoUpload(context.Background(), AccountIdentifier(testAccountID), params)
if assert.Error(t, err) {
assert.Equal(t, ErrMissingUploadLength, err)
}
params.UploadLength = 123

out, err := client.StreamInitiateTUSVideoUpload(context.Background(), AccountIdentifier(testAccountID), params)
if assert.NoError(t, err) {
assert.Equal(t, "https://production.gateway.api.cloudflare.com/client/v4/accounts/test-account-id/media/278f2a7e763c73dedc064b965d2cfbed?tusv2=true", out.ResponseHeaders.Get("Location"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.Equal(t, "https://production.gateway.api.cloudflare.com/client/v4/accounts/test-account-id/media/278f2a7e763c73dedc064b965d2cfbed?tusv2=true", out.ResponseHeaders.Get("Location"))
assert.Equal(t, "https://api.cloudflare.com/client/v4/accounts/"+testAccountID+"/media/278f2a7e763c73dedc064b965d2cfbed?tusv2=true", out.ResponseHeaders.Get("Location"))

assert.Equal(t, "278f2a7e763c73dedc064b965d2cfbed", out.ResponseHeaders.Get("stream-media-id"))
assert.Equal(t, "1.0.0", out.ResponseHeaders.Get("Tus-Resumable"))
}

params.DirectUserUpload = true
out, err = client.StreamInitiateTUSVideoUpload(context.Background(), AccountIdentifier(testAccountID), params)
if assert.NoError(t, err) {
assert.Equal(t, "https://upload.videodelivery.net/tus/90c68cb5cd4fd5350b1962279c90bec0?tusv2=true", out.ResponseHeaders.Get("Location"))
assert.Equal(t, "278f2a7e763c73dedc064b965d2cfbed", out.ResponseHeaders.Get("stream-media-id"))
assert.Equal(t, "1.0.0", out.ResponseHeaders.Get("Tus-Resumable"))
}
}
1 change: 0 additions & 1 deletion workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,6 @@ func TestUploadWorker_UnsafeBinding(t *testing.T) {
assert.Equal(t, "dynamic_dispatch", mpUpload.BindingMeta["b1"]["type"])

w.Header().Set("content-type", "application/json")
fmt.Println(workersScriptResponse(t))
fmt.Fprint(w, workersScriptResponse(t))
}
mux.HandleFunc("/accounts/"+testAccountID+"/workers/scripts/bar", handler)
Expand Down