Skip to content

Commit

Permalink
check for message prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Nov 5, 2021
1 parent 2b327af commit b4040cd
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.extensions.barrage.util.GrpcUtil;
import io.deephaven.grpc_api.session.SessionState;
import io.deephaven.grpc_api.util.SchemaHelper;
import io.deephaven.proto.backplane.grpc.BatchTableRequest;
import io.deephaven.proto.backplane.grpc.CreateInputTableRequest;
import io.grpc.StatusRuntimeException;
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.Schema;

import javax.inject.Inject;
Expand Down Expand Up @@ -57,12 +56,8 @@ public Table create(CreateInputTableRequest request, List<SessionState.ExportObj
TableDefinition tableDefinitionFromSchema;

if (request.hasSchema()) {
Message message = Message.getRootAsMessage(request.getSchema().asReadOnlyByteBuffer());
if (message.headerType() != MessageHeader.Schema) {
throw GrpcUtil.statusRuntimeException(Code.INVALID_ARGUMENT,
"Must specify schema header in schema message");
}
tableDefinitionFromSchema = BarrageUtil.convertArrowSchema((Schema) message.header(new Schema())).tableDef;
Schema schema = SchemaHelper.flatbufSchema(request.getSchema().asReadOnlyByteBuffer());
tableDefinitionFromSchema = BarrageUtil.convertArrowSchema(schema).tableDef;
} else if (request.hasSourceTableId()) {
Table sourceTable = sourceTables.get(0).get();
tableDefinitionFromSchema = sourceTable.getDefinition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ public static Schema schema(ExportedTableCreationResponse response) {
* @return the flatbuf schema
*/
public static org.apache.arrow.flatbuf.Schema flatbufSchema(ExportedTableCreationResponse response) {
final ByteBuffer bb = response.getSchemaHeader().asReadOnlyByteBuffer();
return flatbufSchema(response.getSchemaHeader().asReadOnlyByteBuffer());
}

/**
* Creates a flatbuf Schema from raw bytes of a Message.
* @param bb a bytebuffer that contains a schema in a message
* @return a flatbuf schema
*/
public static org.apache.arrow.flatbuf.Schema flatbufSchema(ByteBuffer bb) {
if (bb.remaining() < MESSAGE_OFFSET) {
throw new IllegalArgumentException("Not enough bytes for Message/Schema");
}
Expand Down

0 comments on commit b4040cd

Please sign in to comment.