You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
There's a bit to unpack here, so apologies if this is actually multiple features in one. I started working on a proof-of-concept application using this extension and ran into a couple roadblocks while tying it all together, so I was wondering if it was feasible longer-term if this extension could enable deeper integrations with Reactor capabilities.
Simply put, I'd like the ability for a reactive application that makes use of reactor-enabled data repositories (i.e. Redis, J2dbc, Mongo) and can return the Flux/Mono results across the pipeline directly to the web controller interface via the Query capabilities that Axon supports. For example:
I would have a query handler such as this:
@QueryHandler
fun handleQuery(query: GetAllUsersQuery): Flux<UserView> {
return repository.findAll()
}
and make use of this in a WebFlux web controller as follows:
@GetMapping
fun getUsers(): Flux<UserView> {
return queryGateway.send(GetAllUsersQuery(), ResultTypes.multipleInstancesOf(UserView::class.java))
}
This way I can easily stream projections from one reactor-enabled project to another. Currently I have to make sure my query handlers return discrete entities (either a single nullable object or a list of objects), and the query gateway ends up enabling the send/receipt of messages as reactive via Monos, but doesn't support reactive data itself.
Given the streaming nature of fluxes/monos, I'm not entirely sure how possible this is precisely, but if there is an easier way to integrate a query handler with reactor-enabled repositories, I'd be happy to learn more!
Possible Workarounds
Currently I work around this by having the QueryHandler take the Flux/Mono from the repository, collect the results into a list (or just get the first object with next()) and return either that or a default value (empty list or null). On the WebFlux side I have to do Mono<List> and Mono, the latter of which is fine but the former is very strange.
I get that internally, this probably makes sense since each message contains a list of entities. And perhaps there isn't really much gained by mapping the Mono<List<>> to a Flux<>, since the data is all already loaded anyway and you lose any potential benefit from backpressure. The ability to leverage reactive repositories with reactive queries seems like it would allow for more powerful uses with the querying capabilities in Axon, especially in scatter-gather where different services will have different load patterns.
I can bypass this for now by having the web controller return results from my projection database directly (it's part of the same service in this case), but that obviously doesn't work when I want to query from another Axon-enabled service.
The text was updated successfully, but these errors were encountered:
Thanks for reporting this feature request @MNGoldenEagle.
It's actually a duplicate of #3
Nevertheless, this feature is being planned, and we hope to release it in Axon Framework 4.6.
so I was wondering if it was feasible longer-term if this extension could enable deeper integrations with Reactor capabilities.
Definitely yes, but this will not be possible until we resolve some of the blockers, like some core components depending on ThreadLocal. This is also currently under investigation and planning.
Closing this issue, as Axon Framework as off this pull request has integrated support for Streaming Queries.
Let us know if it sufficiently fulfills your request, or if you're missing some additional implementations there, @MNGoldenEagle!
Feature Description
There's a bit to unpack here, so apologies if this is actually multiple features in one. I started working on a proof-of-concept application using this extension and ran into a couple roadblocks while tying it all together, so I was wondering if it was feasible longer-term if this extension could enable deeper integrations with Reactor capabilities.
Simply put, I'd like the ability for a reactive application that makes use of reactor-enabled data repositories (i.e. Redis, J2dbc, Mongo) and can return the Flux/Mono results across the pipeline directly to the web controller interface via the Query capabilities that Axon supports. For example:
I would have a query handler such as this:
and make use of this in a WebFlux web controller as follows:
This way I can easily stream projections from one reactor-enabled project to another. Currently I have to make sure my query handlers return discrete entities (either a single nullable object or a list of objects), and the query gateway ends up enabling the send/receipt of messages as reactive via Monos, but doesn't support reactive data itself.
Given the streaming nature of fluxes/monos, I'm not entirely sure how possible this is precisely, but if there is an easier way to integrate a query handler with reactor-enabled repositories, I'd be happy to learn more!
Possible Workarounds
Currently I work around this by having the QueryHandler take the Flux/Mono from the repository, collect the results into a list (or just get the first object with next()) and return either that or a default value (empty list or null). On the WebFlux side I have to do Mono<List> and Mono, the latter of which is fine but the former is very strange.
I get that internally, this probably makes sense since each message contains a list of entities. And perhaps there isn't really much gained by mapping the Mono<List<>> to a Flux<>, since the data is all already loaded anyway and you lose any potential benefit from backpressure. The ability to leverage reactive repositories with reactive queries seems like it would allow for more powerful uses with the querying capabilities in Axon, especially in scatter-gather where different services will have different load patterns.
I can bypass this for now by having the web controller return results from my projection database directly (it's part of the same service in this case), but that obviously doesn't work when I want to query from another Axon-enabled service.
The text was updated successfully, but these errors were encountered: