Skip to content

Commit

Permalink
[Feature][formats][ogg] Support read ogg format message apache#4201
Browse files Browse the repository at this point in the history
  • Loading branch information
zhilinli committed Mar 3, 2023
1 parent b9935ee commit ccac757
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/en/connector-v2/formats/ogg-json.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Ogg provides a unified format for changelog, here is a simple example for an upd
}
```

Note: please refer to documentation about the meaning of each fields.
Note: please refer to documentation about the meaning of each fields.

The Oracle products table has 4 columns (id, name, description and weight).
The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.18 to 5.15.
Expand Down
2 changes: 2 additions & 0 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ Kafka distinguishes different transactions by different transactionId. This para

### format

Data format. The default format is json. Optional text format. The default field separator is ",".
If you customize the delimiter, add the "field_delimiter" option.
If you use ogg format, please refer to [ogg-json](../formats/ogg-json.md) for details.

### field_delimiter
Expand Down
2 changes: 2 additions & 0 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ The structure of the data, including field names and field types.

## format

Data format. The default format is json. Optional text format. The default field separator is ", ".
If you customize the delimiter, add the "field_delimiter" option.
If you use ogg format, please refer to [ogg-json](../formats/ogg-json.md) for details.

## field_delimiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ public OggJsonDeserializationSchema(

@Override
public SeaTunnelRow deserialize(byte[] message) throws IOException {
throw new UnsupportedOperationException();
throw new SeaTunnelJsonFormatException(
CommonErrorCode.JSON_OPERATION_FAILED,
String.format("Failed to deserialize JSON '%s'.", new String(message)));
}

@Override
Expand Down Expand Up @@ -185,7 +187,7 @@ public void deserialize(byte[] message, Collector<SeaTunnelRow> out) {
private JsonNode convertBytes(byte[] message) {
try {
return jsonDeserializer.deserializeToJsonNode(message);
} catch (Throwable t) {
} catch (Exception t) {
if (ignoreParseErrors) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.File;
Expand Down Expand Up @@ -151,7 +152,7 @@ private OggJsonDeserializationSchema createOggJsonDeserializationSchema(

private static List<String> readLines(String resource) throws IOException {
final URL url = OggJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
assert url != null;
Assertions.assertNotNull(url);
Path path = new File(url.getFile()).toPath();
return Files.readAllLines(path);
}
Expand Down

0 comments on commit ccac757

Please sign in to comment.