Skip to content

Commit

Permalink
Merge pull request #261 from mziccard/remove-serializable-channels
Browse files Browse the repository at this point in the history
Remove Serializable from BlobReadChannel and BlobWriteChannel
  • Loading branch information
aozarov committed Oct 19, 2015
2 parents 03dbc8c + 638bd92 commit 30992ae
Show file tree
Hide file tree
Showing 9 changed files with 512 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.gcloud;

/**
* A common interface for restorable states. Implementations of {@code RestorableState} are capable
* of saving the state of an object to restore it for later use.
*
* Implementations of this class must implement {@link java.io.Serializable} to ensure that the
* state of a the object can be correctly serialized.
*/
public interface RestorableState<T> {

/**
* Returns an object whose internal state reflects the one saved in the invocation object.
*/
T restore();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.google.gcloud.storage;

import com.google.gcloud.RestorableState;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.ReadableByteChannel;

/**
Expand All @@ -28,7 +29,7 @@
*
* This class is @{link Serializable}, which allows incremental reads.
*/
public interface BlobReadChannel extends ReadableByteChannel, Serializable, Closeable {
public interface BlobReadChannel extends ReadableByteChannel, Closeable {

/**
* Overridden to remove IOException.
Expand All @@ -46,4 +47,11 @@ public interface BlobReadChannel extends ReadableByteChannel, Serializable, Clos
*/
void chunkSize(int chunkSize);

/**
* Saves the read channel state.
*
* @return a {@link RestorableState} object that contains the read channel state and can restore
* it afterwards. State object must implement {@link java.io.Serializable}.
*/
public RestorableState<BlobReadChannel> save();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import static com.google.gcloud.RetryHelper.runWithRetries;

import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.MoreObjects;
import com.google.gcloud.RestorableState;
import com.google.gcloud.RetryHelper;
import com.google.gcloud.spi.StorageRpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;

/**
Expand All @@ -35,7 +37,6 @@
class BlobReadChannelImpl implements BlobReadChannel {

private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private static final long serialVersionUID = 4821762590742862669L;

private final StorageOptions serviceOptions;
private final BlobId blob;
Expand All @@ -45,38 +46,33 @@ class BlobReadChannelImpl implements BlobReadChannel {
private boolean endOfStream;
private int chunkSize = DEFAULT_CHUNK_SIZE;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;
private transient int bufferPos;
private transient byte[] buffer;
private final StorageRpc storageRpc;
private final StorageObject storageObject;
private int bufferPos;
private byte[] buffer;

BlobReadChannelImpl(StorageOptions serviceOptions, BlobId blob,
Map<StorageRpc.Option, ?> requestOptions) {
this.serviceOptions = serviceOptions;
this.blob = blob;
this.requestOptions = requestOptions;
isOpen = true;
initTransients();
storageRpc = serviceOptions.storageRpc();
storageObject = blob.toPb();
}

private void writeObject(ObjectOutputStream out) throws IOException {
@Override
public RestorableState<BlobReadChannel> save() {
StateImpl.Builder builder = StateImpl.builder(serviceOptions, blob, requestOptions)
.position(position)
.isOpen(isOpen)
.endOfStream(endOfStream)
.chunkSize(chunkSize);
if (buffer != null) {
position += bufferPos;
buffer = null;
bufferPos = 0;
endOfStream = false;
builder.position(position + bufferPos);
builder.endOfStream(false);
}
out.defaultWriteObject();
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
initTransients();
}

private void initTransients() {
storageRpc = serviceOptions.storageRpc();
storageObject = blob.toPb();
return builder.build();
}

@Override
Expand Down Expand Up @@ -148,4 +144,116 @@ public byte[] call() {
}
return toWrite;
}

static class StateImpl implements RestorableState<BlobReadChannel>, Serializable {

private static final long serialVersionUID = 3889420316004453706L;

private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private final int position;
private final boolean isOpen;
private final boolean endOfStream;
private final int chunkSize;

StateImpl(Builder builder) {
this.serviceOptions = builder.serviceOptions;
this.blob = builder.blob;
this.requestOptions = builder.requestOptions;
this.position = builder.position;
this.isOpen = builder.isOpen;
this.endOfStream = builder.endOfStream;
this.chunkSize = builder.chunkSize;
}

static class Builder {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private int position;
private boolean isOpen;
private boolean endOfStream;
private int chunkSize;

private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
this.serviceOptions = options;
this.blob = blob;
this.requestOptions = reqOptions;
}

Builder position(int position) {
this.position = position;
return this;
}

Builder isOpen(boolean isOpen) {
this.isOpen = isOpen;
return this;
}

Builder endOfStream(boolean endOfStream) {
this.endOfStream = endOfStream;
return this;
}

Builder chunkSize(int chunkSize) {
this.chunkSize = chunkSize;
return this;
}

RestorableState<BlobReadChannel> build() {
return new StateImpl(this);
}
}

static Builder builder(
StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
return new Builder(options, blob, reqOptions);
}

@Override
public BlobReadChannel restore() {
BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions);
channel.position = position;
channel.isOpen = isOpen;
channel.endOfStream = endOfStream;
channel.chunkSize = chunkSize;
return channel;
}

@Override
public int hashCode() {
return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream,
chunkSize);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof StateImpl)) {
return false;
}
final StateImpl other = (StateImpl) obj;
return Objects.equals(this.serviceOptions, other.serviceOptions)
&& Objects.equals(this.blob, other.blob)
&& Objects.equals(this.requestOptions, other.requestOptions)
&& this.position == other.position
&& this.isOpen == other.isOpen
&& this.endOfStream == other.endOfStream
&& this.chunkSize == other.chunkSize;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("blob", blob)
.add("position", position)
.add("isOpen", isOpen)
.add("endOfStream", endOfStream)
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package com.google.gcloud.storage;

import com.google.gcloud.RestorableState;

import java.io.Closeable;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;

/**
Expand All @@ -27,11 +28,21 @@
* data will only be visible after calling {@link #close()}. This class is serializable, to allow
* incremental writes.
*/
public interface BlobWriteChannel extends WritableByteChannel, Serializable, Closeable {
public interface BlobWriteChannel extends WritableByteChannel, Closeable {

/**
* Sets the minimum size that will be written by a single RPC.
* Written data will be buffered and only flushed upon reaching this size or closing the channel.
*/
void chunkSize(int chunkSize);

/**
* Saves the write channel state so that it can be restored afterwards. The original
* {@code BlobWriteChannel} and the restored one should not both be used. Closing one channel
* causes the other channel to close, subsequent writes will fail.
*
* @return a {@link RestorableState} object that contains the write channel state and can restore
* it afterwards. State object must implement {@link java.io.Serializable}.
*/
public RestorableState<BlobWriteChannel> save();
}
Loading

0 comments on commit 30992ae

Please sign in to comment.