From 5aa617c584271f9c249e8eff99f7c0de30f8794a Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Mon, 13 Jan 2020 19:05:27 +0800 Subject: [PATCH 1/3] Fix spell mistake --- docs/zh-cn/v2/spark/configuration/ConfigExamples.md | 6 +++++- docs/zh-cn/v2/spark/configuration/README.md | 4 ++-- docs/zh-cn/v2/spark/deployment.md | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/zh-cn/v2/spark/configuration/ConfigExamples.md b/docs/zh-cn/v2/spark/configuration/ConfigExamples.md index 21752a0fbdb..d51cf3695d3 100644 --- a/docs/zh-cn/v2/spark/configuration/ConfigExamples.md +++ b/docs/zh-cn/v2/spark/configuration/ConfigExamples.md @@ -1 +1,5 @@ -## 完整配置文件案例 [Spark] \ No newline at end of file +## 完整配置文件案例 [Spark] + +[配置示例1 : Streaming 流式计算](https://github.com/InterestingLab/waterdrop/blob/wd-v2-baseline/config/spark.streaming.conf.template) + +[配置示例2 : Batch 离线批处理](https://github.com/InterestingLab/waterdrop/blob/wd-v2-baseline/config/spark.batch.conf.template) diff --git a/docs/zh-cn/v2/spark/configuration/README.md b/docs/zh-cn/v2/spark/configuration/README.md index 08d93544845..97991561dd3 100644 --- a/docs/zh-cn/v2/spark/configuration/README.md +++ b/docs/zh-cn/v2/spark/configuration/README.md @@ -108,6 +108,6 @@ sink { 其他配置可参考: -[配置示例1 : Streaming 流式计算](https://github.com/InterestingLab/waterdrop/blob/master/config/spark.streaming.conf.template) +[配置示例1 : Streaming 流式计算](https://github.com/InterestingLab/waterdrop/blob/wd-v2-baseline/config/spark.streaming.conf.template) -[配置示例2 : Batch 离线批处理](https://github.com/InterestingLab/waterdrop/blob/master/config/spark.batch.conf.template) +[配置示例2 : Batch 离线批处理](https://github.com/InterestingLab/waterdrop/blob/wd-v2-baseline/config/spark.batch.conf.template) diff --git a/docs/zh-cn/v2/spark/deployment.md b/docs/zh-cn/v2/spark/deployment.md index e77b09b08da..d36994f77b3 100644 --- a/docs/zh-cn/v2/spark/deployment.md +++ b/docs/zh-cn/v2/spark/deployment.md @@ -54,4 +54,4 @@ env { ``` -关于如何配置 Waterdrop, 请见[Waterdrop 通用配置](/zh-cn/v2/spark/configuration) +关于如何配置 Waterdrop, 请见 [Waterdrop 通用配置](/zh-cn/v2/spark/configuration/) From 0f019a98494a50551f66ad85d438fa254cd504c8 Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Tue, 14 Jan 2020 13:27:16 +0800 Subject: [PATCH 2/3] Update flink elasticsearch sink docs --- .../sink-plugins/Elasticsearch.md | 45 +++++++++++++++++++ .../configuration/sink-plugins/README.md | 1 + .../source-plugins/Elasticsearch.md | 14 ------ .../configuration/source-plugins/_sidebar.md | 2 - .../waterdrop/flink/sink/Elasticsearch.java | 16 +++++-- 5 files changed, 58 insertions(+), 20 deletions(-) delete mode 100644 docs/zh-cn/v2/flink/configuration/source-plugins/Elasticsearch.md diff --git a/docs/zh-cn/v2/flink/configuration/sink-plugins/Elasticsearch.md b/docs/zh-cn/v2/flink/configuration/sink-plugins/Elasticsearch.md index a968b0858af..373532431b1 100644 --- a/docs/zh-cn/v2/flink/configuration/sink-plugins/Elasticsearch.md +++ b/docs/zh-cn/v2/flink/configuration/sink-plugins/Elasticsearch.md @@ -6,8 +6,53 @@ ### Description +输出数据到 ElasticSearch ### Options +| name | type | required | default value | +| --- | --- | --- | --- | +| [hosts](#hosts-array) | array | yes | - | +| [index_type](#index_type-string) | string | no | log | +| [index_time_format](#index_time_format-string) | string | no | yyyy.MM.dd | +| [index](#index-string) | string | no | waterdrop | + +##### hosts [array] + +Elasticsearch集群地址,格式为host:port,允许指定多个host。如["host1:9200", "host2:9200"]。 + +##### index_type [string] + +Elasticsearch index type + +##### index_time_format [string] + +当`index`参数中的格式为`xxxx-${now}`时,`index_time_format`可以指定index名称的时间格式,默认值为 `yyyy.MM.dd`。常用的时间格式列举如下: + +| Symbol | Description | +| --- | --- | +| y | Year | +| M | Month | +| d | Day of month | +| H | Hour in day (0-23) | +| m | Minute in hour | +| s | Second in minute | + +详细的时间格式语法见[Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html)。 + + +##### index [string] + +Elasticsearch index名称,如果需要根据时间生成index,可以指定时间变量,如:`waterdrop-${now}`。`now`代表当前数据处理的时间。 + ### Examples + +``` +elasticsearch { + hosts = ["localhost:9200"] + index = "waterdrop" +} +``` + +> 将结果写入Elasticsearch集群的名称为 waterdrop 的索引中 diff --git a/docs/zh-cn/v2/flink/configuration/sink-plugins/README.md b/docs/zh-cn/v2/flink/configuration/sink-plugins/README.md index 1e0bb3ea56c..faa0a0efbf5 100644 --- a/docs/zh-cn/v2/flink/configuration/sink-plugins/README.md +++ b/docs/zh-cn/v2/flink/configuration/sink-plugins/README.md @@ -1,6 +1,7 @@ ## Sink(数据输出) 插件的配置 ### Sink插件通用参数 + | name | type | required | default value | | --- | --- | --- | --- | | [source_table_name](#source_table_name-string) | string | no | - | diff --git a/docs/zh-cn/v2/flink/configuration/source-plugins/Elasticsearch.md b/docs/zh-cn/v2/flink/configuration/source-plugins/Elasticsearch.md deleted file mode 100644 index ad20478a450..00000000000 --- a/docs/zh-cn/v2/flink/configuration/source-plugins/Elasticsearch.md +++ /dev/null @@ -1,14 +0,0 @@ -## Source plugin : Elasticsearch [Flink] - -* Author: InterestingLab -* Homepage: https://interestinglab.github.io/waterdrop -* Version: 2.0.0 - -### Description - - -### Options - - -### Examples - diff --git a/docs/zh-cn/v2/flink/configuration/source-plugins/_sidebar.md b/docs/zh-cn/v2/flink/configuration/source-plugins/_sidebar.md index 28bcca64ca0..62c34d29508 100644 --- a/docs/zh-cn/v2/flink/configuration/source-plugins/_sidebar.md +++ b/docs/zh-cn/v2/flink/configuration/source-plugins/_sidebar.md @@ -17,8 +17,6 @@ - [Source 插件配置](/zh-cn/v2/flink/configuration/source-plugins/) - - [Elasticsearch](/zh-cn/v2/flink/configuration/source-plugins/Elasticsearch.md) - - [Fake](/zh-cn/v2/flink/configuration/source-plugins/Fake.md) - [File](/zh-cn/v2/flink/configuration/source-plugins/File.md) diff --git a/plugin-flink-sink-elasticsearch/src/main/java/io/github/interestinglab/waterdrop/flink/sink/Elasticsearch.java b/plugin-flink-sink-elasticsearch/src/main/java/io/github/interestinglab/waterdrop/flink/sink/Elasticsearch.java index f7599b40119..5c3211a5835 100644 --- a/plugin-flink-sink-elasticsearch/src/main/java/io/github/interestinglab/waterdrop/flink/sink/Elasticsearch.java +++ b/plugin-flink-sink-elasticsearch/src/main/java/io/github/interestinglab/waterdrop/flink/sink/Elasticsearch.java @@ -2,6 +2,7 @@ import com.typesafe.config.waterdrop.Config; import com.typesafe.config.waterdrop.ConfigFactory; +import io.github.interestinglab.waterdrop.common.utils.StringTemplate; import io.github.interestinglab.waterdrop.flink.FlinkEnvironment; import io.github.interestinglab.waterdrop.flink.batch.FlinkBatchSink; import io.github.interestinglab.waterdrop.flink.stream.FlinkStreamSink; @@ -29,6 +30,7 @@ public class Elasticsearch implements FlinkStreamSink, FlinkBatchSink { private Config config; + private String indexName; @Override public void setConfig(Config config) { @@ -42,7 +44,11 @@ public Config getConfig() { @Override public CheckResult checkConfig() { - return null; + if (config.hasPath("hosts")) { + return new CheckResult(true, ""); + } else { + return new CheckResult(false, "please specify [hosts] as a non-empty string list"); + } } @@ -52,9 +58,9 @@ public void prepare(FlinkEnvironment env) { { put("index", "waterdrop"); put("index_type", "log"); + put("index_time_format", "yyyy.MM.dd"); } }); - config = config.withFallback(defaultConfig); } @@ -69,6 +75,7 @@ public DataStreamSink outputStream(FlinkEnvironment env, DataStream da RowTypeInfo rowTypeInfo = (RowTypeInfo) dataStream.getType(); String[] fieldNames = rowTypeInfo.getFieldNames(); + indexName = StringTemplate.substitute(config.getString("index"), config.getString("index_time_format")); ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction() { @@ -80,7 +87,7 @@ public IndexRequest createIndexRequest(Row element) { } return Requests.indexRequest() - .index(config.getString("index")) + .index(indexName) .type(config.getString("index_type")) .source(json); } @@ -105,6 +112,7 @@ public DataSink outputBatch(FlinkEnvironment env, DataSet dataSet) { RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType(); String[] fieldNames = rowTypeInfo.getFieldNames(); + indexName = StringTemplate.substitute(config.getString("index"), config.getString("index_time_format")); return dataSet.output(new ElasticsearchOutputFormat<>(config, new ElasticsearchSinkFunction() { @Override public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) { @@ -119,7 +127,7 @@ private IndexRequest createIndexRequest(Row element) { } return Requests.indexRequest() - .index(config.getString("index")) + .index(indexName) .type(config.getString("index_type")) .source(json); } From 165e61363a7a9d8764cd9e078354879aced3890a Mon Sep 17 00:00:00 2001 From: RickyHuo Date: Fri, 17 Jan 2020 13:16:11 +0800 Subject: [PATCH 3/3] Update docs for spark commands --- .../commands/start-waterdrop-spark.sh.md | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/docs/zh-cn/v2/spark/commands/start-waterdrop-spark.sh.md b/docs/zh-cn/v2/spark/commands/start-waterdrop-spark.sh.md index 7a5dfe7a062..d8b83ebfe4d 100644 --- a/docs/zh-cn/v2/spark/commands/start-waterdrop-spark.sh.md +++ b/docs/zh-cn/v2/spark/commands/start-waterdrop-spark.sh.md @@ -1 +1,27 @@ -## start-waterdrop-spark.sh 使用方法 \ No newline at end of file +## start-waterdrop-spark.sh 使用方法 + + +### 使用说明 + +```bash + bin/start-waterdrop-spark.sh -c config-path -m master -e deploy-mode -i city=beijing +``` + +> 使用 `-c` 或者 `--config`来指定配置文件的路径 + +> 使用 `-m` 或者 `--master` 来指定集群管理器 + +> 使用 `-e` 或者 `--deploy-mode` 来指定部署模式 + +> 使用 `-i` 或者 `--variable` 来指定配置文件中的变量,可以配置多个 + + +### 使用案例 + +``` +# Yarn client 模式 +./bin/start-waterdrop-spark.sh --master yarn --deploy-mode client --config ./config/application.conf + +# Yarn cluster 模式 +./bin/start-waterdrop-spark.sh --master yarn --deploy-mode cluster --config ./config/application.conf +``` \ No newline at end of file