Skip to content

Commit

Permalink
fix fabric8io#5036: addressing the handling of non-connection errors
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Apr 12, 2023
1 parent 588d1eb commit 026032a
Show file tree
Hide file tree
Showing 13 changed files with 32 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public CompletionStage<?> onClose(java.net.http.WebSocket webSocket, int statusC

@Override
public void onError(java.net.http.WebSocket webSocket, Throwable error) {
listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error);
listener.onError(new JdkWebSocketImpl(queueSize, webSocket), error, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.exceptions.CloseException;
import org.eclipse.jetty.websocket.api.exceptions.UpgradeException;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -147,7 +148,7 @@ public void onWebSocketError(Throwable cause) {
// - Jetty throws a ClosedChannelException
return;
}
listener.onError(this, cause);
listener.onError(this, cause, cause instanceof CloseException);
}

private void backPressure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ public void onClose(WebSocket webSocket, int code, String reason) {
}

@Override
public void onError(WebSocket webSocket, Throwable error) {
events.put("onError", new Object[] { error });
public void onError(WebSocket webSocket, Throwable error, boolean codecException) {
events.put("onError", new Object[] { error, codecException });
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons
future.completeExceptionally(t);
}
} else {
listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t);
listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t, false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public VertxHttpClient<F> build() {
options.setMaxPoolSize(MAX_CONNECTIONS);
options.setMaxWebSockets(MAX_CONNECTIONS);
options.setIdleTimeoutUnit(TimeUnit.SECONDS);
options.setMaxWebSocketFrameSize(500000);

if (this.connectTimeout != null) {
options.setConnectTimeout((int) this.connectTimeout.toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.fabric8.kubernetes.client.http.WebSocket;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.CodecException;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;

Expand Down Expand Up @@ -52,7 +53,16 @@ void init() {
ws.pongHandler(b -> ws.fetch(1));
// use end, not close, because close is processed immediately vs. end is in frame order
ws.endHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason()));
ws.exceptionHandler(err -> listener.onError(this, err));
ws.exceptionHandler(err -> {
try {
listener.onError(this, err, err instanceof CodecException);
} finally {
// onError should be terminal
if (!ws.isClosed()) {
ws.close();
}
}
});
listener.onOpen(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ default void onClose(WebSocket webSocket, int code, String reason) {
* Called when an error has occurred. It's a terminal event, calls to {@link WebSocket#request()}
* do nothing after this.
*/
default void onError(WebSocket webSocket, Throwable error) {
default void onError(WebSocket webSocket, Throwable error, boolean messageError) {
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void onOpen(WebSocket webSocket) {
}

@Override
public void onError(WebSocket webSocket, Throwable t) {
public void onError(WebSocket webSocket, Throwable t, boolean nonConnectionError) {
closed.set(true);
HttpResponse<?> response = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public PortForward forward(URL resourceBaseUrl, int port, final ReadableByteChan

socket.whenComplete((w, t) -> {
if (t != null) {
listener.onError(w, t);
listener.onError(w, t, false);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void onClose(WebSocket webSocket, int code, String reason) {
}

@Override
public void onError(WebSocket webSocket, Throwable t) {
public void onError(WebSocket webSocket, Throwable t, boolean nonConnectionError) {
logger.debug("{}: Throwable received from websocket", LOG_PREFIX, t);
if (alive.get()) {
serverThrowables.add(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected void start(URL url, Map<String, String> headers, WatchRequestState sta
if (ready) {
// if we're not ready yet, that means we're waiting on the future and there's
// no need to invoke the reconnect logic
listener.onError(w, t);
listener.onError(w, t, false);
}
throw KubernetesClientException.launderThrowable(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager.WatchRequestState;
import io.fabric8.kubernetes.client.http.WebSocket;
import org.slf4j.Logger;
Expand All @@ -42,9 +43,13 @@ public void onOpen(final WebSocket webSocket) {
}

@Override
public void onError(WebSocket webSocket, Throwable t) {
logger.debug("WebSocket error received", t);
manager.scheduleReconnect(state);
public void onError(WebSocket webSocket, Throwable t, boolean messageError) {
if (messageError) {
manager.close(new WatcherException("Could not process websocket message", t));
} else {
logger.debug("WebSocket error received", t);
manager.scheduleReconnect(state);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private ExecWebSocketListener setupConnectionToPod(URI uri) {
.buildAsync(execWebSocketListener);
startedFuture.whenComplete((w, t) -> {
if (t != null) {
execWebSocketListener.onError(w, t);
execWebSocketListener.onError(w, t, false);
}
});
Utils.waitUntilReadyOrFail(startedFuture, getRequestConfig().getWebsocketTimeout(), TimeUnit.MILLISECONDS);
Expand Down

0 comments on commit 026032a

Please sign in to comment.