Skip to content

Commit

Permalink
Temperature-aware data partitioning (#348)
Browse files Browse the repository at this point in the history
  • Loading branch information
hennlo authored Oct 18, 2021
1 parent 4633566 commit 3ee4dcf
Show file tree
Hide file tree
Showing 120 changed files with 6,815 additions and 2,272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.entity.CatalogColumn;
import org.polypheny.db.catalog.entity.CatalogColumnPlacement;
import org.polypheny.db.catalog.entity.CatalogPartitionPlacement;
import org.polypheny.db.catalog.entity.CatalogTable;
import org.polypheny.db.catalog.exceptions.UnknownColumnException;
import org.polypheny.db.catalog.exceptions.UnknownDatabaseException;
Expand Down Expand Up @@ -146,7 +147,8 @@ public String getPhysicalColumnName( String tableName, String logicalColumnName

public void updatePhysicalColumnName( long columnId, String updatedName, boolean updatePosition ) {
CatalogColumnPlacement placement = this.catalog.getColumnPlacement( this.storeId, columnId );
this.catalog.updateColumnPlacementPhysicalNames( this.storeId, columnId, placement.physicalTableName, placement.physicalTableName, updatedName, updatePosition );
CatalogPartitionPlacement partitionPlacement = catalog.getPartitionPlacement( this.storeId, catalog.getTable( placement.tableId ).partitionProperty.partitionIds.get( 0 ) );
this.catalog.updateColumnPlacementPhysicalNames( this.storeId, columnId, partitionPlacement.physicalTableName, updatedName, updatePosition );
}


Expand Down Expand Up @@ -193,4 +195,5 @@ public static String incrementNameRevision( String name ) {

return type + id + "r" + rev;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.polypheny.db.catalog.entity.CatalogColumnPlacement;
import org.polypheny.db.catalog.entity.CatalogIndex;
import org.polypheny.db.catalog.entity.CatalogKey;
import org.polypheny.db.catalog.entity.CatalogPartitionPlacement;
import org.polypheny.db.catalog.entity.CatalogTable;
import org.polypheny.db.docker.DockerInstance;
import org.polypheny.db.docker.DockerManager;
Expand Down Expand Up @@ -208,9 +209,9 @@ public void createNewSchema( SchemaPlus rootSchema, String name ) {


@Override
public Table createTableSchema( CatalogTable catalogTable, List<CatalogColumnPlacement> columnPlacementsOnStore ) {
String physicalTableName = currentSchema.getConvention().physicalNameProvider.getPhysicalTableName( catalogTable.id );
return new CassandraTable( this.currentSchema, catalogTable.name, physicalTableName, false );
public Table createTableSchema( CatalogTable catalogTable, List<CatalogColumnPlacement> columnPlacementsOnStore, CatalogPartitionPlacement partitionPlacement ) {
String cassandraphysicalTableName = currentSchema.getConvention().physicalNameProvider.getPhysicalTableName( catalogTable.id );
return new CassandraTable( this.currentSchema, catalogTable.name, cassandraphysicalTableName, false );
}


Expand All @@ -221,7 +222,7 @@ public Schema getCurrentSchema() {


@Override
public void createTable( Context context, CatalogTable catalogTable ) {
public void createTable( Context context, CatalogTable catalogTable, List<Long> partitionIds ) {
// This check is probably not required due to the check below it.
if ( catalogTable.primaryKey == null ) {
throw new UnsupportedOperationException( "Cannot create Cassandra Table without a primary key!" );
Expand All @@ -247,7 +248,7 @@ public void createTable( Context context, CatalogTable catalogTable ) {
CassandraPhysicalNameProvider physicalNameProvider = new CassandraPhysicalNameProvider( this.getAdapterId() );
String physicalTableName = physicalNameProvider.getPhysicalTableName( catalogTable.id );
// List<CatalogColumn> columns = combinedTable.getColumns();
List<CatalogColumnPlacement> columns = catalog.getColumnPlacementsOnAdapter( getAdapterId(), catalogTable.id );
List<CatalogColumnPlacement> columns = catalog.getColumnPlacementsOnAdapterPerTable( getAdapterId(), catalogTable.id );
CatalogColumnPlacement primaryColumnPlacement = columns.stream().filter( c -> c.columnId == primaryKeyColumnLambda ).findFirst().get();
CatalogColumn catalogColumn = catalog.getColumn( primaryColumnPlacement.columnId );

Expand Down Expand Up @@ -275,20 +276,19 @@ public void createTable( Context context, CatalogTable catalogTable ) {
context.getStatement().getTransaction().registerInvolvedAdapter( this );
this.session.execute( createTable.build() );

for ( CatalogColumnPlacement placement : catalog.getColumnPlacementsOnAdapter( getAdapterId(), catalogTable.id ) ) {
for ( CatalogColumnPlacement placement : catalog.getColumnPlacementsOnAdapterPerTable( getAdapterId(), catalogTable.id ) ) {
catalog.updateColumnPlacementPhysicalNames(
getAdapterId(),
placement.columnId,
this.dbKeyspace, // TODO MV: physical schema name
physicalTableName,
physicalNameProvider.generatePhysicalColumnName( placement.columnId ),
true );
}
}


@Override
public void dropTable( Context context, CatalogTable catalogTable ) {
public void dropTable( Context context, CatalogTable catalogTable, List<Long> partitionIds ) {
CassandraPhysicalNameProvider physicalNameProvider = new CassandraPhysicalNameProvider( this.getAdapterId() );
String physicalTableName = physicalNameProvider.getPhysicalTableName( catalogTable.id );
SimpleStatement dropTable = SchemaBuilder.dropTable( this.dbKeyspace, physicalTableName ).build();
Expand All @@ -315,7 +315,6 @@ public void addColumn( Context context, CatalogTable catalogTable, CatalogColumn
getAdapterId(),
catalogColumn.id,
this.dbKeyspace,
physicalTableName,
physicalColumnName,
false );
}
Expand All @@ -325,7 +324,10 @@ public void addColumn( Context context, CatalogTable catalogTable, CatalogColumn
public void dropColumn( Context context, CatalogColumnPlacement columnPlacement ) {
// public void dropColumn( Context context, CatalogCombinedTable catalogTable, CatalogColumn catalogColumn ) {
// CassandraPhysicalNameProvider physicalNameProvider = new CassandraPhysicalNameProvider( context.getStatement().getTransaction().getCatalog(), this.getStoreId() );
String physicalTableName = columnPlacement.physicalTableName;

CatalogPartitionPlacement partitionPlacement = catalog.getPartitionPlacement( getAdapterId(), catalog.getTable( columnPlacement.tableId ).partitionProperty.partitionIds.get( 0 ) );

String physicalTableName = partitionPlacement.physicalTableName;
String physicalColumnName = columnPlacement.physicalColumnName;

SimpleStatement dropColumn = SchemaBuilder.alterTable( this.dbKeyspace, physicalTableName )
Expand All @@ -337,14 +339,14 @@ public void dropColumn( Context context, CatalogColumnPlacement columnPlacement


@Override
public void addIndex( Context context, CatalogIndex catalogIndex ) {
public void addIndex( Context context, CatalogIndex catalogIndex, List<Long> partitionIds ) {
throw new RuntimeException( "Cassandra adapter does not support adding indexes" );
}


@Override
public void dropIndex( Context context, CatalogIndex catalogIndex ) {
throw new RuntimeException( "Cassandra adaper does not support dropping indexes" );
public void dropIndex( Context context, CatalogIndex catalogIndex, List<Long> partitionIds ) {
throw new RuntimeException( "Cassandra adapter does not support dropping indexes" );
}


Expand Down
Loading

0 comments on commit 3ee4dcf

Please sign in to comment.