Skip to content

Commit

Permalink
Ensure users can provide metadata when invoking the streaming query
Browse files Browse the repository at this point in the history
The streaming query operation on the QueryGateway does not take into
account users can provide an implementation of Message to set
operation-specific metadata. Fix this by checking whether the given
query is a message.

#bug/wrapping-generic-streaming-query-message
  • Loading branch information
smcvb committed Aug 10, 2023
1 parent e944891 commit 3637f3f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -105,7 +105,7 @@ public <R, Q> CompletableFuture<R> query(@Nonnull String queryName, @Nonnull Q q

@Override
public <R, Q> Publisher<R> streamingQuery(String queryName, Q query, Class<R> responseType) {
return Mono.fromSupplier(() -> new GenericStreamingQueryMessage<>(query, queryName, responseType))
return Mono.fromSupplier(() -> new GenericStreamingQueryMessage<>(asMessage(query), queryName, responseType))
.flatMapMany(queryMessage -> queryBus.streamingQuery(processInterceptors(queryMessage)))
.map(Message::getPayload);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010-2022. Axon Framework
* Copyright (c) 2010-2023. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -468,6 +468,21 @@ void streamingQueryPropagateErrors() {
.verify();
}

@Test
void dispatchStreamingQueryWithMetaData() {
when(mockBus.streamingQuery(any())).thenReturn(Flux.empty());

StreamingQueryMessage<String, String> testQuery =
new GenericStreamingQueryMessage<>("Query", String.class).andMetaData(MetaData.with("key", "value"));

StepVerifier.create(testSubject.streamingQuery(testQuery, String.class))
.verifyComplete();

verify(mockBus).streamingQuery(argThat(
streamingQuery -> "value".equals(streamingQuery.getMetaData().get("key"))
));
}

@SuppressWarnings({"unused", "SameParameterValue"})
private <Q, R> QueryMessage<Q, R> anyMessage(Class<Q> queryType, Class<R> responseType) {
return any();
Expand Down

0 comments on commit 3637f3f

Please sign in to comment.