Skip to content

Commit

Permalink
Add Error Handling to Kafka IO (#29546)
Browse files Browse the repository at this point in the history
* Update 2.50 release notes to include new Kafka topicPattern feature

* Create groovy class for io performance tests
Create gradle task and github actions config for GCS using this.

* delete unnecessary class

* fix env call

* fix call to gradle

* run on hosted runner for testing

* add additional checkout

* add destination for triggered tests

* move env variables to correct location

* try uploading against separate dataset

* try without a user

* update branch checkout, try to view the failure log

* run on failure

* update to use correct BigQuery instance

* convert to matrix

* add result reporting

* add failure clause

* remove failure clause, update to run on self-hosted

* address comments, clean up build

* clarify branching

* Add error handling base implementation & test DLQ enabled class

* Add test cases

* apply spotless

* Fix Checkstyles

* Fix Checkstyles

* make DLH serializable

* rename dead letter to bad record

* make DLH serializable

* Change bad record router name, and use multioutputreceiver instead of process context

* Refactor BadRecord to be nested

* clean up checkstyle

* Update error handler test

* Add metrics for counting error records, and for measuring feature usage

* apply spotless

* fix checkstyle

* make metric reporting static

* spotless

* Rework annotations to be an explicit label on a PTransform, instead of using java annotations

* fix checkstyle

* Address comments

* Address comments

* Fix test cases, spotless

* remove flatting without error collections

* fix nullness

* spotless + encoding issues

* spotless

* throw error when error handler isn't used

* add concrete bad record error handler class

* spotless, fix test category

* fix checkstyle

* clean up comments

* fix test case

* initial wiring of error handler into KafkaIO Read

* remove "failing transform" field on bad record, add note to CHANGES.md

* fix failing test cases

* fix failing test cases

* apply spotless

* Add tests

* Add tests

* fix test case

* add documentation

* wire error handler into kafka write

* fix failing test case

* Add tests for writing to kafka with exception handling

* fix sdf testing

* fix sdf testing

* spotless

* deflake tests

* add error handling to kafka streaming example

update error handler to be serializable to support using it as a member of an auto-value based PTransform

* apply final comments

* apply final comments

* apply final comments

* add line to CHANGES.md

* fix spotless

* fix checkstyle

* make sink transform static for serialization

* spotless

* fix typo

* fix typo

* fix spotbugs
  • Loading branch information
johnjcasey authored Dec 7, 2023
1 parent ee5e782 commit 5d11c20
Show file tree
Hide file tree
Showing 22 changed files with 659 additions and 152 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).
* Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676))
* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
* Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KafkaTestUtilities {
'"keySizeBytes": "10",' +
'"valueSizeBytes": "90"' +
'}',
"--readTimeout=120",
"--readTimeout=60",
"--kafkaTopic=beam",
"--withTestcontainers=true",
"--kafkaContainerVersion=5.5.2",
Expand All @@ -56,6 +56,7 @@ class KafkaTestUtilities {
excludeTestsMatching "*SDFResumesCorrectly" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*StopReadingFunction" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*WatermarkUpdateWithSparseMessages" //Kafka SDF does not work for kafka versions <2.0.1
excludeTestsMatching "*KafkaIOSDFReadWithErrorHandler"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.BadRecordErrorHandler;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
Expand All @@ -60,6 +63,8 @@
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand Down Expand Up @@ -97,7 +102,7 @@ public interface KafkaStreamingOptions extends PipelineOptions {
* to use your own Kafka server.
*/
@Description("Kafka server host")
@Default.String("kafka_server:9092")
@Default.String("localhost:9092")
String getKafkaHost();

void setKafkaHost(String value);
Expand Down Expand Up @@ -208,15 +213,22 @@ public void run() {
// Start reading form Kafka with the latest offset
consumerConfig.put("auto.offset.reset", "latest");

PCollection<KV<String, Integer>> pCollection =
pipeline.apply(
KafkaIO.<String, Integer>read()
.withBootstrapServers(options.getKafkaHost())
.withTopic(TOPIC_NAME)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(IntegerDeserializer.class)
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata());
// Register an error handler for any deserialization errors.
// Errors are simulated with an intentionally failing deserializer
PCollection<KV<String, Integer>> pCollection;
try (BadRecordErrorHandler<PCollection<BadRecord>> errorHandler =
pipeline.registerBadRecordErrorHandler(new LogErrors())) {
pCollection =
pipeline.apply(
KafkaIO.<String, Integer>read()
.withBootstrapServers(options.getKafkaHost())
.withTopic(TOPIC_NAME)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(IntermittentlyFailingIntegerDeserializer.class)
.withConsumerConfigUpdates(consumerConfig)
.withBadRecordErrorHandler(errorHandler)
.withoutMetadata());
}

pCollection
// Apply a window and a trigger ourput repeatedly.
Expand Down Expand Up @@ -317,4 +329,39 @@ public void processElement(ProcessContext c, IntervalWindow w) throws Exception
c.output(c.element());
}
}

// Simple PTransform to log Error information
static class LogErrors extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {

@Override
public PCollection<BadRecord> expand(PCollection<BadRecord> input) {
return input.apply("Log Errors", ParDo.of(new LogErrorFn()));
}

static class LogErrorFn extends DoFn<BadRecord, BadRecord> {
@ProcessElement
public void processElement(@Element BadRecord record, OutputReceiver<BadRecord> receiver) {
System.out.println(record);
receiver.output(record);
}
}
}

// Intentionally failing deserializer to simulate bad data from Kafka
public static class IntermittentlyFailingIntegerDeserializer implements Deserializer<Integer> {

public static final IntegerDeserializer INTEGER_DESERIALIZER = new IntegerDeserializer();
public int deserializeCount = 0;

public IntermittentlyFailingIntegerDeserializer() {}

@Override
public Integer deserialize(String topic, byte[] data) {
deserializeCount++;
if (deserializeCount % 10 == 0) {
throw new SerializationException("Expected Serialization Exception");
}
return INTEGER_DESERIALIZER.deserialize(topic, data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.transforms.errorhandling;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -49,22 +52,24 @@
* <p>Simple usage with one DLQ
* <pre>{@code
* PCollection<?> records = ...;
* try (ErrorHandler<E,T> errorHandler = pipeline.registerErrorHandler(SomeSink.write())) {
* PCollection<?> results = records.apply(SomeIO.write().withDeadLetterQueue(errorHandler));
* try (BadRecordErrorHandler<T> errorHandler = pipeline.registerBadRecordErrorHandler(SomeSink.write())) {
* PCollection<?> results = records.apply(SomeIO.write().withErrorHandler(errorHandler));
* }
* results.apply(SomeOtherTransform);
* }</pre>
* Usage with multiple DLQ stages
* <pre>{@code
* PCollection<?> records = ...;
* try (ErrorHandler<E,T> errorHandler = pipeline.registerErrorHandler(SomeSink.write())) {
* PCollection<?> results = records.apply(SomeIO.write().withDeadLetterQueue(errorHandler))
* .apply(OtherTransform.builder().withDeadLetterQueue(errorHandler));
* try (BadRecordErrorHandler<T> errorHandler = pipeline.registerBadRecordErrorHandler(SomeSink.write())) {
* PCollection<?> results = records.apply(SomeIO.write().withErrorHandler(errorHandler))
* .apply(OtherTransform.builder().withErrorHandler(errorHandler));
* }
* results.apply(SomeOtherTransform);
* }</pre>
* This is marked as serializable despite never being needed on the runner, to enable it to be a
* parameter of an Autovalue configured PTransform.
*/
public interface ErrorHandler<ErrorT, OutputT extends POutput> extends AutoCloseable {
public interface ErrorHandler<ErrorT, OutputT extends POutput> extends AutoCloseable, Serializable {

void addErrorCollection(PCollection<ErrorT> errorCollection);

Expand All @@ -79,13 +84,16 @@ class PTransformErrorHandler<ErrorT, OutputT extends POutput>
private static final Logger LOG = LoggerFactory.getLogger(PTransformErrorHandler.class);
private final PTransform<PCollection<ErrorT>, OutputT> sinkTransform;

private final Pipeline pipeline;
// transient as Pipelines are not serializable
private final transient Pipeline pipeline;

private final Coder<ErrorT> coder;

private final List<PCollection<ErrorT>> errorCollections = new ArrayList<>();
// transient as PCollections are not serializable
private transient List<PCollection<ErrorT>> errorCollections = new ArrayList<>();

private @Nullable OutputT sinkOutput = null;
// transient as PCollections are not serializable
private transient @Nullable OutputT sinkOutput = null;

private boolean closed = false;

Expand All @@ -103,6 +111,12 @@ public PTransformErrorHandler(
this.coder = coder;
}

private void readObject(ObjectInputStream aInputStream)
throws ClassNotFoundException, IOException {
aInputStream.defaultReadObject();
errorCollections = new ArrayList<>();
}

@Override
public void addErrorCollection(PCollection<ErrorT> errorCollection) {
errorCollections.add(errorCollection);
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-01103/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="0.11.0.3"
undelimited="01103"
sdfCompatible=false
}

apply from: "../kafka-integration-test.gradle"
3 changes: 2 additions & 1 deletion sdks/java/io/kafka/kafka-100/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="1.0.0"
undelimited="100"
sdfCompatible=false
}

apply from: "../kafka-integration-test.gradle"
apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-111/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="1.1.1"
undelimited="111"
sdfCompatible=false
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-201/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.0.1"
undelimited="201"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-211/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.1.1"
undelimited="211"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-222/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.2.2"
undelimited="222"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-231/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.3.1"
undelimited="231"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-241/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.4.1"
undelimited="241"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
1 change: 1 addition & 0 deletions sdks/java/io/kafka/kafka-251/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
project.ext {
delimited="2.5.1"
undelimited="251"
sdfCompatible=true
}

apply from: "../kafka-integration-test.gradle"
2 changes: 1 addition & 1 deletion sdks/java/io/kafka/kafka-integration-test.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ dependencies {

configurations.create("kafkaVersion$undelimited")

tasks.register("kafkaVersion${undelimited}BatchIT",KafkaTestUtilities.KafkaBatchIT, project.ext.delimited, project.ext.undelimited, false, configurations, project)
tasks.register("kafkaVersion${undelimited}BatchIT",KafkaTestUtilities.KafkaBatchIT, project.ext.delimited, project.ext.undelimited, project.ext.sdfCompatible, configurations, project)
Loading

0 comments on commit 5d11c20

Please sign in to comment.