Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JS API Error handling #1086

Merged
merged 3 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public void onCompleted() {
resultTable.dropReference();
GrpcUtil.safelyExecute(observer::onCompleted);
return resultTable;
}), () -> GrpcUtil.safelyError(observer, Code.INTERNAL, "Do put could not be sealed"));
}), () -> GrpcUtil.safelyError(observer, Code.DATA_LOSS, "Do put could not be sealed"));
});
}

Expand Down Expand Up @@ -496,7 +496,7 @@ private void apply(final BarrageSubscriptionRequest subscriptionRequest) {
}

if (!subscriptionFound) {
throw GrpcUtil.statusRuntimeException(Code.INTERNAL, "Subscription was not found.");
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "Subscription was not found.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,13 @@ public void fetchTable(FetchTableRequest request, StreamObserver<ExportedTableCr
ScriptSession scriptSession = exportedConsole.get();
String tableName = request.getTableName();
if (!scriptSession.hasVariableName(tableName)) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "No value exists with name " + tableName);
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "No value exists with name " + tableName);
}

// Explicit typecheck to catch any wrong-type-ness right away
Object result = scriptSession.unwrapObject(scriptSession.getVariable(tableName));
if (!(result instanceof Table)) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "Value bound to name " + tableName + " is not a Table");
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION, "Value bound to name " + tableName + " is not a Table");
}

// Apply preview columns TODO core#107 move to table service
Expand Down Expand Up @@ -371,12 +371,12 @@ public void fetchFigure(FetchFigureRequest request, StreamObserver<FetchFigureRe

String figureName = request.getFigureName();
if (!scriptSession.hasVariableName(figureName)) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "No value exists with name " + figureName);
throw GrpcUtil.statusRuntimeException(Code.NOT_FOUND, "No value exists with name " + figureName);
}

