Skip to content

Commit

Permalink
Update raptor-legacy to Jdbi 3
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Oct 15, 2021
1 parent a398b4c commit f8e6a6f
Show file tree
Hide file tree
Showing 63 changed files with 468 additions and 696 deletions.
7 changes: 6 additions & 1 deletion plugin/trino-raptor-legacy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,12 @@

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<artifactId>jdbi3-core</artifactId>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-sqlobject</artifactId>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import io.trino.spi.connector.SystemTable;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;
import org.skife.jdbi.v2.IDBI;
import org.jdbi.v3.core.Jdbi;

import javax.annotation.PostConstruct;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -92,7 +92,7 @@ public RaptorConnector(
RaptorTableProperties tableProperties,
Set<SystemTable> systemTables,
ConnectorAccessControl accessControl,
@ForMetadata IDBI dbi)
@ForMetadata Jdbi dbi)
{
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.Type;
import org.skife.jdbi.v2.IDBI;
import org.jdbi.v3.core.Jdbi;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -134,19 +134,19 @@ public class RaptorMetadata
private static final JsonCodec<ConnectorViewDefinition> VIEW_CODEC =
new JsonCodecFactory(new ObjectMapperProvider()).jsonCodec(ConnectorViewDefinition.class);

private final IDBI dbi;
private final Jdbi dbi;
private final MetadataDao dao;
private final ShardManager shardManager;
private final LongConsumer beginDeleteForTableId;

private final AtomicReference<Long> currentTransactionId = new AtomicReference<>();

public RaptorMetadata(IDBI dbi, ShardManager shardManager)
public RaptorMetadata(Jdbi dbi, ShardManager shardManager)
{
this(dbi, shardManager, tableId -> {});
}

public RaptorMetadata(IDBI dbi, ShardManager shardManager, LongConsumer beginDeleteForTableId)
public RaptorMetadata(Jdbi dbi, ShardManager shardManager, LongConsumer beginDeleteForTableId)
{
this.dbi = requireNonNull(dbi, "dbi is null");
this.dao = onDemandDao(dbi, MetadataDao.class);
Expand Down Expand Up @@ -466,7 +466,7 @@ public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle
public void renameTable(ConnectorSession session, ConnectorTableHandle tableHandle, SchemaTableName newTableName)
{
RaptorTableHandle table = (RaptorTableHandle) tableHandle;
runTransaction(dbi, (handle, status) -> {
runTransaction(dbi, handle -> {
MetadataDao dao = handle.attach(MetadataDao.class);
dao.renameTable(table.getTableId(), newTableName.getSchemaName(), newTableName.getTableName());
return null;
Expand Down Expand Up @@ -660,7 +660,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession sess
long transactionId = table.getTransactionId();
long updateTime = session.getStart().toEpochMilli();

long newTableId = runTransaction(dbi, (dbiHandle, status) -> {
long newTableId = runTransaction(dbi, dbiHandle -> {
MetadataDao dao = dbiHandle.attach(MetadataDao.class);

Long distributionId = table.getDistributionId().isPresent() ? table.getDistributionId().getAsLong() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import io.trino.plugin.raptor.legacy.metadata.ForMetadata;
import io.trino.plugin.raptor.legacy.metadata.ShardManager;
import org.skife.jdbi.v2.IDBI;
import org.jdbi.v3.core.Jdbi;

import javax.inject.Inject;

Expand All @@ -25,11 +25,11 @@

public class RaptorMetadataFactory
{
private final IDBI dbi;
private final Jdbi dbi;
private final ShardManager shardManager;

@Inject
public RaptorMetadataFactory(@ForMetadata IDBI dbi, ShardManager shardManager)
public RaptorMetadataFactory(@ForMetadata Jdbi dbi, ShardManager shardManager)
{
this.dbi = requireNonNull(dbi, "dbi is null");
this.shardManager = requireNonNull(shardManager, "shardManager is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import io.trino.spi.NodeManager;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.type.TypeManager;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.ConnectionFactory;
import org.jdbi.v3.core.ConnectionFactory;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.sqlobject.SqlObjectPlugin;

import javax.inject.Singleton;

Expand Down Expand Up @@ -61,11 +61,12 @@ public void configure(Binder binder)
@ForMetadata
@Singleton
@Provides
public IDBI createDBI(@ForMetadata ConnectionFactory connectionFactory, TypeManager typeManager)
public static Jdbi createJdbi(@ForMetadata ConnectionFactory connectionFactory, TypeManager typeManager)
{
DBI dbi = new DBI(connectionFactory);
dbi.registerMapper(new TableColumn.Mapper(typeManager));
dbi.registerMapper(new Distribution.Mapper(typeManager));
Jdbi dbi = Jdbi.create(connectionFactory)
.installPlugin(new SqlObjectPlugin())
.registerRowMapper(new TableColumn.Mapper(typeManager))
.registerRowMapper(new Distribution.Mapper(typeManager));
createTablesWithRetry(dbi);
return dbi;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.predicate.TupleDomain;
import org.skife.jdbi.v2.ResultIterator;
import org.jdbi.v3.core.result.ResultIterator;

import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
*/
package io.trino.plugin.raptor.legacy.metadata;

import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Objects;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -70,17 +65,4 @@ public String toString()
{
return bucketNumber + ":" + nodeIdentifier;
}

public static class Mapper
implements ResultSetMapper<BucketNode>
{
@Override
public BucketNode map(int index, ResultSet rs, StatementContext context)
throws SQLException
{
return new BucketNode(
rs.getInt("bucket_number"),
rs.getString("node_identifier"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,8 @@
*/
package io.trino.plugin.raptor.legacy.metadata;

import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.ResultSetMapper;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.OptionalInt;

import static io.trino.plugin.raptor.legacy.util.DatabaseUtil.getOptionalInt;
import static java.util.Objects.requireNonNull;

public class ColumnMetadataRow
Expand Down Expand Up @@ -64,20 +58,4 @@ public OptionalInt getBucketOrdinalPosition()
{
return bucketOrdinalPosition;
}

public static class Mapper
implements ResultSetMapper<ColumnMetadataRow>
{
@Override
public ColumnMetadataRow map(int index, ResultSet rs, StatementContext context)
throws SQLException
{
return new ColumnMetadataRow(
rs.getLong("table_id"),
rs.getLong("column_id"),
rs.getString("column_name"),
getOptionalInt(rs, "sort_ordinal_position"),
getOptionalInt(rs, "bucket_ordinal_position"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.testing.StaticServiceSelector;
import io.trino.plugin.raptor.legacy.util.DaoSupplier;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.ConnectionFactory;
import org.jdbi.v3.core.ConnectionFactory;
import org.jdbi.v3.core.Jdbi;

import javax.inject.Inject;
import javax.inject.Provider;
Expand Down Expand Up @@ -113,7 +113,7 @@ public void setInjector(Injector injector)
public DaoSupplier<T> get()
{
checkState(injector != null, "injector was not set");
IDBI dbi = injector.getInstance(Key.get(IDBI.class, ForMetadata.class));
Jdbi dbi = injector.getInstance(Key.get(Jdbi.class, ForMetadata.class));
return new DaoSupplier<>(dbi, type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import org.h2.jdbc.JdbcConnection;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.tweak.HandleConsumer;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.jdbi.v3.core.Handle;
import org.jdbi.v3.core.HandleConsumer;
import org.jdbi.v3.core.Jdbi;
import org.jdbi.v3.core.JdbiException;
import org.jdbi.v3.core.result.ResultIterator;

import javax.inject.Inject;

Expand Down Expand Up @@ -103,7 +102,7 @@ public class DatabaseShardManager
private static final String INDEX_TABLE_PREFIX = "x_shards_t";
private static final int MAX_ADD_COLUMN_ATTEMPTS = 100;

private final IDBI dbi;
private final Jdbi dbi;
private final DaoSupplier<ShardDao> shardDaoSupplier;
private final ShardDao dao;
private final NodeSupplier nodeSupplier;
Expand All @@ -122,7 +121,7 @@ public class DatabaseShardManager

@Inject
public DatabaseShardManager(
@ForMetadata IDBI dbi,
@ForMetadata Jdbi dbi,
DaoSupplier<ShardDao> shardDaoSupplier,
NodeSupplier nodeSupplier,
AssignmentLimiter assignmentLimiter,
Expand All @@ -133,7 +132,7 @@ public DatabaseShardManager(
}

public DatabaseShardManager(
IDBI dbi,
Jdbi dbi,
DaoSupplier<ShardDao> shardDaoSupplier,
NodeSupplier nodeSupplier,
AssignmentLimiter assignmentLimiter,
Expand Down Expand Up @@ -210,15 +209,15 @@ public void createTable(long tableId, List<ColumnInfo> columns, boolean bucketed
try (Handle handle = dbi.open()) {
handle.execute(sql);
}
catch (DBIException e) {
catch (JdbiException e) {
throw metadataError(e);
}
}

@Override
public void dropTable(long tableId)
{
runTransaction(dbi, (handle, status) -> {
runTransaction(dbi, handle -> {
lockTable(handle, tableId);

ShardDao shardDao = shardDaoSupplier.attach(handle);
Expand All @@ -239,7 +238,7 @@ public void dropTable(long tableId)
try (Handle handle = dbi.open()) {
handle.execute("DROP TABLE " + shardIndexTable(tableId));
}
catch (DBIException e) {
catch (JdbiException e) {
log.warn(e, "Failed to drop index table %s", shardIndexTable(tableId));
}
}
Expand All @@ -264,7 +263,7 @@ public void addColumn(long tableId, ColumnInfo column)
try (Handle handle = dbi.open()) {
handle.execute(sql);
}
catch (DBIException e) {
catch (JdbiException e) {
if (isSyntaxOrAccessError(e)) {
// exit when column already exists
return;
Expand Down Expand Up @@ -344,12 +343,12 @@ public void replaceShardUuids(long transactionId, long tableId, List<ColumnInfo>
});
}

private void runCommit(long transactionId, HandleConsumer callback)
private void runCommit(long transactionId, HandleConsumer<SQLException> callback)
{
int maxAttempts = 5;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
dbi.useTransaction((handle, status) -> {
dbi.useTransaction(handle -> {
ShardDao dao = shardDaoSupplier.attach(handle);
if (commitTransaction(dao, transactionId)) {
callback.useHandle(handle);
Expand All @@ -358,7 +357,7 @@ private void runCommit(long transactionId, HandleConsumer callback)
});
return;
}
catch (DBIException e) {
catch (JdbiException | SQLException e) {
if (isTransactionCacheFullError(e)) {
throw metadataError(e, "Transaction too large");
}
Expand Down Expand Up @@ -558,7 +557,7 @@ public void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIden

int nodeId = getOrCreateNodeId(nodeIdentifier);

runTransaction(dbi, (handle, status) -> {
runTransaction(dbi, handle -> {
ShardDao dao = shardDaoSupplier.attach(handle);

Set<Integer> oldAssignments = new HashSet<>(fetchLockedNodeIds(handle, tableId, shardUuid));
Expand Down Expand Up @@ -821,8 +820,8 @@ private static Collection<Integer> fetchLockedNodeIds(Handle handle, long tableI

byte[] nodeArray = handle.createQuery(sql)
.bind(0, uuidToBytes(shardUuid))
.map(ByteArrayMapper.FIRST)
.first();
.mapTo(byte[].class)
.one();

return intArrayFromBytes(nodeArray);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeId;
import io.trino.spi.type.TypeManager;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

import javax.inject.Inject;

Expand Down Expand Up @@ -71,7 +71,7 @@ public int getBucketCount()
}

public static class Mapper
implements ResultSetMapper<Distribution>
implements RowMapper<Distribution>
{
private final TypeManager typeManager;

Expand All @@ -82,7 +82,7 @@ public Mapper(TypeManager typeManager)
}

@Override
public Distribution map(int index, ResultSet rs, StatementContext ctx)
public Distribution map(ResultSet rs, StatementContext ctx)
throws SQLException
{
List<Type> types = LIST_CODEC.fromJson(rs.getString("column_types")).stream()
Expand Down
Loading

0 comments on commit f8e6a6f

Please sign in to comment.