diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index 405e4248429..9c0fa3e9517 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -13,15 +13,18 @@ Used to write data to Redis. ## Options -| name | type | required | default value | -|-------------- |--------|----------|---------------| -| host | string | yes | - | -| port | int | yes | - | -| key | string | yes | - | -| data_type | string | yes | - | -| auth | string | no | - | -| format | string | no | json | -| common-options| | no | - | +| name | type | required | default value | +|----------------|--------|----------|---------------| +| host | string | yes | - | +| port | int | yes | - | +| key | string | yes | - | +| data_type | string | yes | - | +| user | string | no | - | +| auth | string | no | - | +| mode | string | no | - | +| auth | list | no | - | +| format | string | no | json | +| common-options | | no | - | ### host [string] @@ -75,11 +78,25 @@ Redis data types, support `key` `hash` `list` `set` `zset` - zset > Each data from upstream will be added to the configured zset key with a weight of 1. So the order of data in zset is based on the order of data consumption. -### auth [String] +### user [string] + +redis authentication user, you need it when you connect to an encrypted cluster + +### auth [string] Redis authentication password, you need it when you connect to an encrypted cluster -### format [String] +### mode [string] + +redis mode, `single` or `cluster`, default is `single` + +### nodes [list] + +redis nodes information, used in cluster mode, must like as the following format: + +[host1:port1, host2:port2] + +### format [string] The format of upstream data, now only support `json`, `text` will be supported later, default `json`. @@ -121,3 +138,7 @@ simple: ### 2.2.0-beta 2022-09-26 - Add Redis Sink Connector + +### next version + +- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188) diff --git a/docs/en/connector-v2/source/Redis.md b/docs/en/connector-v2/source/Redis.md index 974aa22aaaa..589efc01bc8 100644 --- a/docs/en/connector-v2/source/Redis.md +++ b/docs/en/connector-v2/source/Redis.md @@ -17,16 +17,20 @@ Used to read data from Redis. ## Options -| name | type | required | default value | -|--------------- |--------|----------|---------------| -| host | string | yes | - | -| port | int | yes | - | -| keys | string | yes | - | -| data_type | string | yes | - | -| auth | string | No | - | -| schema | config | No | - | -| format | string | No | json | -| common-options | | no | - | +| name | type | required | default value | +|---------------------|--------|----------|---------------| +| host | string | yes | - | +| port | int | yes | - | +| keys | string | yes | - | +| data_type | string | yes | - | +| user | string | no | - | +| auth | string | no | - | +| mode | string | no | - | +| hash_key_parse_mode | string | no | all | +| nodes | list | no | - | +| schema | config | no | - | +| format | string | no | json | +| common-options | | no | - | ### host [string] @@ -36,6 +40,74 @@ redis host redis port +### hash_key_parse_mode [string] + +hash key parse mode, support `all` `kv`, used to tell connector how to parse hash key. + +when setting it to `all`, connector will treat the value of hash key as a row and use the schema config to parse it, when setting it to `kv`, connector will treat each kv in hash key as a row and use the schema config to parse it: + +for example, if the value of hash key is the following shown: + +```text +{ + "001": { + "name": "tyrantlucifer", + "age": 26 + }, + "002": { + "name": "Zongwen", + "age": 26 + } +} + +``` + +if hash_key_parse_mode is `all` and schema config as the following shown, it will generate the following data: + +```hocon + +schema { + fields { + 001 { + name = string + age = int + } + 002 { + name = string + age = int + } + } +} + +``` + +| 001 | 002 | +|---------------------------------|---------------------------| +| Row(name=tyrantlucifer, age=26) | Row(name=Zongwen, age=26) | + +if hash_key_parse_mode is `kv` and schema config as the following shown, it will generate the following data: + +```hocon + +schema { + fields { + hash_key = string + name = string + age = int + } +} + +``` + +| hash_key | name | age | +|----------|---------------|-----| +| 001 | tyrantlucifer | 26 | +| 002 | Zongwen | 26 | + +each kv that in hash key it will be treated as a row and send it to upstream. + +**Tips: connector will use the first field information of schema config as the field name of each k that in each kv** + ### keys [string] keys pattern @@ -67,10 +139,24 @@ redis data types, support `key` `hash` `list` `set` `zset` > Each element in the sorted set will be sent downstream as a single row of data > For example, the value of sorted set is `[tyrantlucier, CalvinKirs]`, the data received downstream are `tyrantlucifer` and `CalvinKirs` and only two message will be received. +### user [string] + +redis authentication user, you need it when you connect to an encrypted cluster + ### auth [string] redis authentication password, you need it when you connect to an encrypted cluster +### mode [string] + +redis mode, `single` or `cluster`, default is `single` + +### nodes [list] + +redis nodes information, used in cluster mode, must like as the following format: + +[host1:port1, host2:port2] + ### format [string] the format of upstream data, now only support `json` `text`, default `json`. @@ -166,3 +252,7 @@ simple: ### 2.2.0-beta 2022-09-26 - Add Redis Source Connector + +### next version + +- [Improve] Support redis cluster mode connection and user authentication [3188](https://github.com/apache/incubator-seatunnel/pull/3188) diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java new file mode 100644 index 00000000000..8348f3561f7 --- /dev/null +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/JedisWrapper.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.redis.config; + +import lombok.NonNull; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class JedisWrapper extends Jedis { + private final JedisCluster jedisCluster; + + public JedisWrapper(@NonNull JedisCluster jedisCluster) { + this.jedisCluster = jedisCluster; + } + + @Override + public String set(final String key, final String value) { + return jedisCluster.set(key, value); + } + + @Override + public String get(final String key) { + return jedisCluster.get(key); + } + + @Override + public long hset(final String key, final Map hash) { + return jedisCluster.hset(key, hash); + } + + @Override + public Map hgetAll(final String key) { + return jedisCluster.hgetAll(key); + } + + @Override + public long lpush(final String key, final String... strings) { + return jedisCluster.lpush(key, strings); + } + + @Override + public List lrange(final String key, final long start, final long stop) { + return jedisCluster.lrange(key, start, stop); + } + + @Override + public long sadd(final String key, final String... members) { + return jedisCluster.sadd(key, members); + } + + @Override + public Set smembers(final String key) { + return jedisCluster.smembers(key); + } + + @Override + public long zadd(final String key, final double score, final String member) { + return jedisCluster.zadd(key, score, member); + } + + @Override + public List zrange(final String key, final long start, final long stop) { + return jedisCluster.zrange(key, start, stop); + } + + @Override + public void close() { + jedisCluster.close(); + } +} diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index e158449b411..11ee665a905 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -21,8 +21,22 @@ public class RedisConfig { public static final String HOST = "host"; public static final String PORT = "port"; public static final String AUTH = "auth"; + public static final String USER = "user"; public static final String KEY_PATTERN = "keys"; public static final String KEY = "key"; public static final String DATA_TYPE = "data_type"; public static final String FORMAT = "format"; + public static final String MODE = "mode"; + public static final String NODES = "nodes"; + public static final String HASH_KEY_PARSE_MODE = "hash_key_parse_mode"; + + public enum RedisMode { + SINGLE, + CLUSTER; + } + + public enum HashKeyParseMode { + ALL, + KV; + } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index bc5147505bd..75b74d2068b 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -21,18 +21,28 @@ import lombok.Data; import org.apache.commons.lang3.StringUtils; +import redis.clients.jedis.ConnectionPoolConfig; +import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; @Data public class RedisParameters implements Serializable { private String host; private int port; private String auth = ""; + private String user = ""; private String keysPattern; private String keyField; private RedisDataType redisDataType; + private RedisConfig.RedisMode mode; + private RedisConfig.HashKeyParseMode hashKeyParseMode; + private List redisNodes = Collections.emptyList(); public void buildWithConfig(Config config) { // set host @@ -43,6 +53,27 @@ public void buildWithConfig(Config config) { if (config.hasPath(RedisConfig.AUTH)) { this.auth = config.getString(RedisConfig.AUTH); } + // set user + if (config.hasPath(RedisConfig.USER)) { + this.user = config.getString(RedisConfig.USER); + } + // set mode + if (config.hasPath(RedisConfig.MODE)) { + this.mode = RedisConfig.RedisMode.valueOf(config.getString(RedisConfig.MODE)); + } else { + this.mode = RedisConfig.RedisMode.SINGLE; + } + // set hash key mode + if (config.hasPath(RedisConfig.HASH_KEY_PARSE_MODE)) { + this.hashKeyParseMode = RedisConfig.HashKeyParseMode + .valueOf(config.getString(RedisConfig.HASH_KEY_PARSE_MODE).toUpperCase()); + } else { + this.hashKeyParseMode = RedisConfig.HashKeyParseMode.ALL; + } + // set redis nodes information + if (config.hasPath(RedisConfig.NODES)) { + this.redisNodes = config.getStringList(RedisConfig.NODES); + } // set key if (config.hasPath(RedisConfig.KEY)) { this.keyField = config.getString(RedisConfig.KEY); @@ -61,10 +92,44 @@ public void buildWithConfig(Config config) { } public Jedis buildJedis() { - Jedis jedis = new Jedis(host, port); - if (StringUtils.isNotBlank(auth)) { - jedis.auth(auth); + switch (mode) { + case SINGLE: + Jedis jedis = new Jedis(host, port); + if (StringUtils.isNotBlank(auth)) { + jedis.auth(auth); + } + if (StringUtils.isNotBlank(user)) { + jedis.aclSetUser(user); + } + return jedis; + case CLUSTER: + HashSet nodes = new HashSet<>(); + HostAndPort node = new HostAndPort(host, port); + nodes.add(node); + if (!redisNodes.isEmpty()) { + for (String redisNode : redisNodes) { + String[] splits = redisNode.split(":"); + if (splits.length != 2) { + throw new IllegalArgumentException("Invalid redis node information," + + "redis node information must like as the following: [host:port]"); + } + HostAndPort hostAndPort = new HostAndPort(splits[0], Integer.parseInt(splits[1])); + nodes.add(hostAndPort); + } + } + ConnectionPoolConfig connectionPoolConfig = new ConnectionPoolConfig(); + JedisCluster jedisCluster; + if (StringUtils.isNotBlank(auth)) { + jedisCluster = new JedisCluster(nodes, JedisCluster.DEFAULT_TIMEOUT, + JedisCluster.DEFAULT_TIMEOUT, JedisCluster.DEFAULT_MAX_ATTEMPTS, + auth, connectionPoolConfig); + } else { + jedisCluster = new JedisCluster(nodes); + } + return new JedisWrapper(jedisCluster); + default: + // do nothing + throw new IllegalArgumentException("Not support this redis mode"); } - return jedis; } } diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java index ada38fe6d19..74f862ed852 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/source/RedisSourceReader.java @@ -19,9 +19,13 @@ import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; +import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisConfig; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisDataType; import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters; @@ -29,6 +33,7 @@ import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; @@ -66,7 +71,21 @@ public void pollNext(Collector output) throws Exception { if (deserializationSchema == null) { output.collect(new SeaTunnelRow(new Object[]{value})); } else { - deserializationSchema.deserialize(value.getBytes(), output); + if (redisParameters.getHashKeyParseMode() == RedisConfig.HashKeyParseMode.KV && + redisDataType == RedisDataType.HASH) { + // Treat each key-value pair in the hash-key as one piece of data + Map recordsMap = JsonUtils.toMap(value); + for (Map.Entry entry : recordsMap.entrySet()) { + String k = entry.getKey(); + String v = entry.getValue(); + Map valuesMap = JsonUtils.toMap(v); + SeaTunnelDataType seaTunnelRowType = deserializationSchema.getProducedType(); + valuesMap.put(((SeaTunnelRowType) seaTunnelRowType).getFieldName(0), k); + deserializationSchema.deserialize(JsonUtils.toJsonString(valuesMap).getBytes(), output); + } + } else { + deserializationSchema.deserialize(value.getBytes(), output); + } } } }