Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow connector reviewcomment fixes #28

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
d6c9e0d
Arrow CI job
lithinwxd Nov 18, 2024
2b758d8
added verison in property file
lithinwxd Nov 18, 2024
84d4d89
testing after removing hyphen
lithinwxd Nov 18, 2024
153312e
remove path to make sure CI is run during every build
lithinwxd Nov 18, 2024
a597913
changed yaml to run CI on evry pull requests and every update except …
lithinwxd Nov 18, 2024
d466a13
review comment fixes
lithinwxd Nov 20, 2024
ada5038
Use flight descriptor instead of ArrowFlightRequest
elbinpallimalilibm Nov 21, 2024
a5d11c4
Use flight descriptor instead of ArrowFlightRequest
elbinpallimalilibm Nov 21, 2024
d3b8063
Arrow page utils changes
lithinwxd Nov 22, 2024
72e2f07
Use flight descriptor instead of ArrowFlightRequest
elbinpallimalilibm Nov 21, 2024
780f0a3
Arrow page utils changes - fixed checkstyle issues
lithinwxd Nov 22, 2024
a2cb5c5
Review comment fixes - Root allocator and typos
lithinwxd Nov 22, 2024
bc25a2e
Remove config getter from flight client handler
elbinpallimalilibm Nov 22, 2024
40776ec
Merge pull request #30 from sabbasani/arrow-connector-workspace-elbin
elbinpallimalilibm Nov 22, 2024
0b06577
Arrow CI job
lithinwxd Nov 18, 2024
bcc101b
added verison in property file
lithinwxd Nov 18, 2024
9615f4c
testing after removing hyphen
lithinwxd Nov 18, 2024
e8250f0
remove path to make sure CI is run during every build
lithinwxd Nov 18, 2024
064b047
changed yaml to run CI on evry pull requests and every update except …
lithinwxd Nov 18, 2024
99d6a8f
review comment fixes
lithinwxd Nov 20, 2024
45f069a
Arrow page utils changes
lithinwxd Nov 22, 2024
ef7b73a
Arrow page utils changes - fixed checkstyle issues
lithinwxd Nov 22, 2024
4986319
Review comment fixes - Root allocator and typos
lithinwxd Nov 22, 2024
82b64d5
Review comment fixes
lithinwxd Nov 22, 2024
40e641c
Merge remote-tracking branch 'origin/arrow-connector-reviewcommentFix…
lithinwxd Nov 22, 2024
d9db76e
Review comment fixes
lithinwxd Nov 22, 2024
5e99355
Review comment fixes - Changed config
lithinwxd Nov 26, 2024
a4ba35c
Added support for small int tiny int date and timestamp
lithinwxd Nov 26, 2024
7170a7f
Review comment fixes - Dictionary encoding and other tests
lithinwxd Nov 27, 2024
2a1071e
Review comment fixes - Added Tests as per comments
lithinwxd Nov 28, 2024
c05c392
Removed license header unwanted place
lithinwxd Nov 28, 2024
2660322
Removed duplicate CI job
lithinwxd Nov 29, 2024
03f1bcd
Review comment fixes
lithinwxd Nov 29, 2024
fa04dde
Added more testcases
lithinwxd Nov 29, 2024
a32e967
Fixed review comments and added support for other datatypes
lithinwxd Nov 29, 2024
d42cf78
Minor fixes on method argument
lithinwxd Nov 29, 2024
a27652a
Fixed review comments - cosmetic changes and unneccessary class removal
lithinwxd Nov 29, 2024
4fb97d7
Arrow - DictionaryEncoding usecase
lithinwxd Dec 6, 2024
0138aa7
Removed description which looks like generated
lithinwxd Dec 6, 2024
b13eeb3
Added support for dictionary encoding
lithinwxd Dec 6, 2024
e2463e7
Fixed review comments
lithinwxd Dec 9, 2024
d391906
Fixed review comments -added index type
lithinwxd Dec 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions .github/workflows/arrow-flight-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
name: arrow flight tests

on:
pull_request:

env:
CONTINUOUS_INTEGRATION: true
MAVEN_OPTS: "-Xmx1024M -XX:+ExitOnOutOfMemoryError"
MAVEN_INSTALL_OPTS: "-Xmx2G -XX:+ExitOnOutOfMemoryError"
MAVEN_FAST_INSTALL: "-B -V --quiet -T 1C -DskipTests -Dair.check.skip-all --no-transfer-progress -Dmaven.javadoc.skip=true"
MAVEN_TEST: "-B -Dair.check.skip-all -Dmaven.javadoc.skip=true -DLogTestDurationListener.enabled=true --no-transfer-progress --fail-at-end"
RETRY: .github/bin/retry

jobs:
changes:
runs-on: ubuntu-latest
permissions:
pull-requests: read
outputs:
codechange: ${{ steps.filter.outputs.codechange }}
steps:
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
codechange:
- '!presto-docs/**'
test:
runs-on: ubuntu-latest
needs: changes
strategy:
fail-fast: false
matrix:
modules:
- ":presto-base-arrow-flight" # Only run tests for the `presto-base-arrow-flight` module

timeout-minutes: 80
concurrency:
group: ${{ github.workflow }}-test-${{ matrix.modules }}-${{ github.event.pull_request.number }}
cancel-in-progress: true

steps:
# Checkout the code only if there are changes in the relevant files
- uses: actions/checkout@v4
if: needs.changes.outputs.codechange == 'true'
with:
show-progress: false

# Set up Java for the build environment
- uses: actions/setup-java@v2
if: needs.changes.outputs.codechange == 'true'
with:
distribution: 'temurin'
java-version: 8

# Cache Maven dependencies to speed up the build
- name: Cache local Maven repository
if: needs.changes.outputs.codechange == 'true'
id: cache-maven
uses: actions/cache@v2
with:
path: ~/.m2/repository
key: ${{ runner.os }}-maven-2-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-2-

# Resolve Maven dependencies (if cache is not found)
- name: Populate Maven cache
if: steps.cache-maven.outputs.cache-hit != 'true' && needs.changes.outputs.codechange == 'true'
run: ./mvnw de.qaware.maven:go-offline-maven-plugin:resolve-dependencies --no-transfer-progress && .github/bin/download_nodejs

# Install dependencies for the target module
- name: Maven Install
if: needs.changes.outputs.codechange == 'true'
run: |
export MAVEN_OPTS="${MAVEN_INSTALL_OPTS}"
./mvnw install ${MAVEN_FAST_INSTALL} -am -pl ${{ matrix.modules }}

# Run Maven tests for the target module
- name: Maven Tests
if: needs.changes.outputs.codechange == 'true'
run: ./mvnw test ${MAVEN_TEST} -pl ${{ matrix.modules }}
15 changes: 12 additions & 3 deletions presto-base-arrow-flight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
<dep.okhttp.version>4.10.0</dep.okhttp.version>
<arrow.version>17.0.0</arrow.version>
<netty.version>4.1.110.Final</netty.version>
<kotlin.version>1.6.20</kotlin.version>
<error_prone_annotations>2.23.0</error_prone_annotations>
</properties>

<dependencies>
Expand Down Expand Up @@ -232,7 +234,6 @@
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<dependencyManagement>
Expand Down Expand Up @@ -288,14 +289,22 @@
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-common</artifactId>
<version>1.6.20</version>
<version>${kotlin.version}</version>
</dependency>

<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
<version>2.23.0</version>
<version>${error_prone_annotations}</version>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-algorithm</artifactId>
<version>${arrow.version}</version>
<scope>compile</scope>
</dependency>

</dependencies>
</dependencyManagement>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,20 @@
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.NotFoundException;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -130,9 +134,7 @@ protected Type getPrestoTypeFromArrowField(Field field)
}
}

protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, Optional<String> query, String schema, String table);

protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, String schema);
protected abstract FlightDescriptor getFlightDescriptor(Optional<String> query, String schema, String table);

protected abstract String getDataSourceSpecificSchemaName(ArrowFlightConfig config, String schemaName);

