Skip to content

Commit

Permalink
[BEAM-13015, #21250] Optimize encoding to a ByteString (#22345)
Browse files Browse the repository at this point in the history
* [BEAM-13015, #21250] Optimize encoding to a ByteString

This leverages the fact that all encoding is done from a thread safe manner
allowing us to drop the syncrhonization that ByteString.Output adds and it
also optimizes the max chunk size based upon performance measurements and
the ratio for how full a byte[] should be for the final copy vs concatenate
decision.

Below are the results of several scenarios in which we compare the protobuf
vs new solution:
```
Benchmark                                                                                       Mode  Cnt         Score        Error  Units
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewLargeWrites               thrpt   25   1149267.797 ±  15366.677  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithReuse      thrpt   25    832816.697 ±   4236.341  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewMixedWritesWithoutReuse   thrpt   25    916629.194 ±   5669.323  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewSmallWrites               thrpt   25  14175167.566 ±  88540.030  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamFewTinyWrites                thrpt   25  22471597.238 ± 186098.311  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyLargeWrites              thrpt   25       610.218 ±      5.019  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithReuse     thrpt   25       484.413 ±     35.194  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyMixedWritesWithoutReuse  thrpt   25       559.983 ±      6.228  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManySmallWrites              thrpt   25     10969.839 ±     88.199  ops/s
ByteStringOutputStreamBenchmark.testProtobufByteStringOutputStreamManyTinyWrites               thrpt   25     40822.925 ±    191.402  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewLargeWrites                thrpt   25   1167673.532 ±   9747.507  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithReuse       thrpt   25   1576528.242 ±  15883.083  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewMixedWritesWithoutReuse    thrpt   25   1009766.655 ±   8700.273  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewSmallWrites                thrpt   25  33293140.679 ± 233693.771  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamFewTinyWrites                 thrpt   25  86841328.763 ± 729741.769  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyLargeWrites               thrpt   25      1058.150 ±     15.192  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithReuse      thrpt   25       937.249 ±      9.264  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyMixedWritesWithoutReuse   thrpt   25       959.671 ±     13.989  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManySmallWrites               thrpt   25     12601.065 ±     92.375  ops/s
ByteStringOutputStreamBenchmark.testSdkCoreByteStringOutputStreamManyTinyWrites                thrpt   25     65277.229 ±   3795.676  ops/s
```

The copy vs concatenate numbers come from these results which show that 256k seems to
be a pretty good chunk size since the larger chunks seem to cost more per byte to allocate.
They also show at what threshold should we currently copy the bytes vs concatenate a partially
full buffer and allocate a new one:
```
Benchmark                                                                                              newSize       copyVsNew   Mode  Cnt         Score        Error  Units
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        512/1024  thrpt   25  19744209.563 ± 148287.185  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        640/1024  thrpt   25  15738981.338 ± 103684.000  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        768/1024  thrpt   25  12778194.652 ± 202212.679  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A        896/1024  thrpt   25  11053602.109 ± 103120.446  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       4096/8192  thrpt   25   2961435.128 ±  25895.802  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       5120/8192  thrpt   25   2498594.030 ±  26051.674  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       6144/8192  thrpt   25   2173161.031 ±  20014.569  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A       7168/8192  thrpt   25   1917545.913 ±  21470.719  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     20480/65536  thrpt   25    537872.049 ±   5525.024  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     24576/65536  thrpt   25    371312.042 ±   4450.715  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     28672/65536  thrpt   25    306027.442 ±   2830.503  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A     32768/65536  thrpt   25    263933.096 ±   1833.603  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   131072/262144  thrpt   25     80224.558 ±   1192.994  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   163840/262144  thrpt   25     65311.283 ±    775.920  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   196608/262144  thrpt   25     54510.877 ±    797.775  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A   229376/262144  thrpt   25     46808.185 ±    515.039  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  524288/1048576  thrpt   25     17729.937 ±    301.199  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  655360/1048576  thrpt   25     12996.953 ±    228.552  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  786432/1048576  thrpt   25     11383.122 ±    384.086  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testCopyArray                                                    N/A  917504/1048576  thrpt   25      9915.318 ±    285.995  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                    1024             N/A  thrpt   25  10023631.563 ±  61317.055  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                    8192             N/A  thrpt   25   2109120.041 ±  17482.682  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                   65536             N/A  thrpt   25    318492.630 ±   3006.827  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                  262144             N/A  thrpt   25     79228.892 ±    725.230  ops/s
ByteStringOutputStreamBenchmark.NewVsCopy.testNewArray                                                 1048576             N/A  thrpt   25     13089.221 ±     73.535  ops/s
```

The difference is minor in the `ProcessBundleBenchmark` as there is not
enough data being passed around for it to make a major difference:
```
Before
Benchmark                                        Mode  Cnt      Score     Error  Units
ProcessBundleBenchmark.testLargeBundle          thrpt   25   1156.159 ±   9.001  ops/s
ProcessBundleBenchmark.testTinyBundle           thrpt   25  29641.444 ± 125.041  ops/s

After
Benchmark                                        Mode  Cnt      Score    Error  Units
ProcessBundleBenchmark.testLargeBundle          thrpt   25   1168.977 ± 25.848  ops/s
ProcessBundleBenchmark.testTinyBundle           thrpt   25  29664.783 ± 99.791  ops/s
```

* fixup comment and address analyzeClassDependencies failures
  • Loading branch information
lukecwik authored Jul 20, 2022
1 parent 9352093 commit 4821e03
Show file tree
Hide file tree
Showing 49 changed files with 967 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -93,7 +93,7 @@ private byte[] toStringPayloadBytes(String data) throws IOException {
.withFieldValue("data", data)
.build();

ByteString.Output outputStream = ByteString.newOutput();
ByteStringOutputStream outputStream = new ByteStringOutputStream();
try {
RowCoder.of(configRow.getSchema()).encode(configRow, outputStream);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.joda.time.Instant;

Expand All @@ -32,7 +33,7 @@ public class MonitoringInfoEncodings {

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}. */
public static ByteString encodeInt64Distribution(DistributionData data) {
ByteString.Output output = ByteString.newOutput();
ByteStringOutputStream output = new ByteStringOutputStream();
try {
VARINT_CODER.encode(data.count(), output);
VARINT_CODER.encode(data.sum(), output);
Expand Down Expand Up @@ -62,7 +63,7 @@ public static DistributionData decodeInt64Distribution(ByteString payload) {
// TODO(BEAM-4374): Implement decodeDoubleDistribution(...)
public static ByteString encodeDoubleDistribution(
long count, double sum, double min, double max) {
ByteString.Output output = ByteString.newOutput();
ByteStringOutputStream output = new ByteStringOutputStream();
try {
VARINT_CODER.encode(count, output);
DOUBLE_CODER.encode(sum, output);
Expand All @@ -76,7 +77,7 @@ public static ByteString encodeDoubleDistribution(

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#LATEST_INT64_TYPE}. */
public static ByteString encodeInt64Gauge(GaugeData data) {
ByteString.Output output = ByteString.newOutput();
ByteStringOutputStream output = new ByteStringOutputStream();
try {
VARINT_CODER.encode(data.timestamp().getMillis(), output);
VARINT_CODER.encode(data.value(), output);
Expand All @@ -99,7 +100,7 @@ public static GaugeData decodeInt64Gauge(ByteString payload) {

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#SUM_INT64_TYPE}. */
public static ByteString encodeInt64Counter(long value) {
ByteString.Output output = ByteString.newOutput();
ByteStringOutputStream output = new ByteStringOutputStream();
try {
VARINT_CODER.encode(value, output);
} catch (IOException e) {
Expand All @@ -119,7 +120,7 @@ public static long decodeInt64Counter(ByteString payload) {

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#SUM_DOUBLE_TYPE}. */
public static ByteString encodeDoubleCounter(double value) {
ByteString.Output output = ByteString.newOutput();
ByteStringOutputStream output = new ByteStringOutputStream();
try {
DOUBLE_CODER.encode(value, output);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
Expand Down Expand Up @@ -160,11 +161,11 @@ public long add(WindowedValue<T> data) throws IOException {
if (formatted.getAttributeMap() != null) {
pubsubMessageBuilder.putAllAttributes(formatted.getAttributeMap());
}
ByteString.Output output = ByteString.newOutput();
ByteStringOutputStream output = new ByteStringOutputStream();
pubsubMessageBuilder.build().writeTo(output);
byteString = output.toByteString();
} else {
ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
coder.encode(data.getValue(), stream, Coder.Context.OUTER);
byteString = stream.toByteString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
import org.apache.beam.sdk.transforms.Materializations.MultimapView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
Expand Down Expand Up @@ -124,7 +124,7 @@ public enum SideInputState {

Coder<SideWindowT> windowCoder = sideWindowStrategy.getWindowFn().windowCoder();

ByteString.Output windowStream = ByteString.newOutput();
ByteStringOutputStream windowStream = new ByteStringOutputStream();
windowCoder.encode(sideWindow, windowStream, Coder.Context.OUTER);

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -429,7 +430,7 @@ public Map<Long, Runnable> flushState() {
((UnboundedSource<?, UnboundedSource.CheckpointMark>) activeReader.getCurrentSource())
.getCheckpointMarkCoder();
if (checkpointCoder != null) {
ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
try {
checkpointCoder.encode(checkpointMark, stream, Coder.Context.OUTER);
} catch (IOException e) {
Expand Down Expand Up @@ -738,10 +739,10 @@ public <T, W extends BoundedWindow> void writePCollectionViewData(
throw new IllegalStateException("writePCollectionViewData must follow a Combine.globally");
}

ByteString.Output dataStream = ByteString.newOutput();
ByteStringOutputStream dataStream = new ByteStringOutputStream();
dataCoder.encode(data, dataStream, Coder.Context.OUTER);

ByteString.Output windowStream = ByteString.newOutput();
ByteStringOutputStream windowStream = new ByteStringOutputStream();
windowCoder.encode(window, windowStream, Coder.Context.OUTER);

ensureStateful("Tried to write view data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Parser;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -305,7 +305,7 @@ private <SideWindowT extends BoundedWindow> Windmill.GlobalDataRequest buildGlob
SideWindowT sideInputWindow =
(SideWindowT) view.getWindowMappingFn().getSideInputWindow(mainWindow);

ByteString.Output windowStream = ByteString.newOutput();
ByteStringOutputStream windowStream = new ByteStringOutputStream();
try {
sideInputWindowCoder.encode(sideInputWindow, windowStream, Coder.Context.OUTER);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -69,7 +70,7 @@ public static ByteString encodeMetadata(
Collection<? extends BoundedWindow> windows,
PaneInfo pane)
throws IOException {
ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
PaneInfoCoder.INSTANCE.encode(pane, stream);
windowsCoder.encode(windows, stream, Coder.Context.OUTER);
return stream.toByteString();
Expand Down Expand Up @@ -135,7 +136,7 @@ private WindmillStreamWriter(String destinationName) {
}

private <EncodeT> ByteString encode(Coder<EncodeT> coder, EncodeT object) throws IOException {
ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
coder.encode(object, stream, Coder.Context.OUTER);
return stream.toByteString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.sdk.values.TimestampedValue;
Expand Down Expand Up @@ -351,7 +352,7 @@ static ByteString encodeKey(StateNamespace namespace, StateTag<?> address) {
try {
// Use ByteString.Output rather than concatenation and String.format. We build these keys
// a lot, and this leads to better performance results. See associated benchmarks.
ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(stream, StandardCharsets.UTF_8);

// stringKey starts and ends with a slash. We separate it from the
Expand Down Expand Up @@ -522,7 +523,7 @@ protected WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyAndFami

ByteString encoded = null;
if (cachedSize == -1 || modified) {
ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
if (value != null) {
coder.encode(value, stream, Coder.Context.OUTER);
}
Expand Down Expand Up @@ -1047,7 +1048,7 @@ public WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyAndFamily
pendingAdds,
(elem, id) -> {
try {
ByteString.Output elementStream = ByteString.newOutput();
ByteStringOutputStream elementStream = new ByteStringOutputStream();
elemCoder.encode(elem.getValue(), elementStream, Context.OUTER);
insertBuilder.addEntries(
SortedListEntry.newBuilder()
Expand Down Expand Up @@ -1249,7 +1250,7 @@ private K userKeyFromProtoKey(ByteString tag) throws IOException {
}

private ByteString protoKeyFromUserKey(K key) throws IOException {
ByteString.Output keyStream = ByteString.newOutput();
ByteStringOutputStream keyStream = new ByteStringOutputStream();
stateKeyPrefix.writeTo(keyStream);
keyCoder.encode(key, keyStream, Context.OUTER);
return keyStream.toByteString();
Expand All @@ -1275,7 +1276,7 @@ protected WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyAndFami

for (K key : localAdditions) {
ByteString keyBytes = protoKeyFromUserKey(key);
ByteString.Output valueStream = ByteString.newOutput();
ByteStringOutputStream valueStream = new ByteStringOutputStream();
valueCoder.encode(cachedValues.get(key), valueStream, Context.OUTER);
ByteString valueBytes = valueStream.toByteString();

Expand All @@ -1290,7 +1291,7 @@ protected WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyAndFami
localAdditions.clear();

for (K key : localRemovals) {
ByteString.Output keyStream = ByteString.newOutput();
ByteStringOutputStream keyStream = new ByteStringOutputStream();
stateKeyPrefix.writeTo(keyStream);
keyCoder.encode(key, keyStream, Context.OUTER);
ByteString keyBytes = keyStream.toByteString();
Expand All @@ -1304,7 +1305,7 @@ protected WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyAndFami

V cachedValue = cachedValues.remove(key);
if (cachedValue != null) {
ByteString.Output valueStream = ByteString.newOutput();
ByteStringOutputStream valueStream = new ByteStringOutputStream();
valueCoder.encode(cachedValues.get(key), valueStream, Context.OUTER);
}
}
Expand Down Expand Up @@ -1555,7 +1556,7 @@ public void clear() {

private Future<V> getFutureForKey(K key) {
try {
ByteString.Output keyStream = ByteString.newOutput();
ByteStringOutputStream keyStream = new ByteStringOutputStream();
stateKeyPrefix.writeTo(keyStream);
keyCoder.encode(key, keyStream, Context.OUTER);
return reader.valueFuture(keyStream.toByteString(), stateFamily, valueCoder);
Expand Down Expand Up @@ -1703,7 +1704,7 @@ public WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyAndFamily
bagUpdatesBuilder = commitBuilder.addBagUpdatesBuilder();
}
for (T value : localAdditions) {
ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
// Encode the value
elemCoder.encode(value, stream, Coder.Context.OUTER);
ByteString encoded = stream.toByteString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -668,7 +669,7 @@ private ByteString concat(Iterable<ByteString> values) {
}

static ByteString encodeAndConcat(Iterable<Object> values, Coder valueCoder) throws IOException {
ByteString.Output out = ByteString.newOutput();
ByteStringOutputStream out = new ByteStringOutputStream();
if (values != null) {
for (Object value : values) {
int size = out.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down Expand Up @@ -216,7 +217,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {

String coderId = "generatedCoder" + idGenerator.getId();
String windowingStrategyId;
try (ByteString.Output output = ByteString.newOutput()) {
try (ByteStringOutputStream output = new ByteStringOutputStream()) {
try {
Coder<?> javaCoder =
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(instructionOutput.getCodec()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
Expand Down Expand Up @@ -234,7 +235,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {

String coderId = "generatedCoder" + idGenerator.getId();
instructionOutputNodeToCoderIdBuilder.put(node, coderId);
try (ByteString.Output output = ByteString.newOutput()) {
try (ByteStringOutputStream output = new ByteStringOutputStream()) {
try {
Coder<?> javaCoder =
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(instructionOutput.getCodec()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
Expand Down Expand Up @@ -73,7 +74,7 @@ public void setUp() {
public void testFetchGlobalDataBasic() throws Exception {
StateFetcher fetcher = new StateFetcher(server);

ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
ListCoder.of(StringUtf8Coder.of()).encode(Arrays.asList("data"), stream, Coder.Context.OUTER);
ByteString encodedIterable = stream.toByteString();

Expand Down Expand Up @@ -126,7 +127,7 @@ public void testFetchGlobalDataBasic() throws Exception {
public void testFetchGlobalDataNull() throws Exception {
StateFetcher fetcher = new StateFetcher(server);

ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
ListCoder.of(VoidCoder.of()).encode(Arrays.asList((Void) null), stream, Coder.Context.OUTER);
ByteString encodedIterable = stream.toByteString();

Expand Down Expand Up @@ -179,10 +180,9 @@ public void testFetchGlobalDataNull() throws Exception {
public void testFetchGlobalDataCacheOverflow() throws Exception {
Coder<List<String>> coder = ListCoder.of(StringUtf8Coder.of());

ByteString.Output stream = ByteString.newOutput();
ByteStringOutputStream stream = new ByteStringOutputStream();
coder.encode(Arrays.asList("data1"), stream, Coder.Context.OUTER);
ByteString encodedIterable1 = stream.toByteString();
stream = ByteString.newOutput();
ByteString encodedIterable1 = stream.toByteStringAndReset();
coder.encode(Arrays.asList("data2"), stream, Coder.Context.OUTER);
ByteString encodedIterable2 = stream.toByteString();

Expand Down
Loading

0 comments on commit 4821e03

Please sign in to comment.