Skip to content

Commit

Permalink
Revert "feat: add blob.uploadfrom(inputstream) (#162)"
Browse files Browse the repository at this point in the history
This reverts commit 1f53baa.
  • Loading branch information
frankyn committed Mar 17, 2020
1 parent b61a820 commit ce17b86
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 283 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
Expand Down Expand Up @@ -75,8 +73,7 @@ public Blob apply(Tuple<Storage, StorageObject> pb) {
}
};

private static final int DEFAULT_CHUNK_SIZE = 15 * 1024 * 1024;
private static final int MIN_BUFFER_SIZE = 256 * 1024;
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;

/** Class for specifying blob source options when {@code Blob} methods are used. */
public static class BlobSourceOption extends Option {
Expand Down Expand Up @@ -263,88 +260,6 @@ public void downloadTo(Path path) {
downloadTo(path, new BlobSourceOption[0]);
}

/**
* Uploads the given file path to this blob using specified blob write options.
*
* @param path file to upload
* @param options blob write options
* @return updated blob
* @throws IOException on I/O error
* @throws StorageException on failure
*/
public Blob uploadFrom(Path path, BlobWriteOption... options) throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
try (InputStream input = Files.newInputStream(path)) {
return uploadFrom(input, options);
}
}

/**
* Uploads the given content to this blob using specified blob write options.
*
* @param input content to upload
* @param options blob write options
* @return updated blob
* @throws IOException on I/O error
* @throws StorageException on failure
*/
public Blob uploadFrom(InputStream input, BlobWriteOption... options) throws IOException {
try (WriteChannel writer = storage.writer(this, options)) {
uploadFrom(input, writer);
}
BlobId blobId = getBlobId();
try {
return storage.get(BlobId.of(blobId.getBucket(), blobId.getName()));
} catch (StorageException e) {
throw new StorageException(
e.getCode(), "Content has been uploaded successfully. Failed to retrieve blob.", e);
}
}

static void uploadFrom(InputStream input, WriteChannel writer) throws IOException {
uploadFrom(input, writer, DEFAULT_CHUNK_SIZE);
}

/**
* Uploads the given content to the storage using specified write channel and the given buffer
* size. Other uploadFrom() methods invoke this one with a buffer size of 15 MiB. Users can pass
* alternative values. Larger buffer sizes might improve the upload performance but require more
* memory. This can cause an OutOfMemoryError or add significant garbage collection overhead.
* Smaller buffer sizes reduce memory consumption, that is noticeable when uploading many objects
* in parallel. Buffer sizes less than 256 KiB are treated as 256 KiB.
*
* <p>This method does not close either the InputStream or the WriterChannel.
*
* <p>Example of uploading:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
* Path file = Paths.get("humongous.file");
* try (InputStream input = Files.newInputStream(file); WriteChannel writer = storage.writer(blobInfo)) {
* Blob.uploadFrom(input, writer, 150 * 1024 * 1024);
* } catch (IOException e) {
* // your handler
* }
* }</pre>
*
* @param input content to upload
* @param writer channel
* @param bufferSize size of the buffer to read from input and send over writer
* @throws IOException on I/O error
*/
public static void uploadFrom(InputStream input, WriteChannel writer, int bufferSize)
throws IOException {
bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
byte[] buffer = new byte[bufferSize];
int length;
while ((length = input.read(buffer)) >= 0) {
writer.write(ByteBuffer.wrap(buffer, 0, length));
}
}

/** Builder for {@code Blob}. */
public static class Builder extends BlobInfo.Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.core.ApiClock;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Project;
import com.google.cloud.storage.Acl.Project.ProjectRole;
import com.google.cloud.storage.Acl.Role;
Expand All @@ -50,17 +48,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.Key;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -595,7 +586,7 @@ public void testBuilder() {
}

