diff --git a/docs/ecosystem/spark-doris-connector.md b/docs/ecosystem/spark-doris-connector.md index 0ae0cb55cdfd9..c1660b9a67c0d 100644 --- a/docs/ecosystem/spark-doris-connector.md +++ b/docs/ecosystem/spark-doris-connector.md @@ -39,7 +39,7 @@ Github: https://github.com/apache/doris-spark-connector | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| -| 24.0.0 | 3.5 ~ 3.0, 2.4 | 1.0 + | 8 | 2.12, 2.11 | +| 24.0.0 | 3.5 ~ 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.0 | 8 | 2.12, 2.11 | @@ -53,10 +53,20 @@ Github: https://github.com/apache/doris-spark-connector ``` org.apache.doris - spark-doris-connector-3.4_2.12 + spark-doris-connector-spark-3.5 24.0.0 ``` + +::: tip + +Starting from version 24.0.0, the naming rules of the Doris connector package have been adjusted: +1. No longer contains Scala version information。 +2. For Spark 2.x versions, use the package named `spark-doris-connector-spark-2` uniformly, and by default only compile based on Scala 2.11 version. If you need Scala 2.12 version, please compile it yourself. +3. For Spark 3.x versions, use the package named `spark-doris-connector-spark-3.x` according to the specific Spark version. Applications based on Spark 3.0 version can use the package `spark-doris-connector-spark-3.1`. + +::: + **Note** 1. Please replace the corresponding Connector version according to different Spark and Scala versions. @@ -67,7 +77,7 @@ Github: https://github.com/apache/doris-spark-connector When compiling, you can directly run `sh build.sh`, for details, please refer to here. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. You can also Execute in the source code directory: @@ -76,21 +86,21 @@ Execute in the source code directory: Enter the Scala and Spark versions you need to compile according to the prompts. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar`. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, if `Spark` is running in `Local` mode, put this file in the `jars/` folder. If `Spark` is running in `Yarn` cluster mode, put this file in the pre-deployment package. -For example, upload `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter +For example, upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter ```shell -1. Upload `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` to hdfs. +1. Upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs. hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ -2. Add the `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` dependency in the cluster. -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar +2. Add the `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` dependency in the cluster. +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar ``` @@ -374,17 +384,17 @@ show databases; -- use databases use your_doris_db; -// show tables in test +-- show tables in test show tables; -// query table +-- query table select * from your_doris_table; -// write data +-- write data insert into your_doris_table values(xxx); insert into your_doris_table select * from your_source_table; -// access table with full name +-- access table with full name select * from your_catalog_name.your_doris_db.your_doris_table; insert into your_catalog_name.your_doris_db.your_doris_table values(xxx); insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table; @@ -398,6 +408,8 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ |----------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | doris.fenodes | -- | Doris FE http address, support multiple addresses, separated by commas | | doris.table.identifier | -- | Doris table identifier, eg, db1.tbl1 | +| doris.user | -- | Doris username | +| doris.password | Empty string | Doris password | | doris.request.retries | 3 | Number of retries to send requests to Doris | | doris.request.connect.timeout.ms | 30000 | Connection timeout for sending requests to Doris | | doris.request.read.timeout.ms | 30000 | Read timeout for sending request to Doris | @@ -437,10 +449,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ | Key | Default Value | Comment | |---------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| user | -- | Doris username | -| password | -- | Doris password | | doris.filter.query.in.max.count | 100 | In the predicate pushdown, the maximum number of elements in the in expression value list. If this number is exceeded, the in-expression conditional filtering is processed on the Spark side. | -| doris.ignore-type | -- | In a temporary view, specify the field types to ignore when reading the schema.
eg: when 'doris.ignore-type'='bitmap,hll' | ### Structured Streaming Configuration diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/spark-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/spark-doris-connector.md index cdd82cf51dc78..6c844fe4e48fa 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/spark-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/spark-doris-connector.md @@ -38,7 +38,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| -| 24.0.0 | 3.5 - 3.0, 2.4 | 1.0 + | 8 | 2.12, 2.11 | +| 24.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | @@ -52,11 +52,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 ``` org.apache.doris - spark-doris-connector-3.5_2.12 + spark-doris-connector-spark-3.5 24.0.0 ``` +::: tip + +从 24.0.0 版本开始,Doris connector 包命名规则发生调整: +1. 不再包含 Scala 版本信息 +2. 对于 Spark 2.x 版本,统一使用名称为 `spark-doris-connector-spark-2` 的包,并且默认只基于 Scala 2.11 版本编译,需要 Scala 2.12 版本的请自行编译。 +3. 对于 Spark 3.x 版本,根据具体 Spark 版本使用使用名称为 `spark-doris-connector-spark-3.x` 的包,其中 Spark 3.0 版本可以使用 `spark-doris-connector-spark-3.1` 的包。 + +::: + **备注** 1. 请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。 @@ -67,7 +76,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 编译时,可直接运行 `sh build.sh`,具体可参考这里。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 也可以 @@ -75,20 +84,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 `sh build.sh` 根据提示输入你需要的 Scala 与 Spark 版本进行编译。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar`。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 -例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 +例如将 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 ```shell -1. 上传 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 到 hdfs。 +1. 上传 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 到 hdfs。 hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ -2. 在集群中添加 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 依赖。 -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar +2. 在集群中添加 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 依赖。 +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar ``` @@ -375,14 +384,14 @@ use your_doris_db; // show tables in test show tables; -// query table +-- query table select * from your_doris_table; -// write data +-- write data insert into your_doris_table values(xxx); insert into your_doris_table select * from your_source_table; -// access table with full name +-- access table with full name select * from your_catalog_name.your_doris_db.your_doris_table; insert into your_catalog_name.your_doris_db.your_doris_table values(xxx); insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table; @@ -397,52 +406,51 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ### 通用配置项 -| Key | Default Value | Comment | -|----------------------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 | -| doris.table.identifier | -- | Doris 表名,如:db1.tbl1 | -| doris.request.retries | 3 | 向 Doris 发送请求的重试次数 | -| doris.request.connect.timeout.ms | 30000 | 向 Doris 发送请求的连接超时时间 | -| doris.request.read.timeout.ms | 30000 | 向 Doris 发送请求的读取超时时间 | -| doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 | -| doris.request.tablet.size | 1 | 一个 RDD Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 | -| doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | -| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。 | -| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | -| doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch | -| doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | -| doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | -| doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | -| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | -| doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | -| doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | -| doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | -| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | -| doris.sink.batch.interval.ms | 50 | 每个批次 sink 的间隔时间,单位 ms。 | -| doris.sink.enable-2pc | false | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 | -| doris.sink.auto-redirect | true | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。 | -| doris.enable.https | false | 是否开启 FE Https 请求。 | -| doris.https.key-store-path | - | Https key store 路径。 | -| doris.https.key-store-type | JKS | Https key store 类型。 | -| doris.https.key-store-password | - | Https key store 密码。 | -| doris.sink.mode | stream_load | Doris 写入模式,可选项 `stream_load` 和 `copy_info`。 | -| doris.read.mode | thrift | Doris 读取模式,可选项 `thrift` 和 `arrow`。 | -| doris.read.arrow-flight-sql.port | - | Doris FE 的 Arrow Flight SQL 端口,当 `doris.read.mode` 为 `arrow` 时,用于通过 Arrow Flight SQL 方式读取数据。服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect) | -| doris.sink.label.prefix | spark-doris | Stream Load 方式写入时的导入标签前缀。 | -| doris.thrift.max.message.size | 2147483647 | 通过 Thrift 方式读取数据时,消息的最大尺寸。 | -| doris.fe.auto.fetch | false | 是否自动获取 FE 信息,当设置为 true 时,会根据 `doris.fenodes` 配置的节点请求所有 FE 节点信息,无需额外配置多个节点以及单独配置 `doris.read.arrow-flight-sql.port` 和 `doris.query.port`。 | -| doris.read.bitmap-to-string | false | 是否将 Bitmap 类型转换为数组索引组成的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_STRING](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-string)。 | -| doris.read.bitmap-to-base64 | false | 是否将 Bitmap 类型转换为 Base64 编码后的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_BASE64](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-base64)。 | -| doris.query.port | - | Doris FE 查询端口,用于覆盖写入以及 Catalog 的元数据获取。 | +| Key | Default Value | Comment | +|----------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 | +| doris.table.identifier | -- | Doris 表名,如:db1.tbl1 | +| doris.user | -- | 访问 Doris 的用户名 | +| doris.password | 空字符串 | 访问 Doris 的密码 | +| doris.request.retries | 3 | 向 Doris 发送请求的重试次数 | +| doris.request.connect.timeout.ms | 30000 | 向 Doris 发送请求的连接超时时间 | +| doris.request.read.timeout.ms | 30000 | 向 Doris 发送请求的读取超时时间 | +| doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 | +| doris.request.tablet.size | 1 | 一个 RDD Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 | +| doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | +| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。 | +| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | +| doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch | +| doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | +| doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | +| doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | +| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | +| doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | +| doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | +| doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | +| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | +| doris.sink.batch.interval.ms | 50 | 每个批次 sink 的间隔时间,单位 ms。 | +| doris.sink.enable-2pc | false | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 | +| doris.sink.auto-redirect | true | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。 | +| doris.enable.https | false | 是否开启 FE Https 请求。 | +| doris.https.key-store-path | - | Https key store 路径。 | +| doris.https.key-store-type | JKS | Https key store 类型。 | +| doris.https.key-store-password | - | Https key store 密码。 | +| doris.sink.mode | stream_load | Doris 写入模式,可选项 `stream_load` 和 `copy_info`。 | +| doris.read.mode | thrift | Doris 读取模式,可选项 `thrift` 和 `arrow`。 | +| doris.read.arrow-flight-sql.port | - | Doris FE 的 Arrow Flight SQL 端口,当 `doris.read.mode` 为 `arrow` 时,用于通过 Arrow Flight SQL 方式读取数据。服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect) | +| doris.sink.label.prefix | spark-doris | Stream Load 方式写入时的导入标签前缀。 | +| doris.thrift.max.message.size | 2147483647 | 通过 Thrift 方式读取数据时,消息的最大尺寸。 | +| doris.fe.auto.fetch | false | 是否自动获取 FE 信息,当设置为 true 时,会根据 `doris.fenodes` 配置的节点请求所有 FE 节点信息,无需额外配置多个节点以及单独配置 `doris.read.arrow-flight-sql.port` 和 `doris.query.port`。 | +| doris.read.bitmap-to-string | false | 是否将 Bitmap 类型转换为数组索引组成的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_STRING](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-string)。 | +| doris.read.bitmap-to-base64 | false | 是否将 Bitmap 类型转换为 Base64 编码后的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_BASE64](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-base64)。 | +| doris.query.port | - | Doris FE 查询端口,用于覆盖写入以及 Catalog 的元数据获取。 | ### SQL 和 Dataframe 专有配置 | Key | Default Value | Comment | -|---------------------------------|---------------|------------------------------------------------------------------------| -| user | -- | 访问 Doris 的用户名 | -| password | -- | 访问 Doris 的密码 | -| doris.filter.query.in.max.count | 100 | 谓词下推中,in 表达式 value 列表元素最大数量。超过此数量,则 in 表达式条件过滤在 Spark 侧处理。 | -| doris.ignore-type | -- | 指在定临时视图中,读取 schema 时要忽略的字段类型。
例如,'doris.ignore-type'='bitmap,hll' | +|---------------------------------|--------------|------------------------------------------------------------------------| +| doris.filter.query.in.max.count | 10000 | 谓词下推中,in 表达式 value 列表元素最大数量。超过此数量,则 in 表达式条件过滤在 Spark 侧处理。 | ### Structured Streaming 专有配置 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/spark-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/spark-doris-connector.md index 7e8439b30bf5c..9bdca11f2dd03 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/spark-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/spark-doris-connector.md @@ -38,7 +38,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| -| 24.0.0 | 3.5 - 3.0, 2.4 | 1.0 + | 8 | 2.12, 2.11 | +| 24.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | @@ -52,11 +52,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 ``` org.apache.doris - spark-doris-connector-3.5_2.12 + spark-doris-connector-spark-3.5 24.0.0 ``` +::: tip + +从 24.0.0 版本开始,Doris connector 包命名规则发生调整: +1. 不再包含 Scala 版本信息 +2. 对于 Spark 2.x 版本,统一使用名称为 `spark-doris-connector-spark-2` 的包,并且默认只基于 Scala 2.11 版本编译,需要 Scala 2.12 版本的请自行编译。 +3. 对于 Spark 3.x 版本,根据具体 Spark 版本使用使用名称为 `spark-doris-connector-spark-3.x` 的包,其中 Spark 3.0 版本可以使用 `spark-doris-connector-spark-3.1` 的包。 + +::: + **备注** 1. 请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。 @@ -67,7 +76,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 编译时,可直接运行 `sh build.sh`,具体可参考这里。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 也可以 @@ -75,20 +84,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 `sh build.sh` 根据提示输入你需要的 Scala 与 Spark 版本进行编译。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar`。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 -例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 +例如将 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 ```shell -1. 上传 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 到 hdfs。 +1. 上传 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 到 hdfs。 hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ -2. 在集群中添加 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 依赖。 -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar +2. 在集群中添加 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 依赖。 +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar ``` @@ -375,14 +384,14 @@ use your_doris_db; // show tables in test show tables; -// query table +-- query table select * from your_doris_table; -// write data +-- write data insert into your_doris_table values(xxx); insert into your_doris_table select * from your_source_table; -// access table with full name +-- access table with full name select * from your_catalog_name.your_doris_db.your_doris_table; insert into your_catalog_name.your_doris_db.your_doris_table values(xxx); insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table; @@ -397,52 +406,51 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ### 通用配置项 -| Key | Default Value | Comment | -|----------------------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 | -| doris.table.identifier | -- | Doris 表名,如:db1.tbl1 | -| doris.request.retries | 3 | 向 Doris 发送请求的重试次数 | -| doris.request.connect.timeout.ms | 30000 | 向 Doris 发送请求的连接超时时间 | -| doris.request.read.timeout.ms | 30000 | 向 Doris 发送请求的读取超时时间 | -| doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 | -| doris.request.tablet.size | 1 | 一个 RDD Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 | -| doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | -| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。 | -| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | -| doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch | -| doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | -| doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | -| doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | -| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | -| doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | -| doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | -| doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | -| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | -| doris.sink.batch.interval.ms | 50 | 每个批次 sink 的间隔时间,单位 ms。 | -| doris.sink.enable-2pc | false | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 | -| doris.sink.auto-redirect | true | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。 | -| doris.enable.https | false | 是否开启 FE Https 请求。 | -| doris.https.key-store-path | - | Https key store 路径。 | -| doris.https.key-store-type | JKS | Https key store 类型。 | -| doris.https.key-store-password | - | Https key store 密码。 | -| doris.sink.mode | stream_load | Doris 写入模式,可选项 `stream_load` 和 `copy_info`。 | -| doris.read.mode | thrift | Doris 读取模式,可选项 `thrift` 和 `arrow`。 | -| doris.read.arrow-flight-sql.port | - | Doris FE 的 Arrow Flight SQL 端口,当 `doris.read.mode` 为 `arrow` 时,用于通过 Arrow Flight SQL 方式读取数据。服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect) | -| doris.sink.label.prefix | spark-doris | Stream Load 方式写入时的导入标签前缀。 | -| doris.thrift.max.message.size | 2147483647 | 通过 Thrift 方式读取数据时,消息的最大尺寸。 | -| doris.fe.auto.fetch | false | 是否自动获取 FE 信息,当设置为 true 时,会根据 `doris.fenodes` 配置的节点请求所有 FE 节点信息,无需额外配置多个节点以及单独配置 `doris.read.arrow-flight-sql.port` 和 `doris.query.port`。 | -| doris.read.bitmap-to-string | false | 是否将 Bitmap 类型转换为数组索引组成的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_STRING](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-string)。 | -| doris.read.bitmap-to-base64 | false | 是否将 Bitmap 类型转换为 Base64 编码后的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_BASE64](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-base64)。 | -| doris.query.port | - | Doris FE 查询端口,用于覆盖写入以及 Catalog 的元数据获取。 | +| Key | Default Value | Comment | +|----------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 | +| doris.table.identifier | -- | Doris 表名,如:db1.tbl1 | +| doris.user | -- | 访问 Doris 的用户名 | +| doris.password | 空字符串 | 访问 Doris 的密码 | +| doris.request.retries | 3 | 向 Doris 发送请求的重试次数 | +| doris.request.connect.timeout.ms | 30000 | 向 Doris 发送请求的连接超时时间 | +| doris.request.read.timeout.ms | 30000 | 向 Doris 发送请求的读取超时时间 | +| doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 | +| doris.request.tablet.size | 1 | 一个 RDD Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 | +| doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | +| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。 | +| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | +| doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch | +| doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | +| doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | +| doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | +| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | +| doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | +| doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | +| doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | +| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | +| doris.sink.batch.interval.ms | 50 | 每个批次 sink 的间隔时间,单位 ms。 | +| doris.sink.enable-2pc | false | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 | +| doris.sink.auto-redirect | true | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。 | +| doris.enable.https | false | 是否开启 FE Https 请求。 | +| doris.https.key-store-path | - | Https key store 路径。 | +| doris.https.key-store-type | JKS | Https key store 类型。 | +| doris.https.key-store-password | - | Https key store 密码。 | +| doris.sink.mode | stream_load | Doris 写入模式,可选项 `stream_load` 和 `copy_info`。 | +| doris.read.mode | thrift | Doris 读取模式,可选项 `thrift` 和 `arrow`。 | +| doris.read.arrow-flight-sql.port | - | Doris FE 的 Arrow Flight SQL 端口,当 `doris.read.mode` 为 `arrow` 时,用于通过 Arrow Flight SQL 方式读取数据。服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect) | +| doris.sink.label.prefix | spark-doris | Stream Load 方式写入时的导入标签前缀。 | +| doris.thrift.max.message.size | 2147483647 | 通过 Thrift 方式读取数据时,消息的最大尺寸。 | +| doris.fe.auto.fetch | false | 是否自动获取 FE 信息,当设置为 true 时,会根据 `doris.fenodes` 配置的节点请求所有 FE 节点信息,无需额外配置多个节点以及单独配置 `doris.read.arrow-flight-sql.port` 和 `doris.query.port`。 | +| doris.read.bitmap-to-string | false | 是否将 Bitmap 类型转换为数组索引组成的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_STRING](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-string)。 | +| doris.read.bitmap-to-base64 | false | 是否将 Bitmap 类型转换为 Base64 编码后的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_BASE64](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-base64)。 | +| doris.query.port | - | Doris FE 查询端口,用于覆盖写入以及 Catalog 的元数据获取。 | ### SQL 和 Dataframe 专有配置 | Key | Default Value | Comment | -|---------------------------------|---------------|------------------------------------------------------------------------| -| user | -- | 访问 Doris 的用户名 | -| password | -- | 访问 Doris 的密码 | -| doris.filter.query.in.max.count | 100 | 谓词下推中,in 表达式 value 列表元素最大数量。超过此数量,则 in 表达式条件过滤在 Spark 侧处理。 | -| doris.ignore-type | -- | 指在定临时视图中,读取 schema 时要忽略的字段类型。
例如,'doris.ignore-type'='bitmap,hll' | +|---------------------------------|--------------|------------------------------------------------------------------------| +| doris.filter.query.in.max.count | 10000 | 谓词下推中,in 表达式 value 列表元素最大数量。超过此数量,则 in 表达式条件过滤在 Spark 侧处理。 | ### Structured Streaming 专有配置 diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/spark-doris-connector.md b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/spark-doris-connector.md index 7e8439b30bf5c..9bdca11f2dd03 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/spark-doris-connector.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/spark-doris-connector.md @@ -38,7 +38,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| -| 24.0.0 | 3.5 - 3.0, 2.4 | 1.0 + | 8 | 2.12, 2.11 | +| 24.0.0 | 3.5 - 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 - 3.1, 2.4, 2.3 | 1.0 - 2.1.0 | 8 | 2.12, 2.11 | @@ -52,11 +52,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 ``` org.apache.doris - spark-doris-connector-3.5_2.12 + spark-doris-connector-spark-3.5 24.0.0 ``` +::: tip + +从 24.0.0 版本开始,Doris connector 包命名规则发生调整: +1. 不再包含 Scala 版本信息 +2. 对于 Spark 2.x 版本,统一使用名称为 `spark-doris-connector-spark-2` 的包,并且默认只基于 Scala 2.11 版本编译,需要 Scala 2.12 版本的请自行编译。 +3. 对于 Spark 3.x 版本,根据具体 Spark 版本使用使用名称为 `spark-doris-connector-spark-3.x` 的包,其中 Spark 3.0 版本可以使用 `spark-doris-connector-spark-3.1` 的包。 + +::: + **备注** 1. 请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。 @@ -67,7 +76,7 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 编译时,可直接运行 `sh build.sh`,具体可参考这里。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 也可以 @@ -75,20 +84,20 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据 `sh build.sh` 根据提示输入你需要的 Scala 与 Spark 版本进行编译。 -编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar`。 +编译成功后,会在 `dist` 目录生成目标 jar 包,如:`spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。 -例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 +例如将 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径 ```shell -1. 上传 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 到 hdfs。 +1. 上传 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 到 hdfs。 hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ -2. 在集群中添加 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 依赖。 -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar +2. 在集群中添加 `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` 依赖。 +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar ``` @@ -375,14 +384,14 @@ use your_doris_db; // show tables in test show tables; -// query table +-- query table select * from your_doris_table; -// write data +-- write data insert into your_doris_table values(xxx); insert into your_doris_table select * from your_source_table; -// access table with full name +-- access table with full name select * from your_catalog_name.your_doris_db.your_doris_table; insert into your_catalog_name.your_doris_db.your_doris_table values(xxx); insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table; @@ -397,52 +406,51 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ ### 通用配置项 -| Key | Default Value | Comment | -|----------------------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 | -| doris.table.identifier | -- | Doris 表名,如:db1.tbl1 | -| doris.request.retries | 3 | 向 Doris 发送请求的重试次数 | -| doris.request.connect.timeout.ms | 30000 | 向 Doris 发送请求的连接超时时间 | -| doris.request.read.timeout.ms | 30000 | 向 Doris 发送请求的读取超时时间 | -| doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 | -| doris.request.tablet.size | 1 | 一个 RDD Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 | -| doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | -| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。 | -| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | -| doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch | -| doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | -| doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | -| doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | -| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | -| doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | -| doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | -| doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | -| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | -| doris.sink.batch.interval.ms | 50 | 每个批次 sink 的间隔时间,单位 ms。 | -| doris.sink.enable-2pc | false | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 | -| doris.sink.auto-redirect | true | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。 | -| doris.enable.https | false | 是否开启 FE Https 请求。 | -| doris.https.key-store-path | - | Https key store 路径。 | -| doris.https.key-store-type | JKS | Https key store 类型。 | -| doris.https.key-store-password | - | Https key store 密码。 | -| doris.sink.mode | stream_load | Doris 写入模式,可选项 `stream_load` 和 `copy_info`。 | -| doris.read.mode | thrift | Doris 读取模式,可选项 `thrift` 和 `arrow`。 | -| doris.read.arrow-flight-sql.port | - | Doris FE 的 Arrow Flight SQL 端口,当 `doris.read.mode` 为 `arrow` 时,用于通过 Arrow Flight SQL 方式读取数据。服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect) | -| doris.sink.label.prefix | spark-doris | Stream Load 方式写入时的导入标签前缀。 | -| doris.thrift.max.message.size | 2147483647 | 通过 Thrift 方式读取数据时,消息的最大尺寸。 | -| doris.fe.auto.fetch | false | 是否自动获取 FE 信息,当设置为 true 时,会根据 `doris.fenodes` 配置的节点请求所有 FE 节点信息,无需额外配置多个节点以及单独配置 `doris.read.arrow-flight-sql.port` 和 `doris.query.port`。 | -| doris.read.bitmap-to-string | false | 是否将 Bitmap 类型转换为数组索引组成的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_STRING](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-string)。 | -| doris.read.bitmap-to-base64 | false | 是否将 Bitmap 类型转换为 Base64 编码后的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_BASE64](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-base64)。 | -| doris.query.port | - | Doris FE 查询端口,用于覆盖写入以及 Catalog 的元数据获取。 | +| Key | Default Value | Comment | +|----------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 | +| doris.table.identifier | -- | Doris 表名,如:db1.tbl1 | +| doris.user | -- | 访问 Doris 的用户名 | +| doris.password | 空字符串 | 访问 Doris 的密码 | +| doris.request.retries | 3 | 向 Doris 发送请求的重试次数 | +| doris.request.connect.timeout.ms | 30000 | 向 Doris 发送请求的连接超时时间 | +| doris.request.read.timeout.ms | 30000 | 向 Doris 发送请求的读取超时时间 | +| doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 | +| doris.request.tablet.size | 1 | 一个 RDD Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 | +| doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 | +| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的额外时间开销。 | +| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 | +| doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch | +| doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 | +| doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。
默认写入时要按照 Doris 表字段顺序写入全部字段。 | +| doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 | +| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 | +| doris.sink.properties.format | csv | Stream Load 的数据格式。
共支持 3 种格式:csv,json,arrow
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | +| doris.sink.properties.* | -- | Stream Load 的导入参数。
例如:
指定列分隔符:`'doris.sink.properties.column_separator' = ','`等
[更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) | +| doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。
此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 | +| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。
如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 | +| doris.sink.batch.interval.ms | 50 | 每个批次 sink 的间隔时间,单位 ms。 | +| doris.sink.enable-2pc | false | 是否开启两阶段提交。开启后将会在作业结束时提交事务,而部分任务失败时会将所有预提交状态的事务会滚。 | +| doris.sink.auto-redirect | true | 是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显式获取 BE 信息。 | +| doris.enable.https | false | 是否开启 FE Https 请求。 | +| doris.https.key-store-path | - | Https key store 路径。 | +| doris.https.key-store-type | JKS | Https key store 类型。 | +| doris.https.key-store-password | - | Https key store 密码。 | +| doris.sink.mode | stream_load | Doris 写入模式,可选项 `stream_load` 和 `copy_info`。 | +| doris.read.mode | thrift | Doris 读取模式,可选项 `thrift` 和 `arrow`。 | +| doris.read.arrow-flight-sql.port | - | Doris FE 的 Arrow Flight SQL 端口,当 `doris.read.mode` 为 `arrow` 时,用于通过 Arrow Flight SQL 方式读取数据。服务端配置方式参考 [基于 Arrow Flight SQL 的高速数据传输链路](https://doris.apache.org/zh-CN/docs/dev/db-connect/arrow-flight-sql-connect) | +| doris.sink.label.prefix | spark-doris | Stream Load 方式写入时的导入标签前缀。 | +| doris.thrift.max.message.size | 2147483647 | 通过 Thrift 方式读取数据时,消息的最大尺寸。 | +| doris.fe.auto.fetch | false | 是否自动获取 FE 信息,当设置为 true 时,会根据 `doris.fenodes` 配置的节点请求所有 FE 节点信息,无需额外配置多个节点以及单独配置 `doris.read.arrow-flight-sql.port` 和 `doris.query.port`。 | +| doris.read.bitmap-to-string | false | 是否将 Bitmap 类型转换为数组索引组成的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_STRING](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-string)。 | +| doris.read.bitmap-to-base64 | false | 是否将 Bitmap 类型转换为 Base64 编码后的字符串读取。具体结果形式参考函数定义 [BITMAP_TO_BASE64](https://doris.apache.org/zh-CN/docs/dev/sql-manual/sql-functions/bitmap-functions/bitmap-to-base64)。 | +| doris.query.port | - | Doris FE 查询端口,用于覆盖写入以及 Catalog 的元数据获取。 | ### SQL 和 Dataframe 专有配置 | Key | Default Value | Comment | -|---------------------------------|---------------|------------------------------------------------------------------------| -| user | -- | 访问 Doris 的用户名 | -| password | -- | 访问 Doris 的密码 | -| doris.filter.query.in.max.count | 100 | 谓词下推中,in 表达式 value 列表元素最大数量。超过此数量,则 in 表达式条件过滤在 Spark 侧处理。 | -| doris.ignore-type | -- | 指在定临时视图中,读取 schema 时要忽略的字段类型。
例如,'doris.ignore-type'='bitmap,hll' | +|---------------------------------|--------------|------------------------------------------------------------------------| +| doris.filter.query.in.max.count | 10000 | 谓词下推中,in 表达式 value 列表元素最大数量。超过此数量,则 in 表达式条件过滤在 Spark 侧处理。 | ### Structured Streaming 专有配置 diff --git a/versioned_docs/version-2.1/ecosystem/spark-doris-connector.md b/versioned_docs/version-2.1/ecosystem/spark-doris-connector.md index 2322c696b32c1..2069537679e99 100644 --- a/versioned_docs/version-2.1/ecosystem/spark-doris-connector.md +++ b/versioned_docs/version-2.1/ecosystem/spark-doris-connector.md @@ -1,7 +1,7 @@ --- { - "title": "Spark Doris Connector", - "language": "en" + "title": "Spark Doris Connector", + "language": "en" } --- @@ -39,7 +39,7 @@ Github: https://github.com/apache/doris-spark-connector | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| -| 24.0.0 | 3.5 ~ 3.0, 2.4 | 1.0 + | 8 | 2.12, 2.11 | +| 24.0.0 | 3.5 ~ 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.0 | 8 | 2.12, 2.11 | @@ -53,10 +53,20 @@ Github: https://github.com/apache/doris-spark-connector ``` org.apache.doris - spark-doris-connector-3.4_2.12 + spark-doris-connector-spark-3.5 24.0.0 ``` + +::: tip + +Starting from version 24.0.0, the naming rules of the Doris connector package have been adjusted: +1. No longer contains Scala version information。 +2. For Spark 2.x versions, use the package named `spark-doris-connector-spark-2` uniformly, and by default only compile based on Scala 2.11 version. If you need Scala 2.12 version, please compile it yourself. +3. For Spark 3.x versions, use the package named `spark-doris-connector-spark-3.x` according to the specific Spark version. Applications based on Spark 3.0 version can use the package `spark-doris-connector-spark-3.1`. + +::: + **Note** 1. Please replace the corresponding Connector version according to different Spark and Scala versions. @@ -67,7 +77,7 @@ Github: https://github.com/apache/doris-spark-connector When compiling, you can directly run `sh build.sh`, for details, please refer to here. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. You can also Execute in the source code directory: @@ -76,21 +86,21 @@ Execute in the source code directory: Enter the Scala and Spark versions you need to compile according to the prompts. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar`. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, if `Spark` is running in `Local` mode, put this file in the `jars/` folder. If `Spark` is running in `Yarn` cluster mode, put this file in the pre-deployment package. -For example, upload `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter +For example, upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter ```shell -1. Upload `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` to hdfs. +1. Upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs. hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ -2. Add the `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` dependency in the cluster. -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar +2. Add the `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` dependency in the cluster. +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar ``` @@ -374,17 +384,17 @@ show databases; -- use databases use your_doris_db; -// show tables in test +-- show tables in test show tables; -// query table +-- query table select * from your_doris_table; -// write data +-- write data insert into your_doris_table values(xxx); insert into your_doris_table select * from your_source_table; -// access table with full name +-- access table with full name select * from your_catalog_name.your_doris_db.your_doris_table; insert into your_catalog_name.your_doris_db.your_doris_table values(xxx); insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table; @@ -398,6 +408,8 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ |----------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | doris.fenodes | -- | Doris FE http address, support multiple addresses, separated by commas | | doris.table.identifier | -- | Doris table identifier, eg, db1.tbl1 | +| doris.user | -- | Doris username | +| doris.password | Empty string | Doris password | | doris.request.retries | 3 | Number of retries to send requests to Doris | | doris.request.connect.timeout.ms | 30000 | Connection timeout for sending requests to Doris | | doris.request.read.timeout.ms | 30000 | Read timeout for sending request to Doris | @@ -437,10 +449,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ | Key | Default Value | Comment | |---------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| user | -- | Doris username | -| password | -- | Doris password | | doris.filter.query.in.max.count | 100 | In the predicate pushdown, the maximum number of elements in the in expression value list. If this number is exceeded, the in-expression conditional filtering is processed on the Spark side. | -| doris.ignore-type | -- | In a temporary view, specify the field types to ignore when reading the schema.
eg: when 'doris.ignore-type'='bitmap,hll' | ### Structured Streaming Configuration diff --git a/versioned_docs/version-3.0/ecosystem/spark-doris-connector.md b/versioned_docs/version-3.0/ecosystem/spark-doris-connector.md index 2322c696b32c1..2069537679e99 100644 --- a/versioned_docs/version-3.0/ecosystem/spark-doris-connector.md +++ b/versioned_docs/version-3.0/ecosystem/spark-doris-connector.md @@ -1,7 +1,7 @@ --- { - "title": "Spark Doris Connector", - "language": "en" + "title": "Spark Doris Connector", + "language": "en" } --- @@ -39,7 +39,7 @@ Github: https://github.com/apache/doris-spark-connector | Connector | Spark | Doris | Java | Scala | |-----------|---------------------|-------------|------|------------| -| 24.0.0 | 3.5 ~ 3.0, 2.4 | 1.0 + | 8 | 2.12, 2.11 | +| 24.0.0 | 3.5 ~ 3.1, 2.4 | 1.0 + | 8 | 2.12, 2.11 | | 1.3.2 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.6 | 8 | 2.12, 2.11 | | 1.3.1 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.0 | 8 | 2.12, 2.11 | | 1.3.0 | 3.4 ~ 3.1, 2.4, 2.3 | 1.0 ~ 2.1.0 | 8 | 2.12, 2.11 | @@ -53,10 +53,20 @@ Github: https://github.com/apache/doris-spark-connector ``` org.apache.doris - spark-doris-connector-3.4_2.12 + spark-doris-connector-spark-3.5 24.0.0 ``` + +::: tip + +Starting from version 24.0.0, the naming rules of the Doris connector package have been adjusted: +1. No longer contains Scala version information。 +2. For Spark 2.x versions, use the package named `spark-doris-connector-spark-2` uniformly, and by default only compile based on Scala 2.11 version. If you need Scala 2.12 version, please compile it yourself. +3. For Spark 3.x versions, use the package named `spark-doris-connector-spark-3.x` according to the specific Spark version. Applications based on Spark 3.0 version can use the package `spark-doris-connector-spark-3.1`. + +::: + **Note** 1. Please replace the corresponding Connector version according to different Spark and Scala versions. @@ -67,7 +77,7 @@ Github: https://github.com/apache/doris-spark-connector When compiling, you can directly run `sh build.sh`, for details, please refer to here. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, for `Spark` running in `Local` mode, put this file in the `jars/` folder. For `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package. You can also Execute in the source code directory: @@ -76,21 +86,21 @@ Execute in the source code directory: Enter the Scala and Spark versions you need to compile according to the prompts. -After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar`. +After successful compilation, the target jar package will be generated in the `dist` directory, such as: `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar`. Copy this file to the `ClassPath` of `Spark` to use `Spark-Doris-Connector`. For example, if `Spark` is running in `Local` mode, put this file in the `jars/` folder. If `Spark` is running in `Yarn` cluster mode, put this file in the pre-deployment package. -For example, upload `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter +For example, upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs and add the Jar package path on hdfs to the `spark.yarn.jars` parameter ```shell -1. Upload `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` to hdfs. +1. Upload `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` to hdfs. hdfs dfs -mkdir /spark-jars/ -hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/ +hdfs dfs -put /your_local_path/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar /spark-jars/ -2. Add the `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` dependency in the cluster. -spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar +2. Add the `spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar` dependency in the cluster. +spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-spark-3.5-24.0.0-SNAPSHOT.jar ``` @@ -374,17 +384,17 @@ show databases; -- use databases use your_doris_db; -// show tables in test +-- show tables in test show tables; -// query table +-- query table select * from your_doris_table; -// write data +-- write data insert into your_doris_table values(xxx); insert into your_doris_table select * from your_source_table; -// access table with full name +-- access table with full name select * from your_catalog_name.your_doris_db.your_doris_table; insert into your_catalog_name.your_doris_db.your_doris_table values(xxx); insert into your_catalog_name.your_doris_db.your_doris_table select * from your_source_table; @@ -398,6 +408,8 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ |----------------------------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | doris.fenodes | -- | Doris FE http address, support multiple addresses, separated by commas | | doris.table.identifier | -- | Doris table identifier, eg, db1.tbl1 | +| doris.user | -- | Doris username | +| doris.password | Empty string | Doris password | | doris.request.retries | 3 | Number of retries to send requests to Doris | | doris.request.connect.timeout.ms | 30000 | Connection timeout for sending requests to Doris | | doris.request.read.timeout.ms | 30000 | Read timeout for sending request to Doris | @@ -437,10 +449,7 @@ insert into your_catalog_name.your_doris_db.your_doris_table select * from your_ | Key | Default Value | Comment | |---------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| user | -- | Doris username | -| password | -- | Doris password | | doris.filter.query.in.max.count | 100 | In the predicate pushdown, the maximum number of elements in the in expression value list. If this number is exceeded, the in-expression conditional filtering is processed on the Spark side. | -| doris.ignore-type | -- | In a temporary view, specify the field types to ignore when reading the schema.
eg: when 'doris.ignore-type'='bitmap,hll' | ### Structured Streaming Configuration