diff --git a/docs/en/connector-v2/sink/Redis.md b/docs/en/connector-v2/sink/Redis.md index 7d2ef237e1c..f91e6bc6ec0 100644 --- a/docs/en/connector-v2/sink/Redis.md +++ b/docs/en/connector-v2/sink/Redis.md @@ -20,6 +20,7 @@ Used to write data to Redis. | data_type | string | yes | - | | user | string | no | - | | auth | string | no | - | +| db_num | int | no | 0 | | mode | string | no | single | | nodes | list | yes when mode=cluster | - | | format | string | no | json | @@ -91,6 +92,10 @@ redis authentication user, you need it when you connect to an encrypted cluster Redis authentication password, you need it when you connect to an encrypted cluster +### db_num [int] + +Redis database index ID. It is connected to db 0 by default + ### mode [string] redis mode, `single` or `cluster`, default is `single` diff --git a/docs/en/connector-v2/source/Redis.md b/docs/en/connector-v2/source/Redis.md index fa4996b0e3a..3029f8061dd 100644 --- a/docs/en/connector-v2/source/Redis.md +++ b/docs/en/connector-v2/source/Redis.md @@ -25,6 +25,7 @@ Used to read data from Redis. | data_type | string | yes | - | | user | string | no | - | | auth | string | no | - | +| db_num | int | no | 0 | | mode | string | no | single | | hash_key_parse_mode | string | no | all | | nodes | list | yes when mode=cluster | - | @@ -151,6 +152,10 @@ redis authentication user, you need it when you connect to an encrypted cluster redis authentication password, you need it when you connect to an encrypted cluster +### db_num [int] + +Redis database index ID. It is connected to db 0 by default + ### mode [string] redis mode, `single` or `cluster`, default is `single` diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java index 511cbe4aa99..7b0c20cbeac 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisConfig.java @@ -48,6 +48,13 @@ public enum HashKeyParseMode { .withDescription( "redis authentication password, you need it when you connect to an encrypted cluster"); + public static final Option DB_NUM = + Options.key("db_num") + .intType() + .defaultValue(0) + .withDescription( + "Redis database index id, it is connected to db 0 by default"); + public static final Option USER = Options.key("user") .stringType() diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java index 8954b4da2a1..b1922263cf3 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/config/RedisParameters.java @@ -40,6 +40,7 @@ public class RedisParameters implements Serializable { private String host; private int port; private String auth = ""; + private int dbNum; private String user = ""; private String keysPattern; private String keyField; @@ -58,6 +59,10 @@ public void buildWithConfig(Config config) { if (config.hasPath(RedisConfig.AUTH.key())) { this.auth = config.getString(RedisConfig.AUTH.key()); } + // set db_num + if (config.hasPath(RedisConfig.DB_NUM.key())) { + this.dbNum = config.getInt(RedisConfig.DB_NUM.key()); + } // set user if (config.hasPath(RedisConfig.USER.key())) { this.user = config.getString(RedisConfig.USER.key()); @@ -115,6 +120,7 @@ public Jedis buildJedis() { if (StringUtils.isNotBlank(user)) { jedis.aclSetUser(user); } + jedis.select(dbNum); return jedis; case CLUSTER: HashSet nodes = new HashSet<>(); @@ -148,7 +154,9 @@ public Jedis buildJedis() { } else { jedisCluster = new JedisCluster(nodes); } - return new JedisWrapper(jedisCluster); + JedisWrapper jedisWrapper = new JedisWrapper(jedisCluster); + jedisWrapper.select(dbNum); + return jedisWrapper; default: // do nothing throw new RedisConnectorException( diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java index bd4a9063ba1..2a2feb7744f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java @@ -98,6 +98,13 @@ private void initSourceData() { for (int i = 0; i < rows.size(); i++) { jedis.set("key_test" + i, new String(jsonSerializationSchema.serialize(rows.get(i)))); } + // db_1 init data + jedis.select(1); + for (int i = 0; i < rows.size(); i++) { + jedis.set("key_test" + i, new String(jsonSerializationSchema.serialize(rows.get(i)))); + } + // db_num backup + jedis.select(0); } private static Pair> generateTestDataSet() { @@ -203,4 +210,14 @@ public void testRedisWithExpire(TestContainer container) Thread.sleep(60 * 1000); Assertions.assertEquals(0, jedis.llen("key_list")); } + + @TestTemplate + public void restRedisDbNum(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/redis-to-redis-by-db-num.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + jedis.select(2); + Assertions.assertEquals(100, jedis.llen("db_test")); + jedis.del("db_test"); + jedis.select(0); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf new file mode 100644 index 00000000000..a14bc2ab9ff --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis-by-db-num.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" + shade.identifier = "base64" + + #spark config + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + keys = "key_test*" + data_type = key + db_num=1 + } +} + +sink { + Redis { + host = "redis-e2e" + port = 6379 + auth = "U2VhVHVubmVs" + key = "db_test" + data_type = list + db_num=2 + } +} \ No newline at end of file