Skip to content

Commit

Permalink
Merge pull request #1359 from MemReel/feature/stream-video-initiate-t…
Browse files Browse the repository at this point in the history
…us-upload

adds support to initiate streams upload using tus
  • Loading branch information
jacobbednarz committed Aug 29, 2023
2 parents 434a5d0 + 1f7bf6a commit 68ee8c1
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 2 deletions.
3 changes: 3 additions & 0 deletions .changelog/1359.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
streams: adds support to initiate tus upload
```
116 changes: 115 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package cloudflare
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/goccy/go-json"
Expand All @@ -23,6 +26,22 @@ var (
ErrMissingVideoID = errors.New("required video id missing")
// ErrMissingFilePath is for when FilePath is required but missing.
ErrMissingFilePath = errors.New("required file path missing")
// ErrMissingTusResumable is for when TusResumable is required but missing.
ErrMissingTusResumable = errors.New("required tus resumable missing")
// ErrInvalidTusResumable is for when TusResumable is invalid.
ErrInvalidTusResumable = errors.New("invalid tus resumable")
// ErrMarshallingTUSMetadata is for when TUS metadata cannot be marshalled.
ErrMarshallingTUSMetadata = errors.New("error marshalling TUS metadata")
// ErrMissingUploadLength is for when UploadLength is required but missing.
ErrMissingUploadLength = errors.New("required upload length missing")
// ErrInvalidStatusCode is for when the status code is invalid.
ErrInvalidStatusCode = errors.New("invalid status code")
)

type TusProtocolVersion string

const (
TusProtocolVersion1_0_0 TusProtocolVersion = "1.0.0"
)

// StreamVideo represents a stream video.
Expand Down Expand Up @@ -171,6 +190,55 @@ type StreamSignedURLParameters struct {
AccessRules []StreamAccessRule `json:"accessRules,omitempty"`
}

type StreamInitiateTUSUploadParameters struct {
DirectUserUpload bool `url:"direct_user,omitempty"`
TusResumable TusProtocolVersion `url:"-"`
UploadLength int64 `url:"-"`
UploadCreator string `url:"-"`
Metadata TUSUploadMetadata `url:"-"`
}

type StreamInitiateTUSUploadResponse struct {
ResponseHeaders http.Header
}

type TUSUploadMetadata struct {
Name string `json:"name,omitempty"`
RequireSignedURLs bool `json:"requiresignedurls,omitempty"`
AllowedOrigins string `json:"allowedorigins,omitempty"`
ThumbnailTimestampPct float64 `json:"thumbnailtimestamppct,omitempty"`
ScheduledDeletion *time.Time `json:"scheduledDeletion,omitempty"`
Watermark string `json:"watermark,omitempty"`
}

func (t TUSUploadMetadata) ToTUSCsv() (string, error) {
var metadataValues []string
if t.Name != "" {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "name", base64.StdEncoding.EncodeToString([]byte(t.Name))))
}
if t.RequireSignedURLs {
metadataValues = append(metadataValues, "requiresignedurls")
}
if t.AllowedOrigins != "" {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "allowedorigins", base64.StdEncoding.EncodeToString([]byte(t.AllowedOrigins))))
}
if t.ThumbnailTimestampPct != 0 {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "thumbnailtimestamppct", base64.StdEncoding.EncodeToString([]byte(strconv.FormatFloat(t.ThumbnailTimestampPct, 'f', -1, 64)))))
}
if t.ScheduledDeletion != nil {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "scheduledDeletion", base64.StdEncoding.EncodeToString([]byte(t.ScheduledDeletion.Format(time.RFC3339)))))
}
if t.Watermark != "" {
metadataValues = append(metadataValues, fmt.Sprintf("%s %s", "watermark", base64.StdEncoding.EncodeToString([]byte(t.Watermark))))
}

if len(metadataValues) > 0 {
return strings.Join(metadataValues, ","), nil
}

return "", nil
}

// StreamVideoResponse represents an API response of a stream video.
type StreamVideoResponse struct {
Response
Expand Down Expand Up @@ -328,7 +396,53 @@ func (api *API) StreamListVideos(ctx context.Context, params StreamListParameter
return streamListResponse.Result, nil
}

// Skipped: https://api.cloudflare.com/#stream-videos-initiate-a-video-upload-using-tus
// StreamInitiateTUSVideoUpload generates a direct upload TUS url for a video.
//
// API Reference: https://developers.cloudflare.com/api/operations/stream-videos-initiate-video-uploads-using-tus
func (api *API) StreamInitiateTUSVideoUpload(ctx context.Context, rc *ResourceContainer, params StreamInitiateTUSUploadParameters) (StreamInitiateTUSUploadResponse, error) {
if rc.Level != AccountRouteLevel {
return StreamInitiateTUSUploadResponse{}, ErrRequiredAccountLevelResourceContainer
}

headers := http.Header{}
if params.TusResumable == "" {
return StreamInitiateTUSUploadResponse{}, ErrMissingTusResumable
} else if params.TusResumable != TusProtocolVersion1_0_0 {
return StreamInitiateTUSUploadResponse{}, ErrInvalidTusResumable
} else {
headers.Set("Tus-Resumable", string(params.TusResumable))
}

if params.UploadLength == 0 {
return StreamInitiateTUSUploadResponse{}, ErrMissingUploadLength
} else {
headers.Set("Upload-Length", strconv.FormatInt(params.UploadLength, 10))
}

if params.UploadCreator != "" {
headers.Set("Upload-Creator", params.UploadCreator)
}

metadataTusCsv, err := params.Metadata.ToTUSCsv()
if err != nil {
return StreamInitiateTUSUploadResponse{}, ErrMarshallingTUSMetadata
}
if metadataTusCsv != "" {
headers.Set("Upload-Metadata", metadataTusCsv)
}

uri := buildURI(fmt.Sprintf("/accounts/%s/stream", rc.Identifier), params)
res, err := api.makeRequestWithAuthTypeAndHeadersComplete(ctx, http.MethodPost, uri, nil, api.authType, headers)
if err != nil {
return StreamInitiateTUSUploadResponse{}, err
}

if res.StatusCode != http.StatusCreated {
return StreamInitiateTUSUploadResponse{}, ErrInvalidStatusCode
}

return StreamInitiateTUSUploadResponse{ResponseHeaders: res.Headers}, nil
}

// StreamGetVideo gets the details for a specific video.
//
Expand Down
95 changes: 95 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,98 @@ func TestStream_CreateSignedURL(t *testing.T) {
assert.Equal(t, want, out, "structs not equal")
}
}

func TestStream_TUSUploadMetadataToTUSCsv(t *testing.T) {
md := TUSUploadMetadata{
Name: "test.mp4",
}
csv, err := md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=", csv)

md.RequireSignedURLs = true
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls", csv)

md.AllowedOrigins = "example.com"
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=", csv)

md.ThumbnailTimestampPct = 0.5
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41", csv)

scheduleDeletion, _ := time.Parse(time.RFC3339, "2023-10-01T02:20:00Z")
md.ScheduledDeletion = &scheduleDeletion
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=", csv)

md.Watermark = "watermark-profile-uid"
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "name dGVzdC5tcDQ=,requiresignedurls,allowedorigins ZXhhbXBsZS5jb20=,thumbnailtimestamppct MC41,scheduledDeletion MjAyMy0xMC0wMVQwMjoyMDowMFo=,watermark d2F0ZXJtYXJrLXByb2ZpbGUtdWlk", csv)

// empty metadata should return empty string
md = TUSUploadMetadata{}
csv, err = md.ToTUSCsv()
assert.NoError(t, err)
assert.Equal(t, "", csv)
}

func TestStream_StreamInitiateTUSVideoUpload(t *testing.T) {
setup()
defer teardown()

mux.HandleFunc("/accounts/"+testAccountID+"/stream", func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, http.MethodPost, r.Method, "Expected method 'POST', got %s", r.Method)
// Make sure Tus-Resumable header is set
assert.Equal(t, "1.0.0", r.Header.Get("Tus-Resumable"))
// Make sure Upload-Length header is set
assert.Equal(t, "123", r.Header.Get("Upload-Length"))
// set the response headers
// if query param direct_user=true, then return the direct url in the header
if r.URL.Query().Get("direct_user") == "true" {
w.Header().Set("Location", "https://upload.videodelivery.net/tus/90c68cb5cd4fd5350b1962279c90bec0?tusv2=true")
} else {
w.Header().Set("Location", "https://api.cloudflare.com/client/v4/accounts/"+testAccountID+"/media/278f2a7e763c73dedc064b965d2cfbed?tusv2=true")
}

w.Header().Set("stream-media-id", "278f2a7e763c73dedc064b965d2cfbed")
w.Header().Set("Tus-Resumable", "1.0.0")
w.WriteHeader(http.StatusCreated)
})

// Make sure Tus-Resumable header is set
params := StreamInitiateTUSUploadParameters{}
_, err := client.StreamInitiateTUSVideoUpload(context.Background(), AccountIdentifier(testAccountID), params)
if assert.Error(t, err) {
assert.Equal(t, ErrMissingTusResumable, err)
}
params.TusResumable = TusProtocolVersion1_0_0

// Make sure Upload-Length header is set
_, err = client.StreamInitiateTUSVideoUpload(context.Background(), AccountIdentifier(testAccountID), params)
if assert.Error(t, err) {
assert.Equal(t, ErrMissingUploadLength, err)
}
params.UploadLength = 123

out, err := client.StreamInitiateTUSVideoUpload(context.Background(), AccountIdentifier(testAccountID), params)
if assert.NoError(t, err) {
assert.Equal(t, "https://api.cloudflare.com/client/v4/accounts/"+testAccountID+"/media/278f2a7e763c73dedc064b965d2cfbed?tusv2=true", out.ResponseHeaders.Get("Location"))
assert.Equal(t, "278f2a7e763c73dedc064b965d2cfbed", out.ResponseHeaders.Get("stream-media-id"))
assert.Equal(t, "1.0.0", out.ResponseHeaders.Get("Tus-Resumable"))
}

params.DirectUserUpload = true
out, err = client.StreamInitiateTUSVideoUpload(context.Background(), AccountIdentifier(testAccountID), params)
if assert.NoError(t, err) {
assert.Equal(t, "https://upload.videodelivery.net/tus/90c68cb5cd4fd5350b1962279c90bec0?tusv2=true", out.ResponseHeaders.Get("Location"))
assert.Equal(t, "278f2a7e763c73dedc064b965d2cfbed", out.ResponseHeaders.Get("stream-media-id"))
assert.Equal(t, "1.0.0", out.ResponseHeaders.Get("Tus-Resumable"))
}
}
1 change: 0 additions & 1 deletion workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,6 @@ func TestUploadWorker_UnsafeBinding(t *testing.T) {
assert.Equal(t, "dynamic_dispatch", mpUpload.BindingMeta["b1"]["type"])

w.Header().Set("content-type", "application/json")
fmt.Println(workersScriptResponse(t))
fmt.Fprint(w, workersScriptResponse(t))
}
mux.HandleFunc("/accounts/"+testAccountID+"/workers/scripts/bar", handler)
Expand Down

0 comments on commit 68ee8c1

Please sign in to comment.