Skip to content

Commit

Permalink
Merge pull request apache#12 from bsidhom/output-receiver-empty
Browse files Browse the repository at this point in the history
Only access output coder if available
  • Loading branch information
axelmagn authored Mar 13, 2018
2 parents 8a2d352 + 9714449 commit 6bbd8a3
Showing 1 changed file with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,29 +111,30 @@ public void mapPartition(Iterable<WindowedValue<InputT>> input,
Map<BeamFnApi.Target, Coder<WindowedValue<?>>> outputCoders =
processBundleDescriptor.getOutputTargetCoders();
BeamFnApi.Target outputTarget = null;
SdkHarnessClient.RemoteOutputReceiver<WindowedValue<OutputT>> mainOutputReceiver = null;
if (outputCoders.size() > 0) {
outputTarget = Iterables.getOnlyElement(outputCoders.keySet());
}
Coder<?> outputCoder = Iterables.getOnlyElement(outputCoders.values());
SdkHarnessClient.RemoteOutputReceiver<WindowedValue<OutputT>> mainOutputReceiver =
new SdkHarnessClient.RemoteOutputReceiver<WindowedValue<OutputT>>() {
@Override
public Coder<WindowedValue<OutputT>> getCoder() {
@SuppressWarnings("unchecked")
Coder<WindowedValue<OutputT>> result = (Coder<WindowedValue<OutputT>>) outputCoder;
return result;
}
Coder<?> outputCoder = Iterables.getOnlyElement(outputCoders.values());
mainOutputReceiver =
new SdkHarnessClient.RemoteOutputReceiver<WindowedValue<OutputT>>() {
@Override
public Coder<WindowedValue<OutputT>> getCoder() {
@SuppressWarnings("unchecked")
Coder<WindowedValue<OutputT>> result = (Coder<WindowedValue<OutputT>>) outputCoder;
return result;
}

@Override
public FnDataReceiver<WindowedValue<OutputT>> getReceiver() {
return new FnDataReceiver<WindowedValue<OutputT>>() {
@Override
public void accept(WindowedValue<OutputT> input) throws Exception {
collector.collect(input);
}
};
}
};
@Override
public FnDataReceiver<WindowedValue<OutputT>> getReceiver() {
return new FnDataReceiver<WindowedValue<OutputT>>() {
@Override
public void accept(WindowedValue<OutputT> input) throws Exception {
collector.collect(input);
}
};
}
};
}
Map<BeamFnApi.Target,
SdkHarnessClient.RemoteOutputReceiver<?>> receiverMap;
if (outputTarget == null) {
Expand Down

0 comments on commit 6bbd8a3

Please sign in to comment.