Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set message metadata via Reactor context #24

Merged
merged 82 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
e86b592
Change version to minor release version
smcvb Sep 14, 2020
1c2d45d
Token update try 3
smcvb Sep 14, 2020
22dbec9
Revert token to original
smcvb Sep 14, 2020
797fc8a
Update secure token
smcvb Sep 15, 2020
7cd953a
Run Sonar only with JDK-11 and try different way to pass token.
bert-laverman Sep 17, 2020
cac9cb9
Just the new token then, and camelcased name of the org
bert-laverman Sep 17, 2020
4a74b7d
Regenerated project key
bert-laverman Sep 17, 2020
77a5d5f
Copy build script from working project
bert-laverman Sep 17, 2020
2c46c8a
And now with a fixed copy-paste bug
bert-laverman Sep 17, 2020
77348f9
Remove mention of user group
smcvb Sep 24, 2020
59cba91
Added dependabot to the project
lfgcampos Oct 9, 2020
8477936
Add Maven central and SonarCloud badges
sandjelkovic Oct 12, 2020
72f7323
Merge pull request #10 from AxonFramework/add-dependabot
lfgcampos Oct 13, 2020
fd2b1de
Bump maven-javadoc-plugin from 2.10.4 to 3.2.0
dependabot[bot] Oct 13, 2020
d8555c0
Bump axon-messaging from 4.4.2 to 4.4.3
dependabot[bot] Oct 13, 2020
3a3c7f8
Bump maven-source-plugin from 3.0.1 to 3.2.1
dependabot[bot] Oct 13, 2020
b2765ad
Bump junit.jupiter.version from 5.5.2 to 5.7.0
dependabot[bot] Oct 13, 2020
467606a
Bump spring.version from 5.2.5.RELEASE to 5.2.9.RELEASE
dependabot[bot] Oct 13, 2020
15898fa
Merge pull request #17 from AxonFramework/dependabot/maven/spring.ver…
schananas Oct 14, 2020
adf1f1f
Merge pull request #16 from AxonFramework/dependabot/maven/org.apache…
schananas Oct 14, 2020
21e8a3d
Merge pull request #15 from AxonFramework/dependabot/maven/junit.jupi…
schananas Oct 14, 2020
cebd060
Merge pull request #14 from AxonFramework/dependabot/maven/org.axonfr…
schananas Oct 14, 2020
a09711a
Merge pull request #13 from AxonFramework/dependabot/maven/org.apache…
schananas Oct 14, 2020
e8e54e0
Bump maven-surefire-plugin from 2.19.1 to 2.22.2
dependabot[bot] Oct 15, 2020
483b8b1
Bump mockito.version from 3.1.0 to 3.5.13
dependabot[bot] Oct 15, 2020
41b3329
Bump maven-resources-plugin from 3.0.2 to 3.2.0
dependabot[bot] Oct 15, 2020
b2ab88f
Bump projectreactor.version from 3.3.6.RELEASE to 3.3.10.RELEASE
dependabot[bot] Oct 15, 2020
e066575
Set message metadata when MetaData is attached to Reactor Context
schananas Oct 15, 2020
bb8a42f
Add issue templates and close-label config
smcvb Oct 22, 2020
971124e
Add milestone
smcvb Oct 22, 2020
9e0cb4f
Merge pull request #12 from AxonFramework/add-readme-badges
sandjelkovic Oct 24, 2020
f796c9b
Add label in progress
smcvb Oct 26, 2020
0e2b5b7
Added automatic release notes on milestone closure
lfgcampos Nov 6, 2020
a58f69a
Added label mapping to release notes
lfgcampos Nov 6, 2020
e6ef424
Merge pull request #25 from AxonFramework/automatic-release-notes
smcvb Nov 6, 2020
3ac1d1f
Add new line
smcvb Nov 6, 2020
be30d21
Replace mailing list link with discuss.axoniq link
azzazzel Nov 25, 2020
02ddabf
Merge pull request #27 from azzazzel/patch-1
smcvb Nov 26, 2020
c737b60
Introduce Build and Test GHA
smcvb Dec 18, 2020
ea5fb6c
Fix if statement
smcvb Dec 18, 2020
14fed8a
Rename badge
smcvb Dec 18, 2020
2fffbb8
Remove empty line
smcvb Dec 18, 2020
b106bce
Update GHA
smcvb Jan 20, 2021
bf19abc
Add maven wrapper
smcvb Jan 20, 2021
b074db0
Fix javadoc
smcvb Jan 20, 2021
dc4c2b1
Merge pull request #23 from AxonFramework/dependabot/maven/projectrea…
smcvb Feb 3, 2021
bef7667
Merge pull request #22 from AxonFramework/dependabot/maven/org.apache…
smcvb Feb 3, 2021
d3d958b
Bump maven-assembly-plugin from 2.6 to 3.3.0
dependabot[bot] Feb 3, 2021
e963354
Bump jacoco-maven-plugin from 0.8.5 to 0.8.6
dependabot[bot] Feb 3, 2021
cb19a27
Merge pull request #20 from AxonFramework/dependabot/maven/mockito.ve…
smcvb Feb 3, 2021
30ece11
Merge pull request #19 from AxonFramework/dependabot/maven/org.apache…
smcvb Feb 3, 2021
7a87272
Bump maven-compiler-plugin from 3.5.1 to 3.8.1
dependabot[bot] Feb 3, 2021
bc471ee
Bump axon-messaging from 4.4.3 to 4.4.6
dependabot[bot] Feb 3, 2021
12c8f81
Merge pull request #31 from AxonFramework/dependabot/maven/org.jacoco…
smcvb Feb 3, 2021
d3ec298
Merge pull request #29 from AxonFramework/dependabot/maven/org.apache…
smcvb Feb 3, 2021
b9587ab
Merge pull request #32 from AxonFramework/dependabot/maven/org.apache…
smcvb Feb 3, 2021
5a42d85
Merge pull request #33 from AxonFramework/dependabot/maven/org.axonfr…
smcvb Feb 3, 2021
e097250
Bump spring.boot.version from 2.2.6.RELEASE to 2.4.2
dependabot[bot] Feb 3, 2021
c5a38c9
Merge pull request #30 from AxonFramework/dependabot/maven/spring.boo…
smcvb Feb 3, 2021
2cd9936
Merge branch 'axon-reactor-4.4.x'
smcvb Feb 3, 2021
289e455
Bump spring-boot-starters from 2.2.6.RELEASE to 2.2.13.RELEASE
dependabot[bot] Feb 4, 2021
21789ce
Bump mockito.version from 3.5.13 to 3.7.7
dependabot[bot] Feb 4, 2021
e8ff8a1
Bump spring.version from 5.2.9.RELEASE to 5.3.3
dependabot[bot] Feb 4, 2021
79f0432
Bump maven-jar-plugin from 3.0.2 to 3.2.0
dependabot[bot] Feb 4, 2021
d16e9cd
Bump projectreactor.version from 3.3.10.RELEASE to 3.4.2
dependabot[bot] Feb 4, 2021
99d6714
Merge pull request #35 from AxonFramework/dependabot/maven/spring.ver…
smcvb Feb 4, 2021
1acd3af
Merge pull request #38 from AxonFramework/dependabot/maven/org.apache…
smcvb Feb 4, 2021
f97a430
Merge pull request #37 from AxonFramework/dependabot/maven/projectrea…
smcvb Feb 4, 2021
57ae627
Merge pull request #36 from AxonFramework/dependabot/maven/mockito.ve…
smcvb Feb 4, 2021
aafe53b
Merge pull request #34 from AxonFramework/dependabot/maven/org.spring…
smcvb Feb 4, 2021
7d55c25
Bump junit.jupiter.version from 5.7.0 to 5.7.1
dependabot[bot] Feb 5, 2021
3d5f7a8
Merge pull request #39 from AxonFramework/dependabot/maven/junit.jupi…
smcvb Feb 5, 2021
20d3ef9
Fix junit5 dependencies in order to run every test of the project.
lfgcampos Feb 9, 2021
8945bad
Bump axon-messaging from 4.4.6 to 4.4.7
dependabot[bot] Feb 11, 2021
260cfb3
Merge pull request #40 from AxonFramework/dependabot/maven/org.axonfr…
smcvb Feb 11, 2021
33b4d92
Update contribution description
smcvb Feb 12, 2021
d2a5c92
Bump projectreactor.version from 3.4.2 to 3.4.3
dependabot[bot] Feb 16, 2021
f0ef705
Merge pull request #41 from AxonFramework/dependabot/maven/projectrea…
schananas Feb 23, 2021
a132c64
Bump projectreactor.version from 3.4.3 to 3.4.4
dependabot[bot] Mar 16, 2021
4b475ce
Merge pull request #45 from AxonFramework/dependabot/maven/projectrea…
smcvb Mar 16, 2021
5e7d190
Merge branch 'master' into feature/set-metadata-via-context
schananas Apr 2, 2021
1c43a3d
Update from deprecated API
schananas Apr 2, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
version: 2
updates:
- package-ecosystem: maven
directory: "/"
schedule:
interval: daily
open-pull-requests-limit: 5
# Specify labels for pull requests
labels:
- "Type: Dependency Upgrade"
- "Priority 1: Must"
# Add reviewers
reviewers:
- "m1l4n54v1c"
- "saratry"
- "stefanvozd"
37 changes: 23 additions & 14 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,32 +1,41 @@
# We want to run this on Ubuntu 18.04 LTS
dist: bionic

