diff --git a/.github/workflows/arrow-flight-tests.yml b/.github/workflows/arrow-flight-tests.yml
new file mode 100644
index 0000000000000..ee77c122536e1
--- /dev/null
+++ b/.github/workflows/arrow-flight-tests.yml
@@ -0,0 +1,82 @@
+name: arrow flight tests
+
+on:
+ pull_request:
+
+env:
+ CONTINUOUS_INTEGRATION: true
+ MAVEN_OPTS: "-Xmx1024M -XX:+ExitOnOutOfMemoryError"
+ MAVEN_INSTALL_OPTS: "-Xmx2G -XX:+ExitOnOutOfMemoryError"
+ MAVEN_FAST_INSTALL: "-B -V --quiet -T 1C -DskipTests -Dair.check.skip-all --no-transfer-progress -Dmaven.javadoc.skip=true"
+ MAVEN_TEST: "-B -Dair.check.skip-all -Dmaven.javadoc.skip=true -DLogTestDurationListener.enabled=true --no-transfer-progress --fail-at-end"
+ RETRY: .github/bin/retry
+
+jobs:
+ changes:
+ runs-on: ubuntu-latest
+ permissions:
+ pull-requests: read
+ outputs:
+ codechange: ${{ steps.filter.outputs.codechange }}
+ steps:
+ - uses: dorny/paths-filter@v2
+ id: filter
+ with:
+ filters: |
+ codechange:
+ - '!presto-docs/**'
+ test:
+ runs-on: ubuntu-latest
+ needs: changes
+ strategy:
+ fail-fast: false
+ matrix:
+ modules:
+ - ":presto-base-arrow-flight" # Only run tests for the `presto-base-arrow-flight` module
+
+ timeout-minutes: 80
+ concurrency:
+ group: ${{ github.workflow }}-test-${{ matrix.modules }}-${{ github.event.pull_request.number }}
+ cancel-in-progress: true
+
+ steps:
+ # Checkout the code only if there are changes in the relevant files
+ - uses: actions/checkout@v4
+ if: needs.changes.outputs.codechange == 'true'
+ with:
+ show-progress: false
+
+ # Set up Java for the build environment
+ - uses: actions/setup-java@v2
+ if: needs.changes.outputs.codechange == 'true'
+ with:
+ distribution: 'temurin'
+ java-version: 8
+
+ # Cache Maven dependencies to speed up the build
+ - name: Cache local Maven repository
+ if: needs.changes.outputs.codechange == 'true'
+ id: cache-maven
+ uses: actions/cache@v2
+ with:
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-maven-2-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ ${{ runner.os }}-maven-2-
+
+ # Resolve Maven dependencies (if cache is not found)
+ - name: Populate Maven cache
+ if: steps.cache-maven.outputs.cache-hit != 'true' && needs.changes.outputs.codechange == 'true'
+ run: ./mvnw de.qaware.maven:go-offline-maven-plugin:resolve-dependencies --no-transfer-progress && .github/bin/download_nodejs
+
+ # Install dependencies for the target module
+ - name: Maven Install
+ if: needs.changes.outputs.codechange == 'true'
+ run: |
+ export MAVEN_OPTS="${MAVEN_INSTALL_OPTS}"
+ ./mvnw install ${MAVEN_FAST_INSTALL} -am -pl ${{ matrix.modules }}
+
+ # Run Maven tests for the target module
+ - name: Maven Tests
+ if: needs.changes.outputs.codechange == 'true'
+ run: ./mvnw test ${MAVEN_TEST} -pl ${{ matrix.modules }}
diff --git a/presto-base-arrow-flight/pom.xml b/presto-base-arrow-flight/pom.xml
index fd9871e6b2490..b2d1ff04a964e 100644
--- a/presto-base-arrow-flight/pom.xml
+++ b/presto-base-arrow-flight/pom.xml
@@ -16,18 +16,14 @@
4.10.0
17.0.0
4.1.110.Final
+ 1.6.20
+ 2.23.0
com.facebook.airlift
bootstrap
-
-
- ch.qos.logback
- logback-core
-
-
@@ -39,14 +35,7 @@
com.google.guava
guava
-
- org.checkerframework
- checker-qual
-
-
- com.google.errorprone
- error_prone_annotations
-
+
com.google.j2objc
j2objc-annotations
@@ -99,6 +88,12 @@
org.apache.arrow
arrow-memory-core
${arrow.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
@@ -115,37 +110,29 @@
org.apache.arrow
arrow-jdbc
${arrow.version}
-
-
-
- io.netty
- netty-codec-http2
- ${netty.version}
-
-
-
- io.netty
- netty-handler-proxy
- ${netty.version}
- io.netty
- netty-codec-http
+ org.slf4j
+ slf4j-api
-
- io.netty
- netty-tcnative-boringssl-static
- 2.0.65.Final
-
-
org.apache.arrow
arrow-vector
${arrow.version}
+
+ org.slf4j
+ slf4j-api
+
+
+
+ commons-codec
+ commons-codec
+
+
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
@@ -188,13 +175,6 @@
test
-
- org.checkerframework
- checker-qual
- 3.4.1
- test
-
-
com.facebook.presto
presto-testng-services
@@ -225,6 +205,11 @@
flight-core
${arrow.version}
+
+ org.slf4j
+ slf4j-api
+
+
com.google.j2objc
j2objc-annotations
@@ -249,41 +234,10 @@
h2
test
-
-
- org.codehaus.mojo
- animal-sniffer-annotations
- 1.23
-
-
-
- com.google.j2objc
- j2objc-annotations
- 1.3
-
-
-
- com.google.errorprone
- error_prone_annotations
- 2.14.0
-
-
-
- com.google.protobuf
- protobuf-java
- 3.25.5
-
-
-
- commons-codec
- commons-codec
- 1.17.0
-
-
io.netty
netty-transport-native-unix-common
@@ -321,22 +275,36 @@
- org.objenesis
- objenesis
- 3.3
+ io.netty
+ netty-handler-proxy
+ ${netty.version}
+
+
+
+ io.netty
+ netty-codec-http
+ ${netty.version}
org.jetbrains.kotlin
kotlin-stdlib-common
- 1.6.20
+ ${kotlin.version}
+
+
+
+ com.google.errorprone
+ error_prone_annotations
+ ${error_prone_annotations}
- org.slf4j
- slf4j-api
- 2.0.13
+ org.apache.arrow
+ arrow-algorithm
+ ${arrow.version}
+ compile
+
@@ -352,26 +320,10 @@
org.apache.maven.plugins
maven-enforcer-plugin
-
-
-
-
- com.google.errorprone:error_prone_annotations
-
-
-
-
org.apache.maven.plugins
maven-dependency-plugin
-
-
- io.netty:netty-codec-http2
- io.netty:netty-handler-proxy
- io.netty:netty-tcnative-boringssl-static
-
-
org.basepom.maven
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/AbstractArrowMetadata.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/AbstractArrowMetadata.java
index 0507a76749977..c991d3c4c1f5d 100644
--- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/AbstractArrowMetadata.java
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/AbstractArrowMetadata.java
@@ -38,16 +38,20 @@
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.NotFoundException;
+import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -70,76 +74,67 @@ public AbstractArrowMetadata(ArrowFlightConfig config, ArrowFlightClientHandler
this.clientHandler = requireNonNull(clientHandler);
}
- private ArrowColumnHandle createArrowColumnHandleForFloatingPointType(String columnName, ArrowType.FloatingPoint floatingPoint)
+ private Type getPrestoTypeForArrowFloatingPointType(ArrowType.FloatingPoint floatingPoint)
{
switch (floatingPoint.getPrecision()) {
case SINGLE:
- return new ArrowColumnHandle(columnName, RealType.REAL);
+ return RealType.REAL;
case DOUBLE:
- return new ArrowColumnHandle(columnName, DoubleType.DOUBLE);
+ return DoubleType.DOUBLE;
default:
throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPoint.getPrecision());
}
}
- private ArrowColumnHandle createArrowColumnHandleForIntType(String columnName, ArrowType.Int intType)
+ private Type getPrestoTypeForArrowIntType(ArrowType.Int intType)
{
switch (intType.getBitWidth()) {
case 64:
- return new ArrowColumnHandle(columnName, BigintType.BIGINT);
+ return BigintType.BIGINT;
case 32:
- return new ArrowColumnHandle(columnName, IntegerType.INTEGER);
+ return IntegerType.INTEGER;
case 16:
- return new ArrowColumnHandle(columnName, SmallintType.SMALLINT);
+ return SmallintType.SMALLINT;
case 8:
- return new ArrowColumnHandle(columnName, TinyintType.TINYINT);
+ return TinyintType.TINYINT;
default:
throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid bit width " + intType.getBitWidth());
}
}
- private ColumnMetadata createIntColumnMetadata(String columnName, ArrowType.Int intType)
+ protected Type getPrestoTypeFromArrowField(Field field)
{
- switch (intType.getBitWidth()) {
- case 64:
- return new ColumnMetadata(columnName, BigintType.BIGINT);
- case 32:
- return new ColumnMetadata(columnName, IntegerType.INTEGER);
- case 16:
- return new ColumnMetadata(columnName, SmallintType.SMALLINT);
- case 8:
- return new ColumnMetadata(columnName, TinyintType.TINYINT);
- default:
- throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid bit width " + intType.getBitWidth());
- }
- }
-
- private ColumnMetadata createFloatingPointColumnMetadata(String columnName, ArrowType.FloatingPoint floatingPointType)
- {
- switch (floatingPointType.getPrecision()) {
- case SINGLE:
- return new ColumnMetadata(columnName, RealType.REAL);
- case DOUBLE:
- return new ColumnMetadata(columnName, DoubleType.DOUBLE);
+ switch (field.getType().getTypeID()) {
+ case Int:
+ ArrowType.Int intType = (ArrowType.Int) field.getType();
+ return getPrestoTypeForArrowIntType(intType);
+ case Binary:
+ case LargeBinary:
+ case FixedSizeBinary:
+ return VarbinaryType.VARBINARY;
+ case Date:
+ return DateType.DATE;
+ case Timestamp:
+ return TimestampType.TIMESTAMP;
+ case Utf8:
+ case LargeUtf8:
+ return VarcharType.VARCHAR;
+ case FloatingPoint:
+ ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType();
+ return getPrestoTypeForArrowFloatingPointType(floatingPoint);
+ case Decimal:
+ ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType();
+ return DecimalType.createDecimalType(decimalType.getPrecision(), decimalType.getScale());
+ case Bool:
+ return BooleanType.BOOLEAN;
+ case Time:
+ return TimeType.TIME;
default:
- throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPointType.getPrecision());
+ throw new UnsupportedOperationException("The data type " + field.getType().getTypeID() + " is not supported.");
}
}
- /**
- * Provides the field type, which can be overridden by concrete implementations
- * with their own custom type.
- *
- * @return the field type
- */
- protected Type overrideFieldType(Field field, Type type)
- {
- return type;
- }
-
- protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, Optional query, String schema, String table);
-
- protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, String schema);
+ protected abstract FlightDescriptor getFlightDescriptor(Optional query, String schema, String table);
protected abstract String getDataSourceSpecificSchemaName(ArrowFlightConfig config, String schemaName);
@@ -163,11 +158,11 @@ public List getColumnsList(String schema, String table, ConnectorSession
try {
String dataSourceSpecificSchemaName = getDataSourceSpecificSchemaName(config, schema);
String dataSourceSpecificTableName = getDataSourceSpecificTableName(config, table);
- ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(), Optional.empty(),
+ FlightDescriptor flightDescriptor = getFlightDescriptor(Optional.empty(),
dataSourceSpecificSchemaName, dataSourceSpecificTableName);
- FlightInfo flightInfo = clientHandler.getFlightInfo(request, connectorSession);
- List fields = flightInfo.getSchema().getFields();
+ Optional flightschema = clientHandler.getSchema(flightDescriptor, connectorSession);
+ List fields = flightschema.map(Schema::getFields).orElse(Collections.emptyList());
return fields;
}
catch (Exception e) {
@@ -178,7 +173,7 @@ public List getColumnsList(String schema, String table, ConnectorSession
@Override
public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
- Map column = new HashMap<>();
+ Map columnHandles = new HashMap<>();
String schemaValue = ((ArrowTableHandle) tableHandle).getSchema();
String tableValue = ((ArrowTableHandle) tableHandle).getTable();
@@ -190,56 +185,22 @@ public Map getColumnHandles(ConnectorSession session, Conn
String columnName = field.getName();
logger.debug("The value of the flight columnName is:- %s", columnName);
- ArrowColumnHandle handle;
- switch (field.getType().getTypeID()) {
- case Int:
- ArrowType.Int intType = (ArrowType.Int) field.getType();
- handle = createArrowColumnHandleForIntType(columnName, intType);
- break;
- case Binary:
- case LargeBinary:
- case FixedSizeBinary:
- handle = new ArrowColumnHandle(columnName, VarbinaryType.VARBINARY);
- break;
- case Date:
- handle = new ArrowColumnHandle(columnName, DateType.DATE);
- break;
- case Timestamp:
- handle = new ArrowColumnHandle(columnName, TimestampType.TIMESTAMP);
- break;
- case Utf8:
- case LargeUtf8:
- handle = new ArrowColumnHandle(columnName, VarcharType.VARCHAR);
- break;
- case FloatingPoint:
- ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType();
- handle = createArrowColumnHandleForFloatingPointType(columnName, floatingPoint);
- break;
- case Decimal:
- ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType();
- handle = new ArrowColumnHandle(columnName, DecimalType.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
- break;
- case Bool:
- handle = new ArrowColumnHandle(columnName, BooleanType.BOOLEAN);
- break;
- case Time:
- handle = new ArrowColumnHandle(columnName, TimeType.TIME);
- break;
- default:
- throw new UnsupportedOperationException("The data type " + field.getType().getTypeID() + " is not supported.");
- }
- Type type = overrideFieldType(field, handle.getColumnType());
- if (!type.equals(handle.getColumnType())) {
- handle = new ArrowColumnHandle(columnName, type);
- }
- column.put(columnName, handle);
+ Type type = getPrestoTypeFromArrowField(field);
+ columnHandles.put(columnName, new ArrowColumnHandle(columnName, type));
}
- return column;
+ return columnHandles;
}
@Override
public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns)
{
+ if (!(table instanceof ArrowTableHandle)) {
+ throw new PrestoException(
+ StandardErrorCode.INVALID_CAST_ARGUMENT,
+ "Invalid table handle: Expected an instance of ArrowTableHandle but received "
+ + table.getClass().getSimpleName() + "");
+ }
+
ArrowTableHandle tableHandle = (ArrowTableHandle) table;
List columns = new ArrayList<>();
@@ -266,53 +227,8 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
for (Field field : columnList) {
String columnName = field.getName();
- ArrowType type = field.getType();
-
- ColumnMetadata columnMetadata;
-
- switch (type.getTypeID()) {
- case Int:
- ArrowType.Int intType = (ArrowType.Int) type;
- columnMetadata = createIntColumnMetadata(columnName, intType);
- break;
- case Binary:
- case LargeBinary:
- case FixedSizeBinary:
- columnMetadata = new ColumnMetadata(columnName, VarbinaryType.VARBINARY);
- break;
- case Date:
- columnMetadata = new ColumnMetadata(columnName, DateType.DATE);
- break;
- case Timestamp:
- columnMetadata = new ColumnMetadata(columnName, TimestampType.TIMESTAMP);
- break;
- case Utf8:
- case LargeUtf8:
- columnMetadata = new ColumnMetadata(columnName, VarcharType.VARCHAR);
- break;
- case FloatingPoint:
- ArrowType.FloatingPoint floatingPointType = (ArrowType.FloatingPoint) type;
- columnMetadata = createFloatingPointColumnMetadata(columnName, floatingPointType);
- break;
- case Decimal:
- ArrowType.Decimal decimalType = (ArrowType.Decimal) type;
- columnMetadata = new ColumnMetadata(columnName, DecimalType.createDecimalType(decimalType.getPrecision(), decimalType.getScale()));
- break;
- case Time:
- columnMetadata = new ColumnMetadata(columnName, TimeType.TIME);
- break;
- case Bool:
- columnMetadata = new ColumnMetadata(columnName, BooleanType.BOOLEAN);
- break;
- default:
- throw new UnsupportedOperationException("The data type " + type.getTypeID() + " is not supported.");
- }
-
- Type fieldType = overrideFieldType(field, columnMetadata.getType());
- if (!fieldType.equals(columnMetadata.getType())) {
- columnMetadata = new ColumnMetadata(columnName, fieldType);
- }
- meta.add(columnMetadata);
+ Type fieldType = getPrestoTypeFromArrowField(field);
+ meta.add(new ColumnMetadata(columnName, fieldType));
}
return new ConnectorTableMetadata(new SchemaTableName(((ArrowTableHandle) table).getSchema(), ((ArrowTableHandle) table).getTable()), meta);
}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/AbstractArrowSplitManager.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/AbstractArrowSplitManager.java
index 49d4fe91fc171..e92afe8b0a4b8 100644
--- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/AbstractArrowSplitManager.java
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/AbstractArrowSplitManager.java
@@ -20,6 +20,7 @@
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;
import java.util.List;
@@ -37,17 +38,17 @@ public AbstractArrowSplitManager(ArrowFlightClientHandler client)
this.clientHandler = client;
}
- protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, ArrowTableLayoutHandle tableLayoutHandle);
+ protected abstract FlightDescriptor getFlightDescriptor(ArrowFlightConfig config, ArrowTableLayoutHandle tableLayoutHandle);
@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext)
{
ArrowTableLayoutHandle tableLayoutHandle = (ArrowTableLayoutHandle) layout;
ArrowTableHandle tableHandle = tableLayoutHandle.getTableHandle();
- ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(),
+ FlightDescriptor flightDescriptor = getFlightDescriptor(clientHandler.getConfig(),
tableLayoutHandle);
- FlightInfo flightInfo = clientHandler.getFlightInfo(request, session);
+ FlightInfo flightInfo = clientHandler.getFlightInfo(flightDescriptor, session);
List splits = flightInfo.getEndpoints()
.stream()
.map(info -> new ArrowSplit(
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java
index 9523bd210faa5..1028af2414308 100644
--- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java
@@ -34,16 +34,20 @@ public class ArrowConnector
private final ConnectorPageSourceProvider pageSourceProvider;
private final ConnectorHandleResolver handleResolver;
+ private final ArrowFlightClientHandler arrowFlightClientHandler;
+
@Inject
public ArrowConnector(ConnectorMetadata metadata,
- ConnectorHandleResolver handleResolver,
- ConnectorSplitManager splitManager,
- ConnectorPageSourceProvider pageSourceProvider)
+ ConnectorHandleResolver handleResolver,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider,
+ ArrowFlightClientHandler arrowFlightClientHandler)
{
this.metadata = requireNonNull(metadata, "Metadata is null");
- this.handleResolver = requireNonNull(handleResolver, "Metadata is null");
+ this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
this.splitManager = requireNonNull(splitManager, "SplitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "PageSinkProvider is null");
+ this.arrowFlightClientHandler = requireNonNull(arrowFlightClientHandler, "arrow flight handler is null");
}
public Optional getHandleResolver()
@@ -74,4 +78,10 @@ public ConnectorPageSourceProvider getPageSourceProvider()
{
return pageSourceProvider;
}
+
+ @Override
+ public void shutdown()
+ {
+ arrowFlightClientHandler.closeRootallocator();
+ }
}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java
index f17317dd42b3a..e070b2c4624d6 100644
--- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java
@@ -46,7 +46,7 @@ public class ArrowConnectorFactory
public ArrowConnectorFactory(String name, Module module, ClassLoader classLoader)
{
checkArgument(!isNullOrEmpty(name), "name is null or empty");
- this.name = name;
+ this.name = requireNonNull(name, "name is null");
this.module = requireNonNull(module, "module is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java
index 1a9d964d7458d..3d12617a0839d 100644
--- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java
@@ -14,7 +14,6 @@
package com.facebook.plugin.arrow;
import org.apache.arrow.flight.FlightClient;
-import org.apache.arrow.memory.RootAllocator;
import java.io.IOException;
import java.io.InputStream;
@@ -27,13 +26,11 @@ public class ArrowFlightClient
{
private final FlightClient flightClient;
private final Optional trustedCertificate;
- private RootAllocator allocator;
- public ArrowFlightClient(FlightClient flightClient, Optional trustedCertificate, RootAllocator allocator)
+ public ArrowFlightClient(FlightClient flightClient, Optional trustedCertificate)
{
this.flightClient = requireNonNull(flightClient, "flightClient cannot be null");
- this.trustedCertificate = trustedCertificate;
- this.allocator = allocator;
+ this.trustedCertificate = requireNonNull(trustedCertificate, "trustedCertificate is null");
}
public FlightClient getFlightClient()
@@ -53,9 +50,5 @@ public void close() throws InterruptedException, IOException
if (trustedCertificate.isPresent()) {
trustedCertificate.get().close();
}
- if (allocator != null) {
- allocator.close();
- allocator = null;
- }
}
}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java
index ba75cbffc088c..a85edf42358bb 100644
--- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java
@@ -21,6 +21,7 @@
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.grpc.CredentialCallOption;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.Schema;
import java.io.FileInputStream;
import java.io.InputStream;
@@ -33,6 +34,8 @@ public abstract class ArrowFlightClientHandler
private static final Logger logger = Logger.get(ArrowFlightClientHandler.class);
private final ArrowFlightConfig config;
+ private RootAllocator allocator;
+
public ArrowFlightClientHandler(ArrowFlightConfig config)
{
this.config = config;
@@ -41,7 +44,6 @@ public ArrowFlightClientHandler(ArrowFlightConfig config)
private ArrowFlightClient initializeClient(Optional uri)
{
try {
- RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
Optional trustedCertificate = Optional.empty();
Location location;
@@ -57,6 +59,10 @@ private ArrowFlightClient initializeClient(Optional uri)
}
}
+ if (null == allocator) {
+ initializeAllocator();
+ }
+
FlightClient.Builder flightClientBuilder = FlightClient.builder(allocator, location);
if (config.getVerifyServer() != null && !config.getVerifyServer()) {
flightClientBuilder.verifyServer(false);
@@ -67,26 +73,22 @@ else if (config.getFlightServerSSLCertificate() != null) {
}
FlightClient flightClient = flightClientBuilder.build();
- return new ArrowFlightClient(flightClient, trustedCertificate, allocator);
+ return new ArrowFlightClient(flightClient, trustedCertificate);
}
catch (Exception ex) {
throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight client could not be obtained." + ex.getMessage(), ex);
}
}
- protected abstract CredentialCallOption getCallOptions(ConnectorSession connectorSession);
-
- /**
- * Connector implementations can override this method to get a FlightDescriptor
- * from command or path.
- * @param flightRequest
- * @return
- */
- protected FlightDescriptor getFlightDescriptor(ArrowFlightRequest flightRequest)
+ private synchronized void initializeAllocator()
{
- return FlightDescriptor.command(flightRequest.getCommand());
+ if (allocator == null) {
+ allocator = new RootAllocator(Long.MAX_VALUE);
+ }
}
+ protected abstract CredentialCallOption getCallOptions(ConnectorSession connectorSession);
+
public ArrowFlightConfig getConfig()
{
return config;
@@ -97,13 +99,12 @@ public ArrowFlightClient getClient(Optional uri)
return initializeClient(uri);
}
- public FlightInfo getFlightInfo(ArrowFlightRequest request, ConnectorSession connectorSession)
+ public FlightInfo getFlightInfo(FlightDescriptor flightDescriptor, ConnectorSession connectorSession)
{
try (ArrowFlightClient client = getClient(Optional.empty())) {
CredentialCallOption auth = this.getCallOptions(connectorSession);
- FlightDescriptor descriptor = getFlightDescriptor(request);
logger.debug("Fetching flight info");
- FlightInfo flightInfo = client.getFlightClient().getInfo(descriptor, auth);
+ FlightInfo flightInfo = client.getFlightClient().getInfo(flightDescriptor, auth);
logger.debug("got flight info");
return flightInfo;
}
@@ -111,4 +112,16 @@ public FlightInfo getFlightInfo(ArrowFlightRequest request, ConnectorSession con
throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight information could not be obtained from the flight server." + e.getMessage(), e);
}
}
+
+ public Optional getSchema(FlightDescriptor flightDescriptor, ConnectorSession connectorSession)
+ {
+ return getFlightInfo(flightDescriptor, connectorSession).getSchemaOptional();
+ }
+
+ public void closeRootallocator()
+ {
+ if (null != allocator) {
+ allocator.close();
+ }
+ }
}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java
deleted file mode 100644
index 7e04e0a6066e3..0000000000000
--- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.facebook.plugin.arrow;
-
-public interface ArrowFlightRequest
-{
- byte[] getCommand();
-}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
index b83e791f2163e..ec51125a34848 100644
--- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
@@ -16,54 +16,19 @@
import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
-import com.facebook.presto.common.block.BlockBuilder;
-import com.facebook.presto.common.type.CharType;
-import com.facebook.presto.common.type.DateType;
-import com.facebook.presto.common.type.DecimalType;
-import com.facebook.presto.common.type.Decimals;
-import com.facebook.presto.common.type.TimeType;
-import com.facebook.presto.common.type.TimestampType;
import com.facebook.presto.common.type.Type;
-import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
-import com.google.common.base.CharMatcher;
-import io.airlift.slice.Slice;
-import io.airlift.slice.Slices;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Ticket;
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DateDayVector;
-import org.apache.arrow.vector.DateMilliVector;
-import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.NullVector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeMicroVector;
-import org.apache.arrow.vector.TimeMilliVector;
-import org.apache.arrow.vector.TimeSecVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
-import org.apache.arrow.vector.TimeStampMilliTZVector;
-import org.apache.arrow.vector.TimeStampMilliVector;
-import org.apache.arrow.vector.TimeStampSecVector;
-import org.apache.arrow.vector.TinyIntVector;
-import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.dictionary.Dictionary;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.TimeUnit;
import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
@@ -100,454 +65,6 @@ private void getFlightStream(ArrowFlightClientHandler clientHandler, byte[] tick
}
}
- private Block buildBlockFromVector(FieldVector vector, Type type)
- {
- if (vector instanceof BitVector) {
- return buildBlockFromBitVector((BitVector) vector, type);
- }
- else if (vector instanceof TinyIntVector) {
- return buildBlockFromTinyIntVector((TinyIntVector) vector, type);
- }
- else if (vector instanceof IntVector) {
- return buildBlockFromIntVector((IntVector) vector, type);
- }
- else if (vector instanceof SmallIntVector) {
- return buildBlockFromSmallIntVector((SmallIntVector) vector, type);
- }
- else if (vector instanceof BigIntVector) {
- return buildBlockFromBigIntVector((BigIntVector) vector, type);
- }
- else if (vector instanceof DecimalVector) {
- return buildBlockFromDecimalVector((DecimalVector) vector, type);
- }
- else if (vector instanceof NullVector) {
- return buildBlockFromNullVector((NullVector) vector, type);
- }
- else if (vector instanceof TimeStampMicroVector) {
- return buildBlockFromTimeStampMicroVector((TimeStampMicroVector) vector, type);
- }
- else if (vector instanceof TimeStampMilliVector) {
- return buildBlockFromTimeStampMilliVector((TimeStampMilliVector) vector, type);
- }
- else if (vector instanceof Float4Vector) {
- return buildBlockFromFloat4Vector((Float4Vector) vector, type);
- }
- else if (vector instanceof Float8Vector) {
- return buildBlockFromFloat8Vector((Float8Vector) vector, type);
- }
- else if (vector instanceof VarCharVector) {
- if (type instanceof CharType) {
- return buildCharTypeBlockFromVarcharVector((VarCharVector) vector, type);
- }
- else if (type instanceof TimeType) {
- return buildTimeTypeBlockFromVarcharVector((VarCharVector) vector, type);
- }
- else {
- return buildBlockFromVarCharVector((VarCharVector) vector, type);
- }
- }
- else if (vector instanceof VarBinaryVector) {
- return buildBlockFromVarBinaryVector((VarBinaryVector) vector, type);
- }
- else if (vector instanceof DateDayVector) {
- return buildBlockFromDateDayVector((DateDayVector) vector, type);
- }
- else if (vector instanceof DateMilliVector) {
- return buildBlockFromDateMilliVector((DateMilliVector) vector, type);
- }
- else if (vector instanceof TimeMilliVector) {
- return buildBlockFromTimeMilliVector((TimeMilliVector) vector, type);
- }
- else if (vector instanceof TimeSecVector) {
- return buildBlockFromTimeSecVector((TimeSecVector) vector, type);
- }
- else if (vector instanceof TimeStampSecVector) {
- return buildBlockFromTimeStampSecVector((TimeStampSecVector) vector, type);
- }
- else if (vector instanceof TimeMicroVector) {
- return buildBlockFromTimeMicroVector((TimeMicroVector) vector, type);
- }
- else if (vector instanceof TimeStampMilliTZVector) {
- return buildBlockFromTimeMilliTZVector((TimeStampMilliTZVector) vector, type);
- }
- throw new UnsupportedOperationException("Unsupported vector type: " + vector.getClass().getSimpleName());
- }
-
- private Block buildBlockFromTimeMilliTZVector(TimeStampMilliTZVector vector, Type type)
- {
- if (!(type instanceof TimestampType)) {
- throw new IllegalArgumentException("Type must be a TimestampType for TimeStampMilliTZVector");
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- long millis = vector.get(i);
- type.writeLong(builder, millis);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromBitVector(BitVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- type.writeBoolean(builder, vector.get(i) == 1);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromIntVector(IntVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- type.writeLong(builder, vector.get(i));
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromSmallIntVector(SmallIntVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- type.writeLong(builder, vector.get(i));
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromTinyIntVector(TinyIntVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- type.writeLong(builder, vector.get(i));
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromBigIntVector(BigIntVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- type.writeLong(builder, vector.get(i));
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromDecimalVector(DecimalVector vector, Type type)
- {
- if (!(type instanceof DecimalType)) {
- throw new IllegalArgumentException("Type must be a DecimalType for DecimalVector");
- }
-
- DecimalType decimalType = (DecimalType) type;
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
-
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- BigDecimal decimal = vector.getObject(i); // Get the BigDecimal value
- if (decimalType.isShort()) {
- builder.writeLong(decimal.unscaledValue().longValue());
- }
- else {
- Slice slice = Decimals.encodeScaledValue(decimal);
- decimalType.writeSlice(builder, slice, 0, slice.length());
- }
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromNullVector(NullVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- builder.appendNull();
- }
- return builder.build();
- }
-
- private Block buildBlockFromTimeStampMicroVector(TimeStampMicroVector vector, Type type)
- {
- if (!(type instanceof TimestampType)) {
- throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName());
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- long micros = vector.get(i);
- long millis = TimeUnit.MICROSECONDS.toMillis(micros);
- type.writeLong(builder, millis);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromTimeStampMilliVector(TimeStampMilliVector vector, Type type)
- {
- if (!(type instanceof TimestampType)) {
- throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName());
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- long millis = vector.get(i);
- type.writeLong(builder, millis);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromFloat8Vector(Float8Vector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- type.writeDouble(builder, vector.get(i));
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromFloat4Vector(Float4Vector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- int intBits = Float.floatToIntBits(vector.get(i));
- type.writeLong(builder, intBits);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromVarBinaryVector(VarBinaryVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- byte[] value = vector.get(i);
- type.writeSlice(builder, Slices.wrappedBuffer(value));
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromVarCharVector(VarCharVector vector, Type type)
- {
- if (!(type instanceof VarcharType)) {
- throw new IllegalArgumentException("Expected VarcharType but got " + type.getClass().getName());
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- String value = new String(vector.get(i), StandardCharsets.UTF_8);
- type.writeSlice(builder, Slices.utf8Slice(value));
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromDateDayVector(DateDayVector vector, Type type)
- {
- if (!(type instanceof DateType)) {
- throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName());
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- type.writeLong(builder, vector.get(i));
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromDateMilliVector(DateMilliVector vector, Type type)
- {
- if (!(type instanceof DateType)) {
- throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName());
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- DateType dateType = (DateType) type;
- long days = TimeUnit.MILLISECONDS.toDays(vector.get(i));
- dateType.writeLong(builder, days);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromTimeSecVector(TimeSecVector vector, Type type)
- {
- if (!(type instanceof TimeType)) {
- throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector");
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- int value = vector.get(i);
- long millis = TimeUnit.SECONDS.toMillis(value);
- type.writeLong(builder, millis);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromTimeMilliVector(TimeMilliVector vector, Type type)
- {
- if (!(type instanceof TimeType)) {
- throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector");
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- long millis = vector.get(i);
- type.writeLong(builder, millis);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromTimeMicroVector(TimeMicroVector vector, Type type)
- {
- if (!(type instanceof TimeType)) {
- throw new IllegalArgumentException("Type must be a TimeType for TimemicroVector");
- }
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- long value = vector.get(i);
- long micro = TimeUnit.MICROSECONDS.toMillis(value);
- type.writeLong(builder, micro);
- }
- }
- return builder.build();
- }
-
- private Block buildBlockFromTimeStampSecVector(TimeStampSecVector vector, Type type)
- {
- if (!(type instanceof TimestampType)) {
- throw new IllegalArgumentException("Type must be a TimestampType for TimeStampSecVector");
- }
-
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- long value = vector.get(i);
- long millis = TimeUnit.SECONDS.toMillis(value);
- type.writeLong(builder, millis);
- }
- }
- return builder.build();
- }
-
- private Block buildCharTypeBlockFromVarcharVector(VarCharVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- String value = new String(vector.get(i), StandardCharsets.UTF_8);
- type.writeSlice(builder, Slices.utf8Slice(CharMatcher.is(' ').trimTrailingFrom(value)));
- }
- }
- return builder.build();
- }
-
- private Block buildTimeTypeBlockFromVarcharVector(VarCharVector vector, Type type)
- {
- BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
- for (int i = 0; i < vector.getValueCount(); i++) {
- if (vector.isNull(i)) {
- builder.appendNull();
- }
- else {
- String timeString = new String(vector.get(i), StandardCharsets.UTF_8);
- LocalTime time = LocalTime.parse(timeString);
- long millis = Duration.between(LocalTime.MIN, time).toMillis();
- type.writeLong(builder, millis);
- }
- }
- return builder.build();
- }
-
@Override
public long getCompletedBytes()
{
@@ -605,7 +122,13 @@ public Page getNextPage()
FieldVector vector = vectorSchemaRoot.get().getVector(columnIndex);
Type type = columnHandles.get(columnIndex).getColumnType();
- Block block = buildBlockFromVector(vector, type);
+ boolean isDictionaryBlock = vector.getField().getDictionary() != null;
+ Dictionary dictionary = null;
+ if (isDictionaryBlock) {
+ dictionary = flightStream.getDictionaryProvider().lookup(vector.getField().getDictionary().getId());
+ }
+ Block block = null != dictionary ? ArrowPageUtils.buildBlockFromVector(vector, type, dictionary.getVector(), isDictionaryBlock) :
+ ArrowPageUtils.buildBlockFromVector(vector, type, null, false);
blocks.add(block);
}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageUtils.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageUtils.java
new file mode 100644
index 0000000000000..04b20d00c24a8
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageUtils.java
@@ -0,0 +1,968 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
+import com.facebook.presto.common.block.DictionaryBlock;
+import com.facebook.presto.common.type.ArrayType;
+import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.BooleanType;
+import com.facebook.presto.common.type.CharType;
+import com.facebook.presto.common.type.DateType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.Decimals;
+import com.facebook.presto.common.type.DoubleType;
+import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.RealType;
+import com.facebook.presto.common.type.RowType;
+import com.facebook.presto.common.type.SmallintType;
+import com.facebook.presto.common.type.TimeType;
+import com.facebook.presto.common.type.TimestampType;
+import com.facebook.presto.common.type.TinyintType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.VarbinaryType;
+import com.facebook.presto.common.type.VarcharType;
+import com.google.common.base.CharMatcher;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.impl.UnionListReader;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.util.JsonStringArrayList;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.LocalTime;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowPageUtils
+{
+ private ArrowPageUtils()
+ {
+ }
+
+ public static Block buildBlockFromVector(FieldVector vector, Type type, FieldVector dictionary, boolean isDictionaryVector)
+ {
+ if (isDictionaryVector) {
+ return buildBlockFromDictionaryVector(vector, dictionary);
+ }
+ else if (vector instanceof BitVector) {
+ return buildBlockFromBitVector((BitVector) vector, type);
+ }
+ else if (vector instanceof TinyIntVector) {
+ return buildBlockFromTinyIntVector((TinyIntVector) vector, type);
+ }
+ else if (vector instanceof IntVector) {
+ return buildBlockFromIntVector((IntVector) vector, type);
+ }
+ else if (vector instanceof SmallIntVector) {
+ return buildBlockFromSmallIntVector((SmallIntVector) vector, type);
+ }
+ else if (vector instanceof BigIntVector) {
+ return buildBlockFromBigIntVector((BigIntVector) vector, type);
+ }
+ else if (vector instanceof DecimalVector) {
+ return buildBlockFromDecimalVector((DecimalVector) vector, type);
+ }
+ else if (vector instanceof NullVector) {
+ return buildBlockFromNullVector((NullVector) vector, type);
+ }
+ else if (vector instanceof TimeStampMicroVector) {
+ return buildBlockFromTimeStampMicroVector((TimeStampMicroVector) vector, type);
+ }
+ else if (vector instanceof TimeStampMilliVector) {
+ return buildBlockFromTimeStampMilliVector((TimeStampMilliVector) vector, type);
+ }
+ else if (vector instanceof Float4Vector) {
+ return buildBlockFromFloat4Vector((Float4Vector) vector, type);
+ }
+ else if (vector instanceof Float8Vector) {
+ return buildBlockFromFloat8Vector((Float8Vector) vector, type);
+ }
+ else if (vector instanceof VarCharVector) {
+ if (type instanceof CharType) {
+ return buildCharTypeBlockFromVarcharVector((VarCharVector) vector, type);
+ }
+ else if (type instanceof TimeType) {
+ return buildTimeTypeBlockFromVarcharVector((VarCharVector) vector, type);
+ }
+ else {
+ return buildBlockFromVarCharVector((VarCharVector) vector, type);
+ }
+ }
+ else if (vector instanceof VarBinaryVector) {
+ return buildBlockFromVarBinaryVector((VarBinaryVector) vector, type);
+ }
+ else if (vector instanceof DateDayVector) {
+ return buildBlockFromDateDayVector((DateDayVector) vector, type);
+ }
+ else if (vector instanceof DateMilliVector) {
+ return buildBlockFromDateMilliVector((DateMilliVector) vector, type);
+ }
+ else if (vector instanceof TimeMilliVector) {
+ return buildBlockFromTimeMilliVector((TimeMilliVector) vector, type);
+ }
+ else if (vector instanceof TimeSecVector) {
+ return buildBlockFromTimeSecVector((TimeSecVector) vector, type);
+ }
+ else if (vector instanceof TimeStampSecVector) {
+ return buildBlockFromTimeStampSecVector((TimeStampSecVector) vector, type);
+ }
+ else if (vector instanceof TimeMicroVector) {
+ return buildBlockFromTimeMicroVector((TimeMicroVector) vector, type);
+ }
+ else if (vector instanceof TimeStampMilliTZVector) {
+ return buildBlockFromTimeMilliTZVector((TimeStampMilliTZVector) vector, type);
+ }
+ else if (vector instanceof ListVector) {
+ return buildBlockFromListVector((ListVector) vector, type);
+ }
+
+ throw new UnsupportedOperationException("Unsupported vector type: " + vector.getClass().getSimpleName());
+ }
+
+ public static Block buildBlockFromDictionaryVector(FieldVector fieldVector, FieldVector dictionaryVector)
+ {
+ // Validate inputs
+ requireNonNull(fieldVector, "encoded vector is null");
+ requireNonNull(dictionaryVector, "dictionary vector is null");
+
+ // Create a BlockBuilder for the decoded vector's data type
+ Type prestoType = getPrestoTypeFromArrowType(dictionaryVector.getField().getType());
+
+ Block dictionaryblock = null;
+ // Populate the block dynamically based on vector type
+ for (int i = 0; i < dictionaryVector.getValueCount(); i++) {
+ if (!dictionaryVector.isNull(i)) {
+ dictionaryblock = appendValueToBlock(dictionaryVector, prestoType);
+ }
+ }
+
+ return getDictionaryBlock(fieldVector, dictionaryblock);
+
+ // Create the Presto DictionaryBlock
+ }
+
+ private static DictionaryBlock getDictionaryBlock(FieldVector fieldVector, Block dictionaryblock)
+ {
+ if (fieldVector instanceof IntVector) {
+ // Get the Arrow indices vector
+ IntVector indicesVector = (IntVector) fieldVector;
+ int[] ids = new int[indicesVector.getValueCount()];
+ for (int i = 0; i < indicesVector.getValueCount(); i++) {
+ ids[i] = indicesVector.get(i);
+ }
+ return new DictionaryBlock(ids.length, dictionaryblock, ids);
+ }
+ else if (fieldVector instanceof SmallIntVector) {
+ // Get the SmallInt indices vector
+ SmallIntVector smallIntIndicesVector = (SmallIntVector) fieldVector;
+ int[] ids = new int[smallIntIndicesVector.getValueCount()];
+ for (int i = 0; i < smallIntIndicesVector.getValueCount(); i++) {
+ ids[i] = smallIntIndicesVector.get(i);
+ }
+ return new DictionaryBlock(ids.length, dictionaryblock, ids);
+ }
+ else if (fieldVector instanceof TinyIntVector) {
+ // Get the TinyInt indices vector
+ TinyIntVector tinyIntIndicesVector = (TinyIntVector) fieldVector;
+ int[] ids = new int[tinyIntIndicesVector.getValueCount()];
+ for (int i = 0; i < tinyIntIndicesVector.getValueCount(); i++) {
+ ids[i] = tinyIntIndicesVector.get(i);
+ }
+ return new DictionaryBlock(ids.length, dictionaryblock, ids);
+ }
+ else {
+ // Handle the case where the FieldVector is of an unsupported type
+ throw new IllegalArgumentException("Unsupported FieldVector type: " + fieldVector.getClass());
+ }
+ }
+
+ private static Type getPrestoTypeFromArrowType(ArrowType arrowType)
+ {
+ if (arrowType instanceof ArrowType.Utf8) {
+ return VarcharType.VARCHAR;
+ }
+ else if (arrowType instanceof ArrowType.Int) {
+ ArrowType.Int intType = (ArrowType.Int) arrowType;
+ if (intType.getBitWidth() == 8 || intType.getBitWidth() == 16 || intType.getBitWidth() == 32) {
+ return IntegerType.INTEGER;
+ }
+ else if (intType.getBitWidth() == 64) {
+ return BigintType.BIGINT;
+ }
+ }
+ else if (arrowType instanceof ArrowType.FloatingPoint) {
+ ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType;
+ FloatingPointPrecision precision = fpType.getPrecision();
+
+ if (precision == FloatingPointPrecision.SINGLE) { // 32-bit float
+ return RealType.REAL;
+ }
+ else if (precision == FloatingPointPrecision.DOUBLE) { // 64-bit float
+ return DoubleType.DOUBLE;
+ }
+ else {
+ throw new UnsupportedOperationException("Unsupported FloatingPoint precision: " + precision);
+ }
+ }
+ else if (arrowType instanceof ArrowType.Bool) {
+ return BooleanType.BOOLEAN;
+ }
+ else if (arrowType instanceof ArrowType.Binary) {
+ return VarbinaryType.VARBINARY;
+ }
+ else if (arrowType instanceof ArrowType.Decimal) {
+ return DecimalType.createDecimalType();
+ }
+ throw new UnsupportedOperationException("Unsupported ArrowType: " + arrowType);
+ }
+
+ private static Block appendValueToBlock(ValueVector vector, Type prestoType)
+ {
+ if (vector instanceof VarCharVector) {
+ return buildBlockFromVarCharVector((VarCharVector) vector, prestoType);
+ }
+ else if (vector instanceof IntVector) {
+ return buildBlockFromIntVector((IntVector) vector, prestoType);
+ }
+ else if (vector instanceof BigIntVector) {
+ return buildBlockFromBigIntVector((BigIntVector) vector, prestoType);
+ }
+ else if (vector instanceof Float4Vector) {
+ return buildBlockFromFloat4Vector((Float4Vector) vector, prestoType);
+ }
+ else if (vector instanceof Float8Vector) {
+ return buildBlockFromFloat8Vector((Float8Vector) vector, prestoType);
+ }
+ else if (vector instanceof BitVector) {
+ return buildBlockFromBitVector((BitVector) vector, prestoType);
+ }
+ else if (vector instanceof VarBinaryVector) {
+ return buildBlockFromVarBinaryVector((VarBinaryVector) vector, prestoType);
+ }
+ else if (vector instanceof DecimalVector) {
+ return buildBlockFromDecimalVector((DecimalVector) vector, prestoType);
+ }
+ else if (vector instanceof TinyIntVector) {
+ return buildBlockFromTinyIntVector((TinyIntVector) vector, prestoType);
+ }
+ else if (vector instanceof SmallIntVector) {
+ return buildBlockFromSmallIntVector((SmallIntVector) vector, prestoType);
+ }
+ else if (vector instanceof DateDayVector) {
+ return buildBlockFromDateDayVector((DateDayVector) vector, prestoType);
+ }
+ else if (vector instanceof TimeStampMilliTZVector) {
+ return buildBlockFromTimeStampMicroVector((TimeStampMicroVector) vector, prestoType);
+ }
+ else {
+ throw new UnsupportedOperationException("Unsupported vector type: " + vector.getClass());
+ }
+ }
+
+ public static Block buildBlockFromTimeMilliTZVector(TimeStampMilliTZVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Type must be a TimestampType for TimeStampMilliTZVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long millis = vector.get(i);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromBitVector(BitVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeBoolean(builder, vector.get(i) == 1);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromIntVector(IntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromSmallIntVector(SmallIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromTinyIntVector(TinyIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromBigIntVector(BigIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromDecimalVector(DecimalVector vector, Type type)
+ {
+ if (!(type instanceof DecimalType)) {
+ throw new IllegalArgumentException("Type must be a DecimalType for DecimalVector");
+ }
+
+ DecimalType decimalType = (DecimalType) type;
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ BigDecimal decimal = vector.getObject(i); // Get the BigDecimal value
+ if (decimalType.isShort()) {
+ builder.writeLong(decimal.unscaledValue().longValue());
+ }
+ else {
+ Slice slice = Decimals.encodeScaledValue(decimal);
+ decimalType.writeSlice(builder, slice, 0, slice.length());
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromNullVector(NullVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ builder.appendNull();
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromTimeStampMicroVector(TimeStampMicroVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long micros = vector.get(i);
+ long millis = TimeUnit.MICROSECONDS.toMillis(micros);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromTimeStampMilliVector(TimeStampMilliVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long millis = vector.get(i);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromFloat8Vector(Float8Vector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeDouble(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromFloat4Vector(Float4Vector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ int intBits = Float.floatToIntBits(vector.get(i));
+ type.writeLong(builder, intBits);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromVarBinaryVector(VarBinaryVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ byte[] value = vector.get(i);
+ type.writeSlice(builder, Slices.wrappedBuffer(value));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromVarCharVector(VarCharVector vector, Type type)
+ {
+ if (!(type instanceof VarcharType)) {
+ throw new IllegalArgumentException("Expected VarcharType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ String value = new String(vector.get(i), StandardCharsets.UTF_8);
+ type.writeSlice(builder, Slices.utf8Slice(value));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromDateDayVector(DateDayVector vector, Type type)
+ {
+ if (!(type instanceof DateType)) {
+ throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromDateMilliVector(DateMilliVector vector, Type type)
+ {
+ if (!(type instanceof DateType)) {
+ throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ DateType dateType = (DateType) type;
+ long days = TimeUnit.MILLISECONDS.toDays(vector.get(i));
+ dateType.writeLong(builder, days);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromTimeSecVector(TimeSecVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ int value = vector.get(i);
+ long millis = TimeUnit.SECONDS.toMillis(value);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromTimeMilliVector(TimeMilliVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long millis = vector.get(i);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromTimeMicroVector(TimeMicroVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimemicroVector");
+ }
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long value = vector.get(i);
+ long micro = TimeUnit.MICROSECONDS.toMillis(value);
+ type.writeLong(builder, micro);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromTimeStampSecVector(TimeStampSecVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Type must be a TimestampType for TimeStampSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long value = vector.get(i);
+ long millis = TimeUnit.SECONDS.toMillis(value);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildCharTypeBlockFromVarcharVector(VarCharVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ String value = new String(vector.get(i), StandardCharsets.UTF_8);
+ type.writeSlice(builder, Slices.utf8Slice(CharMatcher.is(' ').trimTrailingFrom(value)));
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildTimeTypeBlockFromVarcharVector(VarCharVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ String timeString = new String(vector.get(i), StandardCharsets.UTF_8);
+ LocalTime time = LocalTime.parse(timeString);
+ long millis = Duration.between(LocalTime.MIN, time).toMillis();
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ public static Block buildBlockFromListVector(ListVector vector, Type type)
+ {
+ if (!(type instanceof ArrayType)) {
+ throw new IllegalArgumentException("Type must be an ArrayType for ListVector");
+ }
+
+ ArrayType arrayType = (ArrayType) type;
+ Type elementType = arrayType.getElementType();
+ BlockBuilder arrayBuilder = type.createBlockBuilder(null, vector.getValueCount());
+
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ arrayBuilder.appendNull();
+ }
+ else {
+ BlockBuilder elementBuilder = arrayBuilder.beginBlockEntry();
+ UnionListReader reader = vector.getReader();
+ reader.setPosition(i);
+
+ while (reader.next()) {
+ Object value = reader.readObject();
+ if (value == null) {
+ elementBuilder.appendNull();
+ }
+ else {
+ appendValueToBuilder(elementType, elementBuilder, value);
+ }
+ }
+ arrayBuilder.closeEntry();
+ }
+ }
+ return arrayBuilder.build();
+ }
+
+ public static void appendValueToBuilder(Type type, BlockBuilder builder, Object value)
+ {
+ if (value == null) {
+ builder.appendNull();
+ return;
+ }
+
+ if (type instanceof VarcharType) {
+ writeVarcharType(type, builder, value);
+ }
+ else if (type instanceof SmallintType) {
+ writeSmallintType(type, builder, value);
+ }
+ else if (type instanceof TinyintType) {
+ writeTinyintType(type, builder, value);
+ }
+ else if (type instanceof BigintType) {
+ writeBigintType(type, builder, value);
+ }
+ else if (type instanceof IntegerType) {
+ writeIntegerType(type, builder, value);
+ }
+ else if (type instanceof DoubleType) {
+ writeDoubleType(type, builder, value);
+ }
+ else if (type instanceof BooleanType) {
+ writeBooleanType(type, builder, value);
+ }
+ else if (type instanceof DecimalType) {
+ writeDecimalType((DecimalType) type, builder, value);
+ }
+ else if (type instanceof ArrayType) {
+ writeArrayType((ArrayType) type, builder, value);
+ }
+ else if (type instanceof RowType) {
+ writeRowType((RowType) type, builder, value);
+ }
+ else if (type instanceof DateType) {
+ writeDateType(type, builder, value);
+ }
+ else if (type instanceof TimestampType) {
+ writeTimestampType(type, builder, value);
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+ }
+
+ public static void writeVarcharType(Type type, BlockBuilder builder, Object value)
+ {
+ Slice slice = Slices.utf8Slice(value.toString());
+ type.writeSlice(builder, slice);
+ }
+
+ public static void writeSmallintType(Type type, BlockBuilder builder, Object value)
+ {
+ if (value instanceof Number) {
+ type.writeLong(builder, ((Number) value).shortValue());
+ }
+ else if (value instanceof JsonStringArrayList) {
+ for (Object obj : (JsonStringArrayList) value) {
+ try {
+ short shortValue = Short.parseShort(obj.toString());
+ type.writeLong(builder, shortValue);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid number format in JsonStringArrayList for SmallintType: " + obj, e);
+ }
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type for SmallintType: " + value.getClass());
+ }
+ }
+
+ public static void writeTinyintType(Type type, BlockBuilder builder, Object value)
+ {
+ if (value instanceof Number) {
+ type.writeLong(builder, ((Number) value).byteValue());
+ }
+ else if (value instanceof JsonStringArrayList) {
+ for (Object obj : (JsonStringArrayList) value) {
+ try {
+ byte byteValue = Byte.parseByte(obj.toString());
+ type.writeLong(builder, byteValue);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid number format in JsonStringArrayList for TinyintType: " + obj, e);
+ }
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type for TinyintType: " + value.getClass());
+ }
+ }
+
+ public static void writeBigintType(Type type, BlockBuilder builder, Object value)
+ {
+ if (value instanceof Long) {
+ type.writeLong(builder, (Long) value);
+ }
+ else if (value instanceof Integer) {
+ type.writeLong(builder, ((Integer) value).longValue());
+ }
+ else if (value instanceof JsonStringArrayList) {
+ for (Object obj : (JsonStringArrayList) value) {
+ try {
+ long longValue = Long.parseLong(obj.toString());
+ type.writeLong(builder, longValue);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid number format in JsonStringArrayList: " + obj, e);
+ }
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type for BigintType: " + value.getClass());
+ }
+ }
+
+ public static void writeIntegerType(Type type, BlockBuilder builder, Object value)
+ {
+ if (value instanceof Integer) {
+ type.writeLong(builder, (Integer) value);
+ }
+ else if (value instanceof JsonStringArrayList) {
+ for (Object obj : (JsonStringArrayList) value) {
+ try {
+ int intValue = Integer.parseInt(obj.toString());
+ type.writeLong(builder, intValue);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid number format in JsonStringArrayList: " + obj, e);
+ }
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type for IntegerType: " + value.getClass());
+ }
+ }
+
+ public static void writeDoubleType(Type type, BlockBuilder builder, Object value)
+ {
+ if (value instanceof Double) {
+ type.writeDouble(builder, (Double) value);
+ }
+ else if (value instanceof Float) {
+ type.writeDouble(builder, ((Float) value).doubleValue());
+ }
+ else if (value instanceof JsonStringArrayList) {
+ for (Object obj : (JsonStringArrayList) value) {
+ try {
+ double doubleValue = Double.parseDouble(obj.toString());
+ type.writeDouble(builder, doubleValue);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid number format in JsonStringArrayList: " + obj, e);
+ }
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type for DoubleType: " + value.getClass());
+ }
+ }
+
+ public static void writeBooleanType(Type type, BlockBuilder builder, Object value)
+ {
+ if (value instanceof Boolean) {
+ type.writeBoolean(builder, (Boolean) value);
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type for BooleanType: " + value.getClass());
+ }
+ }
+
+ public static void writeDecimalType(DecimalType type, BlockBuilder builder, Object value)
+ {
+ if (value instanceof BigDecimal) {
+ BigDecimal decimalValue = (BigDecimal) value;
+ if (type.isShort()) {
+ // write ShortDecimalType
+ long unscaledValue = decimalValue.unscaledValue().longValue();
+ type.writeLong(builder, unscaledValue);
+ }
+ else {
+ // write LongDecimalType
+ Slice slice = Decimals.encodeScaledValue(decimalValue);
+ type.writeSlice(builder, slice);
+ }
+ }
+ else if (value instanceof Long) {
+ // Direct handling for ShortDecimalType using long
+ if (type.isShort()) {
+ type.writeLong(builder, (Long) value);
+ }
+ else {
+ throw new IllegalArgumentException("Long value is not supported for LongDecimalType.");
+ }
+ }
+ else {
+ throw new IllegalArgumentException("Unsupported type for DecimalType: " + value.getClass());
+ }
+ }
+
+ public static void writeArrayType(ArrayType type, BlockBuilder builder, Object value)
+ {
+ Type elementType = type.getElementType();
+ BlockBuilder arrayBuilder = builder.beginBlockEntry();
+ for (Object element : (Iterable>) value) {
+ appendValueToBuilder(elementType, arrayBuilder, element);
+ }
+ builder.closeEntry();
+ }
+
+ public static void writeRowType(RowType type, BlockBuilder builder, Object value)
+ {
+ List