Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JS API should attempt to reconnect to the gRPC server #3502

Merged
merged 18 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions go/internal/proto/console/console.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,14 @@ message GetHeapInfoResponse {
int64 free_memory = 3 [jstype=JS_STRING];
}

// Presently you get _all_ logs, not just your console. A future version might take a specific console_id to
// restrict this to a single console.
message LogSubscriptionRequest {
// presently you get _all_ logs, not just your console
// Ticket console_id = 1;
// If a non-zero value is specified, represents the timestamp in microseconds since the unix epoch when
// the client last saw a message. Technically this might skip messages if more than one message was
// logged at the same microsecond that connection was lost - to avoid this, subtract one from the last
// seen message's micros, and expect to receive some messages that have already been seen.
int64 last_seen_log_timestamp = 1 [jstype=JS_STRING];
repeated string levels = 2;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ public void record(LogBufferRecord record) {
}
// since the subscribe() method auto-replays all existing logs, filter to just once this consumer probably
// hasn't seen
if (record.getTimestampMicros() < request.getLastSeenLogTimestamp()) {
if (record.getTimestampMicros() <= request.getLastSeenLogTimestamp()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a note, not really feedback or a request other than to consider whether or not we should change the API.

Interesting. The listener could miss log messages that occurred at the same timestamp, but I see how the alternative guarantees at least one duplicate message. Maybe a better API would count the log messages and you just provide an offset. (This is probably why Kafka uses an offset instead of timestamp for new subscriptions.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's fair. Dup messages were confusing me, and the comment made it appear that this was "filtering to [ones] this consumer probably hadn't seen" meant something other than "will definitely always dup the last message seen, and maybe more". @devinrsmith any thoughts? I think you touched this last?

I know we will sometimes have gaps since this is implemented as a ring buffer, and maybe we tacitly accept that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "guarantees" around the log buffer API are sketchy at best - the consumer never knows if they actually missed log messages or not. I'm happy to "break" the current API w/ the change as Colin has here... all else equal, if the client wants to they can request timestamp-1, and potentially handle dubious de-duping logic if necessary themselves.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so I'll revert this and fix the comment instead?

return;
}
// Note: we can't send record off-thread without doing a deepCopy.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private synchronized void sendUpdateMessage(final Ticket ticket, final long size
try {
responseObserver.onNext(update.build());
} catch (final RuntimeException err) {
log.error().append(logPrefix).append("failed to notify listener of state change: ").append(err).endl();
log.debug().append(logPrefix).append("failed to notify listener of state change: ").append(err).endl();
session.removeExportListener(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import elemental2.dom.CustomEventInit;
import elemental2.dom.DomGlobal;
import elemental2.promise.Promise;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.TerminationNotificationResponse;
import io.deephaven.web.client.ide.IdeSession;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.*;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket;
Expand Down Expand Up @@ -45,7 +46,12 @@ public abstract class QueryConnectable<Self extends QueryConnectable<Self>> exte
@JsProperty(namespace = "dh.QueryInfo")
public static final String EVENT_CONNECT = "connect";

/**
* Removed in favor of a proper disconnect/reconnect. Event listeners should switch to the "disconnect" and
* "reconnect" events instead.
*/
@JsProperty(namespace = "dh.IdeConnection")
@Deprecated
public static final String HACK_CONNECTION_FAILURE = "hack-connection-failure";

private final List<IdeSession> sessions = new ArrayList<>();
Expand All @@ -66,8 +72,9 @@ public QueryConnectable() {

public abstract Promise<ConnectOptions> getConnectOptions();

@Deprecated
public void notifyConnectionError(ResponseStreamWrapper.Status status) {
if (notifiedConnectionError) {
if (notifiedConnectionError || !hasListeners(HACK_CONNECTION_FAILURE)) {
return;
}
notifiedConnectionError = true;
Expand All @@ -78,6 +85,8 @@ public void notifyConnectionError(ResponseStreamWrapper.Status status) {
"details", status.getDetails(),
"metadata", status.getMetadata()));
fireEvent(HACK_CONNECTION_FAILURE, event);
JsLog.warn(
"The event dh.IdeConnection.HACK_CONNECTION_FAILURE is deprecated and will be removed in a later release");
}

@Override
Expand All @@ -104,6 +113,20 @@ public String getServerUrl() {
throw new UnsupportedOperationException();
}

/**
* Internal method to permit delegating to some orchestration tool to see if this worker can be connected to yet.
*/
@JsIgnore
public Promise<Self> onReady() {
// noinspection unchecked
return Promise.resolve((Self) this);
}

/**
* Promise that resolves when this worker instance can be connected to, or rejects if it can't be used.
*
* @return A promise that resolves with this instance.
*/
public abstract Promise<Self> running();

@JsMethod
Expand Down Expand Up @@ -203,6 +226,7 @@ public void connected() {
JsLog.debug(getClass(), " connected");

connected = true;
notifiedConnectionError = false;

fireEvent(EVENT_CONNECT);

Expand Down Expand Up @@ -243,4 +267,6 @@ public void disconnected() {
+ "Query disconnected (to prevent this log message, handle the EVENT_DISCONNECT event)");
}
}

public abstract void notifyServerShutdown(TerminationNotificationResponse success);
}
Loading