From 029fbe29555abaf5499f8f5ec0a72dac63f37bd1 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Thu, 15 Oct 2015 17:13:50 +0200 Subject: [PATCH 1/7] Remove Serializable from BlobRead and BlobWriteChannel - remove serializable from interfaces - add State interface and save method to channels - add StateImpl class to channel implementations - add tests --- .../gcloud/storage/BlobReadChannel.java | 26 ++- .../gcloud/storage/BlobReadChannelImpl.java | 147 +++++++++++++++-- .../gcloud/storage/BlobWriteChannel.java | 30 +++- .../gcloud/storage/BlobWriteChannelImpl.java | 154 ++++++++++++++++-- .../storage/BlobReadChannelImplTest.java | 40 +++++ .../storage/BlobWriteChannelImplTest.java | 45 ++++- .../gcloud/storage/SerializationTest.java | 38 ++++- 7 files changed, 443 insertions(+), 37 deletions(-) diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java index ad1a385d9a83..88090a70fdb9 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java @@ -18,7 +18,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.Serializable; import java.nio.channels.ReadableByteChannel; /** @@ -28,7 +27,7 @@ * * This class is @{link Serializable}, which allows incremental reads. */ -public interface BlobReadChannel extends ReadableByteChannel, Serializable, Closeable { +public interface BlobReadChannel extends ReadableByteChannel, Closeable { /** * Overridden to remove IOException. @@ -46,4 +45,27 @@ public interface BlobReadChannel extends ReadableByteChannel, Serializable, Clos */ void chunkSize(int chunkSize); + /** + * Saves the read channel state. + * + * @return an object that contains the read channel state and can restore it afterwards. State + * object must implement {@link java.io.Serializable}. + */ + public State save(); + + /** + * A common interface for all classes that implement the internal state of a + * {@code BlobReadChannel}. + * + * Implementations of this class must implement {@link java.io.Serializable} to ensure that the + * state of a channel can be correctly serialized. + */ + public interface State { + + /** + * Returns a {@code BlobReadChannel} whose internal state reflects the one saved in the + * invocation object. + */ + public BlobReadChannel restore(); + } } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java index 79fe2fd1e531..8c94e84cd505 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java @@ -19,14 +19,15 @@ import static com.google.gcloud.RetryHelper.runWithRetries; import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.MoreObjects; import com.google.gcloud.RetryHelper; import com.google.gcloud.spi.StorageRpc; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Callable; /** @@ -35,7 +36,6 @@ class BlobReadChannelImpl implements BlobReadChannel { private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024; - private static final long serialVersionUID = 4821762590742862669L; private final StorageOptions serviceOptions; private final BlobId blob; @@ -45,10 +45,10 @@ class BlobReadChannelImpl implements BlobReadChannel { private boolean endOfStream; private int chunkSize = DEFAULT_CHUNK_SIZE; - private transient StorageRpc storageRpc; - private transient StorageObject storageObject; - private transient int bufferPos; - private transient byte[] buffer; + private StorageRpc storageRpc; + private StorageObject storageObject; + private int bufferPos; + private byte[] buffer; BlobReadChannelImpl(StorageOptions serviceOptions, BlobId blob, Map requestOptions) { @@ -59,19 +59,18 @@ class BlobReadChannelImpl implements BlobReadChannel { initTransients(); } - private void writeObject(ObjectOutputStream out) throws IOException { + @Override + public State save() { + StateImpl.Builder builder = StateImpl.builder(serviceOptions, blob, requestOptions) + .position(position) + .isOpen(isOpen) + .endOfStream(endOfStream) + .chunkSize(chunkSize); if (buffer != null) { - position += bufferPos; - buffer = null; - bufferPos = 0; - endOfStream = false; + builder.position(position + bufferPos); + builder.endOfStream(false); } - out.defaultWriteObject(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - initTransients(); + return builder.build(); } private void initTransients() { @@ -148,4 +147,116 @@ public byte[] call() { } return toWrite; } + + static class StateImpl implements BlobReadChannel.State, Serializable { + + private static final long serialVersionUID = 3889420316004453706L; + + private final StorageOptions serviceOptions; + private final BlobId blob; + private final Map requestOptions; + private final int position; + private final boolean isOpen; + private final boolean endOfStream; + private final int chunkSize; + + StateImpl(Builder builder) { + this.serviceOptions = builder.serviceOptions; + this.blob = builder.blob; + this.requestOptions = builder.requestOptions; + this.position = builder.position; + this.isOpen = builder.isOpen; + this.endOfStream = builder.endOfStream; + this.chunkSize = builder.chunkSize; + } + + public static class Builder { + private final StorageOptions serviceOptions; + private final BlobId blob; + private final Map requestOptions; + private int position; + private boolean isOpen; + private boolean endOfStream; + private int chunkSize; + + private Builder(StorageOptions options, BlobId blob, Map reqOptions) { + this.serviceOptions = options; + this.blob = blob; + this.requestOptions = reqOptions; + } + + public Builder position(int position) { + this.position = position; + return this; + } + + public Builder isOpen(boolean isOpen) { + this.isOpen = isOpen; + return this; + } + + public Builder endOfStream(boolean endOfStream) { + this.endOfStream = endOfStream; + return this; + } + + public Builder chunkSize(int chunkSize) { + this.chunkSize = chunkSize; + return this; + } + + public State build() { + return new StateImpl(this); + } + } + + public static Builder builder( + StorageOptions options, BlobId blob, Map reqOptions) { + return new Builder(options, blob, reqOptions); + } + + @Override + public BlobReadChannel restore() { + BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions); + channel.position = position; + channel.isOpen = isOpen; + channel.endOfStream = endOfStream; + channel.chunkSize = chunkSize; + return channel; + } + + @Override + public int hashCode() { + return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream, + chunkSize); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof StateImpl)) { + return false; + } + final StateImpl other = (StateImpl) obj; + return Objects.equals(this.serviceOptions, other.serviceOptions) && + Objects.equals(this.blob, other.blob) && + Objects.equals(this.requestOptions, other.requestOptions) && + this.position == other.position && + this.isOpen == other.isOpen && + this.endOfStream == other.endOfStream && + this.chunkSize == other.chunkSize; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("blob", blob) + .add("position", position) + .add("isOpen", isOpen) + .add("endOfStream", endOfStream) + .toString(); + } + } } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java index 20b2ce087632..97ad591d2a74 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java @@ -17,7 +17,6 @@ package com.google.gcloud.storage; import java.io.Closeable; -import java.io.Serializable; import java.nio.channels.WritableByteChannel; /** @@ -27,11 +26,38 @@ * data will only be visible after calling {@link #close()}. This class is serializable, to allow * incremental writes. */ -public interface BlobWriteChannel extends WritableByteChannel, Serializable, Closeable { +public interface BlobWriteChannel extends WritableByteChannel, Closeable { /** * Sets the minimum size that will be written by a single RPC. * Written data will be buffered and only flushed upon reaching this size or closing the channel. */ void chunkSize(int chunkSize); + + /** + * Saves the write channel state. + * + * @return an object that contains the write channel state and can restore it afterwards. State + * object must implement {@link java.io.Serializable}. + */ + public State save(); + + /** + * A common interface for all classes that implement the internal state of a + * {@code BlobWriteChannel}. + * + * Implementations of this class must implement {@link java.io.Serializable} to ensure that the + * state of a channel can be correctly serialized. + */ + public interface State { + + /** + * Returns a {@code BlobWriteChannel} whose internal state reflects the one saved in the + * invocation object. + * + * The original {@code BlobWriteChannel} and the restored one should not both be used. Closing + * one channel causes the other channel to close, subsequent writes will fail. + */ + public BlobWriteChannel restore(); + } } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java index 8cb95a797cf6..7d01238b0322 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java @@ -20,15 +20,16 @@ import static java.util.concurrent.Executors.callable; import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.MoreObjects; import com.google.gcloud.RetryHelper; import com.google.gcloud.spi.StorageRpc; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; +import java.util.Objects; /** * Default implementation for BlobWriteChannel. @@ -48,8 +49,8 @@ class BlobWriteChannelImpl implements BlobWriteChannel { private boolean isOpen = true; private int chunkSize = DEFAULT_CHUNK_SIZE; - private transient StorageRpc storageRpc; - private transient StorageObject storageObject; + private StorageRpc storageRpc; + private StorageObject storageObject; BlobWriteChannelImpl(StorageOptions options, BlobInfo blobInfo, Map optionsMap) { @@ -59,11 +60,24 @@ class BlobWriteChannelImpl implements BlobWriteChannel { uploadId = storageRpc.open(storageObject, optionsMap); } - private void writeObject(ObjectOutputStream out) throws IOException { + BlobWriteChannelImpl(StorageOptions options, BlobInfo blobInfo, String uploadId) { + this.options = options; + this.blobInfo = blobInfo; + this.uploadId = uploadId; + initTransients(); + } + + @Override + public State save() { if (isOpen) { flush(true); } - out.defaultWriteObject(); + return StateImpl.builder(options, blobInfo, uploadId) + .position(position) + .buffer(buffer) + .limit(limit) + .isOpen(isOpen) + .chunkSize(chunkSize).build(); } private void flush(boolean compact) { @@ -87,13 +101,6 @@ public void run() { } } - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - if (isOpen) { - initTransients(); - } - } - private void initTransients() { storageRpc = options.storageRpc(); storageObject = blobInfo.toPb(); @@ -150,4 +157,125 @@ public void chunkSize(int chunkSize) { chunkSize = (chunkSize / MIN_CHUNK_SIZE) * MIN_CHUNK_SIZE; this.chunkSize = Math.max(MIN_CHUNK_SIZE, chunkSize); } + + static class StateImpl implements State, Serializable { + + private static final long serialVersionUID = 8541062465055125619L; + + private final StorageOptions serviceOptions; + private final BlobInfo blobInfo; + private final String uploadId; + private final int position; + private final byte[] buffer; + private final int limit; + private final boolean isOpen; + private final int chunkSize; + + StateImpl(Builder builder) { + this.serviceOptions = builder.serviceOptions; + this.blobInfo = builder.blobInfo; + this.uploadId = builder.uploadId; + this.position = builder.position; + this.buffer = builder.buffer; + this.limit = builder.limit; + this.isOpen = builder.isOpen; + this.chunkSize = builder.chunkSize; + } + + public static class Builder { + private final StorageOptions serviceOptions; + private final BlobInfo blobInfo; + private final String uploadId; + private int position; + private byte[] buffer; + private int limit; + private boolean isOpen; + private int chunkSize; + + private Builder(StorageOptions options, BlobInfo blobInfo, String uploadId) { + this.serviceOptions = options; + this.blobInfo = blobInfo; + this.uploadId = uploadId; + } + + public Builder position(int position) { + this.position = position; + return this; + } + + public Builder buffer(byte[] buffer) { + this.buffer = buffer.clone(); + return this; + } + + public Builder limit(int limit) { + this.limit = limit; + return this; + } + + public Builder isOpen(boolean isOpen) { + this.isOpen = isOpen; + return this; + } + + public Builder chunkSize(int chunkSize) { + this.chunkSize = chunkSize; + return this; + } + + public State build() { + return new StateImpl(this); + } + } + + public static Builder builder(StorageOptions options, BlobInfo blobInfo, String uploadId) { + return new Builder(options, blobInfo, uploadId); + } + + @Override + public BlobWriteChannel restore() { + BlobWriteChannelImpl channel = new BlobWriteChannelImpl(serviceOptions, blobInfo, uploadId); + channel.position = position; + channel.buffer = buffer.clone(); + channel.limit = limit; + channel.isOpen = isOpen; + channel.chunkSize = chunkSize; + return channel; + } + + @Override + public int hashCode() { + return Objects.hash(serviceOptions, blobInfo, uploadId, position, limit, isOpen, chunkSize, + Arrays.hashCode(buffer)); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof StateImpl)) { + return false; + } + final StateImpl other = (StateImpl) obj; + return Objects.equals(this.serviceOptions, other.serviceOptions) && + Objects.equals(this.blobInfo, other.blobInfo) && + Objects.equals(this.uploadId, other.uploadId) && + Objects.deepEquals(this.buffer, other.buffer) && + this.position == other.position && + this.limit == other.limit && + this.isOpen == other.isOpen && + this.chunkSize == other.chunkSize; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("blobInfo", blobInfo) + .add("uploadId", uploadId) + .add("position", position) + .add("isOpen", isOpen) + .toString(); + } + } } diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java index c5c9a0e48612..9e6e159fe90e 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java @@ -180,6 +180,46 @@ public void testReadClosed() { } } + @Test + public void testSaveAndRestore() throws IOException, ClassNotFoundException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()).times(2); + EasyMock.replay(optionsMock); + byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE); + byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE); + ByteBuffer firstReadBuffer = ByteBuffer.allocate(42); + ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + EasyMock + .expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) + .andReturn(firstResult); + EasyMock + .expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE)) + .andReturn(secondResult); + EasyMock.replay(storageRpcMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); + reader.read(firstReadBuffer); + BlobReadChannel.State readerState = reader.save(); + BlobReadChannel restoredReader = readerState.restore(); + restoredReader.read(secondReadBuffer); + assertArrayEquals(Arrays.copyOf(firstResult, firstReadBuffer.capacity()), + firstReadBuffer.array()); + assertArrayEquals(secondResult, secondReadBuffer.array()); + } + + @Test + public void testStateEquals() { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.replay(optionsMock); + EasyMock.replay(storageRpcMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); + BlobReadChannel secondReader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); + BlobReadChannel.State state = reader.save(); + BlobReadChannel.State secondState = secondReader.save(); + assertEquals(state, secondState); + assertEquals(state.hashCode(), secondState.hashCode()); + assertEquals(state.toString(), secondState.toString()); + } + private static byte[] randomByteArray(int size) { byte[] byteArray = new byte[size]; RANDOM.nextBytes(byteArray); diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java index 54135cc9990d..7e7d537fc119 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java @@ -27,7 +27,9 @@ import com.google.gcloud.spi.StorageRpc; import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; +import org.junit.After; import org.junit.Test; import org.junit.Before; @@ -36,7 +38,6 @@ import java.util.Arrays; import java.util.Map; import java.util.Random; -import org.junit.After; public class BlobWriteChannelImplTest { @@ -192,6 +193,48 @@ public void testWriteClosed() throws IOException { } } + @Test + public void testSaveAndRestore() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()).times(2); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(CaptureType.ALL); + Capture capturedPosition = Capture.newInstance(CaptureType.ALL); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.captureLong(capturedPosition), + EasyMock.eq(DEFAULT_CHUNK_SIZE), EasyMock.eq(false)); + EasyMock.expectLastCall().times(2); + EasyMock.replay(storageRpcMock); + ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE); + ByteBuffer buffer2 = randomBuffer(DEFAULT_CHUNK_SIZE); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1)); + assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0)); + assertEquals(new Long(0L), capturedPosition.getValues().get(0)); + BlobWriteChannel.State writerState = writer.save(); + BlobWriteChannel restoredWriter = writerState.restore(); + assertEquals(DEFAULT_CHUNK_SIZE, restoredWriter.write(buffer2)); + assertArrayEquals(buffer2.array(), capturedBuffer.getValues().get(1)); + assertEquals(new Long(DEFAULT_CHUNK_SIZE), capturedPosition.getValues().get(1)); + } + + @Test + public void testStateEquals() { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID) + .times(2); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + BlobWriteChannel writer2 = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + BlobWriteChannel.State state = writer.save(); + BlobWriteChannel.State state2 = writer2.save(); + assertEquals(state, state2); + assertEquals(state.hashCode(), state2.hashCode()); + assertEquals(state.toString(), state2.toString()); + } + private static ByteBuffer randomBuffer(int size) { byte[] byteArray = new byte[size]; RANDOM.nextBytes(byteArray); diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java index 2dd1b8cbf895..444d516e5530 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java @@ -19,8 +19,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; +import com.google.common.collect.ImmutableMap; import com.google.gcloud.AuthCredentials; import com.google.gcloud.RetryParams; +import com.google.gcloud.spi.StorageRpc; import com.google.gcloud.storage.Acl.Project.ProjectRole; import org.junit.Test; @@ -32,6 +34,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Collections; +import java.util.Map; public class SerializationTest { @@ -64,6 +67,7 @@ public class SerializationTest { Storage.BucketSourceOption.metagenerationMatch(1); private static final Storage.BucketTargetOption BUCKET_TARGET_OPTIONS = Storage.BucketTargetOption.metagenerationNotMatch(); + private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); @Test public void testServiceOptions() throws Exception { @@ -100,8 +104,40 @@ public void testModelAndRequests() throws Exception { } } + @Test + public void testReadChannelState() throws IOException, ClassNotFoundException { + StorageOptions options = StorageOptions.builder() + .projectId("p2") + .retryParams(RetryParams.getDefaultInstance()) + .authCredentials(AuthCredentials.noCredentials()) + .build(); + BlobReadChannel reader = + new BlobReadChannelImpl(options, BlobId.of("b", "n"), EMPTY_RPC_OPTIONS); + BlobReadChannel.State state = reader.save(); + BlobReadChannel.State deserializedState = serializeAndDeserialize(state); + assertEquals(state, deserializedState); + assertEquals(state.hashCode(), deserializedState.hashCode()); + assertEquals(state.toString(), deserializedState.toString()); + } + + @Test + public void testWriteChannelState() throws IOException, ClassNotFoundException { + StorageOptions options = StorageOptions.builder() + .projectId("p2") + .retryParams(RetryParams.getDefaultInstance()) + .authCredentials(AuthCredentials.noCredentials()) + .build(); + BlobWriteChannelImpl writer = new BlobWriteChannelImpl( + options, BlobInfo.builder(BlobId.of("b", "n")).build(), "upload-id"); + BlobWriteChannel.State state = writer.save(); + BlobWriteChannel.State deserializedState = serializeAndDeserialize(state); + assertEquals(state, deserializedState); + assertEquals(state.hashCode(), deserializedState.hashCode()); + assertEquals(state.toString(), deserializedState.toString()); + } + @SuppressWarnings("unchecked") - private T serializeAndDeserialize(T obj) + private T serializeAndDeserialize(T obj) throws IOException, ClassNotFoundException { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (ObjectOutputStream output = new ObjectOutputStream(bytes)) { From 33400dac0c853e281f5dbb9bac2a6aeecd6f5f0d Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Fri, 16 Oct 2015 15:11:20 +0200 Subject: [PATCH 2/7] Move State interface out of channels, move to core module and make it generic --- .../com/google/gcloud/RestorableState.java | 32 +++++++++++++++++++ .../gcloud/storage/BlobReadChannel.java | 24 +++----------- .../gcloud/storage/BlobReadChannelImpl.java | 19 +++++------ .../gcloud/storage/BlobWriteChannel.java | 31 +++++------------- .../gcloud/storage/BlobWriteChannelImpl.java | 22 ++++++------- .../storage/BlobReadChannelImplTest.java | 7 ++-- .../storage/BlobWriteChannelImplTest.java | 7 ++-- .../gcloud/storage/SerializationTest.java | 9 +++--- 8 files changed, 76 insertions(+), 75 deletions(-) create mode 100644 gcloud-java-core/src/main/java/com/google/gcloud/RestorableState.java diff --git a/gcloud-java-core/src/main/java/com/google/gcloud/RestorableState.java b/gcloud-java-core/src/main/java/com/google/gcloud/RestorableState.java new file mode 100644 index 000000000000..9cd3ee5c3c4c --- /dev/null +++ b/gcloud-java-core/src/main/java/com/google/gcloud/RestorableState.java @@ -0,0 +1,32 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.gcloud; + +/** + * A common interface for restorable states. Implementations of {@code RestorableState} are capable + * of saving the state of an object to restore it for later use. + * + * Implementations of this class must implement {@link java.io.Serializable} to ensure that the + * state of a the object can be correctly serialized. + */ +public interface RestorableState { + + /** + * Returns an object whose internal state reflects the one saved in the invocation object. + */ + T restore(); +} diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java index 88090a70fdb9..b004e3d61634 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java @@ -16,6 +16,8 @@ package com.google.gcloud.storage; +import com.google.gcloud.RestorableState; + import java.io.Closeable; import java.io.IOException; import java.nio.channels.ReadableByteChannel; @@ -48,24 +50,8 @@ public interface BlobReadChannel extends ReadableByteChannel, Closeable { /** * Saves the read channel state. * - * @return an object that contains the read channel state and can restore it afterwards. State - * object must implement {@link java.io.Serializable}. - */ - public State save(); - - /** - * A common interface for all classes that implement the internal state of a - * {@code BlobReadChannel}. - * - * Implementations of this class must implement {@link java.io.Serializable} to ensure that the - * state of a channel can be correctly serialized. + * @return a {@link RestorableState} object that contains the read channel state and can restore + * it afterwards. State object must implement {@link java.io.Serializable}. */ - public interface State { - - /** - * Returns a {@code BlobReadChannel} whose internal state reflects the one saved in the - * invocation object. - */ - public BlobReadChannel restore(); - } + public RestorableState save(); } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java index 8c94e84cd505..fb096d1175e7 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java @@ -20,6 +20,7 @@ import com.google.api.services.storage.model.StorageObject; import com.google.common.base.MoreObjects; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryHelper; import com.google.gcloud.spi.StorageRpc; @@ -45,8 +46,8 @@ class BlobReadChannelImpl implements BlobReadChannel { private boolean endOfStream; private int chunkSize = DEFAULT_CHUNK_SIZE; - private StorageRpc storageRpc; - private StorageObject storageObject; + private final StorageRpc storageRpc; + private final StorageObject storageObject; private int bufferPos; private byte[] buffer; @@ -56,11 +57,12 @@ class BlobReadChannelImpl implements BlobReadChannel { this.blob = blob; this.requestOptions = requestOptions; isOpen = true; - initTransients(); + storageRpc = serviceOptions.storageRpc(); + storageObject = blob.toPb(); } @Override - public State save() { + public RestorableState save() { StateImpl.Builder builder = StateImpl.builder(serviceOptions, blob, requestOptions) .position(position) .isOpen(isOpen) @@ -73,11 +75,6 @@ public State save() { return builder.build(); } - private void initTransients() { - storageRpc = serviceOptions.storageRpc(); - storageObject = blob.toPb(); - } - @Override public boolean isOpen() { return isOpen; @@ -148,7 +145,7 @@ public byte[] call() { return toWrite; } - static class StateImpl implements BlobReadChannel.State, Serializable { + static class StateImpl implements RestorableState, Serializable { private static final long serialVersionUID = 3889420316004453706L; @@ -205,7 +202,7 @@ public Builder chunkSize(int chunkSize) { return this; } - public State build() { + public RestorableState build() { return new StateImpl(this); } } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java index 97ad591d2a74..be3ef2293ec3 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java @@ -16,6 +16,8 @@ package com.google.gcloud.storage; +import com.google.gcloud.RestorableState; + import java.io.Closeable; import java.nio.channels.WritableByteChannel; @@ -35,29 +37,12 @@ public interface BlobWriteChannel extends WritableByteChannel, Closeable { void chunkSize(int chunkSize); /** - * Saves the write channel state. - * - * @return an object that contains the write channel state and can restore it afterwards. State - * object must implement {@link java.io.Serializable}. - */ - public State save(); - - /** - * A common interface for all classes that implement the internal state of a - * {@code BlobWriteChannel}. + * Saves the write channel state so that it can be restored afterwards. The original + * {@code BlobWriteChannel} and the restored one should not both be used. Closing one channel + * causes the other channel to close, subsequent writes will fail. * - * Implementations of this class must implement {@link java.io.Serializable} to ensure that the - * state of a channel can be correctly serialized. + * @return a {@link RestorableState} object that contains the write channel state and can restore + * it afterwards. State object must implement {@link java.io.Serializable}. */ - public interface State { - - /** - * Returns a {@code BlobWriteChannel} whose internal state reflects the one saved in the - * invocation object. - * - * The original {@code BlobWriteChannel} and the restored one should not both be used. Closing - * one channel causes the other channel to close, subsequent writes will fail. - */ - public BlobWriteChannel restore(); - } + public RestorableState save(); } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java index 7d01238b0322..0471087a5078 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java @@ -21,6 +21,7 @@ import com.google.api.services.storage.model.StorageObject; import com.google.common.base.MoreObjects; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryHelper; import com.google.gcloud.spi.StorageRpc; @@ -49,14 +50,15 @@ class BlobWriteChannelImpl implements BlobWriteChannel { private boolean isOpen = true; private int chunkSize = DEFAULT_CHUNK_SIZE; - private StorageRpc storageRpc; - private StorageObject storageObject; + private final StorageRpc storageRpc; + private final StorageObject storageObject; BlobWriteChannelImpl(StorageOptions options, BlobInfo blobInfo, Map optionsMap) { this.options = options; this.blobInfo = blobInfo; - initTransients(); + storageRpc = options.storageRpc(); + storageObject = blobInfo.toPb(); uploadId = storageRpc.open(storageObject, optionsMap); } @@ -64,11 +66,12 @@ class BlobWriteChannelImpl implements BlobWriteChannel { this.options = options; this.blobInfo = blobInfo; this.uploadId = uploadId; - initTransients(); + storageRpc = options.storageRpc(); + storageObject = blobInfo.toPb(); } @Override - public State save() { + public RestorableState save() { if (isOpen) { flush(true); } @@ -101,11 +104,6 @@ public void run() { } } - private void initTransients() { - storageRpc = options.storageRpc(); - storageObject = blobInfo.toPb(); - } - private void validateOpen() throws IOException { if (!isOpen) { throw new IOException("stream is closed"); @@ -158,7 +156,7 @@ public void chunkSize(int chunkSize) { this.chunkSize = Math.max(MIN_CHUNK_SIZE, chunkSize); } - static class StateImpl implements State, Serializable { + static class StateImpl implements RestorableState, Serializable { private static final long serialVersionUID = 8541062465055125619L; @@ -223,7 +221,7 @@ public Builder chunkSize(int chunkSize) { return this; } - public State build() { + public RestorableState build() { return new StateImpl(this); } } diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java index 9e6e159fe90e..e8ed915581b8 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import com.google.common.collect.ImmutableMap; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryParams; import com.google.gcloud.spi.StorageRpc; @@ -198,7 +199,7 @@ public void testSaveAndRestore() throws IOException, ClassNotFoundException { EasyMock.replay(storageRpcMock); reader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); reader.read(firstReadBuffer); - BlobReadChannel.State readerState = reader.save(); + RestorableState readerState = reader.save(); BlobReadChannel restoredReader = readerState.restore(); restoredReader.read(secondReadBuffer); assertArrayEquals(Arrays.copyOf(firstResult, firstReadBuffer.capacity()), @@ -213,8 +214,8 @@ public void testStateEquals() { EasyMock.replay(storageRpcMock); reader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); BlobReadChannel secondReader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); - BlobReadChannel.State state = reader.save(); - BlobReadChannel.State secondState = secondReader.save(); + RestorableState state = reader.save(); + RestorableState secondState = secondReader.save(); assertEquals(state, secondState); assertEquals(state.hashCode(), secondState.hashCode()); assertEquals(state.toString(), secondState.toString()); diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java index 7e7d537fc119..4943aa2f9d70 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import com.google.common.collect.ImmutableMap; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryParams; import com.google.gcloud.spi.StorageRpc; @@ -212,7 +213,7 @@ public void testSaveAndRestore() throws IOException { assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1)); assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0)); assertEquals(new Long(0L), capturedPosition.getValues().get(0)); - BlobWriteChannel.State writerState = writer.save(); + RestorableState writerState = writer.save(); BlobWriteChannel restoredWriter = writerState.restore(); assertEquals(DEFAULT_CHUNK_SIZE, restoredWriter.write(buffer2)); assertArrayEquals(buffer2.array(), capturedBuffer.getValues().get(1)); @@ -228,8 +229,8 @@ public void testStateEquals() { EasyMock.replay(storageRpcMock); writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); BlobWriteChannel writer2 = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); - BlobWriteChannel.State state = writer.save(); - BlobWriteChannel.State state2 = writer2.save(); + RestorableState state = writer.save(); + RestorableState state2 = writer2.save(); assertEquals(state, state2); assertEquals(state.hashCode(), state2.hashCode()); assertEquals(state.toString(), state2.toString()); diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java index 444d516e5530..edda4ed17e25 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.google.gcloud.AuthCredentials; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryParams; import com.google.gcloud.spi.StorageRpc; import com.google.gcloud.storage.Acl.Project.ProjectRole; @@ -113,8 +114,8 @@ public void testReadChannelState() throws IOException, ClassNotFoundException { .build(); BlobReadChannel reader = new BlobReadChannelImpl(options, BlobId.of("b", "n"), EMPTY_RPC_OPTIONS); - BlobReadChannel.State state = reader.save(); - BlobReadChannel.State deserializedState = serializeAndDeserialize(state); + RestorableState state = reader.save(); + RestorableState deserializedState = serializeAndDeserialize(state); assertEquals(state, deserializedState); assertEquals(state.hashCode(), deserializedState.hashCode()); assertEquals(state.toString(), deserializedState.toString()); @@ -129,8 +130,8 @@ public void testWriteChannelState() throws IOException, ClassNotFoundException { .build(); BlobWriteChannelImpl writer = new BlobWriteChannelImpl( options, BlobInfo.builder(BlobId.of("b", "n")).build(), "upload-id"); - BlobWriteChannel.State state = writer.save(); - BlobWriteChannel.State deserializedState = serializeAndDeserialize(state); + RestorableState state = writer.save(); + RestorableState deserializedState = serializeAndDeserialize(state); assertEquals(state, deserializedState); assertEquals(state.hashCode(), deserializedState.hashCode()); assertEquals(state.toString(), deserializedState.toString()); From ab56573274b17060376043d5768e179a80f5cd15 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Fri, 16 Oct 2015 15:19:55 +0200 Subject: [PATCH 3/7] Add channel write and restore integration tests --- .../google/gcloud/storage/ITStorageTest.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java index 775d81f28fa8..7469b0bb7fb8 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import com.google.common.collect.ImmutableList; +import com.google.gcloud.RestorableState; import com.google.gcloud.storage.testing.RemoteGcsHelper; import java.io.ByteArrayInputStream; @@ -35,7 +36,6 @@ import java.net.URLConnection; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Calendar; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; @@ -399,6 +399,35 @@ public void testReadAndWriteChannels() throws UnsupportedEncodingException, IOEx assertTrue(storage.delete(bucket, blobName)); } + @Test + public void testReadAndWriteSaveChannels() throws UnsupportedEncodingException, IOException { + String blobName = "test-read-and-write-save-channels-blob"; + BlobInfo blob = BlobInfo.builder(bucket, blobName).build(); + byte[] stringBytes; + BlobWriteChannel writer = storage.writer(blob); + stringBytes = BLOB_STRING_CONTENT.getBytes(UTF_8); + writer.write(ByteBuffer.wrap(BLOB_BYTE_CONTENT)); + RestorableState writerState = writer.save(); + BlobWriteChannel secondWriter = writerState.restore(); + secondWriter.write(ByteBuffer.wrap(stringBytes)); + secondWriter.close(); + ByteBuffer readBytes; + ByteBuffer readStringBytes; + BlobReadChannel reader = storage.reader(blob.blobId()); + reader.chunkSize(BLOB_BYTE_CONTENT.length); + readBytes = ByteBuffer.allocate(BLOB_BYTE_CONTENT.length); + reader.read(readBytes); + RestorableState readerState = reader.save(); + BlobReadChannel secondReader = readerState.restore(); + readStringBytes = ByteBuffer.allocate(stringBytes.length); + secondReader.read(readStringBytes); + reader.close(); + secondReader.close(); + assertArrayEquals(BLOB_BYTE_CONTENT, readBytes.array()); + assertEquals(BLOB_STRING_CONTENT, new String(readStringBytes.array(), UTF_8)); + assertTrue(storage.delete(bucket, blobName)); + } + @Test public void testReadChannelFail() throws UnsupportedEncodingException, IOException { String blobName = "test-read-channel-blob-fail"; From dcbe37fab40c070d58155fdaa8b8fee673a40582 Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 19 Oct 2015 12:30:03 +0200 Subject: [PATCH 4/7] Remove public modifiers from channels StateImpl Builder --- .../gcloud/storage/BlobReadChannelImpl.java | 14 +++++++------- .../gcloud/storage/BlobWriteChannelImpl.java | 16 ++++++++-------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java index fb096d1175e7..fc36d110e828 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java @@ -167,7 +167,7 @@ static class StateImpl implements RestorableState, Serializable this.chunkSize = builder.chunkSize; } - public static class Builder { + static class Builder { private final StorageOptions serviceOptions; private final BlobId blob; private final Map requestOptions; @@ -182,32 +182,32 @@ private Builder(StorageOptions options, BlobId blob, Map r this.requestOptions = reqOptions; } - public Builder position(int position) { + Builder position(int position) { this.position = position; return this; } - public Builder isOpen(boolean isOpen) { + Builder isOpen(boolean isOpen) { this.isOpen = isOpen; return this; } - public Builder endOfStream(boolean endOfStream) { + Builder endOfStream(boolean endOfStream) { this.endOfStream = endOfStream; return this; } - public Builder chunkSize(int chunkSize) { + Builder chunkSize(int chunkSize) { this.chunkSize = chunkSize; return this; } - public RestorableState build() { + RestorableState build() { return new StateImpl(this); } } - public static Builder builder( + static Builder builder( StorageOptions options, BlobId blob, Map reqOptions) { return new Builder(options, blob, reqOptions); } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java index 0471087a5078..052af6e29072 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java @@ -180,7 +180,7 @@ static class StateImpl implements RestorableState, Serializabl this.chunkSize = builder.chunkSize; } - public static class Builder { + static class Builder { private final StorageOptions serviceOptions; private final BlobInfo blobInfo; private final String uploadId; @@ -196,37 +196,37 @@ private Builder(StorageOptions options, BlobInfo blobInfo, String uploadId) { this.uploadId = uploadId; } - public Builder position(int position) { + Builder position(int position) { this.position = position; return this; } - public Builder buffer(byte[] buffer) { + Builder buffer(byte[] buffer) { this.buffer = buffer.clone(); return this; } - public Builder limit(int limit) { + Builder limit(int limit) { this.limit = limit; return this; } - public Builder isOpen(boolean isOpen) { + Builder isOpen(boolean isOpen) { this.isOpen = isOpen; return this; } - public Builder chunkSize(int chunkSize) { + Builder chunkSize(int chunkSize) { this.chunkSize = chunkSize; return this; } - public RestorableState build() { + RestorableState build() { return new StateImpl(this); } } - public static Builder builder(StorageOptions options, BlobInfo blobInfo, String uploadId) { + static Builder builder(StorageOptions options, BlobInfo blobInfo, String uploadId) { return new Builder(options, blobInfo, uploadId); } From 71604bba52031a8609b682903ada37943f158b1b Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 19 Oct 2015 12:52:33 +0200 Subject: [PATCH 5/7] Remove compact param from write channel flush, restore limit from array length --- .../gcloud/storage/BlobWriteChannelImpl.java | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java index 052af6e29072..b0954b8d0cf8 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java @@ -73,18 +73,17 @@ class BlobWriteChannelImpl implements BlobWriteChannel { @Override public RestorableState save() { if (isOpen) { - flush(true); + flush(); } return StateImpl.builder(options, blobInfo, uploadId) .position(position) - .buffer(buffer) - .limit(limit) + .buffer(Arrays.copyOf(buffer, limit)) .isOpen(isOpen) .chunkSize(chunkSize).build(); } - private void flush(boolean compact) { - if (limit >= chunkSize || compact && limit >= MIN_CHUNK_SIZE) { + private void flush() { + if (limit >= chunkSize) { final int length = limit - limit % MIN_CHUNK_SIZE; try { runWithRetries(callable(new Runnable() { @@ -98,7 +97,7 @@ public void run() { } position += length; limit -= length; - byte[] temp = new byte[compact ? limit : chunkSize]; + byte[] temp = new byte[chunkSize]; System.arraycopy(buffer, length, temp, 0, limit); buffer = temp; } @@ -122,7 +121,7 @@ public int write(ByteBuffer byteBuffer) throws IOException { byteBuffer.get(buffer, limit, toWrite); } limit += toWrite; - flush(false); + flush(); return toWrite; } @@ -175,7 +174,7 @@ static class StateImpl implements RestorableState, Serializabl this.uploadId = builder.uploadId; this.position = builder.position; this.buffer = builder.buffer; - this.limit = builder.limit; + this.limit = this.buffer.length; this.isOpen = builder.isOpen; this.chunkSize = builder.chunkSize; } @@ -202,12 +201,7 @@ Builder position(int position) { } Builder buffer(byte[] buffer) { - this.buffer = buffer.clone(); - return this; - } - - Builder limit(int limit) { - this.limit = limit; + this.buffer = buffer; return this; } From 2060526a1469fbbf6a492594ac3dc66d06aaa51c Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 19 Oct 2015 20:11:00 +0200 Subject: [PATCH 6/7] Fix wrapping, remove limit from write channel StateImpl --- .../gcloud/storage/BlobReadChannelImpl.java | 14 ++++----- .../gcloud/storage/BlobWriteChannelImpl.java | 29 +++++++++---------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java index fc36d110e828..7731d04837a6 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java @@ -237,13 +237,13 @@ public boolean equals(Object obj) { return false; } final StateImpl other = (StateImpl) obj; - return Objects.equals(this.serviceOptions, other.serviceOptions) && - Objects.equals(this.blob, other.blob) && - Objects.equals(this.requestOptions, other.requestOptions) && - this.position == other.position && - this.isOpen == other.isOpen && - this.endOfStream == other.endOfStream && - this.chunkSize == other.chunkSize; + return Objects.equals(this.serviceOptions, other.serviceOptions) + && Objects.equals(this.blob, other.blob) + && Objects.equals(this.requestOptions, other.requestOptions) + && this.position == other.position + && this.isOpen == other.isOpen + && this.endOfStream == other.endOfStream + && this.chunkSize == other.chunkSize; } @Override diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java index b0954b8d0cf8..517ed9911102 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java @@ -72,14 +72,17 @@ class BlobWriteChannelImpl implements BlobWriteChannel { @Override public RestorableState save() { + byte[] bufferToSave = null; if (isOpen) { flush(); + bufferToSave = Arrays.copyOf(buffer, limit); } return StateImpl.builder(options, blobInfo, uploadId) .position(position) - .buffer(Arrays.copyOf(buffer, limit)) + .buffer(bufferToSave) .isOpen(isOpen) - .chunkSize(chunkSize).build(); + .chunkSize(chunkSize) + .build(); } private void flush() { @@ -164,7 +167,6 @@ static class StateImpl implements RestorableState, Serializabl private final String uploadId; private final int position; private final byte[] buffer; - private final int limit; private final boolean isOpen; private final int chunkSize; @@ -174,7 +176,6 @@ static class StateImpl implements RestorableState, Serializabl this.uploadId = builder.uploadId; this.position = builder.position; this.buffer = builder.buffer; - this.limit = this.buffer.length; this.isOpen = builder.isOpen; this.chunkSize = builder.chunkSize; } @@ -185,7 +186,6 @@ static class Builder { private final String uploadId; private int position; private byte[] buffer; - private int limit; private boolean isOpen; private int chunkSize; @@ -229,7 +229,7 @@ public BlobWriteChannel restore() { BlobWriteChannelImpl channel = new BlobWriteChannelImpl(serviceOptions, blobInfo, uploadId); channel.position = position; channel.buffer = buffer.clone(); - channel.limit = limit; + channel.limit = buffer.length; channel.isOpen = isOpen; channel.chunkSize = chunkSize; return channel; @@ -237,7 +237,7 @@ public BlobWriteChannel restore() { @Override public int hashCode() { - return Objects.hash(serviceOptions, blobInfo, uploadId, position, limit, isOpen, chunkSize, + return Objects.hash(serviceOptions, blobInfo, uploadId, position, isOpen, chunkSize, Arrays.hashCode(buffer)); } @@ -250,14 +250,13 @@ public boolean equals(Object obj) { return false; } final StateImpl other = (StateImpl) obj; - return Objects.equals(this.serviceOptions, other.serviceOptions) && - Objects.equals(this.blobInfo, other.blobInfo) && - Objects.equals(this.uploadId, other.uploadId) && - Objects.deepEquals(this.buffer, other.buffer) && - this.position == other.position && - this.limit == other.limit && - this.isOpen == other.isOpen && - this.chunkSize == other.chunkSize; + return Objects.equals(this.serviceOptions, other.serviceOptions) + && Objects.equals(this.blobInfo, other.blobInfo) + && Objects.equals(this.uploadId, other.uploadId) + && Objects.deepEquals(this.buffer, other.buffer) + && this.position == other.position + && this.isOpen == other.isOpen + && this.chunkSize == other.chunkSize; } @Override From 638bd92113a30b4e1b330f941db1b02fda41039f Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Mon, 19 Oct 2015 22:35:16 +0200 Subject: [PATCH 7/7] Add test for save/restore closed BlobWriteChannelImpl --- .../gcloud/storage/BlobWriteChannelImpl.java | 6 +++-- .../storage/BlobWriteChannelImplTest.java | 26 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java index 517ed9911102..1c841d1dfc6a 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java @@ -227,9 +227,11 @@ static Builder builder(StorageOptions options, BlobInfo blobInfo, String uploadI @Override public BlobWriteChannel restore() { BlobWriteChannelImpl channel = new BlobWriteChannelImpl(serviceOptions, blobInfo, uploadId); + if (buffer != null) { + channel.buffer = buffer.clone(); + channel.limit = buffer.length; + } channel.position = position; - channel.buffer = buffer.clone(); - channel.limit = buffer.length; channel.isOpen = isOpen; channel.chunkSize = chunkSize; return channel; diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java index 4943aa2f9d70..ab3f7a000d90 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java @@ -220,6 +220,32 @@ public void testSaveAndRestore() throws IOException { assertEquals(new Long(DEFAULT_CHUNK_SIZE), capturedPosition.getValues().get(1)); } + @Test + public void testSaveAndRestoreClosed() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.eq(0L), EasyMock.eq(0), EasyMock.eq(true)); + EasyMock.expectLastCall(); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.close(); + RestorableState writerState = writer.save(); + RestorableState expectedWriterState = + BlobWriteChannelImpl.StateImpl.builder(optionsMock, BLOB_INFO, UPLOAD_ID) + .buffer(null) + .chunkSize(DEFAULT_CHUNK_SIZE) + .isOpen(false) + .position(0) + .build(); + BlobWriteChannel restoredWriter = writerState.restore(); + assertArrayEquals(new byte[0], capturedBuffer.getValue()); + assertEquals(expectedWriterState, restoredWriter.save()); + } + @Test public void testStateEquals() { EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2);