-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
gRPC Dev UI - support streaming calls
- Loading branch information
1 parent
b54f1ca
commit 0b2aa95
Showing
14 changed files
with
687 additions
and
220 deletions.
There are no files selected for viewing
64 changes: 64 additions & 0 deletions
64
core/devmode-spi/src/main/java/io/quarkus/dev/testing/GrpcWebSocketProxy.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package io.quarkus.dev.testing; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Consumer; | ||
|
||
public class GrpcWebSocketProxy { | ||
|
||
private static final AtomicInteger connectionIdSeq = new AtomicInteger(); | ||
|
||
private static volatile WebSocketListener webSocketListener; | ||
|
||
private static final Map<Integer, Consumer<Runnable>> webSocketConnections = new ConcurrentHashMap<>(); | ||
|
||
public static Integer addWebSocket(Consumer<String> responseConsumer, | ||
Consumer<Runnable> closeHandler) { | ||
if (webSocketListener != null) { | ||
int id = connectionIdSeq.getAndIncrement(); | ||
webSocketListener.onOpen(id, responseConsumer); | ||
|
||
webSocketConnections.put(id, closeHandler); | ||
return id; | ||
} | ||
return null; | ||
} | ||
|
||
public static void closeAll() { | ||
CountDownLatch latch = new CountDownLatch(webSocketConnections.size()); | ||
for (Map.Entry<Integer, Consumer<Runnable>> connection : webSocketConnections.entrySet()) { | ||
connection.getValue().accept(latch::countDown); | ||
webSocketListener.onClose(connection.getKey()); | ||
} | ||
try { | ||
if (!latch.await(5, TimeUnit.SECONDS)) { | ||
System.err.println("Failed to close all the websockets in 5 seconds"); | ||
} | ||
} catch (InterruptedException e) { | ||
System.err.println("Interrupted while waiting for websockets to be closed"); | ||
} | ||
} | ||
|
||
public static void closeWebSocket(int id) { | ||
webSocketListener.onClose(id); | ||
} | ||
|
||
public static void setWebSocketListener(WebSocketListener listener) { | ||
webSocketListener = listener; | ||
} | ||
|
||
public static void addMessage(Integer socketId, String message) { | ||
webSocketListener.newMessage(socketId, message); | ||
} | ||
|
||
public interface WebSocketListener { | ||
void onOpen(int id, Consumer<String> responseConsumer); | ||
|
||
void newMessage(int id, String content); | ||
|
||
void onClose(int id); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.