diff --git a/grpc-api/src/main/java/io/deephaven/grpc_api/arrow/ArrowFlightUtil.java b/grpc-api/src/main/java/io/deephaven/grpc_api/arrow/ArrowFlightUtil.java index 13d896d3598..297dfa9bb41 100644 --- a/grpc-api/src/main/java/io/deephaven/grpc_api/arrow/ArrowFlightUtil.java +++ b/grpc-api/src/main/java/io/deephaven/grpc_api/arrow/ArrowFlightUtil.java @@ -300,7 +300,7 @@ public void onCompleted() { resultTable.dropReference(); GrpcUtil.safelyExecute(observer::onCompleted); return resultTable; - }), () -> GrpcUtil.safelyError(observer, Code.INTERNAL, "Do put could not be sealed")); + }), () -> GrpcUtil.safelyError(observer, Code.DATA_LOSS, "Do put could not be sealed")); }); } @@ -496,7 +496,7 @@ private void apply(final BarrageSubscriptionRequest subscriptionRequest) { } if (!subscriptionFound) { - throw GrpcUtil.statusRuntimeException(Code.INTERNAL, "Subscription was not found."); + throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "Subscription was not found."); } } diff --git a/grpc-api/src/main/java/io/deephaven/grpc_api/console/ConsoleServiceGrpcImpl.java b/grpc-api/src/main/java/io/deephaven/grpc_api/console/ConsoleServiceGrpcImpl.java index bd7df4ac1e4..1deede03f23 100644 --- a/grpc-api/src/main/java/io/deephaven/grpc_api/console/ConsoleServiceGrpcImpl.java +++ b/grpc-api/src/main/java/io/deephaven/grpc_api/console/ConsoleServiceGrpcImpl.java @@ -231,13 +231,13 @@ public void fetchTable(FetchTableRequest request, StreamObserver onError(final ExportErrorGrpcHandler errorHandler) { final String dependentStr = dependentExportId == null ? "" : (" (related parent export id: " + dependentExportId + ")"); errorHandler.onError(StatusProto.toStatusRuntimeException(Status.newBuilder() - .setCode(Code.INTERNAL.getNumber()) + .setCode(Code.FAILED_PRECONDITION.getNumber()) .setMessage("Details Logged w/ID '" + errorContext + "'" + dependentStr) .build())); })); diff --git a/grpc-api/src/main/java/io/deephaven/grpc_api/util/BrowserStream.java b/grpc-api/src/main/java/io/deephaven/grpc_api/util/BrowserStream.java index 95f4e29abe7..4766086002a 100644 --- a/grpc-api/src/main/java/io/deephaven/grpc_api/util/BrowserStream.java +++ b/grpc-api/src/main/java/io/deephaven/grpc_api/util/BrowserStream.java @@ -73,7 +73,7 @@ public BrowserStream(final Mode mode, final SessionState session, final Marshall public void onMessageReceived(T message) { synchronized (this) { if (halfClosedSeq != -1 && message.sequence > halfClosedSeq) { - throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "Sequence sent after half close: closed seq=" + halfClosedSeq + " recv seq=" + message.sequence); + throw GrpcUtil.statusRuntimeException(Code.ABORTED, "Sequence sent after half close: closed seq=" + halfClosedSeq + " recv seq=" + message.sequence); } if (message.isHalfClosed) { @@ -85,7 +85,7 @@ public void onMessageReceived(T message) { if (mode == Mode.IN_ORDER) { if (message.sequence < nextSeq) { - throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "Duplicate sequence sent: next seq=" + nextSeq + " recv seq=" + message.sequence); + throw GrpcUtil.statusRuntimeException(Code.OUT_OF_RANGE, "Duplicate sequence sent: next seq=" + nextSeq + " recv seq=" + message.sequence); } boolean queueMsg = false; if (processingMessage) { diff --git a/grpc-api/src/main/java/io/deephaven/grpc_api/util/GrpcUtil.java b/grpc-api/src/main/java/io/deephaven/grpc_api/util/GrpcUtil.java index 0a71c5dc8f5..1b8a493e6a8 100644 --- a/grpc-api/src/main/java/io/deephaven/grpc_api/util/GrpcUtil.java +++ b/grpc-api/src/main/java/io/deephaven/grpc_api/util/GrpcUtil.java @@ -49,7 +49,7 @@ public static T rpcWrapper(final Logger log, final StreamObserver respons } public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err) { - return securelyWrapError(log, err, Code.INTERNAL); + return securelyWrapError(log, err, Code.INVALID_ARGUMENT); } public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err, final Code statusCode) { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java index cd7e22b7876..4c3e94d23c4 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/QueryConnectable.java @@ -2,6 +2,7 @@ import elemental2.core.JsArray; import elemental2.core.JsSet; +import elemental2.dom.CustomEventInit; import elemental2.dom.DomGlobal; import elemental2.promise.Promise; import io.deephaven.ide.shared.IdeSession; @@ -20,6 +21,7 @@ import jsinterop.annotations.JsIgnore; import jsinterop.annotations.JsMethod; import jsinterop.annotations.JsProperty; +import jsinterop.base.JsPropertyMap; import java.util.ArrayList; import java.util.List; @@ -65,6 +67,9 @@ static AuthTokenPromiseSupplier oneShot(ConnectToken initialToken) { @JsProperty(namespace = "dh.QueryInfo") public static final String EVENT_CONNECT = "connect"; + @JsProperty(namespace = "dh.IdeConnection") + public static final String HACK_CONNECTION_FAILURE = "hack-connection-failure"; + private final List sessions = new ArrayList<>(); private final JsSet cancelled = new JsSet<>(); @@ -76,6 +81,16 @@ public QueryConnectable(Supplier> authTokenPromiseSupplier this.connection = JsLazy.of(() -> new WorkerConnection(this, authTokenPromiseSupplier)); } + public void notifyConnectionError(ResponseStreamWrapper.Status status) { + CustomEventInit event = CustomEventInit.create(); + event.setDetail(JsPropertyMap.of( + "status", status.getCode(), + "details", status.getDetails(), + "metadata", status.getMetadata() + )); + fireEvent(HACK_CONNECTION_FAILURE, event); + } + @Override @JsMethod public RemoverFn addEventListener(String name, EventFn callback) { diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java index 8efe8785393..ee9fafc26ec 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/WorkerConnection.java @@ -25,6 +25,8 @@ import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.LogSubscriptionData; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.LogSubscriptionRequest; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb_service.ConsoleServiceClient; +import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ExportNotification; +import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ExportNotificationRequest; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.HandshakeRequest; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.HandshakeResponse; import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb_service.SessionServiceClient; @@ -141,6 +143,7 @@ private enum State { private final JsSet> logCallbacks = new JsSet<>(); private final Map> subscriptionStreams = new HashMap<>(); + private ResponseStreamWrapper exportNotifications; private Map tableMaps = new HashMap<>(); @@ -258,6 +261,7 @@ private void connectToWorker(Supplier> authTokenPromiseSup // // start a heartbeat to check if connection is properly alive // ping(success.getAuthSessionToken()); + startExportNotificationsStream(); return Promise.resolve(handshakeResponse); }, fail -> { @@ -285,6 +289,39 @@ private void connectToWorker(Supplier> authTokenPromiseSup }); } + public void checkStatus(ResponseStreamWrapper.Status status) { + //TODO provide simpler hooks to retry auth, restart the stream + if (status.getCode() == Code.OK) { + // success, ignore + } else if (status.getCode() == Code.Unauthenticated) { + // TODO re-create session once? + // for now treating this as fatal, UI should encourage refresh to try again + info.notifyConnectionError(status); + } else if (status.getCode() == Code.Internal || status.getCode() == Code.Unknown) { + // for now treating these as fatal also + info.notifyConnectionError(status); + } else if (status.getCode() == Code.Unavailable) { + // TODO skip re-authing for now, just backoff and try again + } // others probably are meaningful to the caller + } + + private void startExportNotificationsStream() { + if (exportNotifications != null) { + exportNotifications.cancel(); + } + exportNotifications = ResponseStreamWrapper.of(tableServiceClient.exportedTableUpdates(new ExportedTableUpdatesRequest(), metadata())); + exportNotifications.onData(update -> { + if (update.getUpdateFailureMessage() != null && !update.getUpdateFailureMessage().isEmpty()) { + exportedTableUpdateMessageError(new TableTicket(update.getExportId().getTicket_asU8()), update.getUpdateFailureMessage()); + } else { + exportedTableUpdateMessage(new TableTicket(update.getExportId().getTicket_asU8()), java.lang.Long.parseLong(update.getSize())); + } + }); + + // any export notification error is bad news + exportNotifications.onStatus(this::checkStatus); + } + private void authUpdate(HandshakeResponse handshakeResponse) { // store the token and schedule refresh calls to keep it alive sessionToken = new String(Js.uncheckedCast(handshakeResponse.getSessionToken_asU8()), Charset.forName("UTF-8")); @@ -300,6 +337,7 @@ private void authUpdate(HandshakeResponse handshakeResponse) { if (fail != null) { //TODO set a flag so others know not to try until we re-trigger initial auth //TODO re-trigger auth + checkStatus((ResponseStreamWrapper.Status) fail); return; } // mark the new token, schedule a new check @@ -1148,9 +1186,7 @@ public JsRunnable subscribeToLogs(JsConsumer callback) { notifyLog(logItem); }); - logStream.onEnd(status -> { - //TODO handle reconnect - }); + logStream.onEnd(this::checkStatus); } else { pastLogs.forEach(callback::apply); } diff --git a/web/client-api/src/main/java/io/deephaven/web/client/api/batch/RequestBatcher.java b/web/client-api/src/main/java/io/deephaven/web/client/api/batch/RequestBatcher.java index 29ddc118f87..d9540a27e0c 100644 --- a/web/client-api/src/main/java/io/deephaven/web/client/api/batch/RequestBatcher.java +++ b/web/client-api/src/main/java/io/deephaven/web/client/api/batch/RequestBatcher.java @@ -236,7 +236,6 @@ public Promise sendRequest() { ResponseStreamWrapper batchStream = ResponseStreamWrapper.of(connection.tableServiceClient().batch(request, connection.metadata())); batchStream.onData(response -> { - DomGlobal.console.log("onData", this, request.toObject(), response.toObject()); TableReference resultid = response.getResultId(); if (!resultid.hasTicket()) { // thanks for telling us, but we don't at this time have a nice way to indicate this @@ -292,7 +291,6 @@ public Promise sendRequest() { }); batchStream.onEnd(status -> { - DomGlobal.console.log("onEnd", this, request.toObject(), status); // request is complete if (status.getCode() == Code.OK) { resolve.onInvoke((Void) null);