Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cloud): Adding an upload proxy node #103

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

mariocynicys
Copy link
Collaborator

@mariocynicys mariocynicys commented Nov 12, 2021

This PR adds an upload proxy node. A state aware node that handles the work of uploading content to a remote location on behalf of Shaka Packager. This PR adds no new dependencies.

  • The base node of the upload proxy is HTTPUpload which runs as a python thread and manages an HTTPServer that Shaka Packager should send all its requests to.
  • The internal HTTPServer instantiates request handlers for requests coming from Shaka Packager, these request handlers send the incoming requests to the upload location, and they might add more headers to the request.
  • Subclasses of HTTPUpload are there to support different protocols -other than http/https- for cloud providers and to extend the HTTPUpload's capabilities, but they should all use an http/https API after processing the upload url.
  • Manifest files are temporarily stored in the filesystem in case of multi-period concatenation.
  • RequestBodyAsFileIO is a class that digests a request body incrementally without writing it to the disk. It is used by the request handlers to send the request body that lacks an EOF to the upload location.

Issues:

  • The GCS upload doesn't authenticate by itself and needs a refresh token from google's oauth 2.0 playground (but the implementation is simpler this way).
  • S3 upload doesn't yet work because I haven't found google's oauth 2.0 playground counterpart for AWS yet (If it exists).
  • This feature crashes on Windows (the upload node crashes) because of the presence of backslashes in the url(path) sent to the upload proxy, this happens when Shaka Packager produces files of the pattern somepath\stream_xx.m3u8 which we should be able to control using the argument --playlist_name but for some reason Shaka Packager doesn't recognize it.
  • The node doesn't yet handle redirects.

- integrated the proxy node with the rest of the project
- done some refactoring
- got rid of the connection pool, as it didn't work well with https
This commit brings support for GCS uploads. Urls using the gs protocol will be handled by this class (`GCSUpload`).

`GCSUpload` class uses the refresh token passed in the extra headers to periodically refresh the short-living access token. At the moment, the user needs to provide client id and client secret along with the refresh token, creating a client id and secret is done from google cloud console, and an oauth consent screen also is needed with a write access to the bucket, a refresh token can then be obtained from google developers oauth2.0 playground. These details should probably be documented.
TODO: we need to accept the extra headers from the command line in a more convenient way, like curl for instance.
lets call this a working s3 upload functionality, this is not tested and i dont know the steps to get a client id and an access token as in GCS. This was written relying on amazon's docs and needs to be tested
Some comments and edits, also changed the way we accept headers from the command line.  Since I now have a functioning S3 bucket, i tested uploading to it but got an error about the transfer encoding: chunked that is used in the headers, will investigate this further
This fixes the crash of the upload node on windows for the master hls and the dash manifest, as they are now using forward slashes when the outputlocation is a url, this would still crash on windows because we can not control the media playlists' names and they are also using bashslashes for path joining on windows which would break the HTTPServer listening to Shaka Packager. the argument --playlist_name should fix this but Shaka Packager doesn't recognize it :/
Copy link
Member

@joeyparrish joeyparrish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to play with this locally for a bit. I might be able to make some concrete suggestions or contribute to your branch.

@@ -320,6 +320,11 @@ class PipelineConfig(configuration.Base):
default=EncryptionConfig({})).cast()
"""Encryption settings."""

use_local_proxy = configuration.Field(bool, default=True).cast()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should do without this by detecting the cloud provider from the output URL.

Alternately, we could make it a command-line argument, parallel to the output URL. It is weird to have one part of the cloud config on the command-line, and another necessary part in the pipeline config.

encode_chunked = True
else:
content_length = content_length and int(content_length)
body = RequestBodyAsFileIO(self.rfile, content_length)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that any of this is needed: either the file IO wrapper or the temporary writing of chunks to the FS. Also, writing chunks to the FS means the output is no longer being streamed to chunk-capable clouds like Amazon's. Ideally, we would not buffer a whole segment before transmitting it.

I'm going to experiment with this a bit locally and see what I can do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In here we will write only the manifests to the FS so we can perform period concatenation on them (note: the period concatenation is carried out by the streamer and not packager, if the manifests are pushed to the remote url straight away, we won't be able to perform period concatenation). We won't write media segments to the FS though.

But, re-looking at this, I think I over complicated it (to avoid introducing new dependencies, I believe). I will look into a package that supports streaming to carry this out (probably requests).

# Respond to Shaka Packager with the response we got.
self.send_response(res.status)
self.end_headers()
# self.wfile.write(res.read())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cruft; please drop

print("Couldn't refresh access token. ErrCode: {}, ErrMst: {!r}".format(
res.status, res.read()))
else:
print("Non sufficient info provided to refresh the access token.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

English nit: "Insufficient"

parser.add_argument('-o', '--output',
default='output_files',
help='The output folder to write files to, or an HTTP ' +
'or HTTPS URL where files will be PUT.' +
'Used even if uploading to cloud storage.')
parser.add_argument('-H', '--add-header',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it difficult to figure out how to get the necessary headers for a refresh token. I can enable oauth2 in GCP, get a client-id and client-secret, but I don't understand how to get an initial refresh-token in an easy way.

Here are some ideas:

  1. Streamer could get the initial refresh token for me, just based on client-id and client-secret
  2. There would be docs (maybe in the examples of docs/source/overview.rst) showing how to get refresh-token
  3. We could use a library like google-auth to handle the authentication (haven't found the s3 equivalent yet)

I found a quick and easy way to make it work with this code, with gcloud already installed and authenticated, but I don't think it's a "best practice":

  -H "Authorization=Bearer $(gcloud auth print-access-token)" \
  -o gs://my_gcs_bucket/folder/

It seems like this might be equivalent to using "application default credentials" in google-auth:

import google.auth
credentials, project = google.auth.default()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed not user friendly and needs a rework. I would go for the option of using a client id+secret, the third option is ambiguous to me right now but I will look into it.

BTW, I was getting refresh tokens from google's oauth 2.0 playground.

@joeyparrish
Copy link
Member

My -H "Authorization=Bearer $(gcloud auth print-access-token)" hack failed after about one hour of running. I started getting HTTP 429 responses from Google Cloud, which makes me think it wasn't an issue with the token itself, but something more fundamental we will need to resolve.

Google's docs say:

If you run into any issues such as increased latency or error rates, pause your ramp-up or reduce the request rate temporarily in order to give Cloud Storage more time to scale your bucket. You should use exponential backoff to retry your requests when:

Receiving errors with 408 and 429 response codes.
Receiving errors with 5xx response codes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants