Skip to content

Commit

Permalink
ticdc: Use open-protocol instead of default protocol and add missing …
Browse files Browse the repository at this point in the history
…params (#8047)
  • Loading branch information
Rustin170506 authored Dec 31, 2021
1 parent 1f1c05c commit 4a67b35
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 16 deletions.
26 changes: 14 additions & 12 deletions ticdc/manage-ticdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="mysql://root:1
```shell
Create changefeed successfully!
ID: simple-replication-task
Info: {"sink-uri":"mysql://root:[email protected]:3306/","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"default"},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
Info: {"sink-uri":"mysql://root:[email protected]:3306/","opts":{},"create-time":"2020-03-12T22:04:08.103600025+08:00","start-ts":415241823337054209,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":".","config":{"case-sensitive":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null}
```

- `--changefeed-id`:同步任务的 ID,格式需要符合正则表达式 `^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`。如果不指定该 ID,TiCDC 会自动生成一个 UUID(version 4 格式)作为 ID。
Expand Down Expand Up @@ -193,7 +193,7 @@ URI 中可配置的参数如下:
{{< copyable "shell-regular" >}}

```shell
--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&partition-num=6&max-message-bytes=67108864&replication-factor=1"
```

URI 中可配置的的参数如下:
Expand All @@ -206,10 +206,12 @@ URI 中可配置的的参数如下:
| `kafka-version` | 下游 Kafka 版本号(可选,默认值 `2.4.0`,目前支持的最低版本为 `0.11.0.2`,最高版本为 `2.7.0`。该值需要与下游 Kafka 的实际版本保持一致) |
| `kafka-client-id` | 指定同步任务的 Kafka 客户端的 ID(可选,默认值为 `TiCDC_sarama_producer_同步任务的 ID`|
| `partition-num` | 下游 Kafka partition 数量(可选,不能大于实际 partition 数量,否则创建同步任务会失败,默认值 `3`|
| `max-message-bytes` | 每次向 Kafka broker 发送消息的最大数据量(可选,默认值 `1MB`|
| `replication-factor` | kafka 消息保存副本数(可选,默认值 `1`|
| `protocol` | 输出到 kafka 消息协议,可选值有 `default``canal``avro``maxwell`(默认值为 `default`|
| `max-batch-size` | 从 v4.0.9 引入。如果消息协议支持将多条变更记录输出到一条 kafka 消息,该参数指定一条 kafka 消息中变更记录的最多数量,目前仅对 Kafka 的 `protocol``default` 时有效(可选,默认值为 `16`|
| `max-message-bytes` | 每次向 Kafka broker 发送消息的最大数据量(可选,默认值 `10MB`)。从 v5.0.6 和 v4.0.6 开始,默认值分别从 64MB 和 256MB 调整至 10MB。|
| `replication-factor` | Kafka 消息保存副本数(可选,默认值 `1`|
| `protocol` | 输出到 Kafka 的消息协议,可选值有 `canal-json``open-protocol``canal``avro``maxwell` |
| `auto-create-topic` | 当传入的 `topic-name` 在 Kafka 集群不存在时,TiCDC 是否要自动创建该 topic(可选,默认值 `true`|
| `enable-tidb-extension` | 当输出协议为 `canal-json` 时,如果该值为 `true`,TiCDC 会发送 Resolved 事件,并在 Kafka 消息中添加 TiDB 扩展字段(可选,默认值 `false`|
| `max-batch-size` | 从 v4.0.9 开始引入。当消息协议支持把多条变更记录输出至一条 Kafka 消息时,该参数用于指定这一条 Kafka 消息中变更记录的最多数量。目前,仅当 Kafka 消息的 `protocol``open-protocol` 时有效(可选,默认值 `16`|
| `ca` | 连接下游 Kafka 实例所需的 CA 证书文件路径(可选) |
| `cert` | 连接下游 Kafka 实例所需的证书文件路径(可选) |
| `key` | 连接下游 Kafka 实例所需的证书密钥文件路径(可选) |
Expand All @@ -221,10 +223,11 @@ URI 中可配置的的参数如下:

* TiCDC 推荐用户自行创建 Kafka Topic,你至少需要设置该 Topic 每次向 Kafka broker 发送消息的最大数据量和下游 Kafka partition 的数量。在创建 changefeed 的时候,这两项设置分别对应 `max-message-bytes``partition-num` 参数。
* 如果你在创建 changefeed 时,使用了尚未存在的 Topic,那么 TiCDC 会尝试使用 `partition-num``replication-factor` 参数自行创建 Topic。建议明确指定这两个参数。
* 在大多数情况下,建议使用 `canal-json` 协议。

> **注意:**
>
>`protocol``default` 时,TiCDC 会尽量避免产生长度超过 `max-message-bytes` 的消息。但如果单条数据变更记录需要超过 `max-message-bytes` 个字节来表示,为了避免静默失败,TiCDC 会试图输出这条消息并在日志中输出 Warning。
>`protocol``open-protocol` 时,TiCDC 会尽量避免产生长度超过 `max-message-bytes` 的消息。但如果单条数据变更记录需要超过 `max-message-bytes` 个字节来表示,为了避免静默失败,TiCDC 会试图输出这条消息并在日志中输出 Warning。

#### TiCDC 集成 Kafka Connect (Confluent Platform)

Expand All @@ -237,7 +240,7 @@ URI 中可配置的的参数如下:
{{< copyable "shell-regular" >}}

```shell
--sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=avro&partition-num=6&max-message-bytes=67108864&replication-factor=1"
--sink-uri="kafka://127.0.0.1:9092/topic-name?protocol=canal-json&kafka-version=2.4.0&protocol=avro&partition-num=6&max-message-bytes=67108864&replication-factor=1"
--opts registry="http://127.0.0.1:8081"
```

Expand Down Expand Up @@ -271,7 +274,7 @@ URI 中可配置的的参数如下:
| `maxPendingMessages` | Pending 消息队列的最大大小,例如,等待接收来自 Pulsar 的确认的消息(可选,默认值为 1000) |
| `disableBatching` | 禁止自动批量发送消息(可选) |
| `batchingMaxPublishDelay` | 设置发送消息的批处理时间(默认值为 10ms) |
| `compressionType` | 设置发送消息时使用的压缩算法(可选 `LZ4``ZLIB``ZSTD`,默认值为 `ZSTD`|
| `compressionType` | 设置发送消息时使用的压缩算法(可选 `NONE``LZ4``ZLIB``ZSTD`,默认值为 `NONE`|
| `hashingScheme` | 用于选择发送分区的哈希算法(可选 `JavaStringHash``Murmur3`,默认值为 `JavaStringHash`|
| `properties.*` | 在 TiCDC 中 Pulsar producer 上添加用户定义的属性(可选,示例 `properties.location=Hangzhou`|

Expand Down Expand Up @@ -379,7 +382,6 @@ cdc cli changefeed query --pd=http://10.0.10.25:2379 --changefeed-id=simple-repl
},
"sink": {
"dispatchers": null,
"protocol": "default"
},
"scheduler": {
"type": "table-number",
Expand Down Expand Up @@ -575,8 +577,8 @@ dispatchers = [
{matcher = ['test3.*', 'test4.*'], dispatcher = "rowid"},
]
# 对于 MQ 类的 Sink,可以指定消息的协议格式
# 目前支持 default、canal、avro 和 maxwell 四种协议。default 为 TiCDC Open Protocol
protocol = "default"
# 目前支持 canal-json、open-protocol、canal、avro 和 maxwell 五种协议。
protocol = "canal-json"
```

Expand Down
4 changes: 2 additions & 2 deletions ticdc/ticdc-open-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ curl -X GET http://127.0.0.1:8300/api/v1/health
{"matcher":["test1.*", "test2.*"], "dispatcher":"ts"},
{"matcher":["test3.*", "test4.*"], "dispatcher":"rowid"}
],
"protocal":"default"
"protocol":"canal-json"
}
```

Expand All @@ -157,7 +157,7 @@ curl -X GET http://127.0.0.1:8300/api/v1/health

`matcher`:匹配语法和过滤器规则语法相同。

`protocal`:对于 MQ 类的 Sink,可以指定消息的协议格式。目前支持 default、canalavro 和 maxwell 四种协议,默认为 TiCDC Open Protocol
`protocol`:对于 MQ 类的 Sink,可以指定消息的协议格式。目前支持 `canal-json``open-protocol``canal``avro``maxwell` 五种协议

### 使用样例

Expand Down
4 changes: 2 additions & 2 deletions ticdc/troubleshoot-ticdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ cdc cli changefeed create --pd=http://10.0.10.25:2379 --sink-uri="kafka://127.0.

## TiCDC 把数据同步到 Kafka 时,能在 TiDB 中控制单条消息大小的上限吗?

可以通过 `max-message-bytes` 控制每次向 Kafka broker 发送消息的最大数据量(可选,默认值 64MB);通过 `max-batch-size` 参数指定每条 kafka 消息中变更记录的最大数量,目前仅对 Kafka 的 protocol 为 `default` 时有效(可选,默认值为 `4096`)。
可以通过 `max-message-bytes` 控制每次向 Kafka broker 发送消息的最大数据量(可选,默认值 10MB);通过 `max-batch-size` 参数指定每条 kafka 消息中变更记录的最大数量,目前仅对 Kafka 的 `protocol``open-protocol` 时有效(可选,默认值 `16`)。

## TiCDC 把数据同步到 Kafka 时,一条消息中会不会包含多种数据变更?

Expand Down Expand Up @@ -351,7 +351,7 @@ TiCDC 对大事务(大小超过 5 GB)提供部分支持,根据场景不同

## TiCDC 集群升级到 v4.0.8 之后,changefeed 报错 `[CDC:ErrKafkaInvalidConfig]Canal requires old value to be enabled`,为什么?

自 v4.0.8 起,如果 changefeed 使用 canal 或者 maxwell 协议输出,TiCDC 会自动开启 Old Value 功能。但如果 TiCDC 是从较旧版本升级到 v4.0.8 或以上版本的,changefeed 使用 canal 或 maxwell 协议的同时 Old Value 功能被禁用,此时会出现该报错。可以按照以下步骤解决该报错:
自 v4.0.8 起,如果 changefeed 使用 `canal-json``canal` 或者 `maxwell` 协议输出,TiCDC 会自动开启 Old Value 功能。但是,当 TiCDC 是从较旧版本升级到 v4.0.8 或以上版本时,在 changefeed 使用 `canal-json``canal``maxwell` 协议的同时 TiCDC 的 Old Value 功能会被禁用。此时,会出现该报错。可以按照以下步骤解决该报错:

1. 将 changefeed 配置文件中 `enable-old-value` 的值设为 `true`
2. 使用 `cdc cli changefeed pause` 暂停同步任务。
Expand Down

0 comments on commit 4a67b35

Please sign in to comment.