Skip to content

Commit

Permalink
[Feature][formats][ogg] Support read ogg format message #4201 (#4225)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: zhilinli <[email protected]>
Co-authored-by: hailin0 <[email protected]>
  • Loading branch information
3 people authored Nov 6, 2023
1 parent 38132f5 commit 7728e24
Show file tree
Hide file tree
Showing 34 changed files with 1,576 additions and 1,431 deletions.
93 changes: 93 additions & 0 deletions docs/en/connector-v2/formats/ogg-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Ogg Format

[Oracle GoldenGate](https://www.oracle.com/integration/goldengate/) (a.k.a ogg) is a managed service providing a real-time data mesh platform, which uses replication to keep data highly available, and enabling real-time analysis. Customers can design, execute, and monitor their data replication and stream data processing solutions without the need to allocate or manage compute environments. Ogg provides a format schema for changelog and supports to serialize messages using JSON.

Seatunnel supports to interpret Ogg JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as

synchronizing incremental data from databases to other systems
auditing logs
real-time materialized views on databases
temporal join changing history of a database table and so on.

Seatunnel also supports to encode the INSERT/UPDATE/DELETE messages in Seatunnel as Ogg JSON messages, and emit to storage like Kafka. However, currently Seatunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Seatunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Ogg messages.

# Format Options

| option | default | required | Description |
|------------------------------|---------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| format | (none) | yes | Specify what format to use, here should be '-json'. |
| ogg_json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
| ogg_json.database.include | (none) | no | An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |
| ogg_json.table.include | (none) | no | An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's Pattern. |

# How to use Ogg format

## Kafka uses example

Ogg provides a unified format for changelog, here is a simple example for an update operation captured from a Oracle products table:

```bash
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "U",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}
```

Note: please refer to documentation about the meaning of each fields.

The Oracle products table has 4 columns (id, name, description and weight).
The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15.
Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following Seatunnel to consume this topic and interpret the change events.

```bash
env {
execution.parallelism = 1
job.mode = "STREAMING"
}
source {
Kafka {
bootstrap.servers = "127.0.0.1:9092"
topic = "ogg"
result_table_name = "kafka_name"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "double"
}
},
format = ogg_json
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "12345678"
table = "ogg"
primary_keys = ["id"]
}
}
```

10 changes: 5 additions & 5 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ Currently two formats are supported:

For example, Upstream data is the following:

| name | age | data |
|------|-----|---------------|
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |
| name | age | data |
|------|-----|---------------|
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |

If `${name}` is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic.
If `${name}` is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic.

### Semantics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ public String getJdbcUrl(String databaseName) {
+ additionalUrlParams;
}

public void setDatabaseName(String databaseName) {
this.databaseName = databaseName;
}

@Override
public String getJdbcUrl() {
return getJdbcUrl(databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,38 @@ public class UniqueDatabase {
private final String username;
private final String password;

/**
* @param container mysql docker container
* @param databaseName name of the database
* @param username Connection user name
* @param password Connection password
* @param templateName Execute ddl/ directory file name
*/
public UniqueDatabase(
MySqlContainer container, String databaseName, String username, String password) {
MySqlContainer container,
String databaseName,
String username,
String password,
String templateName) {
this(
container,
databaseName,
Integer.toUnsignedString(new Random().nextInt(), 36),
username,
password);
password,
(!templateName.isEmpty() && templateName != null) ? templateName : password);
}

private UniqueDatabase(
MySqlContainer container,
String databaseName,
final String identifier,
String username,
String password) {
String password,
String templateName) {
this.container = container;
this.databaseName = databaseName + "_" + identifier;
this.templateName = databaseName;
this.templateName = templateName;
this.username = username;
this.password = password;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ public enum MessageFormat {
CANAL_JSON,
DEBEZIUM_JSON,
COMPATIBLE_DEBEZIUM_JSON,
COMPATIBLE_KAFKA_CONNECT_JSON
COMPATIBLE_KAFKA_CONNECT_JSON,
OGG_JSON
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.format.json.canal.CanalJsonSerializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.json.ogg.OggJsonSerializationSchema;
import org.apache.seatunnel.format.text.TextSerializationSchema;

import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -220,6 +221,8 @@ private static SerializationSchema createSerializationSchema(
.build();
case CANAL_JSON:
return new CanalJsonSerializationSchema(rowType);
case OGG_JSON:
return new OggJsonSerializationSchema(rowType);
case DEBEZIUM_JSON:
return new DebeziumJsonSerializationSchema(rowType);
case COMPATIBLE_DEBEZIUM_JSON:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public OptionRule optionRule() {
.conditional(
Config.FORMAT,
Arrays.asList(
MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT),
MessageFormat.JSON,
MessageFormat.CANAL_JSON,
MessageFormat.TEXT,
MessageFormat.OGG_JSON),
Config.TOPIC)
.optional(
Config.KAFKA_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema;
import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;

Expand Down Expand Up @@ -218,6 +219,10 @@ private DeserializationSchema<SeaTunnelRow> createDeserializationSchema(
return CanalJsonDeserializationSchema.builder(seaTunnelRowType)
.setIgnoreParseErrors(true)
.build();
case OGG_JSON:
return OggJsonDeserializationSchema.builder(seaTunnelRowType)
.setIgnoreParseErrors(true)
.build();
case COMPATIBLE_KAFKA_CONNECT_JSON:
Boolean keySchemaEnable =
readonlyConfig.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public class MysqlCDCIT extends TestSuiteBase implements TestResource {
private static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);

private final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw");
new UniqueDatabase(
MYSQL_CONTAINER, MYSQL_DATABASE, "mysqluser", "mysqlpw", MYSQL_DATABASE);

// mysql source table query sql
private static final String SOURCE_SQL_TEMPLATE =
Expand Down
Loading

0 comments on commit 7728e24

Please sign in to comment.