diff --git a/src/main/java/io/lettuce/core/XReadArgs.java b/src/main/java/io/lettuce/core/XReadArgs.java index 13d0b4938d..c08338e2dd 100644 --- a/src/main/java/io/lettuce/core/XReadArgs.java +++ b/src/main/java/io/lettuce/core/XReadArgs.java @@ -1,11 +1,11 @@ package io.lettuce.core; -import java.time.Duration; - import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.CommandArgs; import io.lettuce.core.protocol.CommandKeyword; +import java.time.Duration; + /** * Argument list builder for the Redis XREAD and {@literal XREADGROUP} commands. * Static import the methods from {@link XReadArgs.Builder} and call the methods: {@code block(…)} . @@ -171,7 +171,7 @@ private StreamOffset(K name, String offset) { } /** - * Read all new arriving elements from the stream identified by {@code name}. + * Read all new arriving elements from the stream identified by {@code name} excluding any elements before this call * * @param name must not be {@code null}. * @return the {@link StreamOffset} object without a specific offset. @@ -183,6 +183,21 @@ public static StreamOffset latest(K name) { return new StreamOffset<>(name, "$"); } + /** + * Read all new arriving elements from the stream identified by {@code name} including the last element added before + * this call + * + * @param name must not be {@code null}. + * @return the {@link StreamOffset} object without a specific offset. + * @since 7.0 + */ + public static StreamOffset last(K name) { + + LettuceAssert.notNull(name, "Stream must not be null"); + + return new StreamOffset<>(name, "+"); + } + /** * Read all new arriving elements from the stream identified by {@code name} with ids greater than the last one consumed * by the consumer group. diff --git a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java index 2216955d4d..0a4b233a9c 100644 --- a/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/commands/StreamCommandIntegrationTests.java @@ -19,25 +19,6 @@ */ package io.lettuce.core.commands; -import static io.lettuce.core.protocol.CommandType.*; -import static org.assertj.core.api.Assertions.*; - -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import javax.inject.Inject; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.extension.ExtendWith; - import io.lettuce.core.*; import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.api.sync.RedisCommands; @@ -49,6 +30,23 @@ import io.lettuce.core.protocol.CommandArgs; import io.lettuce.test.LettuceExtension; import io.lettuce.test.condition.EnabledOnCommand; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; + +import javax.inject.Inject; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static io.lettuce.core.protocol.CommandType.XINFO; +import static org.assertj.core.api.Assertions.assertThat; /** * Integration tests for {@link io.lettuce.core.api.sync.RedisStreamCommands}. @@ -316,7 +314,7 @@ public void xreadTransactional() { redis.multi(); redis.xadd("stream-1", Collections.singletonMap("key3", "value3")); redis.xadd("stream-2", Collections.singletonMap("key4", "value4")); - redis.xread(StreamOffset.from("stream-1", initial1), XReadArgs.StreamOffset.from("stream-2", initial2)); + redis.xread(StreamOffset.from("stream-1", initial1), StreamOffset.from("stream-2", initial2)); TransactionResult exec = redis.exec(); @@ -337,6 +335,23 @@ public void xreadTransactional() { assertThat(secondMessage.getBody()).containsEntry("key4", "value4"); } + @Test + public void xreadLastVsLatest() { + redis.xadd("stream-1", Collections.singletonMap("key1", "value1")); + redis.xadd("stream-1", Collections.singletonMap("key2", "value2")); + + List> lastMessages = redis.xread(StreamOffset.last("stream-1")); + List> latestMessages = redis.xread(StreamOffset.latest("stream-1")); + + assertThat(lastMessages).hasSize(1); + StreamMessage lastMessage = lastMessages.get(0); + + assertThat(lastMessage.getStream()).isEqualTo("stream-1"); + assertThat(lastMessage.getBody()).hasSize(1).containsEntry("key2", "value2"); + + assertThat(latestMessages).isEmpty(); + } + @Test void xinfoStream() {