Skip to content

Commit

Permalink
DBZ-1292 ☁️ Initial import of CloudEvents example
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling committed Jan 20, 2020
1 parent 001af2d commit cd44c94
Show file tree
Hide file tree
Showing 11 changed files with 536 additions and 0 deletions.
111 changes: 111 additions & 0 deletions cloudevents/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Debezium CloudEvents Demo

This demo automatically deploys the topology of services as defined in the [Debezium Tutorial](http://debezium.io/docs/tutorial/).

## Preparations

```shell
export DEBEZIUM_VERSION=1.0
docker-compose up --build
```

## CloudEvents Structured Mode with JSON for envelope and data

```shell
# Deploy Postgres connector
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/inventory-connector-json-json/config -d @register-postgres-json-json.json

# Consume messages from the Debezium topic
docker run --rm --tty \
--network cloudevents-network \
debezium/tooling \
kafkacat -b kafka:9092 -C -o beginning -q \
-t dbserver1.inventory.customers | jq .

# Modify records in the database via psql client
docker-compose exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
```

In order to produce `data` values without the embedded JSON schema, add `"value.converter.json.schemas.enable" : "false"` to the connector configuration and `PUT` it again.

## CloudEvents Structured Mode with JSON for envelope and Avro for data

```shell
# Deploy Postgres connector
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/inventory-connector-json-avro/config -d @register-postgres-json-avro.json

# Consume messages from the Debezium topic
docker run --rm --tty \
--network cloudevents-network \
debezium/tooling \
kafkacat -b kafka:9092 -C -o beginning -q \
-t dbserver2.inventory.customers | jq .
```

Observe how the `data` field is (base64-encoded) Avro binary data.
A Kafka Streams application (see _avro-data-extractor_ directory) processes this topic and writes out the extracted Avro data to the `customers2` topic.
Examine its contents like so:

```shell
docker run --rm --tty \
--network cloudevents-network \
debezium/tooling \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8081 \
-t customers2 | jq .
```

## CloudEvents Structured Mode with Avro for envelope and data

```shell
# Deploy Postgres connector
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/inventory-connector-avro-avro/config -d @register-postgres-avro-avro.json

# Consume messages from the Debezium topic:
docker run --rm --tty \
--network cloudevents-network \
debezium/tooling \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8081 \
-t dbserver3.inventory.customers | jq .
```

Again the `data` field is an Avro-encoded binary itself.
The same stream processing application writes out that data to the `customers3` topic:

```shell
docker run --rm --tty \
--network cloudevents-network \
debezium/tooling \
kafkacat -b kafka:9092 -C -o beginning -q -s value=avro -r http://schema-registry:8081 \
-t customers2 | jq .
```

## CloudEvents Binary Mode

tbd.

## Clean-up

```shell
docker-compose down
```

## Debugging

Should you need to establish a remote debugging session into a deployed connector, add the following to the `environment` section of the `connect` in the Compose file service:

- KAFKA_DEBUG=true
- DEBUG_SUSPEND_FLAG=n

Also expose the debugging port 5005 under `ports`:

- 5005:5005

You can then establish a remote debugging session from your IDE on localhost:5005.

## Useful Commands

Listing all topics:

```shell
docker-compose exec kafka /kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
```
5 changes: 5 additions & 0 deletions cloudevents/avro-data-extractor/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM fabric8/java-jboss-openjdk8-jdk
ENV JAVA_OPTIONS='-Dquarkus.http.host=0.0.0.0 -Dquarkus.http.port=${PORT}'
COPY target/lib/* /deployments/lib/
COPY target/*-runner.jar /deployments/app.jar
ENTRYPOINT [ "/deployments/run-java.sh" ]
136 changes: 136 additions & 0 deletions cloudevents/avro-data-extractor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.debezium.examples</groupId>
<artifactId>cloudevents-data-extractor</artifactId>
<version>0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<failOnMissingWebXml>false</failOnMissingWebXml>

<mvn.compiler.version>3.5.1</mvn.compiler.version>
<mvn.dependency.version>3.0.2</mvn.dependency.version>
<version.surefire>2.22.0</version.surefire>

<apache.kafka.version>2.0.0</apache.kafka.version>
<version.quarkus>1.1.1.Final</version.quarkus>
<version.debezium>1.0.0.Final</version.debezium>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bom</artifactId>
<version>${version.quarkus}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<finalName>aggregator</finalName>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${version.surefire}</version>
<configuration>
<systemProperties>
<java.util.logging.manager>org.jboss.logmanager.LogManager</java.util.logging.manager>
</systemProperties>
</configuration>
</plugin>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${version.quarkus}</version>
<executions>
<execution>
<goals>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-undertow-websockets</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>5.3.2</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>native</id>
<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
<version>${version.quarkus}</version>
<executions>
<execution>
<goals>
<goal>native-image</goal>
</goals>
<configuration>
<enableHttpUrlHandler>true</enableHttpUrlHandler>
<autoServiceLoaderRegistration>false</autoServiceLoaderRegistration>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.examples.cloudevents.dataextractor;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import javax.inject.Inject;

import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.debezium.examples.cloudevents.dataextractor.model.CloudEvent;
import io.debezium.serde.DebeziumSerdes;
/**
* Starts up the KStreams pipeline once the source topics have been created.
*
* @author Gunnar Morling
*/
@ApplicationScoped
public class StreamsPipelineManager {

private static final Logger LOG = LoggerFactory.getLogger( StreamsPipelineManager.class );

@Inject
@ConfigProperty(name="json.avro.customers.topic")
String jsonAvroCustomersTopic;

@Inject
@ConfigProperty(name="json.avro.extracted.topic")
String jsonAvroExtractedTopic;

@Inject
@ConfigProperty(name="avro.avro.customers.topic")
String avroAvroCustomersTopic;

@Inject
@ConfigProperty(name="avro.avro.extracted.topic")
String avroAvroExtractedTopic;

@Inject
@ConfigProperty(name="schema.registry.url")
String schemaRegistryUrl;

@Produces
Topology createStreamTopology() {
LOG.info("Creating stream topology");

StreamsBuilder builder = new StreamsBuilder();

Serde<Long> longKeySerde = DebeziumSerdes.payloadJson(Long.class);
longKeySerde.configure(Collections.emptyMap(), true);
Serde<CloudEvent> orderSerde = DebeziumSerdes.payloadJson(CloudEvent.class);
orderSerde.configure(Collections.emptyMap(), false);

builder.stream(jsonAvroCustomersTopic, Consumed.with(longKeySerde, orderSerde))
.mapValues(ce -> ce.data)
.to(jsonAvroExtractedTopic, Produced.with(longKeySerde, Serdes.ByteArray()));

Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde();
Map<String, String> config = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
genericAvroSerde.configure(config, false);

builder.stream(avroAvroCustomersTopic, Consumed.with(longKeySerde, genericAvroSerde))
.mapValues(ce -> ((ByteBuffer) ce.get("data")).array())
.to(avroAvroExtractedTopic, Produced.with(longKeySerde, Serdes.ByteArray()));

return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.debezium.examples.cloudevents.dataextractor.model;

public class CloudEvent {

public String id;
public String source;
public String specversion;
public String type;
public String time;
public String datacontenttype;
public String dataschema;
public String iodebeziumop;
public String iodebeziumversion;
public String iodebeziumconnector;
public String iodebeziumname;
public String iodebeziumtsms;
public boolean iodebeziumsnapshot;
public String iodebeziumdb;
public String iodebeziumschema;
public String iodebeziumtable;
public String iodebeziumtxId;
public String iodebeziumlsn;
public String iodebeziumxmin;
public byte[] data;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
json.avro.customers.topic=dbserver2.inventory.customers
json.avro.extracted.topic=customers2

avro.avro.customers.topic=dbserver3.inventory.customers
avro.avro.extracted.topic=customers3

schema.registry.url=http://schema-registry:8081

quarkus.kafka-streams.bootstrap-servers=localhost:9092
quarkus.kafka-streams.application-id=cloudevents-data-extractor
quarkus.kafka-streams.topics=${json.avro.customers.topic},${avro.avro.customers.topic}

# streams options
kafka-streams.cache.max.bytes.buffering=10240
kafka-streams.commit.interval.ms=1000
kafka-streams.metadata.max.age.ms=500
kafka-streams.auto.offset.reset=earliest
kafka-streams.processing.guarantee=exactly_once

quarkus.log.console.enable=true
quarkus.log.console.level=INFO
Loading

0 comments on commit cd44c94

Please sign in to comment.