Skip to content

Commit

Permalink
Merge pull request #438 from InterestingLab/rickyhuo.wd2.docs.spark
Browse files Browse the repository at this point in the history
Rickyhuo.wd2.docs.flink
  • Loading branch information
garyelephant authored Jan 18, 2020
2 parents 6567b32 + 165e613 commit bec498b
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 25 deletions.
45 changes: 45 additions & 0 deletions docs/zh-cn/v2/flink/configuration/sink-plugins/Elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,53 @@

### Description

输出数据到 ElasticSearch

### Options

| name | type | required | default value |
| --- | --- | --- | --- |
| [hosts](#hosts-array) | array | yes | - |
| [index_type](#index_type-string) | string | no | log |
| [index_time_format](#index_time_format-string) | string | no | yyyy.MM.dd |
| [index](#index-string) | string | no | waterdrop |

##### hosts [array]

Elasticsearch集群地址,格式为host:port,允许指定多个host。如["host1:9200", "host2:9200"]

##### index_type [string]

Elasticsearch index type

##### index_time_format [string]

`index`参数中的格式为`xxxx-${now}`时,`index_time_format`可以指定index名称的时间格式,默认值为 `yyyy.MM.dd`。常用的时间格式列举如下:

| Symbol | Description |
| --- | --- |
| y | Year |
| M | Month |
| d | Day of month |
| H | Hour in day (0-23) |
| m | Minute in hour |
| s | Second in minute |

详细的时间格式语法见[Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html)


##### index [string]

Elasticsearch index名称,如果需要根据时间生成index,可以指定时间变量,如:`waterdrop-${now}``now`代表当前数据处理的时间。


### Examples

```
elasticsearch {
hosts = ["localhost:9200"]
index = "waterdrop"
}
```

> 将结果写入Elasticsearch集群的名称为 waterdrop 的索引中
1 change: 1 addition & 0 deletions docs/zh-cn/v2/flink/configuration/sink-plugins/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## Sink(数据输出) 插件的配置

### Sink插件通用参数

| name | type | required | default value |
| --- | --- | --- | --- |
| [source_table_name](#source_table_name-string) | string | no | - |
Expand Down
14 changes: 0 additions & 14 deletions docs/zh-cn/v2/flink/configuration/source-plugins/Elasticsearch.md

This file was deleted.

2 changes: 0 additions & 2 deletions docs/zh-cn/v2/flink/configuration/source-plugins/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

- [Source 插件配置](/zh-cn/v2/flink/configuration/source-plugins/)

- [Elasticsearch](/zh-cn/v2/flink/configuration/source-plugins/Elasticsearch.md)

- [Fake](/zh-cn/v2/flink/configuration/source-plugins/Fake.md)

- [File](/zh-cn/v2/flink/configuration/source-plugins/File.md)
Expand Down
28 changes: 27 additions & 1 deletion docs/zh-cn/v2/spark/commands/start-waterdrop-spark.sh.md
Original file line number Diff line number Diff line change
@@ -1 +1,27 @@
## start-waterdrop-spark.sh 使用方法
## start-waterdrop-spark.sh 使用方法


### 使用说明

```bash
bin/start-waterdrop-spark.sh -c config-path -m master -e deploy-mode -i city=beijing
```

> 使用 `-c` 或者 `--config`来指定配置文件的路径
> 使用 `-m` 或者 `--master` 来指定集群管理器
> 使用 `-e` 或者 `--deploy-mode` 来指定部署模式
> 使用 `-i` 或者 `--variable` 来指定配置文件中的变量,可以配置多个

### 使用案例

```
# Yarn client 模式
./bin/start-waterdrop-spark.sh --master yarn --deploy-mode client --config ./config/application.conf
# Yarn cluster 模式
./bin/start-waterdrop-spark.sh --master yarn --deploy-mode cluster --config ./config/application.conf
```
6 changes: 5 additions & 1 deletion docs/zh-cn/v2/spark/configuration/ConfigExamples.md
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
## 完整配置文件案例 [Spark]
## 完整配置文件案例 [Spark]

[配置示例1 : Streaming 流式计算](https://github.com/InterestingLab/waterdrop/blob/wd-v2-baseline/config/spark.streaming.conf.template)

[配置示例2 : Batch 离线批处理](https://github.com/InterestingLab/waterdrop/blob/wd-v2-baseline/config/spark.batch.conf.template)
4 changes: 2 additions & 2 deletions docs/zh-cn/v2/spark/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,6 @@ sink {

其他配置可参考:

[配置示例1 : Streaming 流式计算](https://github.com/InterestingLab/waterdrop/blob/master/config/spark.streaming.conf.template)
[配置示例1 : Streaming 流式计算](https://github.com/InterestingLab/waterdrop/blob/wd-v2-baseline/config/spark.streaming.conf.template)

[配置示例2 : Batch 离线批处理](https://github.com/InterestingLab/waterdrop/blob/master/config/spark.batch.conf.template)
[配置示例2 : Batch 离线批处理](https://github.com/InterestingLab/waterdrop/blob/wd-v2-baseline/config/spark.batch.conf.template)
2 changes: 1 addition & 1 deletion docs/zh-cn/v2/spark/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ env {
```

关于如何配置 Waterdrop, 请见[Waterdrop 通用配置](/zh-cn/v2/spark/configuration)
关于如何配置 Waterdrop, 请见 [Waterdrop 通用配置](/zh-cn/v2/spark/configuration/)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.typesafe.config.waterdrop.Config;
import com.typesafe.config.waterdrop.ConfigFactory;
import io.github.interestinglab.waterdrop.common.utils.StringTemplate;
import io.github.interestinglab.waterdrop.flink.FlinkEnvironment;
import io.github.interestinglab.waterdrop.flink.batch.FlinkBatchSink;
import io.github.interestinglab.waterdrop.flink.stream.FlinkStreamSink;
Expand Down Expand Up @@ -29,6 +30,7 @@
public class Elasticsearch implements FlinkStreamSink<Row, Row>, FlinkBatchSink<Row, Row> {

private Config config;
private String indexName;

@Override
public void setConfig(Config config) {
Expand All @@ -42,7 +44,11 @@ public Config getConfig() {

@Override
public CheckResult checkConfig() {
return null;
if (config.hasPath("hosts")) {
return new CheckResult(true, "");
} else {
return new CheckResult(false, "please specify [hosts] as a non-empty string list");
}
}


Expand All @@ -52,9 +58,9 @@ public void prepare(FlinkEnvironment env) {
{
put("index", "waterdrop");
put("index_type", "log");
put("index_time_format", "yyyy.MM.dd");
}
});

config = config.withFallback(defaultConfig);
}

Expand All @@ -69,6 +75,7 @@ public DataStreamSink<Row> outputStream(FlinkEnvironment env, DataStream<Row> da

RowTypeInfo rowTypeInfo = (RowTypeInfo) dataStream.getType();
String[] fieldNames = rowTypeInfo.getFieldNames();
indexName = StringTemplate.substitute(config.getString("index"), config.getString("index_time_format"));
ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<Row>() {
Expand All @@ -80,7 +87,7 @@ public IndexRequest createIndexRequest(Row element) {
}

return Requests.indexRequest()
.index(config.getString("index"))
.index(indexName)
.type(config.getString("index_type"))
.source(json);
}
Expand All @@ -105,6 +112,7 @@ public DataSink<Row> outputBatch(FlinkEnvironment env, DataSet<Row> dataSet) {

RowTypeInfo rowTypeInfo = (RowTypeInfo) dataSet.getType();
String[] fieldNames = rowTypeInfo.getFieldNames();
indexName = StringTemplate.substitute(config.getString("index"), config.getString("index_time_format"));
return dataSet.output(new ElasticsearchOutputFormat<>(config, new ElasticsearchSinkFunction<Row>() {
@Override
public void process(Row element, RuntimeContext ctx, RequestIndexer indexer) {
Expand All @@ -119,7 +127,7 @@ private IndexRequest createIndexRequest(Row element) {
}

return Requests.indexRequest()
.index(config.getString("index"))
.index(indexName)
.type(config.getString("index_type"))
.source(json);
}
Expand Down

0 comments on commit bec498b

Please sign in to comment.