Skip to content

Commit

Permalink
Merge pull request #18163 from michalszynkiewicz/grpc-reactive-servic…
Browse files Browse the repository at this point in the history
…e-test-fix
  • Loading branch information
cescoffier authored Jun 25, 2021
2 parents ff5d522 + 737bf8a commit 40167a9
Showing 1 changed file with 4 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ public Uni<Test.Empty> add(Test.Item request) {
@Override
public Multi<Test.Item> watch(Test.Empty request) {
int contextId = contextChecker.newContextId("ReactiveService#watch");
Multi<Item> cached = broadcast.cache();
cached.subscribe().with(i -> {
});
Multi<Test.Item> existing = Item.<Item> streamAll()
.map(item -> Test.Item.newBuilder().setText(item.text).build());
return Multi.createBy().concatenating()
.streams(existing, broadcast.map(i -> i.text)
.streams(existing, cached.map(i -> i.text)
.map(Test.Item.newBuilder()::setText)
.map(Test.Item.Builder::build))
.onItem().invoke(
Expand Down

0 comments on commit 40167a9

Please sign in to comment.