language: java

sudo: required

services:
- docker

jobs:
include:
- name: "JDK 8"
jdk: openjdk8
env: MVN_BUILD_SCRIPT="clean verify"
- name: "JDK 11 with Sonar analysis"
jdk: openjdk11
env: MVN_BUILD_SCRIPT="-Pcoverage clean verify sonar:sonar -Dsonar.projectKey=AxonFramework_extension-reactor"
- docker

#jobs:
# include:
# - name: JDK 8
# jdk: openjdk8
# env: MVN_BUILD_SCRIPT="clean verify"
# - name: JDK 11 with Sonar analysis
# jdk: openjdk11
# env: MVN_BUILD_SCRIPT="-Pcoverage clean verify sonar:sonar -Dsonar.projectKey=AxonFramework_extension-reactor"
#install: true
#cache:
# directories:
# - "$HOME/.m2"
#script:
# - mvn -version
# - mvn -B -U $MVN_BUILD_SCRIPT
jdk:
- openjdk8

install: true

cache:
directories:
- $HOME/.m2

# We switch JDK by setting the JAVA_HOME environment variable. All JDKs are under the same base directory.
script:
- mvn -version
- mvn -B -U $MVN_BUILD_SCRIPT
- mvn -B clean verify
- JAVA_HOME=$(dirname ${JAVA_HOME})/openjdk11 mvn -version
- JAVA_HOME=$(dirname ${JAVA_HOME})/openjdk11 mvn -B -Pcoverage clean verify sonar:sonar -Dsonar.projectKey=AxonFramework_extension-reactor

notifications:
slack:
Expand All @@ -36,4 +45,4 @@ addons:
sonarcloud:
organization: "axonframework"
token:
secure: "htYEusSkwB7bvgURS8FU8CBFmIlJcfSVhA6dZQ5uF3m3KYltT8D6zC4/O9aRw/2dId0lPMC5T6XiRLkmJsa3hLJhsbnrOW+8UXyzWq6SZfuVngRAXKeo5sqrv+7XXyIir5kbq0WSgs4UBJfxUIIDynfDupE9Y4MfZAQRA0FrupkEwwV4sp+QQKIymYFO84iPgwvlHpcn+KDMI0LNtLa6+kKJuuL5u4HsKHA4tTWBDP5aSBZL2dRNt1YM8+poO3pXH+fqTW21X7qq2GfoDfyLdfWtodT9TfREBQWgJcX6dXc3yLruPMnY7wzvNZUVF7bDDjIg9rcfARLLyBDutDGtiovqilKjes5JbsjEweRw9CpLq86UqIN1Sn1dHf40ieuvuU6hDyOCRAYwbP2/DZtN5eFyVJgUKY4NpcAewOODI3KMVTrhEeVjET8uvFsKT1Phsph3Ii/qhamED5+Yqvu+0974hQq0C3jXtDaM9xKC9ViaK1Bw1Q8P6EwGUvTEDFzsEPsSpg03G0EXQoOlikk5AbgjIEhDzeD9/UQvXv8qL5DM9N9nD/fH8ELa0ma5GHzMwK9WrkjjsYmeppjyT/wJHvb0MHAZxkJvq4izHvAZmvkusBEAOGpZHGRhDdGSR59zDMT1B7d8SLLk8OH9bFVEYh7rtFeNzwl6mzaGPFO7aWc="
secure: "Byhy5L5cGZ7dlymrXg6/5SRSmCSbBLq2DbmBDHnWKTq2TfbhCgNvkrjI7R4i4xwkvIcNEuFblULUzGSz7+lGcTxpNsMt9MZKxnH8Hj+PAWNOLdHQGty9+EXyQCF9q4x+td/Sqp/EuCFBYvDyVCVCIVbyMKe6bD9J6hfC8QWpK1Q3eDOVGI9N9AtawkDVxoRYH2Br4tkxrSAQcNA8w53oqycNPotDeIQS0K/780AULXzb7i8fYO/Uz4x+pnh348O35009WTS2eUU8yjoZ4UJmBJ1ocN3Xtj7NyllPPr9e5QZfPi+xAz2YecWLSrwpyPiS4nPmzAyPcYtohYc/HiSntZvP9mZYmCVPb4GbZAIPrgXSjtPX2RMd9FVwLH2PJ2VX4uxQ2B9maCuETj7zV8CW6zCUNJJMRtpHzIvNY/Nj+9H8n0EKepMKFxU0uWq34JJcWTvLX/fITGs6KljNivMq/w3Hue/akW2b9XoqLzfryTUYIFShZWp/rhQdset67ReBvtLFE+lTH+XoNkvd+psV37j51nmnnFUNEfITpvIi+3zStytCK213coyNU2cKCaa93Jfvgt90OJBNBf9M1RpP+oyQ++kSWkUFY8u/QMW1walR8sFlKmkw6NRaupgEc/QpbkGzSdcT5laiDrhxisNsaRCTuF42QmlT9nQhP0ZG+EQ="
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ There are a couple of things to consider when you're traversing anything Axon:
as the majority of possible scenarios you might encounter when using Axon should be covered there.
* If the Reference Guide does not cover a specific topic you would've expected,
we'd appreciate if you could file an [issue](https://github.com/AxonIQ/reference-guide/issues) about it for us.
* There is a a [public mailing list](https://groups.google.com/forum/#!forum/axonframework) to support you in the case
the reference guide did not sufficiently answer your question.
* Next to the mailing list we also monitor Stack Overflow for any questions which are tagged with `axon`.
* There is a [forum](https://discuss.axoniq.io/) to support you in the case the reference guide did not sufficiently answer your question.
Axon Framework and Server developers will help out on a best effort basis.
Know that any support from contributors on posted question is very much appreciated on the forum.
* Next to the forum we also monitor Stack Overflow for any questions which are tagged with `axon`.

## Feature requests and issue reporting

Expand Down
2 changes: 1 addition & 1 deletion axon-reactor-spring-boot-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<artifactId>axon-reactor-parent</artifactId>
<groupId>org.axonframework.extensions.reactor</groupId>
<version>4.4.3-SNAPSHOT</version>
<version>4.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
4 changes: 2 additions & 2 deletions axon-reactor-spring-boot-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

<groupId>org.axonframework.extensions.reactor</groupId>
<artifactId>axon-reactor-spring-boot-starter</artifactId>
<version>4.4.3-SNAPSHOT</version>
<version>4.5-SNAPSHOT</version>

<name>Axon Framework Reactor Extension - Spring Boot Starter</name>
<description>
Expand Down Expand Up @@ -125,7 +125,7 @@
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.10.4</version>
<version>3.2.0</version>
<executions>
<execution>
<id>attach-javadoc</id>
Expand Down
2 changes: 1 addition & 1 deletion axon-reactor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<parent>
<artifactId>axon-reactor-parent</artifactId>
<groupId>org.axonframework.extensions.reactor</groupId>
<version>4.4.3-SNAPSHOT</version>
<version>4.5-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package org.axonframework.extensions.reactor.commandhandling.gateway;

import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.*;
import org.axonframework.commandhandling.gateway.RetryScheduler;
import org.axonframework.commandhandling.gateway.RetryingCallback;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.Registration;
import org.axonframework.extensions.reactor.messaging.ReactorMessageDispatchInterceptor;
import org.axonframework.extensions.reactor.messaging.ReactorResultHandlerInterceptor;
import org.axonframework.extensions.reactor.commandhandling.callbacks.ReactorCallback;
import org.axonframework.messaging.MetaData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
Expand Down Expand Up @@ -72,13 +69,27 @@ public static Builder builder() {
@Override
public <R> Mono<R> send(Object command) {
//noinspection unchecked
return Mono.<CommandMessage<?>>just(GenericCommandMessage.asCommandMessage(command))
return createCommandMessage(command)
.transform(this::processCommandInterceptors)
.flatMap(this::dispatchCommand)
.flatMap(this::processResultsInterceptors)
.transform(this::getPayload);
}

private Mono<CommandMessage<?>> createCommandMessage(Object command) {
return Mono.just(command)
.zipWith(metaDataFromContext())
.map(commandAndMeta -> GenericCommandMessage.asCommandMessage(commandAndMeta.getT1())
.andMetaData(commandAndMeta.getT2()));
}

private Mono<MetaData> metaDataFromContext() {
return Mono.subscriberContext()
.handle((ctx,sink) -> sink.next(Objects.requireNonNull(
ctx.getOrDefault(MetaData.class, MetaData.emptyInstance())
)));
}

@Override
public Registration registerDispatchInterceptor(ReactorMessageDispatchInterceptor<CommandMessage<?>> interceptor) {
dispatchInterceptors.add(interceptor);
Expand All @@ -100,19 +111,23 @@ private Mono<CommandMessage<?>> processCommandInterceptors(Mono<CommandMessage<?

private <C, R> Mono<Tuple2<CommandMessage<C>, Flux<CommandResultMessage<? extends R>>>> dispatchCommand(
CommandMessage<C> commandMessage) {
ReactorCallback<C, R> reactorCallback = new ReactorCallback<>();
CommandCallback<C, R> callback = reactorCallback;
if (retryScheduler != null) {
callback = new RetryingCallback<>(callback, retryScheduler, commandBus);
}
commandBus.dispatch(commandMessage, callback);
return Mono.just(commandMessage).zipWith(Mono.just(Flux.from(reactorCallback)));
return Mono.defer(()-> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this change in another PR.

ReactorCallback<C, R> reactorCallback = new ReactorCallback<>();
CommandCallback<C, R> callback = reactorCallback;
if (retryScheduler != null) {
callback = new RetryingCallback<>(callback, retryScheduler, commandBus);
}
commandBus.dispatch(commandMessage, callback);
return Mono.just(commandMessage).zipWith(Mono.just(Flux.from(reactorCallback)));
});
}

private <C> Mono<? extends CommandResultMessage<?>> processResultsInterceptors(
Tuple2<CommandMessage<C>, Flux<CommandResultMessage<?>>> commandWithResults) {
CommandMessage<?> commandMessage = commandWithResults.getT1();
Flux<CommandResultMessage<?>> commandResultMessages = commandWithResults.getT2();
Flux<CommandResultMessage<?>> commandResultMessages = commandWithResults.getT2()
.flatMapSequential(this::mapExceptionalResult);

return Flux.fromIterable(resultInterceptors)
.reduce(commandResultMessages,
(result, interceptor) -> interceptor.intercept(commandMessage, result))
Expand All @@ -126,6 +141,10 @@ private <R> Mono<R> getPayload(Mono<? extends CommandResultMessage<?>> commandRe
.map(it -> (R) it.getPayload());
}

private Mono<? extends CommandResultMessage<?>> mapExceptionalResult(CommandResultMessage<?> response) {
return response.isExceptional() ? Mono.error(response.exceptionResult()) : Mono.just(response);
}

/**
* Builder class to instantiate {@link DefaultReactorCommandGateway}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ default Flux<Object> sendAll(Publisher<?> commands) {
return Flux.from(commands)
.concatMap(this::send);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.extensions.reactor.messaging.ReactorMessageDispatchInterceptor;
import org.axonframework.messaging.MetaData;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -57,11 +59,25 @@ public static Builder builder() {
@Override
public Flux<EventMessage<?>> publish(List<?> events) {
return Flux.fromIterable(events)
.map(event -> Mono.<EventMessage<?>>just(GenericEventMessage.asEventMessage(event)))
.map(this::createEventMessage)
.flatMap(this::processEventInterceptors)
.flatMap(this::publishEvent);
}

private Mono<EventMessage<?>> createEventMessage(Object event) {
return Mono.just(event)
.zipWith(metaDataFromContext())
.map(eventAndMeta -> GenericEventMessage.asEventMessage(eventAndMeta.getT1())
.andMetaData(eventAndMeta.getT2()));
}

private Mono<MetaData> metaDataFromContext() {
return Mono.subscriberContext()
.handle((ctx,sink) -> sink.next(Objects.requireNonNull(
ctx.getOrDefault(MetaData.class, MetaData.emptyInstance())
)));
}

@Override
public Registration registerDispatchInterceptor(ReactorMessageDispatchInterceptor<EventMessage<?>> interceptor) {
dispatchInterceptors.add(interceptor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,10 @@
import org.axonframework.common.Registration;
import org.axonframework.extensions.reactor.messaging.ReactorMessageDispatchInterceptor;
import org.axonframework.extensions.reactor.messaging.ReactorResultHandlerInterceptor;
import org.axonframework.messaging.MetaData;
import org.axonframework.messaging.ResultMessage;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.queryhandling.DefaultSubscriptionQueryResult;
import org.axonframework.queryhandling.GenericQueryMessage;
import org.axonframework.queryhandling.GenericSubscriptionQueryMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
Expand Down Expand Up @@ -91,16 +83,29 @@ public Registration registerResultHandlerInterceptor(

@Override
public <R, Q> Mono<R> query(String queryName, Q query, ResponseType<R> responseType) {
return Mono.<QueryMessage<?, ?>>fromCallable(() -> new GenericQueryMessage<>(asMessage(query),
queryName,
responseType))
return createQueryMessage(queryName, query, responseType)
.transform(this::processDispatchInterceptors)
.flatMap(this::dispatchQuery)
.flatMapMany(this::processResultsInterceptors)
.<R>transform(this::getPayload)
.next();
}


public <R, Q> Mono<QueryMessage<?, ?>> createQueryMessage(String queryName, Q query, ResponseType<R> responseType) {
return Mono.fromCallable(() -> new GenericQueryMessage<>(asMessage(query), queryName, responseType))
.zipWith(metaDataFromContext())
.map(queryAndMeta -> queryAndMeta.getT1().andMetaData(queryAndMeta.getT2()));
}


private Mono<MetaData> metaDataFromContext() {
return Mono.subscriberContext()
.handle((ctx,sink) -> sink.next(Objects.requireNonNull(
ctx.getOrDefault(MetaData.class, MetaData.emptyInstance())
)));
}

@Override
public <R, Q> Flux<R> scatterGather(String queryName, Q query, ResponseType<R> responseType, Duration timeout) {
return Mono.<QueryMessage<?, ?>>fromCallable(() -> new GenericQueryMessage<>(asMessage(query),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
import org.axonframework.commandhandling.gateway.RetryScheduler;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MetaData;
import org.axonframework.queryhandling.QueryMessage;
import org.junit.jupiter.api.*;
import org.mockito.ArgumentCaptor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
Expand All @@ -23,6 +26,7 @@

import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
import static reactor.util.context.Context.of;

/**
* Tests for {@link DefaultReactorCommandGateway} all together with Command Bus.
Expand Down Expand Up @@ -100,7 +104,7 @@ void testSendContext() throws Exception {
Mono<String> result = reactiveCommandGateway.send("command");
verifyNoMoreInteractions(commandMessageHandler);

Context context = Context.of("k1", "v1");
Context context = of("k1", "v1");

StepVerifier.create(result.subscriberContext(context))
.expectNext("handled")
Expand All @@ -112,6 +116,33 @@ void testSendContext() throws Exception {
verifyNoMoreInteractions(mockRetryScheduler);
}

@Test
void testQuerySetMetaDataViaContext() throws Exception {
Mono<String> result = reactiveCommandGateway.send("command");
verifyNoMoreInteractions(commandMessageHandler);

Context context = of(MetaData.class, MetaData.with("k","v"));

StepVerifier.create(result.subscriberContext(context))
.expectNext("handled")
.expectAccessibleContext()
.containsOnly(context)
.then()
.verifyComplete();

ArgumentCaptor<CommandMessage> commandMessageCaptor = ArgumentCaptor.forClass(CommandMessage.class);


verify(commandBus).dispatch(commandMessageCaptor.capture(),any());
CommandMessage commandMessage = commandMessageCaptor.getValue();

assertTrue(commandMessage.getMetaData().containsKey("k"));
assertTrue(commandMessage.getMetaData().containsValue("v"));

verify(commandMessageHandler).handle(any());
verifyNoMoreInteractions(mockRetryScheduler);
}

@Test
void testSendVoidHandler() throws Exception {
Mono<String> result = reactiveCommandGateway.send(1L);
Expand Down Expand Up @@ -186,7 +217,7 @@ void testSendWithDispatchInterceptor() {

@Test
void testSendWithDispatchInterceptorWithContext() {
Context context = Context.of("security", true);
Context context = of("security", true);

reactiveCommandGateway
.registerDispatchInterceptor(cmdMono -> cmdMono
Expand All @@ -207,7 +238,7 @@ void testSendWithDispatchInterceptorWithContext() {

@Test
void testSendWithDispatchInterceptorWithContextFiltered() {
Context context = Context.of("security", false);
Context context = of("security", false);

reactiveCommandGateway
.registerDispatchInterceptor(cmdMono -> cmdMono
Expand Down
Loading