From 7beb99dc2058da220f1f69d531978db917707e8b Mon Sep 17 00:00:00 2001 From: Dmitry <58846611+dmitry-fa@users.noreply.github.com> Date: Wed, 1 Apr 2020 14:19:27 +0300 Subject: [PATCH] feat: add upload functionality (#214) feat: add upload functionality * feat: add upload functionality * fix: review comments --- .../cloud/storage/StorageOperations.java | 188 ++++++++++++++++++ .../cloud/storage/StorageOperationsTest.java | 173 ++++++++++++++++ .../cloud/storage/it/ITStorageTest.java | 44 ++++ 3 files changed, 405 insertions(+) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOperations.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/StorageOperationsTest.java diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOperations.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOperations.java new file mode 100644 index 000000000..6ff178ff6 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOperations.java @@ -0,0 +1,188 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.cloud.WriteChannel; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; + +/** Utility methods to perform various operations with the Storage such as upload. */ +public final class StorageOperations { + + private final Storage storage; + private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024; + private static final int MIN_BUFFER_SIZE = 256 * 1024; + + /** + * Creates a new StorageOperations instance associated with the given storage. + * + * @param storage the Storage + */ + public StorageOperations(Storage storage) { + this.storage = storage; + } + + /** + * Uploads {@code path} to the blob using {@link Storage#writer}. By default any MD5 and CRC32C + * values in the given {@code blobInfo} are ignored unless requested via the {@link + * Storage.BlobWriteOption#md5Match()} and {@link Storage.BlobWriteOption#crc32cMatch()} options. + * Folder upload is not supported. + * + *

Example of uploading a file: + * + *

{@code
+   * String bucketName = "my-unique-bucket";
+   * String fileName = "readme.txt";
+   * BlobId blobId = BlobId.of(bucketName, fileName);
+   * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
+   * new StorageOperations(storage).upload(blobInfo, Paths.get(fileName));
+   * }
+ * + * @param blobInfo blob to create + * @param path file to upload + * @param options blob write options + * @throws IOException on I/O error + * @throws StorageException on failure + * @see #upload(BlobInfo, Path, int, Storage.BlobWriteOption...) + */ + public void upload(BlobInfo blobInfo, Path path, Storage.BlobWriteOption... options) + throws IOException { + upload(blobInfo, path, DEFAULT_BUFFER_SIZE, options); + } + + /** + * Uploads {@code path} to the blob using {@link Storage#writer} and {@code bufferSize}. By + * default any MD5 and CRC32C values in the given {@code blobInfo} are ignored unless requested + * via the {@link Storage.BlobWriteOption#md5Match()} and {@link + * Storage.BlobWriteOption#crc32cMatch()} options. Folder upload is not supported. + * + *

{@link #upload(BlobInfo, Path, Storage.BlobWriteOption...)} invokes this method with a + * buffer size of 15 MiB. Users can pass alternative values. Larger buffer sizes might improve the + * upload performance but require more memory. This can cause an OutOfMemoryError or add + * significant garbage collection overhead. Smaller buffer sizes reduce memory consumption, that + * is noticeable when uploading many objects in parallel. Buffer sizes less than 256 KiB are + * treated as 256 KiB. + * + *

Example of uploading a humongous file: + * + *

{@code
+   * BlobId blobId = BlobId.of(bucketName, blobName);
+   * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
+   *
+   * int largeBufferSize = 150 * 1024 * 1024;
+   * Path file = Paths.get("humongous.file");
+   * new StorageOperations(storage).upload(blobInfo, file, largeBufferSize);
+   * }
+ * + * @param blobInfo blob to create + * @param path file to upload + * @param bufferSize size of the buffer I/O operations + * @param options blob write options + * @throws IOException on I/O error + * @throws StorageException on failure + */ + public void upload( + BlobInfo blobInfo, Path path, int bufferSize, Storage.BlobWriteOption... options) + throws IOException { + if (Files.isDirectory(path)) { + throw new StorageException(0, path + " is a directory"); + } + try (InputStream input = Files.newInputStream(path)) { + upload(blobInfo, input, bufferSize, options); + } + } + + /** + * Reads bytes from an input stream and uploads those bytes to the blob using {@link + * Storage#writer}. By default any MD5 and CRC32C values in the given {@code blobInfo} are ignored + * unless requested via the {@link Storage.BlobWriteOption#md5Match()} and {@link + * Storage.BlobWriteOption#crc32cMatch()} options. + * + *

Example of uploading data with CRC32C checksum: + * + *

{@code
+   * BlobId blobId = BlobId.of(bucketName, blobName);
+   * byte[] content = "Hello, world".getBytes(StandardCharsets.UTF_8);
+   * Hasher hasher = Hashing.crc32c().newHasher().putBytes(content);
+   * String crc32c = BaseEncoding.base64().encode(Ints.toByteArray(hasher.hash().asInt()));
+   * BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setCrc32c(crc32c).build();
+   * new StorageOperations(storage).upload(blobInfo, new ByteArrayInputStream(content),
+   *     Storage.BlobWriteOption.crc32cMatch());
+   * }
+ * + * @param blobInfo blob to create + * @param content input stream to read from + * @param options blob write options + * @throws IOException on I/O error + * @throws StorageException on failure + * @see #upload(BlobInfo, InputStream, int, Storage.BlobWriteOption...) + */ + public void upload(BlobInfo blobInfo, InputStream content, Storage.BlobWriteOption... options) + throws IOException { + upload(blobInfo, content, DEFAULT_BUFFER_SIZE, options); + } + + /** + * Reads bytes from an input stream and uploads those bytes to the blob using {@link + * Storage#writer} and {@code bufferSize}. By default any MD5 and CRC32C values in the given + * {@code blobInfo} are ignored unless requested via the {@link + * Storage.BlobWriteOption#md5Match()} and {@link Storage.BlobWriteOption#crc32cMatch()} options. + * + *

{@link #upload(BlobInfo, InputStream, Storage.BlobWriteOption...)} )} invokes this method + * with a buffer size of 15 MiB. Users can pass alternative values. Larger buffer sizes might + * improve the upload performance but require more memory. This can cause an OutOfMemoryError or + * add significant garbage collection overhead. Smaller buffer sizes reduce memory consumption, + * that is noticeable when uploading many objects in parallel. Buffer sizes less than 256 KiB are + * treated as 256 KiB. + * + * @param blobInfo blob to create + * @param content input stream to read from + * @param bufferSize size of the buffer I/O operations + * @param options blob write options + * @throws IOException on I/O error + * @throws StorageException on failure + */ + public void upload( + BlobInfo blobInfo, InputStream content, int bufferSize, Storage.BlobWriteOption... options) + throws IOException { + try (WriteChannel writer = storage.writer(blobInfo, options)) { + upload(Channels.newChannel(content), writer, bufferSize); + } + } + + /* + * Uploads the given content to the storage using specified write channel and the given buffer + * size. This method does not close any channels. + */ + private static void upload(ReadableByteChannel reader, WriteChannel writer, int bufferSize) + throws IOException { + bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE); + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + writer.setChunkSize(bufferSize); + + while (reader.read(buffer) >= 0) { + buffer.flip(); + writer.write(buffer); + buffer.clear(); + } + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageOperationsTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageOperationsTest.java new file mode 100644 index 000000000..0651c1ac1 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/StorageOperationsTest.java @@ -0,0 +1,173 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createStrictMock; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; + +import com.google.cloud.WriteChannel; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class StorageOperationsTest { + private Storage storage; + private StorageOperations storageOperations; + + private static final BlobInfo BLOB_INFO = BlobInfo.newBuilder("b", "n").build(); + private static final int DEFAULT_BUFFER_SIZE = 15 * 1024 * 1024; + private static final int MIN_BUFFER_SIZE = 256 * 1024; + + @Before + public void setUp() { + storage = createStrictMock(Storage.class); + storageOperations = new StorageOperations(storage); + } + + @After + public void tearDown() throws Exception { + verify(storage); + } + + @Test + public void testUploadFromNonExistentFile() { + replay(storage); + String fileName = "non_existing_file.txt"; + try { + storageOperations.upload(BLOB_INFO, Paths.get(fileName)); + storageOperations.upload(BLOB_INFO, Paths.get(fileName), -1); + fail(); + } catch (IOException e) { + assertEquals(NoSuchFileException.class, e.getClass()); + assertEquals(fileName, e.getMessage()); + } + } + + @Test + public void testUploadFromDirectory() throws IOException { + replay(storage); + Path dir = Files.createTempDirectory("unit_"); + try { + storageOperations.upload(BLOB_INFO, dir); + storageOperations.upload(BLOB_INFO, dir, -2); + fail(); + } catch (StorageException e) { + assertEquals(dir + " is a directory", e.getMessage()); + } + } + + private void prepareForUpload(BlobInfo blobInfo, byte[] bytes, Storage.BlobWriteOption... options) + throws Exception { + prepareForUpload(blobInfo, bytes, DEFAULT_BUFFER_SIZE, options); + } + + private void prepareForUpload( + BlobInfo blobInfo, byte[] bytes, int bufferSize, Storage.BlobWriteOption... options) + throws Exception { + WriteChannel channel = createStrictMock(WriteChannel.class); + ByteBuffer expectedByteBuffer = ByteBuffer.wrap(bytes, 0, bytes.length); + channel.setChunkSize(bufferSize); + expect(channel.write(expectedByteBuffer)).andReturn(bytes.length); + channel.close(); + replay(channel); + expect(storage.writer(blobInfo, options)).andReturn(channel); + replay(storage); + } + + @Test + public void testUploadFromFile() throws Exception { + byte[] dataToSend = {1, 2, 3}; + prepareForUpload(BLOB_INFO, dataToSend); + Path tempFile = Files.createTempFile("testUpload", ".tmp"); + Files.write(tempFile, dataToSend); + storageOperations.upload(BLOB_INFO, tempFile); + } + + @Test + public void testUploadFromStream() throws Exception { + byte[] dataToSend = {1, 2, 3, 4, 5}; + Storage.BlobWriteOption[] options = + new Storage.BlobWriteOption[] {Storage.BlobWriteOption.crc32cMatch()}; + prepareForUpload(BLOB_INFO, dataToSend, options); + InputStream input = new ByteArrayInputStream(dataToSend); + storageOperations.upload(BLOB_INFO, input, options); + } + + @Test + public void testUploadSmallBufferSize() throws Exception { + byte[] dataToSend = new byte[100_000]; + prepareForUpload(BLOB_INFO, dataToSend, MIN_BUFFER_SIZE); + InputStream input = new ByteArrayInputStream(dataToSend); + int smallBufferSize = 100; + storageOperations.upload(BLOB_INFO, input, smallBufferSize); + } + + @Test + public void testUploadFromIOException() throws Exception { + IOException ioException = new IOException("message"); + WriteChannel channel = createStrictMock(WriteChannel.class); + channel.setChunkSize(DEFAULT_BUFFER_SIZE); + expect(channel.write((ByteBuffer) anyObject())).andThrow(ioException); + replay(channel); + expect(storage.writer(eq(BLOB_INFO))).andReturn(channel); + replay(storage); + InputStream input = new ByteArrayInputStream(new byte[10]); + try { + storageOperations.upload(BLOB_INFO, input); + fail(); + } catch (IOException e) { + assertSame(e, ioException); + } + } + + @Test + public void testUploadMultiplePortions() throws Exception { + int totalSize = 400_000; + int bufferSize = 300_000; + byte[] dataToSend = new byte[totalSize]; + dataToSend[0] = 42; + dataToSend[bufferSize] = 43; + + WriteChannel channel = createStrictMock(WriteChannel.class); + channel.setChunkSize(bufferSize); + expect(channel.write(ByteBuffer.wrap(dataToSend, 0, bufferSize))).andReturn(1); + expect(channel.write(ByteBuffer.wrap(dataToSend, bufferSize, totalSize - bufferSize))) + .andReturn(2); + channel.close(); + replay(channel); + expect(storage.writer(BLOB_INFO)).andReturn(channel); + replay(storage); + + InputStream input = new ByteArrayInputStream(dataToSend); + storageOperations.upload(BLOB_INFO, input, bufferSize); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java index 6abc233fb..094117929 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITStorageTest.java @@ -76,6 +76,7 @@ import com.google.cloud.storage.StorageBatchResult; import com.google.cloud.storage.StorageClass; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOperations; import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageRoles; import com.google.cloud.storage.testing.RemoteStorageHelper; @@ -103,6 +104,8 @@ import java.net.URL; import java.net.URLConnection; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; import java.security.Key; import java.util.ArrayList; import java.util.Arrays; @@ -131,6 +134,7 @@ public class ITStorageTest { private static RemoteStorageHelper remoteStorageHelper; private static Storage storage; + private static StorageOperations storageOperations; private static String kmsKeyOneResourcePath; private static String kmsKeyTwoResourcePath; private static Metadata requestParamsHeader = new Metadata(); @@ -175,6 +179,7 @@ public class ITStorageTest { public static void beforeClass() throws IOException { remoteStorageHelper = RemoteStorageHelper.create(); storage = remoteStorageHelper.getOptions().getService(); + storageOperations = new StorageOperations(storage); storage.create( BucketInfo.newBuilder(BUCKET) @@ -3208,4 +3213,43 @@ public void testBucketLogging() throws ExecutionException, InterruptedException RemoteStorageHelper.forceDelete(storage, loggingBucket, 5, TimeUnit.SECONDS); } } + + @Test + public void testUploadFromDownloadTo() throws Exception { + String blobName = "test-uploadFrom-downloadTo-blob"; + BlobId blobId = BlobId.of(BUCKET, blobName); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); + + Path tempFileFrom = Files.createTempFile("ITStorageTest_", ".tmp"); + Files.write(tempFileFrom, BLOB_BYTE_CONTENT); + storageOperations.upload(blobInfo, tempFileFrom); + + Path tempFileTo = Files.createTempFile("ITStorageTest_", ".tmp"); + storage.get(blobId).downloadTo(tempFileTo); + byte[] readBytes = Files.readAllBytes(tempFileTo); + assertArrayEquals(BLOB_BYTE_CONTENT, readBytes); + } + + @Test + public void testUploadWithEncryption() throws Exception { + String blobName = "test-upload-withEncryption"; + BlobId blobId = BlobId.of(BUCKET, blobName); + BlobInfo blobInfo = BlobInfo.newBuilder(blobId).build(); + + ByteArrayInputStream content = new ByteArrayInputStream(BLOB_BYTE_CONTENT); + storageOperations.upload(blobInfo, content, Storage.BlobWriteOption.encryptionKey(KEY)); + + Blob blob = storage.get(blobId); + try { + blob.getContent(); + fail("StorageException was expected"); + } catch (StorageException e) { + String expectedMessage = + "The target object is encrypted by a customer-supplied encryption key."; + assertTrue(e.getMessage().contains(expectedMessage)); + assertEquals(400, e.getCode()); + } + byte[] readBytes = blob.getContent(Blob.BlobSourceOption.decryptionKey(KEY)); + assertArrayEquals(BLOB_BYTE_CONTENT, readBytes); + } }