How to get List<String> in RedisPubSubListener.message? #3050
-
Good afternoon, I'm trying to implement RESP2-based client-side caching support using lettuce, and currently have this code in place to test the cache invalidation events: final var client = RedisClient.create(uriBuilder.build());
client.setOptions(ClientOptions.builder().protocolVersion(ProtocolVersion.RESP2).build());
final var pubSub = client.connectPubSub();
final var tracking = TrackingArgs.Builder.enabled().redirect(pubSub.sync().clientId()).bcast();
final var data = client.connect();
data.sync().clientTracking(tracking);
pubSub.addListener(new RedisPubSubAdapter<>() {
@Override
public void message(String channel, String message) {
System.out.printf("Channel: %s, Message: %s%n", channel, message);
}
});
pubSub.sync().subscribe("__redis__:invalidate"); Everything is fine for commands that modify single keys, e.g.
Could you please advise how to get the list of all the affected keys in the listener? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 1 reply
-
Hey @v-chernyshev , Thanks for logging this and thanks for providing a reproducer to test it. Please allow the team some time to analyse the problem and we will get back to you ASAP |
Beta Was this translation helpful? Give feedback.
-
(partial update, will update this message with the complete investigation) Some background:
I've tested your scenario both using the custom implementation and the driver built-in implementation with various results:
I hope the third option could serve as a workaround, but you probably explicitly used RESP2 for a reason. RESP2 + custom implementationPartial investigation shows that there seems to be a problem in the way the PubSubOutput works for RESP2.
... it would assume the third part of the reply, the reply message contents, is not an array, and would overwrite the first value with the second value, thus resulting in only one message being sent to the listener and one of the updates being lost. RESP3 + custom implementation final var client = RedisClient.create(local_standalone);
client.setOptions(ClientOptions.builder().protocolVersion(ProtocolVersion.RESP3).build());
final var pubSub = client.connectPubSub();
final var tracking = TrackingArgs.Builder.enabled().redirect(pubSub.sync().clientId()).bcast();
final var data = client.connect();
// prepare our connection and another party
final var otherParty = client.connect().sync();
final String[] currentMessage = { null };
data.sync().clientTracking(tracking);
pubSub.addListener(
message -> {
if (message.getType().equals("invalidate")) {
System.out.printf("Channel: %s, Message: %s%n", message.getType(), message.getContent().toString());
currentMessage[0] = message.getContent().toString();
}
}
);
pubSub.sync().subscribe("__redis__:invalidate");
Map<String, String> mset = new HashMap<>();
mset.put("foo", "bar");
mset.put("baz", "qux");
otherParty.mset(mset);
do {
Thread.sleep(1000);
} while (currentMessage[0] == null || !currentMessage[0].equalsIgnoreCase("baz")); RESP2 + built-in implementationFailing, investigating why RESP3 + built-in implementation // the client-side cache
Map<String, String> clientCache = new ConcurrentHashMap<>();
final var redisClient = RedisClient.create(redisURI);
redisClient.setOptions(ClientOptions.builder().protocolVersion(ProtocolVersion.RESP3).build());
// main connection using client-side cache
StatefulRedisConnection<String, String> thisParty = redisClient.connect();
// prepare our connection and another party (to execute change)
StatefulRedisConnection<String, String> otherParty = redisClient.connect();
RedisCommands<String, String> commands = otherParty.sync();
// Create cache-frontend through which we're going to access the cache
CacheFrontend<String, String> frontend = ClientSideCaching.enable(CacheAccessor.forMap(clientCache), thisParty,
TrackingArgs.Builder.enabled());
// make sure value exists in Redis
// client-side cache is empty
commands.set("foo", "1");
commands.set("baz", "2");
// Read-through into Redis using cached frontend
String cachedValueFoo = frontend.get("foo");
System.out.println("Cached value for foo: " + cachedValueFoo);
String cachedValueBaz = frontend.get("baz");
System.out.println("Cached value for baz: " + cachedValueBaz);
// MSET foo bar baz qux
Map<String, String> mset = new HashMap<>();
mset.put("foo", "bar");
mset.put("baz", "qux");
commands.mset(mset);
// a while later
Thread.sleep(1000);
// the expiration reflects in the client-side cache
System.out.println("---------------------------");
System.out.println(clientCache.isEmpty() ? "There are no cached values!" : "There are some cached values!" );
System.out.println(clientCache.values()); |
Beta Was this translation helpful? Give feedback.
-
Thank you for your investigation! I can confirm that the RESP3-based implementation works as expected, as long as the listener is set for push messages rather than pub/sub events. This should be enough for my current task. Admittedly, I’m not sure what to do with the pub/sub. It’s not clear whether a list may ever be received in response to the normal PUBLISH command as it only allows sending one value at a time. |
Beta Was this translation helpful? Give feedback.
(partial update, will update this message with the complete investigation)
Some background:
I've tested your scenario both using the custom implementation and the driver built-in implementation with various results: