Skip to content

Commit

Permalink
Update JS API to support gRPC input tables (#1565)
Browse files Browse the repository at this point in the history
The JS InputTable.addRow method now shares an implementation with the CSV upload mechanism (using flight DoPut, exposed as IdeConnection.newTable), and both can parse from a variety of formats. DateTime objects are supported now by this as well.

In order to support this, the JS Table now handles attributes sent from the server. More changes are expected here, from #1484 etc.

This patch also fixes a bug where the server wouldn't correctly respond to an InputTableService call.

Fixes #1451
Fixes #1271 
Partial #1041
  • Loading branch information
niloc132 authored Nov 19, 2021
1 parent 98e0e39 commit 96f7035
Show file tree
Hide file tree
Showing 56 changed files with 3,471 additions and 1,428 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public void addTableToInputTable(AddTableRequest request, StreamObserver<AddTabl
// actually add the tables contents
try {
mutableInputTable.add(table);
GrpcUtil.safelyExecuteLocked(responseObserver, () -> {
responseObserver.onNext(AddTableResponse.getDefaultInstance());
responseObserver.onCompleted();
});
} catch (IOException ioException) {
throw GrpcUtil.statusRuntimeException(Code.DATA_LOSS, "Error adding table to input table");
}
Expand Down Expand Up @@ -111,6 +115,10 @@ public void deleteTableFromInputTable(DeleteTableRequest request,
// actually delete the table's contents
try {
mutableInputTable.delete(tableToDelete);
GrpcUtil.safelyExecuteLocked(responseObserver, () -> {
responseObserver.onNext(DeleteTableResponse.getDefaultInstance());
responseObserver.onCompleted();
});
} catch (IOException ioException) {
throw GrpcUtil.statusRuntimeException(Code.DATA_LOSS,
"Error deleting table from inputtable");
Expand Down
4 changes: 4 additions & 0 deletions proto/raw-js-openapi/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ require("deephaven/proto/table_pb");
require("deephaven/proto/console_pb");
require("deephaven/proto/ticket_pb");
require("deephaven/proto/application_pb");
require("deephaven/proto/inputtable_pb");
require("Flight_pb")
require("BrowserFlight_pb")
var sessionService = require("deephaven/proto/session_pb_service");
var tableService = require("deephaven/proto/table_pb_service");
var consoleService = require("deephaven/proto/console_pb_service");
var applicationService = require("deephaven/proto/application_pb_service");
var inputTableService = require("deephaven/proto/inputtable_pb_service");
var browserFlightService = require("BrowserFlight_pb_service");
var flightService = require("Flight_pb_service");

Expand All @@ -33,6 +35,8 @@ var io = { deephaven: {
ticket_pb: proto.io.deephaven.proto.backplane.grpc,
application_pb: proto.io.deephaven.proto.backplane.grpc,
application_pb_service: applicationService,
inputtable_pb: proto.io.deephaven.proto.backplane.grpc,
inputtable_pb_service: inputTableService,
},
barrage: {
"flatbuf": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class Column {
private String constituentType;

private String description;
private final boolean isInputTableKeyColumn;

@JsMethod(namespace = "dh.Column")
public static CustomColumn formatRowColor(String expression) {
Expand All @@ -44,7 +45,8 @@ public static CustomColumn createCustomColumn(String name, String expression) {
}

public Column(int jsIndex, int index, Integer formatColumnIndex, Integer styleColumnIndex, String type, String name,
boolean isPartitionColumn, Integer formatStringColumnIndex, String description) {
boolean isPartitionColumn, Integer formatStringColumnIndex, String description,
boolean inputTableKeyColumn) {
this.jsIndex = jsIndex;
this.index = index;
this.formatColumnIndex = formatColumnIndex;
Expand All @@ -54,6 +56,7 @@ public Column(int jsIndex, int index, Integer formatColumnIndex, Integer styleCo
this.isPartitionColumn = isPartitionColumn;
this.formatStringColumnIndex = formatStringColumnIndex;
this.description = description;
this.isInputTableKeyColumn = inputTableKeyColumn;
}

@JsMethod
Expand Down Expand Up @@ -133,6 +136,10 @@ public boolean getIsPartitionColumn() {
return isPartitionColumn;
}

public boolean isInputTableKeyColumn() {
return isInputTableKeyColumn;
}

@JsMethod
public Sort sort() {
return new Sort(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import elemental2.core.Global;
import elemental2.core.JsArray;
import elemental2.core.JsString;
import elemental2.dom.CustomEventInit;
import elemental2.dom.DomGlobal;
import elemental2.promise.IThenable.ThenOnFulfilledCallbackFn;
Expand All @@ -16,6 +15,8 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.SelectDistinctRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.SnapshotTableRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb.runchartdownsamplerequest.ZoomRange;
import io.deephaven.web.client.api.barrage.def.ColumnDefinition;
import io.deephaven.web.client.api.barrage.def.TableAttributesDefinition;
import io.deephaven.web.client.api.batch.RequestBatcher;
import io.deephaven.web.client.api.filter.FilterCondition;
import io.deephaven.web.client.api.input.JsInputTable;
Expand Down Expand Up @@ -217,16 +218,16 @@ public Promise<JsInputTable> inputTable() {
if (!hasInputTable) {
return Js.uncheckedCast(Promise.reject("Table is not an InputTable"));
}
return new Promise<>((resolve, reject) -> {
// workerConnection.getServer().fetchInputTable(getHeadHandle(), Callbacks.of((success, fail) -> {
// if (fail == null) {
// resolve.onInvoke(new JsInputTable(this, success.getKeys(), success.getValues()));
// } else {
// reject.onInvoke(fail);
// }
// }));
throw new UnsupportedOperationException("inputTable");
});
String[] keyCols = new String[0];
String[] valueCols = new String[0];
for (int i = 0; i < getColumns().length; i++) {
if (getColumns().getAt(i).isInputTableKeyColumn()) {
keyCols[keyCols.length] = getColumns().getAt(i).getName();
} else {
valueCols[valueCols.length] = getColumns().getAt(i).getName();
}
}
return Promise.resolve(new JsInputTable(this, keyCols, valueCols));
}

@JsMethod
Expand Down Expand Up @@ -255,31 +256,22 @@ public void close() {
public String[] getAttributes() {
TableAttributesDefinition attrs = lastVisibleState().getTableDef().getAttributes();
return Stream.concat(
attrs.getAsMap().keySet().stream(),
Stream.of(attrs.getRemainingKeys())).toArray(String[]::new);
Arrays.stream(attrs.getKeys()),
attrs.getRemainingAttributeKeys().stream()).toArray(String[]::new);
}

@JsMethod
public Object getAttribute(String attributeName) {
TableAttributesDefinition attrs = lastVisibleState().getTableDef().getAttributes();
// If the value was present as something easy to serialize, return it.
String value = attrs.getAsMap().get(attributeName);
String value = attrs.getValue(attributeName);
if (value != null) {
return value;
}

// Else check to see if it was present in the remaining keys (things that werent serialized)
boolean found = false;
for (int i = 0; i < attrs.getRemainingKeys().length; i++) {
if (attrs.getRemainingKeys()[i].equals(attributeName)) {
found = true;
break;
}
}

// If not, return the value null - this shouldn't be used to detect absence of an attribute,
// use getAttributes() for that.
if (!found) {
// Else check to see if it was present in the remaining keys (things that werent serialized).
// This shouldn't be used to detect the absence of an attribute, use getAttributes() for that
if (!attrs.getRemainingAttributeKeys().contains(attributeName)) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.GetConsoleTypesRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.GetConsoleTypesResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.StartConsoleRequest;
import io.deephaven.web.client.api.barrage.stream.ResponseStreamWrapper;
import io.deephaven.web.client.fu.CancellablePromise;
import io.deephaven.web.client.fu.JsLog;
import io.deephaven.web.client.fu.LazyPromise;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class Sort {
REVERSE = "REVERSE";

private static final Column REVERSE_COLUMN =
new Column(-1, -1, null, null, "", "__REVERSE_COLUMN", false, null, null);
new Column(-1, -1, null, null, "", "__REVERSE_COLUMN", false, null, null, false);

private final Column column;
private String direction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@
import io.deephaven.javascript.proto.dhinternal.flatbuffers.Long;
import io.deephaven.javascript.proto.dhinternal.grpcweb.Grpc;
import io.deephaven.javascript.proto.dhinternal.grpcweb.grpc.Code;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.*;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageMessageType;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageMessageWrapper;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSubscriptionOptions;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.BarrageUpdateMetadata;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.barrage.flatbuf.barrage_generated.io.deephaven.barrage.flatbuf.ColumnConversionMode;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.FieldInfo;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.FieldsChangeUpdate;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.ListFieldsRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb_service.ApplicationServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.FetchFigureRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.LogSubscriptionData;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb.LogSubscriptionRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.console_pb_service.ConsoleServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.FieldInfo;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.FieldsChangeUpdate;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.application_pb.ListFieldsRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.inputtable_pb_service.InputTableServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.HandshakeRequest;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.HandshakeResponse;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.session_pb.ReleaseRequest;
Expand All @@ -50,13 +56,16 @@
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.table_pb_service.TableServiceClient;
import io.deephaven.javascript.proto.dhinternal.io.deephaven.proto.ticket_pb.Ticket;
import io.deephaven.web.client.api.barrage.BarrageUtils;
import io.deephaven.web.client.api.barrage.def.InitialTableDefinition;
import io.deephaven.web.client.api.barrage.stream.BiDiStream;
import io.deephaven.web.client.api.barrage.stream.ResponseStreamWrapper;
import io.deephaven.web.client.api.batch.RequestBatcher;
import io.deephaven.web.client.api.batch.TableConfig;
import io.deephaven.web.client.api.console.JsVariableChanges;
import io.deephaven.web.client.api.console.JsVariableDefinition;
import io.deephaven.web.client.api.csv.CsvTypeParser;
import io.deephaven.web.client.api.i18n.JsTimeZone;
import io.deephaven.web.client.api.lifecycle.HasLifecycle;
import io.deephaven.web.client.api.parse.JsDataHandler;
import io.deephaven.web.client.api.state.StateCache;
import io.deephaven.web.client.api.tree.JsTreeTable;
import io.deephaven.web.client.api.widget.plot.JsFigure;
Expand All @@ -66,13 +75,7 @@
import io.deephaven.web.client.state.ClientTableState;
import io.deephaven.web.client.state.HasTableBinding;
import io.deephaven.web.client.state.TableReviver;
import io.deephaven.web.shared.data.ConnectToken;
import io.deephaven.web.shared.data.DeltaUpdates;
import io.deephaven.web.shared.data.InitialTableDefinition;
import io.deephaven.web.shared.data.LogItem;
import io.deephaven.web.shared.data.TableMapHandle;
import io.deephaven.web.shared.data.TableSnapshot;
import io.deephaven.web.shared.data.TableSubscriptionRequest;
import io.deephaven.web.shared.data.*;
import io.deephaven.web.shared.fu.JsConsumer;
import io.deephaven.web.shared.fu.JsRunnable;
import io.deephaven.web.shared.ide.VariableType;
Expand All @@ -84,9 +87,15 @@
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -161,6 +170,7 @@ private enum State {
private ApplicationServiceClient applicationServiceClient;
private FlightServiceClient flightServiceClient;
private BrowserFlightServiceClient browserFlightServiceClient;
private InputTableServiceClient inputTableServiceClient;

private final StateCache cache = new StateCache();
private final JsWeakMap<HasTableBinding, RequestBatcher> batchers = new JsWeakMap<>();
Expand Down Expand Up @@ -201,6 +211,8 @@ public WorkerConnection(QueryConnectable<?> info, Supplier<Promise<ConnectToken>
new ApplicationServiceClient(info.getServerUrl(), JsPropertyMap.of("debug", debugGrpc));
browserFlightServiceClient =
new BrowserFlightServiceClient(info.getServerUrl(), JsPropertyMap.of("debug", debugGrpc));
inputTableServiceClient =
new InputTableServiceClient(info.getServerUrl(), JsPropertyMap.of("debug", debugGrpc));

// builder.setConnectionErrorHandler(msg -> info.failureHandled(String.valueOf(msg)));

Expand Down Expand Up @@ -837,6 +849,10 @@ public FlightServiceClient flightServiceClient() {
return flightServiceClient;
}

public InputTableServiceClient inputTableServiceClient() {
return inputTableServiceClient;
}

public BrowserHeaders metadata() {
return metadata;
}
Expand All @@ -845,13 +861,13 @@ public <ReqT, RespT> BiDiStream.Factory<ReqT, RespT> streamFactory() {
return new BiDiStream.Factory<>(this::metadata, config::newTicketInt, useWebsockets);
}

public Promise<JsTable> newTable(String[] columnNames, String[] types, String[][] data, String userTimeZone,
public Promise<JsTable> newTable(String[] columnNames, String[] types, Object[][] data, String userTimeZone,
HasEventHandling failHandler) {
// Store the ref to the data using an array we can clear out, so the data is garbage collected later
// This means the table can only be created once, but that's probably what we want in this case anyway
final String[][][] dataRef = new String[][][] {data};
final Object[][][] dataRef = new Object[][][] {data};
return newState(failHandler, (c, cts, metadata) -> {
final String[][] d = dataRef[0];
final Object[][] d = dataRef[0];
if (d == null) {
c.apply("Data already released, cannot re-create table", null);
return;
Expand All @@ -862,14 +878,14 @@ public Promise<JsTable> newTable(String[] columnNames, String[] types, String[][
Builder schema = new Builder(1024);

// while we're examining columns, build the copiers for data
List<CsvTypeParser.CsvColumn> columns = new ArrayList<>();
List<JsDataHandler> columns = new ArrayList<>();

double[] fields = new double[columnNames.length];
for (int i = 0; i < columnNames.length; i++) {
String columnName = columnNames[i];
String columnType = types[i];

CsvTypeParser.CsvColumn writer = CsvTypeParser.getColumn(columnType);
JsDataHandler writer = JsDataHandler.getHandler(columnType);
columns.add(writer);

double nameOffset = schema.createString(columnName);
Expand Down Expand Up @@ -935,9 +951,13 @@ public Promise<JsTable> newTable(String[] columnNames, String[] types, String[][

// iterate each column, building buffers and fieldnodes, as well as building the actual payload
List<Uint8Array> buffers = new ArrayList<>();
List<CsvTypeParser.Node> nodes = new ArrayList<>();
List<JsDataHandler.Node> nodes = new ArrayList<>();
JsDataHandler.ParseContext context = new JsDataHandler.ParseContext();
if (userTimeZone != null) {
context.timeZone = JsTimeZone.getTimeZone(userTimeZone);
}
for (int i = 0; i < data.length; i++) {
columns.get(i).writeColumn(data[i], nodes::add, buffers::add);
columns.get(i).write(data[i], context, nodes::add, buffers::add);
}

// write down the buffers for the RecordBatch
Expand All @@ -958,7 +978,7 @@ public Promise<JsTable> newTable(String[] columnNames, String[] types, String[][

RecordBatch.startNodesVector(bodyData, nodes.size());
for (int i = nodes.size() - 1; i >= 0; i--) {
CsvTypeParser.Node node = nodes.get(i);
JsDataHandler.Node node = nodes.get(i);
FieldNode.createFieldNode(bodyData, Long.create(node.length(), 0), Long.create(node.nullCount(), 0));
}
double nodesOffset = bodyData.endVector();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.deephaven.web.shared.data;
package io.deephaven.web.client.api.barrage.def;

import java.io.Serializable;

public class ColumnDefinition implements Serializable {
public class ColumnDefinition {
private int columnIndex;
private String name;
private String type;
Expand All @@ -18,6 +16,8 @@ public class ColumnDefinition implements Serializable {

// Indicates that this is a style column for the row
private boolean forRow;
private boolean isInputTableKeyColumn;
private String description;

public String getName() {
return name;
Expand Down Expand Up @@ -118,4 +118,20 @@ public String getStyleColumnName() {
public void setStyleColumnName(String styleColumn) {
this.styleColumn = styleColumn;
}

public void setInputTableKeyColumn(boolean inputTableKeyColumn) {
this.isInputTableKeyColumn = inputTableKeyColumn;
}

public boolean isInputTableKeyColumn() {
return isInputTableKeyColumn;
}

public void setDescription(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}
Loading

0 comments on commit 96f7035

Please sign in to comment.