@Test
public void testDownloadTo() throws Exception {
public void testDownload() throws Exception {
final byte[] expected = {1, 2};
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions).times(1);
Expand Down Expand Up @@ -627,7 +618,7 @@ public Long answer() throws Throwable {
}

@Test
public void testDownloadToWithRetries() throws Exception {
public void testDownloadWithRetries() throws Exception {
final byte[] expected = {1, 2};
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions);
Expand Down Expand Up @@ -671,135 +662,4 @@ public Long answer() throws Throwable {
byte actual[] = Files.readAllBytes(file.toPath());
assertArrayEquals(expected, actual);
}

@Test
public void testUploadFromNonExistentFile() {
initializeExpectedBlob(1);
expect(storage.getOptions()).andReturn(mockOptions);
replay(storage);
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
String fileName = "non_existing_file.txt";
try {
blob.uploadFrom(Paths.get(fileName));
fail();
} catch (IOException e) {
assertEquals(NoSuchFileException.class, e.getClass());
assertEquals(fileName, e.getMessage());
}
}

@Test
public void testUploadFromDirectory() throws IOException {
initializeExpectedBlob(1);
expect(storage.getOptions()).andReturn(mockOptions);
replay(storage);
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
Path dir = Files.createTempDirectory("unit_");
try {
blob.uploadFrom(dir);
fail();
} catch (StorageException e) {
assertEquals(dir + " is a directory", e.getMessage());
}
}

private WriteChannel createWriteChannelMock(byte[] bytes) throws Exception {
WriteChannel channel = createMock(WriteChannel.class);
ByteBuffer expectedByteBuffer = ByteBuffer.wrap(bytes, 0, bytes.length);
expect(channel.write(expectedByteBuffer)).andReturn(bytes.length);
channel.close();
replay(channel);
return channel;
}

private Blob createBlobForUpload(WriteChannel channel) {
initializeExpectedBlob(1);
BlobId blobId = BlobId.of(BLOB_INFO.getBucket(), BLOB_INFO.getName());
expect(storage.getOptions()).andReturn(mockOptions);
expect(storage.writer(eq(expectedBlob))).andReturn(channel);
expect(storage.get(blobId)).andReturn(expectedBlob);
replay(storage);
return new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
}

@Test
public void testUploadFromFile() throws Exception {
byte[] dataToSend = {1, 2, 3};
WriteChannel channel = createWriteChannelMock(dataToSend);
blob = createBlobForUpload(channel);
Path tempFile = Files.createTempFile("testUpload", ".tmp");
Files.write(tempFile, dataToSend);
blob = blob.uploadFrom(tempFile);
assertSame(expectedBlob, blob);
}

@Test
public void testUploadFromStream() throws Exception {
byte[] dataToSend = {1, 2, 3, 4, 5};
WriteChannel channel = createWriteChannelMock(dataToSend);
blob = createBlobForUpload(channel);
InputStream input = new ByteArrayInputStream(dataToSend);
blob = blob.uploadFrom(input);
assertSame(expectedBlob, blob);
}

@Test
public void testUploadFromStreamRetrieveFailed() throws Exception {
byte[] dataToSend = {1, 2, 3, 4, 5};
StorageException storageException = new StorageException(123, "message");
WriteChannel channel = createWriteChannelMock(dataToSend);
initializeExpectedBlob(1);
BlobId blobId = BlobId.of(BLOB_INFO.getBucket(), BLOB_INFO.getName());
expect(storage.getOptions()).andReturn(mockOptions);
expect(storage.writer(eq(expectedBlob))).andReturn(channel);
expect(storage.get(blobId)).andThrow(storageException);
replay(storage);
Blob blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
InputStream input = new ByteArrayInputStream(dataToSend);
try {
blob.uploadFrom(input);
fail();
} catch (StorageException e) {
assertEquals(
"Content has been uploaded successfully. Failed to retrieve blob.", e.getMessage());
assertSame(e.getCause(), storageException);
}
}

@Test
public void testUpload() throws Exception {
replay(storage);
byte[] dataToSend = {1, 2, 3, 4, 5};
WriteChannel channel = createWriteChannelMock(dataToSend);
InputStream input = new ByteArrayInputStream(dataToSend);
Blob.uploadFrom(input, channel);
}

@Test
public void testUploadSmallBufferSize() throws Exception {
replay(storage);
byte[] dataToSend = new byte[100_000];
WriteChannel channel = createWriteChannelMock(dataToSend);
InputStream input = new ByteArrayInputStream(dataToSend);
Blob.uploadFrom(input, channel, 100);
}

@Test
public void testUploadMultiplePortions() throws Exception {
replay(storage);
int totalSize = 400_000;
int bufferSize = 300_000;
byte[] dataToSend = new byte[totalSize];
dataToSend[0] = 42;
dataToSend[bufferSize] = 43;

WriteChannel channel = createMock(WriteChannel.class);
expect(channel.write(ByteBuffer.wrap(dataToSend, 0, bufferSize))).andReturn(bufferSize);
expect(channel.write(ByteBuffer.wrap(dataToSend, bufferSize, totalSize - bufferSize)))
.andReturn(bufferSize - bufferSize);
channel.close();
replay(channel);
InputStream input = new ByteArrayInputStream(dataToSend);
Blob.uploadFrom(input, channel, bufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
import com.google.cloud.TransportOptions;
import com.google.cloud.WriteChannel;
import com.google.cloud.http.HttpTransportOptions;
Expand Down Expand Up @@ -104,8 +103,6 @@
import java.net.URL;
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -3211,56 +3208,4 @@ public void testBucketLogging() throws ExecutionException, InterruptedException
RemoteStorageHelper.forceDelete(storage, loggingBucket, 5, TimeUnit.SECONDS);
}
}

@Test
public void testUpload() throws Exception {
String blobName = "test-upload-static";
BlobId blobId = BlobId.of(BUCKET, blobName);
try (WriteChannel writer = storage.writer(BlobInfo.newBuilder(blobId).build())) {
Blob.uploadFrom(new ByteArrayInputStream(BLOB_STRING_CONTENT.getBytes(UTF_8)), writer, 1);
}
Blob blob = storage.get(blobId);
String readString = new String(blob.getContent(), UTF_8);
assertEquals(BLOB_STRING_CONTENT, readString);
}

@Test
public void testUploadFromDownloadTo() throws Exception {
String blobName = "test-uploadFrom-downloadTo-blob";
BlobInfo blobInfo = BlobInfo.newBuilder(BUCKET, blobName).build();

Path tempFileFrom = Files.createTempFile("ITStorageTest_", ".tmp");
Files.write(tempFileFrom, BLOB_BYTE_CONTENT);
Blob blob = storage.create(blobInfo);
blob = blob.uploadFrom(tempFileFrom);

Path tempFileTo = Files.createTempFile("ITStorageTest_", ".tmp");
blob.downloadTo(tempFileTo);
byte[] readBytes = Files.readAllBytes(tempFileTo);
assertArrayEquals(BLOB_BYTE_CONTENT, readBytes);
}

@Test
public void testUploadFromDownloadToWithEncryption() throws Exception {
String blobName = "test-uploadFrom-downloadTo-withEncryption-blob";
BlobInfo blobInfo = BlobInfo.newBuilder(BUCKET, blobName).build();

Path tempFileFrom = Files.createTempFile("ITStorageTest_", ".tmp");
Files.write(tempFileFrom, BLOB_BYTE_CONTENT);
Blob blob = storage.create(blobInfo);
blob = blob.uploadFrom(tempFileFrom, Storage.BlobWriteOption.encryptionKey(KEY));

Path tempFileTo = Files.createTempFile("ITStorageTest_", ".tmp");
try {
blob.downloadTo(tempFileTo);
} catch (RetryHelper.RetryHelperException e) {
// Expected to be StorageException
String expectedMessage =
"The target object is encrypted by a customer-supplied encryption key.";
assertTrue(e.getMessage().contains(expectedMessage));
}
blob.downloadTo(tempFileTo, Blob.BlobSourceOption.decryptionKey(KEY));
byte[] readBytes = Files.readAllBytes(tempFileTo);
assertArrayEquals(BLOB_BYTE_CONTENT, readBytes);
}
}

0 comments on commit ce17b86

Please sign in to comment.