Skip to content

Commit

Permalink
feat(storage): support gzipping files before uploading them (#714)
Browse files Browse the repository at this point in the history
Fixes #45

Co-authored-by: fbalboaDialpad <fernando.balboa@dialpad.com>
  • Loading branch information
TheKevJames and fbalboaDialpad committed Apr 4, 2024
1 parent e89a751 commit 1aa9d67
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 47 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ repos:
name: mypy-storage
additional_dependencies:
- aiohttp
- gcloud-aio-auth
- gcloud-aio-auth>=5.3.0
- rsa
- types-aiofiles
- types-requests
Expand Down
2 changes: 1 addition & 1 deletion auth/gcloud/aio/auth/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ async def get(
if not auto_decompress and not stream:
warnings.warn(
'the requests library always decompresses responses when '
'outside of streaming mode; when audo_decompress is '
'outside of streaming mode; when auto_decompress is '
'False, stream = True must also be set',
UserWarning,
)
Expand Down
3 changes: 3 additions & 0 deletions storage/gcloud/aio/storage/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,15 @@ def chunk_size(self) -> int:
async def download(
self, timeout: int = DEFAULT_TIMEOUT,
session: Optional[Session] = None,
auto_decompress: bool = True,
) -> Any:
headers = None if auto_decompress else {'accept-encoding': 'gzip'}
return await self.bucket.storage.download(
self.bucket.name,
self.name,
timeout=timeout,
session=session,
headers=headers,
)

async def upload(
Expand Down
79 changes: 65 additions & 14 deletions storage/gcloud/aio/storage/storage.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import binascii
import enum
import gzip
import io
import json
import logging
Expand Down Expand Up @@ -415,12 +416,20 @@ async def upload(
metadata: Optional[Dict[str, Any]] = None,
session: Optional[Session] = None,
force_resumable_upload: Optional[bool] = None,
zipped: bool = False,
timeout: int = 30,
) -> Dict[str, Any]:
url = f'{self._api_root_write}/{bucket}/o'

stream = self._preprocess_data(file_data)

parameters = parameters or {}
if zipped:
parameters['contentEncoding'] = 'gzip'
# Here we load the file-like object data into memory in chunks and
# re-write it compressed. This is implemented like this so we don't
# load the whole file into memory at once.
stream = self._compress_file_in_chunks(input_stream=stream)

if BUILD_GCLOUD_REST and isinstance(stream, io.StringIO):
# HACK: `requests` library does not accept `str` as `data` in `put`
# HTTP request.
Expand All @@ -431,8 +440,6 @@ async def upload(
# mime detection method same as in aiohttp 3.4.4
content_type = content_type or mimetypes.guess_type(object_name)[0]

parameters = parameters or {}

headers = headers or {}
headers.update(await self._headers())
headers.update({
Expand Down Expand Up @@ -498,6 +505,34 @@ def _preprocess_data(data: Any) -> IO[Any]:

raise TypeError(f'unsupported upload type: "{type(data)}"')

@staticmethod
def _compress_file_in_chunks(input_stream: IO[AnyStr],
chunk_size: int = 8192) -> IO[bytes]:
"""
Reads the contents of input_stream and writes it gzip-compressed to
output_stream in chunks. The chunk size is 8Kb by default, which is a
standard filesystem block size.
"""
compressed_stream = io.BytesIO()

with gzip.open(compressed_stream, 'wb') as gzipped_file:
chunk_bytes: bytes
while True:
chunk = input_stream.read(chunk_size)
if not chunk:
break
if isinstance(chunk, str):
chunk_bytes = chunk.encode('utf-8')
else:
chunk_bytes = chunk

gzipped_file.write(chunk_bytes)

# After finishing writing, reset the buffer position so it can be read
compressed_stream.seek(0)

return compressed_stream

@staticmethod
def _decide_upload_type(
force_resumable_upload: Optional[bool],
Expand Down Expand Up @@ -553,19 +588,35 @@ async def _download(
headers = headers or {}
headers.update(await self._headers())

# aiohttp and requests automatically decompress the body if this
# argument is not passed. We assume that if the Accept-Encoding header
# is present, then the client will handle the decompression
auto_decompress = 'accept-encoding' not in {k.lower() for k in headers}

s = AioSession(session) if session else self.session
response = await s.get(
url, headers=headers, params=params or {},
timeout=timeout,
)

# N.B. the GCS API sometimes returns 'application/octet-stream' when a
# string was uploaded. To avoid potential weirdness, always return a
# bytes object.
try:
data: bytes = await response.read()
except (AttributeError, TypeError):
data = response.content # type: ignore[assignment]
data: bytes
if not auto_decompress and BUILD_GCLOUD_REST:
# Requests lib has a different way of reading compressed data. We
# must pass the stream=True argument and read the response using
# the 'raw' property.
response = await s.get(
url, headers=headers, params=params or {},
timeout=timeout, stream=True,
)
data = response.raw.read() # type: ignore[attr-defined]
else:
response = await s.get(
url, headers=headers, params=params or {},
timeout=timeout, auto_decompress=auto_decompress,
)
# N.B. the GCS API sometimes returns 'application/octet-stream'
# when a string was uploaded. To avoid potential weirdness, always
# return a bytes object.
try:
data = await response.read()
except (AttributeError, TypeError):
data = response.content # type: ignore[assignment]

return data

Expand Down
4 changes: 2 additions & 2 deletions storage/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 19 additions & 19 deletions storage/poetry.rest.lock
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ test = ["pytest (>=6)"]

[[package]]
name = "gcloud-rest-auth"
version = "5.2.1"
version = "5.3.0"
description = "Python Client for Google Cloud Auth"
optional = false
python-versions = ">= 3.8, < 4.0"
Expand Down Expand Up @@ -345,38 +345,38 @@ testing = ["pytest", "pytest-benchmark"]

[[package]]
name = "pyasn1"
version = "0.5.1"
version = "0.6.0"
description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs (X.208)"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
python-versions = ">=3.8"
files = [
{file = "pyasn1-0.5.1-py2.py3-none-any.whl", hash = "sha256:4439847c58d40b1d0a573d07e3856e95333f1976294494c325775aeca506eb58"},
{file = "pyasn1-0.5.1.tar.gz", hash = "sha256:6d391a96e59b23130a5cfa74d6fd7f388dbbe26cc8f1edf39fdddf08d9d6676c"},
{file = "pyasn1-0.6.0-py2.py3-none-any.whl", hash = "sha256:cca4bb0f2df5504f02f6f8a775b6e416ff9b0b3b16f7ee80b5a3153d9b804473"},
{file = "pyasn1-0.6.0.tar.gz", hash = "sha256:3a35ab2c4b5ef98e17dfdec8ab074046fbda76e281c5a706ccd82328cfc8f64c"},
]

[[package]]
name = "pyasn1-modules"
version = "0.3.0"
version = "0.4.0"
description = "A collection of ASN.1-based protocols modules"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
python-versions = ">=3.8"
files = [
{file = "pyasn1_modules-0.3.0-py2.py3-none-any.whl", hash = "sha256:d3ccd6ed470d9ffbc716be08bd90efbd44d0734bc9303818f7336070984a162d"},
{file = "pyasn1_modules-0.3.0.tar.gz", hash = "sha256:5bd01446b736eb9d31512a30d46c1ac3395d676c6f3cafa4c03eb54b9925631c"},
{file = "pyasn1_modules-0.4.0-py3-none-any.whl", hash = "sha256:be04f15b66c206eed667e0bb5ab27e2b1855ea54a842e5037738099e8ca4ae0b"},
{file = "pyasn1_modules-0.4.0.tar.gz", hash = "sha256:831dbcea1b177b28c9baddf4c6d1013c24c3accd14a1873fffaa6a2e905f17b6"},
]

[package.dependencies]
pyasn1 = ">=0.4.6,<0.6.0"
pyasn1 = ">=0.4.6,<0.7.0"

[[package]]
name = "pycparser"
version = "2.21"
version = "2.22"
description = "C parser in Python"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
python-versions = ">=3.8"
files = [
{file = "pycparser-2.21-py2.py3-none-any.whl", hash = "sha256:8ee45429555515e1f6b185e78100aea234072576aa43ab53aefcae078162fca9"},
{file = "pycparser-2.21.tar.gz", hash = "sha256:e644fdec12f7872f86c58ff790da456218b10f863970249516d60a5eaca77206"},
{file = "pycparser-2.22-py3-none-any.whl", hash = "sha256:c3702b6d3dd8c7abc1afa565d7e63d53a1d0bd86cdc24edd75470f4de499cfcc"},
{file = "pycparser-2.22.tar.gz", hash = "sha256:491c8be9c040f5390f5bf44a5b07752bd07f56edf992381b05c701439eec10f6"},
]

[[package]]
Expand Down Expand Up @@ -420,17 +420,17 @@ testing = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygm

[[package]]
name = "pytest-mock"
version = "3.12.0"
version = "3.14.0"
description = "Thin-wrapper around the mock package for easier use with pytest"
optional = false
python-versions = ">=3.8"
files = [
{file = "pytest-mock-3.12.0.tar.gz", hash = "sha256:31a40f038c22cad32287bb43932054451ff5583ff094bca6f675df2f8bc1a6e9"},
{file = "pytest_mock-3.12.0-py3-none-any.whl", hash = "sha256:0972719a7263072da3a21c7f4773069bcc7486027d7e8e1f81d98a47e701bc4f"},
{file = "pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0"},
{file = "pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f"},
]

[package.dependencies]
pytest = ">=5.0"
pytest = ">=6.2.5"

[package.extras]
dev = ["pre-commit", "pytest-asyncio", "tox"]
Expand Down Expand Up @@ -501,4 +501,4 @@ zstd = ["zstandard (>=0.18.0)"]
[metadata]
lock-version = "2.0"
python-versions = ">= 3.8, < 4.0"
content-hash = "2216a97eb9dfd593e72cb604ba23caafdd20c7ad74ac2ce1e75b5ebc70e73fe2"
content-hash = "78c6b9c52bc39c96527df352de8d28f2c2af27c073ef9ee6dfcef5adc399dcf9"
10 changes: 5 additions & 5 deletions storage/pyproject.rest.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcloud-rest-storage"
version = "9.2.0"
version = "9.3.0"
description = "Python Client for Google Cloud Storage"
readme = "README.rst"

Expand All @@ -22,15 +22,15 @@ classifiers = [
[tool.poetry.dependencies]
python = ">= 3.8, < 4.0"
# aiofiles = ">= 0.6.0, < 24.0.0"
gcloud-rest-auth = ">= 3.6.0, < 6.0.0"
pyasn1-modules = ">= 0.2.1, < 0.4.0"
gcloud-rest-auth = ">= 5.3.0, < 6.0.0"
pyasn1-modules = ">= 0.2.1, < 0.4.1"
rsa = ">= 3.1.4, < 5.0.0"

[tool.poetry.group.dev.dependencies]
gcloud-rest-auth = { path = "../auth" }
pytest = "8.1.1"
# pytest-asyncio = "0.23.5"
pytest-mock = "3.12.0"
# pytest-asyncio = "0.23.6"
pytest-mock = "3.14.0"

[[tool.poetry.source]]
name = "pypi"
Expand Down
4 changes: 2 additions & 2 deletions storage/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gcloud-aio-storage"
version = "9.2.0"
version = "9.3.0"
description = "Python Client for Google Cloud Storage"
readme = "README.rst"

Expand All @@ -22,7 +22,7 @@ classifiers = [
[tool.poetry.dependencies]
python = ">= 3.8, < 4.0"
aiofiles = ">= 0.6.0, < 24.0.0"
gcloud-aio-auth = ">= 3.6.0, < 6.0.0"
gcloud-aio-auth = ">= 5.3.0, < 6.0.0"
pyasn1-modules = ">= 0.2.1, < 0.4.1"
rsa = ">= 3.1.4, < 5.0.0"

Expand Down
57 changes: 54 additions & 3 deletions storage/tests/integration/smoke_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import gzip
import json
import uuid

Expand Down Expand Up @@ -40,18 +41,18 @@ async def test_object_life_cycle(
bucket = storage.get_bucket(bucket_name)
blob = await bucket.get_blob(object_name)
constructed_result = await blob.download()
assert constructed_result == expected_data
_assert_expected_data(expected_data, constructed_result)

direct_result = await storage.download(bucket_name, object_name)
assert direct_result == expected_data
_assert_expected_data(expected_data, direct_result)

await storage.copy(
bucket_name, object_name, bucket_name,
new_name=copied_object_name,
)

direct_result = await storage.download(bucket_name, copied_object_name)
assert direct_result == expected_data
_assert_expected_data(expected_data, direct_result)

await storage.delete(bucket_name, object_name)
await storage.delete(bucket_name, copied_object_name)
Expand All @@ -61,3 +62,53 @@ async def test_object_life_cycle(

with pytest.raises(ResponseError):
await storage.download(bucket_name, copied_object_name)


@pytest.mark.asyncio
@pytest.mark.parametrize(
'uploaded_data,expected_data,file_extension', [
('test', b'test', 'txt'),
(b'test', b'test', 'bin'),
(
json.dumps({'data': 1}), json.dumps(
{'data': 1},
).encode('utf-8'), 'json',
),
],
)
async def test_zipped_upload(
bucket_name, creds, uploaded_data,
expected_data, file_extension,
):
object_name = f'{uuid.uuid4().hex}/{uuid.uuid4().hex}.{file_extension}'

async with Session() as session:
storage = Storage(service_file=creds, session=session)
await storage.upload(bucket_name, object_name, uploaded_data,
zipped=True)

bucket = storage.get_bucket(bucket_name)
blob = await bucket.get_blob(object_name)

# Download file from GCS without the Accept-Encoding: gzip. The
# data will be served uncompressed by GCS
constructed_result = await blob.download()
_assert_expected_data(expected_data, constructed_result)

# Specify that the file should be downloaded compressed
constructed_result = await blob.download(auto_decompress=False)
_assert_expected_data(expected_data, constructed_result,
compressed=True)

# Do the same but using the storage directly
direct_result = await storage.download(bucket_name, object_name)
_assert_expected_data(expected_data, direct_result)

direct_result = await storage.download(
bucket_name, object_name, headers={'Accept-Encoding': 'gzip'})
_assert_expected_data(expected_data, direct_result, compressed=True)


def _assert_expected_data(expected_data, actual_data, compressed=False):
actual_data = gzip.decompress(actual_data) if compressed else actual_data
assert expected_data == actual_data

0 comments on commit 1aa9d67

Please sign in to comment.