From ec5471789c6163837515574185984c1ae6253336 Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Mon, 27 May 2024 19:07:59 +0300 Subject: [PATCH 1/2] Add last() utility method to the XArgs.StreamOffset --- src/main/java/io/lettuce/core/XReadArgs.java | 21 ++++++- .../core/SocketOptionsIntegrationTests.java | 40 +++++++++++--- .../StreamCommandIntegrationTests.java | 55 ++++++++++++------- 3 files changed, 84 insertions(+), 32 deletions(-) diff --git a/src/main/java/io/lettuce/core/XReadArgs.java b/src/main/java/io/lettuce/core/XReadArgs.java index 13d0b4938d..c08338e2dd 100644 --- a/src/main/java/io/lettuce/core/XReadArgs.java +++ b/src/main/java/io/lettuce/core/XReadArgs.java @@ -1,11 +1,11 @@ package io.lettuce.core; -import java.time.Duration; - import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandKeyword; +import java.time.Duration; + /** * Argument list builder for the Redis XREAD and {@literal XREADGROUP} commands. * Static import the methods from {@link XReadArgs.Builder} and call the methods: {@code block(…)} . @@ -171,7 +171,7 @@ private StreamOffset(K name, String offset) { } /** - * Read all new arriving elements from the stream identified by {@code name}. + * Read all new arriving elements from the stream identified by {@code name} excluding any elements before this call * * @param name must not be {@code null}. * @return the {@link StreamOffset} object without a specific offset. @@ -183,6 +183,21 @@ public static StreamOffset latest(K name) { return new StreamOffset<>(name, "$"); } + /** + * Read all new arriving elements from the stream identified by {@code name} including the last element added before + * this call + * + * @param name must not be {@code null}. + * @return the {@link StreamOffset} object without a specific offset. + * @since 7.0 + */ + public static StreamOffset last(K name) { + + LettuceAssert.notNull(name, "Stream must not be null"); + + return new StreamOffset<>(name, "+"); + } + /** * Read all new arriving elements from the stream identified by {@code name} with ids greater than the last one consumed * by the consumer group. diff --git a/src/test/java/io/lettuce/core/SocketOptionsIntegrationTests.java b/src/test/java/io/lettuce/core/SocketOptionsIntegrationTests.java index a7f7e2a548..eedf7e03da 100644 --- a/src/test/java/io/lettuce/core/SocketOptionsIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SocketOptionsIntegrationTests.java @@ -1,18 +1,16 @@ package io.lettuce.core; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import io.lettuce.test.LettuceExtension; +import io.netty.channel.ConnectTimeoutException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import javax.inject.Inject; import java.net.SocketException; import java.util.concurrent.TimeUnit; -import javax.inject.Inject; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -import io.lettuce.test.LettuceExtension; -import io.netty.channel.ConnectTimeoutException; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; /** * @author Mark Paluch @@ -51,4 +49,28 @@ void testConnectTimeout() { } } +// @Test +// void testConnectTimeout() { +// +// SocketOptions socketOptions = SocketOptions.builder().connectTimeout(100, TimeUnit.MILLISECONDS).build(); +// client.setOptions(ClientOptions.builder().socketOptions(socketOptions).build()); +// +// try { +// client.connect(RedisURI.create("2:4:5:5::1", 60000)); +// fail("Missing RedisConnectionException"); +// } catch (RedisConnectionException e) { +// +// if (e.getCause() instanceof ConnectTimeoutException) { +// assertThat(e).hasRootCauseInstanceOf(ConnectTimeoutException.class); +// assertThat(e.getCause()).hasMessageContaining("connection timed out"); +// return; +// } +// +// if (e.getCause() instanceof SocketException) { +// // Network is unreachable or No route to host are OK as well. +// return; +// } +// } +// } + } diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index 2216955d4d..0a4b233a9c 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -19,25 +19,6 @@ */ package io.lettuce.core.commands; -import static io.lettuce.core.protocol.CommandType.*; -import static org.assertj.core.api.Assertions.*; - -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import javax.inject.Inject; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.extension.ExtendWith; - import io.lettuce.core.*; import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.api.sync.RedisCommands; @@ -49,6 +30,23 @@ import io.lettuce.core.protocol.CommandArgs; import io.lettuce.test.LettuceExtension; import io.lettuce.test.condition.EnabledOnCommand; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; + +import javax.inject.Inject; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static io.lettuce.core.protocol.CommandType.XINFO; +import static org.assertj.core.api.Assertions.assertThat; /** * Integration tests for {@link io.lettuce.core.api.sync.RedisStreamCommands}. @@ -316,7 +314,7 @@ public void xreadTransactional() { redis.multi(); redis.xadd("stream-1", Collections.singletonMap("key3", "value3")); redis.xadd("stream-2", Collections.singletonMap("key4", "value4")); - redis.xread(StreamOffset.from("stream-1", initial1), XReadArgs.StreamOffset.from("stream-2", initial2)); + redis.xread(StreamOffset.from("stream-1", initial1), StreamOffset.from("stream-2", initial2)); TransactionResult exec = redis.exec(); @@ -337,6 +335,23 @@ public void xreadTransactional() { assertThat(secondMessage.getBody()).containsEntry("key4", "value4"); } + @Test + public void xreadLastVsLatest() { + redis.xadd("stream-1", Collections.singletonMap("key1", "value1")); + redis.xadd("stream-1", Collections.singletonMap("key2", "value2")); + + List> lastMessages = redis.xread(StreamOffset.last("stream-1")); + List> latestMessages = redis.xread(StreamOffset.latest("stream-1")); + + assertThat(lastMessages).hasSize(1); + StreamMessage lastMessage = lastMessages.get(0); + + assertThat(lastMessage.getStream()).isEqualTo("stream-1"); + assertThat(lastMessage.getBody()).hasSize(1).containsEntry("key2", "value2"); + + assertThat(latestMessages).isEmpty(); + } + @Test void xinfoStream() { From 1e21d42de13da81aa50cac11affc352fe6a3e99e Mon Sep 17 00:00:00 2001 From: Tihomir Mateev Date: Mon, 27 May 2024 19:11:03 +0300 Subject: [PATCH 2/2] Submitted one more file by mistake --- .../core/SocketOptionsIntegrationTests.java | 40 +++++-------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/src/test/java/io/lettuce/core/SocketOptionsIntegrationTests.java b/src/test/java/io/lettuce/core/SocketOptionsIntegrationTests.java index eedf7e03da..a7f7e2a548 100644 --- a/src/test/java/io/lettuce/core/SocketOptionsIntegrationTests.java +++ b/src/test/java/io/lettuce/core/SocketOptionsIntegrationTests.java @@ -1,16 +1,18 @@ package io.lettuce.core; -import io.lettuce.test.LettuceExtension; -import io.netty.channel.ConnectTimeoutException; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; -import javax.inject.Inject; import java.net.SocketException; import java.util.concurrent.TimeUnit; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import javax.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.lettuce.test.LettuceExtension; +import io.netty.channel.ConnectTimeoutException; /** * @author Mark Paluch @@ -49,28 +51,4 @@ void testConnectTimeout() { } } -// @Test -// void testConnectTimeout() { -// -// SocketOptions socketOptions = SocketOptions.builder().connectTimeout(100, TimeUnit.MILLISECONDS).build(); -// client.setOptions(ClientOptions.builder().socketOptions(socketOptions).build()); -// -// try { -// client.connect(RedisURI.create("2:4:5:5::1", 60000)); -// fail("Missing RedisConnectionException"); -// } catch (RedisConnectionException e) { -// -// if (e.getCause() instanceof ConnectTimeoutException) { -// assertThat(e).hasRootCauseInstanceOf(ConnectTimeoutException.class); -// assertThat(e.getCause()).hasMessageContaining("connection timed out"); -// return; -// } -// -// if (e.getCause() instanceof SocketException) { -// // Network is unreachable or No route to host are OK as well. -// return; -// } -// } -// } - }