Object result = scriptSession.unwrapObject(scriptSession.getVariable(figureName));
if (!(result instanceof FigureWidget)) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "Value bound to name " + figureName + " is not a FigureWidget");
throw GrpcUtil.statusRuntimeException(Code.FAILED_PRECONDITION, "Value bound to name " + figureName + " is not a FigureWidget");
}
FigureWidget widget = (FigureWidget) result;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ public ExportBuilder<T> onError(final ExportErrorGrpcHandler errorHandler) {
final String dependentStr = dependentExportId == null ? ""
: (" (related parent export id: " + dependentExportId + ")");
errorHandler.onError(StatusProto.toStatusRuntimeException(Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setCode(Code.FAILED_PRECONDITION.getNumber())
.setMessage("Details Logged w/ID '" + errorContext + "'" + dependentStr)
.build()));
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public BrowserStream(final Mode mode, final SessionState session, final Marshall
public void onMessageReceived(T message) {
synchronized (this) {
if (halfClosedSeq != -1 && message.sequence > halfClosedSeq) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "Sequence sent after half close: closed seq=" + halfClosedSeq + " recv seq=" + message.sequence);
throw GrpcUtil.statusRuntimeException(Code.ABORTED, "Sequence sent after half close: closed seq=" + halfClosedSeq + " recv seq=" + message.sequence);
}

if (message.isHalfClosed) {
Expand All @@ -85,7 +85,7 @@ public void onMessageReceived(T message) {

if (mode == Mode.IN_ORDER) {
if (message.sequence < nextSeq) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT, "Duplicate sequence sent: next seq=" + nextSeq + " recv seq=" + message.sequence);
throw GrpcUtil.statusRuntimeException(Code.OUT_OF_RANGE, "Duplicate sequence sent: next seq=" + nextSeq + " recv seq=" + message.sequence);
}
boolean queueMsg = false;
if (processingMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static <T> T rpcWrapper(final Logger log, final StreamObserver<?> respons
}

public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err) {
return securelyWrapError(log, err, Code.INTERNAL);
return securelyWrapError(log, err, Code.INVALID_ARGUMENT);
}

public static StatusRuntimeException securelyWrapError(final Logger log, final Throwable err, final Code statusCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import elemental2.core.JsArray;
import elemental2.core.JsSet;
import elemental2.dom.CustomEventInit;
import elemental2.dom.DomGlobal;
import elemental2.promise.Promise;
import io.deephaven.ide.shared.IdeSession;
Expand All @@ -20,6 +21,7 @@
import jsinterop.annotations.JsIgnore;
import jsinterop.annotations.JsMethod;
import jsinterop.annotations.JsProperty;
import jsinterop.base.JsPropertyMap;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -65,6 +67,9 @@ static AuthTokenPromiseSupplier oneShot(ConnectToken initialToken) {
@JsProperty(namespace = "dh.QueryInfo")
public static final String EVENT_CONNECT = "connect";

@JsProperty(namespace = "dh.IdeConnection")
public static final String HACK_CONNECTION_FAILURE = "hack-connection-failure";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this named hack-connection-failure? Why hack-?

Copy link
Member

@nbauernfeind nbauernfeind Aug 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe 'hack' here is to make it blindly obvious this is a temporary work around to a bigger problem that Colin wants carte blanche to fix correctly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary, until we have reconnect (#730) which will have this being removed. I wanted to draw a distinction between normal events or internal-only ones, but I'm open to some other way to do that.

I tried to summarize in the description above (and will make it part of the actual commit message), but the old reconnect events are probably what we will use for these cases once the api is smart enough to try reconnecting streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay cool, I've got a change to listen for these new errors pushed on my Application Mode PR: deephaven/web-client-ui#155


private final List<IdeSession> sessions = new ArrayList<>();
private final JsSet<Ticket> cancelled = new JsSet<>();

Expand All @@ -76,6 +81,16 @@ public QueryConnectable(Supplier<Promise<ConnectToken>> authTokenPromiseSupplier
this.connection = JsLazy.of(() -> new WorkerConnection(this, authTokenPromiseSupplier));
}

public void notifyConnectionError(ResponseStreamWrapper.Status status) {
CustomEventInit event = CustomEventInit.create();
event.setDetail(JsPropertyMap.of(
"status", status.getCode(),
"details", status.getDetails(),
"metadata", status.getMetadata()
));
fireEvent(HACK_CONNECTION_FAILURE, event);
}

@Override
@JsMethod
public RemoverFn addEventListener(String name, EventFn callback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.LogSubscriptionData;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.LogSubscriptionRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb_service.ConsoleServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ExportNotification;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ExportNotificationRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.HandshakeRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.HandshakeResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb_service.SessionServiceClient;
Expand Down Expand Up @@ -141,6 +143,7 @@ private enum State {
private final JsSet<JsConsumer<LogItem>> logCallbacks = new JsSet<>();

private final Map<ClientTableState, ResponseStreamWrapper<FlightData>> subscriptionStreams = new HashMap<>();
private ResponseStreamWrapper<ExportedTableUpdateMessage> exportNotifications;

private Map<TableMapHandle, TableMap> tableMaps = new HashMap<>();

Expand Down Expand Up @@ -258,6 +261,7 @@ private void connectToWorker(Supplier<Promise<ConnectToken>> authTokenPromiseSup

// // start a heartbeat to check if connection is properly alive
// ping(success.getAuthSessionToken());
startExportNotificationsStream();

return Promise.resolve(handshakeResponse);
}, fail -> {
Expand Down Expand Up @@ -285,6 +289,39 @@ private void connectToWorker(Supplier<Promise<ConnectToken>> authTokenPromiseSup
});
}

public void checkStatus(ResponseStreamWrapper.Status status) {
//TODO provide simpler hooks to retry auth, restart the stream
if (status.getCode() == Code.OK) {
// success, ignore
} else if (status.getCode() == Code.Unauthenticated) {
// TODO re-create session once?
// for now treating this as fatal, UI should encourage refresh to try again
info.notifyConnectionError(status);
} else if (status.getCode() == Code.Internal || status.getCode() == Code.Unknown) {
// for now treating these as fatal also
info.notifyConnectionError(status);
} else if (status.getCode() == Code.Unavailable) {
// TODO skip re-authing for now, just backoff and try again
} // others probably are meaningful to the caller
}

private void startExportNotificationsStream() {
if (exportNotifications != null) {
exportNotifications.cancel();
}
exportNotifications = ResponseStreamWrapper.of(tableServiceClient.exportedTableUpdates(new ExportedTableUpdatesRequest(), metadata()));
exportNotifications.onData(update -> {
if (update.getUpdateFailureMessage() != null && !update.getUpdateFailureMessage().isEmpty()) {
exportedTableUpdateMessageError(new TableTicket(update.getExportId().getTicket_asU8()), update.getUpdateFailureMessage());
} else {
exportedTableUpdateMessage(new TableTicket(update.getExportId().getTicket_asU8()), java.lang.Long.parseLong(update.getSize()));
}
});

// any export notification error is bad news
exportNotifications.onStatus(this::checkStatus);
}

private void authUpdate(HandshakeResponse handshakeResponse) {
// store the token and schedule refresh calls to keep it alive
sessionToken = new String(Js.uncheckedCast(handshakeResponse.getSessionToken_asU8()), Charset.forName("UTF-8"));
Expand All @@ -300,6 +337,7 @@ private void authUpdate(HandshakeResponse handshakeResponse) {
if (fail != null) {
//TODO set a flag so others know not to try until we re-trigger initial auth
//TODO re-trigger auth
checkStatus((ResponseStreamWrapper.Status) fail);
return;
}
// mark the new token, schedule a new check
Expand Down Expand Up @@ -1148,9 +1186,7 @@ public JsRunnable subscribeToLogs(JsConsumer<LogItem> callback) {

notifyLog(logItem);
});
logStream.onEnd(status -> {
//TODO handle reconnect
});
logStream.onEnd(this::checkStatus);
} else {
pastLogs.forEach(callback::apply);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ public Promise<Void> sendRequest() {

ResponseStreamWrapper<ExportedTableCreationResponse> batchStream = ResponseStreamWrapper.of(connection.tableServiceClient().batch(request, connection.metadata()));
batchStream.onData(response -> {
DomGlobal.console.log("onData", this, request.toObject(), response.toObject());
TableReference resultid = response.getResultId();
if (!resultid.hasTicket()) {
// thanks for telling us, but we don't at this time have a nice way to indicate this
Expand Down Expand Up @@ -292,7 +291,6 @@ public Promise<Void> sendRequest() {
});

batchStream.onEnd(status -> {
DomGlobal.console.log("onEnd", this, request.toObject(), status);
// request is complete
if (status.getCode() == Code.OK) {
resolve.onInvoke((Void) null);
Expand Down