Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update JS API to support gRPC input tables #1565

Merged
merged 12 commits into from
Nov 19, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public void addTableToInputTable(AddTableRequest request, StreamObserver<AddTabl
// actually add the tables contents
try {
mutableInputTable.add(table);
GrpcUtil.safelyExecuteLocked(responseObserver, () -> {
responseObserver.onNext(AddTableResponse.getDefaultInstance());
responseObserver.onCompleted();
});
} catch (IOException ioException) {
throw GrpcUtil.statusRuntimeException(Code.DATA_LOSS, "Error adding table to input table");
}
Expand Down Expand Up @@ -111,6 +115,10 @@ public void deleteTableFromInputTable(DeleteTableRequest request,
// actually delete the table's contents
try {
mutableInputTable.delete(tableToDelete);
GrpcUtil.safelyExecuteLocked(responseObserver, () -> {
responseObserver.onNext(DeleteTableResponse.getDefaultInstance());
responseObserver.onCompleted();
});
} catch (IOException ioException) {
throw GrpcUtil.statusRuntimeException(Code.DATA_LOSS,
"Error deleting table from inputtable");
Expand Down
4 changes: 4 additions & 0 deletions proto/raw-js-openapi/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ require("deephaven/proto/table_pb");
require("deephaven/proto/console_pb");
require("deephaven/proto/ticket_pb");
require("deephaven/proto/application_pb");
require("deephaven/proto/inputtable_pb");
require("Flight_pb")
require("BrowserFlight_pb")
var sessionService = require("deephaven/proto/session_pb_service");
var tableService = require("deephaven/proto/table_pb_service");
var consoleService = require("deephaven/proto/console_pb_service");
var applicationService = require("deephaven/proto/application_pb_service");
var inputTableService = require("deephaven/proto/inputtable_pb_service");
var browserFlightService = require("BrowserFlight_pb_service");
var flightService = require("Flight_pb_service");

Expand All @@ -33,6 +35,8 @@ var io = { deephaven: {
ticket_pb: proto.io.deephaven.proto.backplane.grpc,
application_pb: proto.io.deephaven.proto.backplane.grpc,
application_pb_service: applicationService,
inputtable_pb: proto.io.deephaven.proto.backplane.grpc,
inputtable_pb_service: inputTableService,
},
barrage: {
"flatbuf": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class Column {
private String constituentType;

private String description;
private final boolean isInputTableKeyColumn;

@JsMethod(namespace = "dh.Column")
public static CustomColumn formatRowColor(String expression) {
Expand All @@ -44,7 +45,8 @@ public static CustomColumn createCustomColumn(String name, String expression) {
}

public Column(int jsIndex, int index, Integer formatColumnIndex, Integer styleColumnIndex, String type, String name,
boolean isPartitionColumn, Integer formatStringColumnIndex, String description) {
boolean isPartitionColumn, Integer formatStringColumnIndex, String description,
boolean inputTableKeyColumn) {
this.jsIndex = jsIndex;
this.index = index;
this.formatColumnIndex = formatColumnIndex;
Expand All @@ -54,6 +56,7 @@ public Column(int jsIndex, int index, Integer formatColumnIndex, Integer styleCo
this.isPartitionColumn = isPartitionColumn;
this.formatStringColumnIndex = formatStringColumnIndex;
this.description = description;
this.isInputTableKeyColumn = inputTableKeyColumn;
}

@JsMethod
Expand Down Expand Up @@ -133,6 +136,10 @@ public boolean getIsPartitionColumn() {
return isPartitionColumn;
}

public boolean isInputTableKeyColumn() {
return isInputTableKeyColumn;
}

@JsMethod
public Sort sort() {
return new Sort(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import elemental2.core.Global;
import elemental2.core.JsArray;
import elemental2.core.JsString;
import elemental2.dom.CustomEventInit;
import elemental2.dom.DomGlobal;
import elemental2.promise.IThenable.ThenOnFulfilledCallbackFn;
Expand All @@ -16,6 +15,8 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.SelectDistinctRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.SnapshotTableRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.runchartdownsamplerequest.ZoomRange;
import io.deephaven.web.client.api.barrage.def.ColumnDefinition;
import io.deephaven.web.client.api.barrage.def.TableAttributesDefinition;
import io.deephaven.web.client.api.batch.RequestBatcher;
import io.deephaven.web.client.api.filter.FilterCondition;
import io.deephaven.web.client.api.input.JsInputTable;
Expand Down Expand Up @@ -217,16 +218,16 @@ public Promise<JsInputTable> inputTable() {
if (!hasInputTable) {
return Js.uncheckedCast(Promise.reject("Table is not an InputTable"));
}
return new Promise<>((resolve, reject) -> {
// workerConnection.getServer().fetchInputTable(getHeadHandle(), Callbacks.of((success, fail) -> {
// if (fail == null) {
// resolve.onInvoke(new JsInputTable(this, success.getKeys(), success.getValues()));
// } else {
// reject.onInvoke(fail);
// }
// }));
throw new UnsupportedOperationException("inputTable");
});
String[] keyCols = new String[0];
String[] valueCols = new String[0];
for (int i = 0; i < getColumns().length; i++) {
if (getColumns().getAt(i).isInputTableKeyColumn()) {
keyCols[keyCols.length] = getColumns().getAt(i).getName();
} else {
valueCols[valueCols.length] = getColumns().getAt(i).getName();
}
}
return Promise.resolve(new JsInputTable(this, keyCols, valueCols));
}

@JsMethod
Expand Down Expand Up @@ -255,31 +256,22 @@ public void close() {
public String[] getAttributes() {
TableAttributesDefinition attrs = lastVisibleState().getTableDef().getAttributes();
return Stream.concat(
attrs.getAsMap().keySet().stream(),
Stream.of(attrs.getRemainingKeys())).toArray(String[]::new);
Arrays.stream(attrs.getKeys()),
attrs.getRemainingAttributeKeys().stream()).toArray(String[]::new);
}

@JsMethod
public Object getAttribute(String attributeName) {
TableAttributesDefinition attrs = lastVisibleState().getTableDef().getAttributes();
// If the value was present as something easy to serialize, return it.
String value = attrs.getAsMap().get(attributeName);
String value = attrs.getValue(attributeName);
if (value != null) {
return value;
}

// Else check to see if it was present in the remaining keys (things that werent serialized)
boolean found = false;
for (int i = 0; i < attrs.getRemainingKeys().length; i++) {
if (attrs.getRemainingKeys()[i].equals(attributeName)) {
found = true;
break;
}
}

// If not, return the value null - this shouldn't be used to detect absence of an attribute,
// use getAttributes() for that.
if (!found) {
// Else check to see if it was present in the remaining keys (things that werent serialized).
// This shouldn't be used to detect the absence of an attribute, use getAttributes() for that
if (!attrs.getRemainingAttributeKeys().contains(attributeName)) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.GetConsoleTypesRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.GetConsoleTypesResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.StartConsoleRequest;
import io.deephaven.web.client.api.barrage.stream.ResponseStreamWrapper;
import io.deephaven.web.client.fu.CancellablePromise;
import io.deephaven.web.client.fu.JsLog;
import io.deephaven.web.client.fu.LazyPromise;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class Sort {
REVERSE = "REVERSE";

private static final Column REVERSE_COLUMN =
new Column(-1, -1, null, null, "", "__REVERSE_COLUMN", false, null, null);
new Column(-1, -1, null, null, "", "__REVERSE_COLUMN", false, null, null, false);

private final Column column;
private String direction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@
import io.deephaven.javascript.proto.dhinternal.flatbuffers.Long;
import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc;
import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Code;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.*;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageMessageType;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSubscriptionOptions;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageUpdateMetadata;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.ColumnConversionMode;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.FieldInfo;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.FieldsChangeUpdate;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.ListFieldsRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb_service.ApplicationServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.FetchFigureRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.LogSubscriptionData;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.LogSubscriptionRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb_service.ConsoleServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.FieldInfo;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.FieldsChangeUpdate;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.ListFieldsRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.inputtable_pb_service.InputTableServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.HandshakeRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.HandshakeResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ReleaseRequest;
Expand All @@ -50,13 +56,16 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb_service.TableServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket;
import io.deephaven.web.client.api.barrage.BarrageUtils;
import io.deephaven.web.client.api.barrage.def.InitialTableDefinition;
import io.deephaven.web.client.api.barrage.stream.BiDiStream;
import io.deephaven.web.client.api.barrage.stream.ResponseStreamWrapper;
import io.deephaven.web.client.api.batch.RequestBatcher;
import io.deephaven.web.client.api.batch.TableConfig;
import io.deephaven.web.client.api.console.JsVariableChanges;
import io.deephaven.web.client.api.console.JsVariableDefinition;
import io.deephaven.web.client.api.csv.CsvTypeParser;
import io.deephaven.web.client.api.i18n.JsTimeZone;
import io.deephaven.web.client.api.lifecycle.HasLifecycle;
import io.deephaven.web.client.api.parse.JsDataHandler;
import io.deephaven.web.client.api.state.StateCache;
import io.deephaven.web.client.api.tree.JsTreeTable;
import io.deephaven.web.client.api.widget.plot.JsFigure;
Expand All @@ -66,13 +75,7 @@
import io.deephaven.web.client.state.ClientTableState;
import io.deephaven.web.client.state.HasTableBinding;
import io.deephaven.web.client.state.TableReviver;
import io.deephaven.web.shared.data.ConnectToken;
import io.deephaven.web.shared.data.DeltaUpdates;
import io.deephaven.web.shared.data.InitialTableDefinition;
import io.deephaven.web.shared.data.LogItem;
import io.deephaven.web.shared.data.TableMapHandle;
import io.deephaven.web.shared.data.TableSnapshot;
import io.deephaven.web.shared.data.TableSubscriptionRequest;
import io.deephaven.web.shared.data.*;
import io.deephaven.web.shared.fu.JsConsumer;
import io.deephaven.web.shared.fu.JsRunnable;
import io.deephaven.web.shared.ide.VariableType;
Expand All @@ -84,9 +87,15 @@
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -161,6 +170,7 @@ private enum State {
private ApplicationServiceClient applicationServiceClient;
private FlightServiceClient flightServiceClient;
private BrowserFlightServiceClient browserFlightServiceClient;
private InputTableServiceClient inputTableServiceClient;

private final StateCache cache = new StateCache();
private final JsWeakMap<HasTableBinding, RequestBatcher> batchers = new JsWeakMap<>();
Expand Down Expand Up @@ -201,6 +211,8 @@ public WorkerConnection(QueryConnectable<?> info, Supplier<Promise<ConnectToken>
new ApplicationServiceClient(info.getServerUrl(), JsPropertyMap.of("debug", debugGrpc));
browserFlightServiceClient =
new BrowserFlightServiceClient(info.getServerUrl(), JsPropertyMap.of("debug", debugGrpc));
inputTableServiceClient =
new InputTableServiceClient(info.getServerUrl(), JsPropertyMap.of("debug", debugGrpc));

// builder.setConnectionErrorHandler(msg -> info.failureHandled(String.valueOf(msg)));

Expand Down Expand Up @@ -837,6 +849,10 @@ public FlightServiceClient flightServiceClient() {
return flightServiceClient;
}

public InputTableServiceClient inputTableServiceClient() {
return inputTableServiceClient;
}

public BrowserHeaders metadata() {
return metadata;
}
Expand All @@ -845,13 +861,13 @@ public <ReqT, RespT> BiDiStream.Factory<ReqT, RespT> streamFactory() {
return new BiDiStream.Factory<>(this::metadata, config::newTicketInt, useWebsockets);
}

public Promise<JsTable> newTable(String[] columnNames, String[] types, String[][] data, String userTimeZone,
public Promise<JsTable> newTable(String[] columnNames, String[] types, Object[][] data, String userTimeZone,
HasEventHandling failHandler) {
// Store the ref to the data using an array we can clear out, so the data is garbage collected later
// This means the table can only be created once, but that's probably what we want in this case anyway
final String[][][] dataRef = new String[][][] {data};
final Object[][][] dataRef = new Object[][][] {data};
return newState(failHandler, (c, cts, metadata) -> {
final String[][] d = dataRef[0];
final Object[][] d = dataRef[0];
if (d == null) {
c.apply("Data already released, cannot re-create table", null);
return;
Expand All @@ -862,14 +878,14 @@ public Promise<JsTable> newTable(String[] columnNames, String[] types, String[][
Builder schema = new Builder(1024);

// while we're examining columns, build the copiers for data
List<CsvTypeParser.CsvColumn> columns = new ArrayList<>();
List<JsDataHandler> columns = new ArrayList<>();

double[] fields = new double[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
String columnName = columnNames[i];
String columnType = types[i];

CsvTypeParser.CsvColumn writer = CsvTypeParser.getColumn(columnType);
JsDataHandler writer = JsDataHandler.getHandler(columnType);
columns.add(writer);

double nameOffset = schema.createString(columnName);
Expand Down Expand Up @@ -935,9 +951,13 @@ public Promise<JsTable> newTable(String[] columnNames, String[] types, String[][

// iterate each column, building buffers and fieldnodes, as well as building the actual payload
List<Uint8Array> buffers = new ArrayList<>();
List<CsvTypeParser.Node> nodes = new ArrayList<>();
List<JsDataHandler.Node> nodes = new ArrayList<>();
JsDataHandler.ParseContext context = new JsDataHandler.ParseContext();
if (userTimeZone != null) {
context.timeZone = JsTimeZone.getTimeZone(userTimeZone);
}
for (int i = 0; i < data.length; i++) {
columns.get(i).writeColumn(data[i], nodes::add, buffers::add);
columns.get(i).write(data[i], context, nodes::add, buffers::add);
}

// write down the buffers for the RecordBatch
Expand All @@ -958,7 +978,7 @@ public Promise<JsTable> newTable(String[] columnNames, String[] types, String[][

RecordBatch.startNodesVector(bodyData, nodes.size());
for (int i = nodes.size() - 1; i >= 0; i--) {
CsvTypeParser.Node node = nodes.get(i);
JsDataHandler.Node node = nodes.get(i);
FieldNode.createFieldNode(bodyData, Long.create(node.length(), 0), Long.create(node.nullCount(), 0));
}
double nodesOffset = bodyData.endVector();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.deephaven.web.shared.data;
package io.deephaven.web.client.api.barrage.def;

import java.io.Serializable;

public class ColumnDefinition implements Serializable {
public class ColumnDefinition {
private int columnIndex;
private String name;
private String type;
Expand All @@ -18,6 +16,8 @@ public class ColumnDefinition implements Serializable {

// Indicates that this is a style column for the row
private boolean forRow;
private boolean isInputTableKeyColumn;
private String description;

public String getName() {
return name;
Expand Down Expand Up @@ -118,4 +118,20 @@ public String getStyleColumnName() {
public void setStyleColumnName(String styleColumn) {
this.styleColumn = styleColumn;
}

public void setInputTableKeyColumn(boolean inputTableKeyColumn) {
this.isInputTableKeyColumn = inputTableKeyColumn;
}

public boolean isInputTableKeyColumn() {
return isInputTableKeyColumn;
}

public void setDescription(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}
Loading