Expand All @@ -156,11 +158,11 @@ public List<Field> getColumnsList(String schema, String table, ConnectorSession
try {
String dataSourceSpecificSchemaName = getDataSourceSpecificSchemaName(config, schema);
String dataSourceSpecificTableName = getDataSourceSpecificTableName(config, table);
ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(), Optional.empty(),
FlightDescriptor flightDescriptor = getFlightDescriptor(Optional.empty(),
dataSourceSpecificSchemaName, dataSourceSpecificTableName);

FlightInfo flightInfo = clientHandler.getFlightInfo(request, connectorSession);
List<Field> fields = flightInfo.getSchema().getFields();
Optional<Schema> flightschema = clientHandler.getSchema(flightDescriptor, connectorSession);
List<Field> fields = flightschema.map(Schema::getFields).orElse(Collections.emptyList());
return fields;
}
catch (Exception e) {
Expand All @@ -171,7 +173,7 @@ public List<Field> getColumnsList(String schema, String table, ConnectorSession
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Map<String, ColumnHandle> columns = new HashMap<>();
Map<String, ColumnHandle> columnHandles = new HashMap<>();

String schemaValue = ((ArrowTableHandle) tableHandle).getSchema();
String tableValue = ((ArrowTableHandle) tableHandle).getTable();
Expand All @@ -184,14 +186,21 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
logger.debug("The value of the flight columnName is:- %s", columnName);

Type type = getPrestoTypeFromArrowField(field);
columns.put(columnName, new ArrowColumnHandle(columnName, type));
columnHandles.put(columnName, new ArrowColumnHandle(columnName, type));
}
return columns;
return columnHandles;
}

@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns)
{
if (!(table instanceof ArrowTableHandle)) {
throw new PrestoException(
StandardErrorCode.INVALID_CAST_ARGUMENT,
"Invalid table handle: Expected an instance of ArrowTableHandle but received "
+ table.getClass().getSimpleName() + "");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to append empty string

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

ArrowTableHandle tableHandle = (ArrowTableHandle) table;

List<ArrowColumnHandle> columns = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightInfo;

import java.util.List;
Expand All @@ -37,17 +38,17 @@ public AbstractArrowSplitManager(ArrowFlightClientHandler client)
this.clientHandler = client;
}

protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, ArrowTableLayoutHandle tableLayoutHandle);
protected abstract FlightDescriptor getFlightDescriptor(ArrowFlightConfig config, ArrowTableLayoutHandle tableLayoutHandle);

@Override
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext)
{
ArrowTableLayoutHandle tableLayoutHandle = (ArrowTableLayoutHandle) layout;
ArrowTableHandle tableHandle = tableLayoutHandle.getTableHandle();
ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(),
FlightDescriptor flightDescriptor = getFlightDescriptor(clientHandler.getConfig(),
tableLayoutHandle);

FlightInfo flightInfo = clientHandler.getFlightInfo(request, session);
FlightInfo flightInfo = clientHandler.getFlightInfo(flightDescriptor, session);
List<ArrowSplit> splits = flightInfo.getEndpoints()
.stream()
.map(info -> new ArrowSplit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@ public class ArrowConnector
private final ConnectorPageSourceProvider pageSourceProvider;
private final ConnectorHandleResolver handleResolver;

private final ArrowFlightClientHandler arrowFlightClientHandler;

@Inject
public ArrowConnector(ConnectorMetadata metadata,
ConnectorHandleResolver handleResolver,
ConnectorSplitManager splitManager,
ConnectorPageSourceProvider pageSourceProvider)
ConnectorHandleResolver handleResolver,
ConnectorSplitManager splitManager,
ConnectorPageSourceProvider pageSourceProvider,
ArrowFlightClientHandler arrowFlightClientHandler)
{
this.metadata = requireNonNull(metadata, "Metadata is null");
this.handleResolver = requireNonNull(handleResolver, "Metadata is null");
this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
this.splitManager = requireNonNull(splitManager, "SplitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "PageSinkProvider is null");
this.arrowFlightClientHandler = requireNonNull(arrowFlightClientHandler, "arrow flight handler is null");
}

public Optional<ConnectorHandleResolver> getHandleResolver()
Expand Down Expand Up @@ -74,4 +78,10 @@ public ConnectorPageSourceProvider getPageSourceProvider()
{
return pageSourceProvider;
}

@Override
public void shutdown()
{
arrowFlightClientHandler.closeRootallocator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ArrowConnectorFactory
public ArrowConnectorFactory(String name, Module module, ClassLoader classLoader)
{
checkArgument(!isNullOrEmpty(name), "name is null or empty");
this.name = name;
this.name = requireNonNull(name, "name is null");
this.module = requireNonNull(module, "module is null");
this.classLoader = requireNonNull(classLoader, "classLoader is null");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.plugin.arrow;

import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.memory.RootAllocator;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -27,13 +26,11 @@ public class ArrowFlightClient
{
private final FlightClient flightClient;
private final Optional<InputStream> trustedCertificate;
private RootAllocator allocator;

public ArrowFlightClient(FlightClient flightClient, Optional<InputStream> trustedCertificate, RootAllocator allocator)
public ArrowFlightClient(FlightClient flightClient, Optional<InputStream> trustedCertificate)
{
this.flightClient = requireNonNull(flightClient, "flightClient cannot be null");
this.trustedCertificate = requireNonNull(trustedCertificate, "trustedCertificate is null");
this.allocator = requireNonNull(allocator, "allocator is null");
}

public FlightClient getFlightClient()
Expand All @@ -53,6 +50,5 @@ public void close() throws InterruptedException, IOException
if (trustedCertificate.isPresent()) {
trustedCertificate.get().close();
}
allocator.close();
}
}
Loading
Loading