Skip to content

Commit

Permalink
[Improve][Connector-V2][Redis] Support redis cluster connection & use…
Browse files Browse the repository at this point in the history
…r authentication (#3188)

* [Feature][Connector-V2][Redis] Redis source & sink connector supports redis cluster mode connection and user authentication

* [Feature][Connector-V2][Redis] Update docs

* [Improve][Connector-V2][Redis] Support multi nodes setting in redis cluster mode

* [Improve][Connector-V2][Redis] Support parse mode for hash keys

* [Improve][Connector-V2][Redis] Update redis source doc
  • Loading branch information
TyrantLucifer authored Oct 31, 2022
1 parent 9bd076c commit c7275a4
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 26 deletions.
43 changes: 32 additions & 11 deletions docs/en/connector-v2/sink/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ Used to write data to Redis.

## Options

| name | type | required | default value |
|-------------- |--------|----------|---------------|
| host | string | yes | - |
| port | int | yes | - |
| key | string | yes | - |
| data_type | string | yes | - |
| auth | string | no | - |
| format | string | no | json |
| common-options| | no | - |
| name | type | required | default value |
|----------------|--------|----------|---------------|
| host | string | yes | - |
| port | int | yes | - |
| key | string | yes | - |
| data_type | string | yes | - |
| user | string | no | - |
| auth | string | no | - |
| mode | string | no | - |
| auth | list | no | - |
| format | string | no | json |
| common-options | | no | - |

### host [string]

Expand Down Expand Up @@ -75,11 +78,25 @@ Redis data types, support `key` `hash` `list` `set` `zset`
- zset
> Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption.
### auth [String]
### user [string]

redis authentication user, you need it when you connect to an encrypted cluster

### auth [string]

Redis authentication password, you need it when you connect to an encrypted cluster

### format [String]
### mode [string]

redis mode, `single` or `cluster`, default is `single`

### nodes [list]

redis nodes information, used in cluster mode, must like as the following format:

[host1:port1, host2:port2]

### format [string]

The format of upstream data, now only support `json`, `text` will be supported later, default `json`.

Expand Down Expand Up @@ -121,3 +138,7 @@ simple:
### 2.2.0-beta 2022-09-26

- Add Redis Sink Connector

### next version

- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188)
110 changes: 100 additions & 10 deletions docs/en/connector-v2/source/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ Used to read data from Redis.

## Options

| name | type | required | default value |
|--------------- |--------|----------|---------------|
| host | string | yes | - |
| port | int | yes | - |
| keys | string | yes | - |
| data_type | string | yes | - |
| auth | string | No | - |
| schema | config | No | - |
| format | string | No | json |
| common-options | | no | - |
| name | type | required | default value |
|---------------------|--------|----------|---------------|
| host | string | yes | - |
| port | int | yes | - |
| keys | string | yes | - |
| data_type | string | yes | - |
| user | string | no | - |
| auth | string | no | - |
| mode | string | no | - |
| hash_key_parse_mode | string | no | all |
| nodes | list | no | - |
| schema | config | no | - |
| format | string | no | json |
| common-options | | no | - |

### host [string]

Expand All @@ -36,6 +40,74 @@ redis host

redis port

### hash_key_parse_mode [string]

hash key parse mode, support `all` `kv`, used to tell connector how to parse hash key.

when setting it to `all`, connector will treat the value of hash key as a row and use the schema config to parse it, when setting it to `kv`, connector will treat each kv in hash key as a row and use the schema config to parse it:

for example, if the value of hash key is the following shown:

```text
{
"001": {
"name": "tyrantlucifer",
"age": 26
},
"002": {
"name": "Zongwen",
"age": 26
}
}
```

if hash_key_parse_mode is `all` and schema config as the following shown, it will generate the following data:

```hocon
schema {
fields {
001 {
name = string
age = int
}
002 {
name = string
age = int
}
}
}
```

| 001 | 002 |
|---------------------------------|---------------------------|
| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) |

if hash_key_parse_mode is `kv` and schema config as the following shown, it will generate the following data:

```hocon
schema {
fields {
hash_key = string
name = string
age = int
}
}
```

| hash_key | name | age |
|----------|---------------|-----|
| 001 | tyrantlucifer | 26 |
| 002 | Zongwen | 26 |

each kv that in hash key it will be treated as a row and send it to upstream.

**Tips: connector will use the first field information of schema config as the field name of each k that in each kv**

### keys [string]

keys pattern
Expand Down Expand Up @@ -67,10 +139,24 @@ redis data types, support `key` `hash` `list` `set` `zset`
> Each element in the sorted set will be sent downstream as a single row of data
> For example, the value of sorted set is `[tyrantlucier, CalvinKirs]`, the data received downstream are `tyrantlucifer` and `CalvinKirs` and only two message will be received.
### user [string]

redis authentication user, you need it when you connect to an encrypted cluster

### auth [string]

redis authentication password, you need it when you connect to an encrypted cluster

### mode [string]

redis mode, `single` or `cluster`, default is `single`

### nodes [list]

redis nodes information, used in cluster mode, must like as the following format:

[host1:port1, host2:port2]

### format [string]

the format of upstream data, now only support `json` `text`, default `json`.
Expand Down Expand Up @@ -166,3 +252,7 @@ simple:
### 2.2.0-beta 2022-09-26

- Add Redis Source Connector

### next version

- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188)
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.redis.config;

import lombok.NonNull;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;

import java.util.List;
import java.util.Map;
import java.util.Set;

public class JedisWrapper extends Jedis {
private final JedisCluster jedisCluster;

public JedisWrapper(@NonNull JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
}

@Override
public String set(final String key, final String value) {
return jedisCluster.set(key, value);
}

@Override
public String get(final String key) {
return jedisCluster.get(key);
}

@Override
public long hset(final String key, final Map<String, String> hash) {
return jedisCluster.hset(key, hash);
}

@Override
public Map<String, String> hgetAll(final String key) {
return jedisCluster.hgetAll(key);
}

@Override
public long lpush(final String key, final String... strings) {
return jedisCluster.lpush(key, strings);
}

@Override
public List<String> lrange(final String key, final long start, final long stop) {
return jedisCluster.lrange(key, start, stop);
}

@Override
public long sadd(final String key, final String... members) {
return jedisCluster.sadd(key, members);
}

@Override
public Set<String> smembers(final String key) {
return jedisCluster.smembers(key);
}

@Override
public long zadd(final String key, final double score, final String member) {
return jedisCluster.zadd(key, score, member);
}

@Override
public List<String> zrange(final String key, final long start, final long stop) {
return jedisCluster.zrange(key, start, stop);
}

@Override
public void close() {
jedisCluster.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,22 @@ public class RedisConfig {
public static final String HOST = "host";
public static final String PORT = "port";
public static final String AUTH = "auth";
public static final String USER = "user";
public static final String KEY_PATTERN = "keys";
public static final String KEY = "key";
public static final String DATA_TYPE = "data_type";
public static final String FORMAT = "format";
public static final String MODE = "mode";
public static final String NODES = "nodes";
public static final String HASH_KEY_PARSE_MODE = "hash_key_parse_mode";

public enum RedisMode {
SINGLE,
CLUSTER;
}

public enum HashKeyParseMode {
ALL,
KV;
}
}
Loading

0 comments on commit c7275a4

Please sign in to comment.