From 97144494846457b776d2419851514bd8ad8169f0 Mon Sep 17 00:00:00 2001 From: Ben Sidhom Date: Tue, 13 Mar 2018 16:34:16 -0700 Subject: [PATCH] Only access output coder if available --- .../FlinkExecutableStageFunction.java | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java index 4dd47a82e66c..e30ff04bd74a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java @@ -111,29 +111,30 @@ public void mapPartition(Iterable> input, Map>> outputCoders = processBundleDescriptor.getOutputTargetCoders(); BeamFnApi.Target outputTarget = null; + SdkHarnessClient.RemoteOutputReceiver> mainOutputReceiver = null; if (outputCoders.size() > 0) { outputTarget = Iterables.getOnlyElement(outputCoders.keySet()); - } - Coder outputCoder = Iterables.getOnlyElement(outputCoders.values()); - SdkHarnessClient.RemoteOutputReceiver> mainOutputReceiver = - new SdkHarnessClient.RemoteOutputReceiver>() { - @Override - public Coder> getCoder() { - @SuppressWarnings("unchecked") - Coder> result = (Coder>) outputCoder; - return result; - } + Coder outputCoder = Iterables.getOnlyElement(outputCoders.values()); + mainOutputReceiver = + new SdkHarnessClient.RemoteOutputReceiver>() { + @Override + public Coder> getCoder() { + @SuppressWarnings("unchecked") + Coder> result = (Coder>) outputCoder; + return result; + } - @Override - public FnDataReceiver> getReceiver() { - return new FnDataReceiver>() { - @Override - public void accept(WindowedValue input) throws Exception { - collector.collect(input); - } - }; - } - }; + @Override + public FnDataReceiver> getReceiver() { + return new FnDataReceiver>() { + @Override + public void accept(WindowedValue input) throws Exception { + collector.collect(input); + } + }; + } + }; + } Map> receiverMap; if (outputTarget == null) {