Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support arbitrary types in partitioning columns #885

Merged
merged 9 commits into from
Jul 22, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public final void findKeys(@NotNull final Consumer<TLK> locationKeyObserver) {
boolean needToUpdateInternalPartitionValue = true;
try (final DirectoryStream<Path> columnPartitionStream = Files.newDirectoryStream(internalPartition, Files::isDirectory)) {
for (final Path columnPartition : columnPartitionStream) {
partitions.put(columnPartitionKey, columnPartition.getFileName().toFile());
partitions.put(columnPartitionKey, columnPartition.getFileName().toString());
if (needToUpdateInternalPartitionValue) {
// Partition order dictates comparison priority, so we need to insert the internal partition after the column partition.
partitions.put(INTERNAL_PARTITION_KEY, internalPartitionValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.db.tables.ColumnDefinition;
import io.deephaven.db.v2.locations.ColumnLocation;
import io.deephaven.db.v2.locations.TableDataException;
import io.deephaven.db.v2.locations.TableLocationKey;
import io.deephaven.db.v2.sources.ColumnSourceGetDefaults;
import io.deephaven.db.v2.sources.chunk.Attributes.Values;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -78,14 +80,20 @@ static final class AsValues<DATA_TYPE> extends NativeType<DATA_TYPE, Values> imp
static final class Partitioning extends RegionedColumnSourceByte<Values> {

Partitioning() {
super(ColumnRegionByte.createNull(), Supplier::get);
super(ColumnRegionByte.createNull(), Supplier::get /* No need to interpose a deferred region in this case. */);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ColumnRegionByte<Values> makeRegion(@NotNull final ColumnDefinition<?> columnDefinition,
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
return new ColumnRegionByte.Constant<>(unbox((Byte) columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName())));
final TableLocationKey locationKey = columnLocation.getTableLocation().getKey();
final Object partitioningColumnValue = locationKey.getPartitionValue(columnDefinition.getName());
if (partitioningColumnValue != null && !Byte.class.isAssignableFrom(partitioningColumnValue.getClass())) {
throw new TableDataException("Unexpected partitioning column value type for " + columnDefinition.getName()
+ ": " + partitioningColumnValue + " is not a Byte at location " + locationKey);
}
return new ColumnRegionByte.Constant<>(unbox((Byte) partitioningColumnValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.deephaven.db.tables.ColumnDefinition;
import io.deephaven.db.v2.locations.ColumnLocation;
import io.deephaven.db.v2.locations.TableDataException;
import io.deephaven.db.v2.locations.TableLocationKey;
import io.deephaven.db.v2.sources.ColumnSourceGetDefaults;
import io.deephaven.db.v2.sources.chunk.Attributes.Values;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -75,14 +77,20 @@ static final class AsValues<DATA_TYPE> extends NativeType<DATA_TYPE, Values> imp
static final class Partitioning extends RegionedColumnSourceChar<Values> {

Partitioning() {
super(ColumnRegionChar.createNull(), Supplier::get);
super(ColumnRegionChar.createNull(), Supplier::get /* No need to interpose a deferred region in this case. */);
}

@Override
public ColumnRegionChar<Values> makeRegion(@NotNull final ColumnDefinition<?> columnDefinition,
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
return new ColumnRegionChar.Constant<>(unbox((Character) columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName())));
final TableLocationKey locationKey = columnLocation.getTableLocation().getKey();
final Object partitioningColumnValue = locationKey.getPartitionValue(columnDefinition.getName());
if (partitioningColumnValue != null && !Character.class.isAssignableFrom(partitioningColumnValue.getClass())) {
throw new TableDataException("Unexpected partitioning column value type for " + columnDefinition.getName()
+ ": " + partitioningColumnValue + " is not a Character at location " + locationKey);
}
return new ColumnRegionChar.Constant<>(unbox((Character) partitioningColumnValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.db.tables.ColumnDefinition;
import io.deephaven.db.v2.locations.ColumnLocation;
import io.deephaven.db.v2.locations.TableDataException;
import io.deephaven.db.v2.locations.TableLocationKey;
import io.deephaven.db.v2.sources.ColumnSourceGetDefaults;
import io.deephaven.db.v2.sources.chunk.Attributes.Values;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -78,14 +80,20 @@ static final class AsValues<DATA_TYPE> extends NativeType<DATA_TYPE, Values> imp
static final class Partitioning extends RegionedColumnSourceDouble<Values> {

Partitioning() {
super(ColumnRegionDouble.createNull(), Supplier::get);
super(ColumnRegionDouble.createNull(), Supplier::get /* No need to interpose a deferred region in this case. */);
}

@Override
public ColumnRegionDouble<Values> makeRegion(@NotNull final ColumnDefinition<?> columnDefinition,
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
return new ColumnRegionDouble.Constant<>(unbox((Double) columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName())));
final TableLocationKey locationKey = columnLocation.getTableLocation().getKey();
final Object partitioningColumnValue = locationKey.getPartitionValue(columnDefinition.getName());
if (partitioningColumnValue != null && !Double.class.isAssignableFrom(partitioningColumnValue.getClass())) {
throw new TableDataException("Unexpected partitioning column value type for " + columnDefinition.getName()
+ ": " + partitioningColumnValue + " is not a Double at location " + locationKey);
}
return new ColumnRegionDouble.Constant<>(unbox((Double) partitioningColumnValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.db.tables.ColumnDefinition;
import io.deephaven.db.v2.locations.ColumnLocation;
import io.deephaven.db.v2.locations.TableDataException;
import io.deephaven.db.v2.locations.TableLocationKey;
import io.deephaven.db.v2.sources.ColumnSourceGetDefaults;
import io.deephaven.db.v2.sources.chunk.Attributes.Values;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -78,14 +80,20 @@ static final class AsValues<DATA_TYPE> extends NativeType<DATA_TYPE, Values> imp
static final class Partitioning extends RegionedColumnSourceFloat<Values> {

Partitioning() {
super(ColumnRegionFloat.createNull(), Supplier::get);
super(ColumnRegionFloat.createNull(), Supplier::get /* No need to interpose a deferred region in this case. */);
}

@Override
public ColumnRegionFloat<Values> makeRegion(@NotNull final ColumnDefinition<?> columnDefinition,
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
return new ColumnRegionFloat.Constant<>(unbox((Float) columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName())));
final TableLocationKey locationKey = columnLocation.getTableLocation().getKey();
final Object partitioningColumnValue = locationKey.getPartitionValue(columnDefinition.getName());
if (partitioningColumnValue != null && !Float.class.isAssignableFrom(partitioningColumnValue.getClass())) {
throw new TableDataException("Unexpected partitioning column value type for " + columnDefinition.getName()
+ ": " + partitioningColumnValue + " is not a Float at location " + locationKey);
}
return new ColumnRegionFloat.Constant<>(unbox((Float) partitioningColumnValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.db.tables.ColumnDefinition;
import io.deephaven.db.v2.locations.ColumnLocation;
import io.deephaven.db.v2.locations.TableDataException;
import io.deephaven.db.v2.locations.TableLocationKey;
import io.deephaven.db.v2.sources.ColumnSourceGetDefaults;
import io.deephaven.db.v2.sources.chunk.Attributes.Values;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -78,14 +80,20 @@ static final class AsValues<DATA_TYPE> extends NativeType<DATA_TYPE, Values> imp
static final class Partitioning extends RegionedColumnSourceInt<Values> {

Partitioning() {
super(ColumnRegionInt.createNull(), Supplier::get);
super(ColumnRegionInt.createNull(), Supplier::get /* No need to interpose a deferred region in this case. */);
}

@Override
public ColumnRegionInt<Values> makeRegion(@NotNull final ColumnDefinition<?> columnDefinition,
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
return new ColumnRegionInt.Constant<>(unbox((Integer) columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName())));
final TableLocationKey locationKey = columnLocation.getTableLocation().getKey();
final Object partitioningColumnValue = locationKey.getPartitionValue(columnDefinition.getName());
if (partitioningColumnValue != null && !Integer.class.isAssignableFrom(partitioningColumnValue.getClass())) {
throw new TableDataException("Unexpected partitioning column value type for " + columnDefinition.getName()
+ ": " + partitioningColumnValue + " is not a Integer at location " + locationKey);
}
return new ColumnRegionInt.Constant<>(unbox((Integer) partitioningColumnValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.db.tables.ColumnDefinition;
import io.deephaven.db.v2.locations.ColumnLocation;
import io.deephaven.db.v2.locations.TableDataException;
import io.deephaven.db.v2.locations.TableLocationKey;
import io.deephaven.db.v2.sources.ColumnSourceGetDefaults;
import io.deephaven.db.v2.sources.chunk.Attributes.Values;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -78,14 +80,20 @@ static final class AsValues<DATA_TYPE> extends NativeType<DATA_TYPE, Values> imp
static final class Partitioning extends RegionedColumnSourceLong<Values> {

Partitioning() {
super(ColumnRegionLong.createNull(), Supplier::get);
super(ColumnRegionLong.createNull(), Supplier::get /* No need to interpose a deferred region in this case. */);
}

@Override
public ColumnRegionLong<Values> makeRegion(@NotNull final ColumnDefinition<?> columnDefinition,
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
return new ColumnRegionLong.Constant<>(unbox((Long) columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName())));
final TableLocationKey locationKey = columnLocation.getTableLocation().getKey();
final Object partitioningColumnValue = locationKey.getPartitionValue(columnDefinition.getName());
if (partitioningColumnValue != null && !Long.class.isAssignableFrom(partitioningColumnValue.getClass())) {
throw new TableDataException("Unexpected partitioning column value type for " + columnDefinition.getName()
+ ": " + partitioningColumnValue + " is not a Long at location " + locationKey);
}
return new ColumnRegionLong.Constant<>(unbox((Long) partitioningColumnValue));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,19 @@

import io.deephaven.db.tables.ColumnDefinition;
import io.deephaven.db.v2.locations.ColumnLocation;
import io.deephaven.db.v2.locations.TableDataException;
import io.deephaven.db.v2.locations.TableLocationKey;
import io.deephaven.db.v2.sources.ColumnSourceGetDefaults;
import io.deephaven.db.v2.sources.chunk.Attributes.Values;
import io.deephaven.db.v2.sources.chunk.SharedContext;
import io.deephaven.util.codec.ObjectDecoder;
import org.jetbrains.annotations.NotNull;

import javax.annotation.Nullable;

import java.util.function.Supplier;

import static io.deephaven.db.v2.utils.ReadOnlyIndex.NULL_KEY;
import static io.deephaven.util.type.TypeUtils.unbox;

abstract class RegionedColumnSourceObject<DATA_TYPE, ATTR extends Values> extends RegionedColumnSourceArray<DATA_TYPE, ATTR, ColumnRegionObject<DATA_TYPE, ATTR>>
implements ColumnSourceGetDefaults.ForObject<DATA_TYPE> {
Expand Down Expand Up @@ -73,15 +75,21 @@ public FillContext makeFillContext(final int chunkCapacity, @Nullable final Shar
static final class Partitioning<DATA_TYPE> extends RegionedColumnSourceObject<DATA_TYPE, Values> {

Partitioning(@NotNull final Class<DATA_TYPE> dataType) {
super(ColumnRegionObject.createNull(), dataType, null, Supplier::get);
super(ColumnRegionObject.createNull(), dataType, null, Supplier::get /* No need to interpose a deferred region in this case. */);
}

@Override
public ColumnRegionObject<DATA_TYPE, Values> makeRegion(@NotNull final ColumnDefinition<?> columnDefinition,
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
//noinspection Convert2Diamond
return new ColumnRegionObject.Constant<DATA_TYPE, Values>(columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName()));
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
final TableLocationKey locationKey = columnLocation.getTableLocation().getKey();
final Object partitioningColumnValue = locationKey.getPartitionValue(columnDefinition.getName());
if (partitioningColumnValue != null && !getType().isAssignableFrom(partitioningColumnValue.getClass())) {
throw new TableDataException("Unexpected partitioning column value type for " + columnDefinition.getName()
+ ": " + partitioningColumnValue + " is not a " + getType() + " at location " + locationKey);
}
//noinspection unchecked
return new ColumnRegionObject.Constant<>((DATA_TYPE) partitioningColumnValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.db.tables.ColumnDefinition;
import io.deephaven.db.v2.locations.ColumnLocation;
import io.deephaven.db.v2.locations.TableDataException;
import io.deephaven.db.v2.locations.TableLocationKey;
import io.deephaven.db.v2.sources.ColumnSourceGetDefaults;
import io.deephaven.db.v2.sources.chunk.Attributes.Values;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -78,14 +80,20 @@ static final class AsValues<DATA_TYPE> extends NativeType<DATA_TYPE, Values> imp
static final class Partitioning extends RegionedColumnSourceShort<Values> {

Partitioning() {
super(ColumnRegionShort.createNull(), Supplier::get);
super(ColumnRegionShort.createNull(), Supplier::get /* No need to interpose a deferred region in this case. */);
}

@Override
public ColumnRegionShort<Values> makeRegion(@NotNull final ColumnDefinition<?> columnDefinition,
@NotNull final ColumnLocation columnLocation,
final int regionIndex) {
return new ColumnRegionShort.Constant<>(unbox((Short) columnLocation.getTableLocation().getKey().getPartitionValue(columnDefinition.getName())));
final TableLocationKey locationKey = columnLocation.getTableLocation().getKey();
final Object partitioningColumnValue = locationKey.getPartitionValue(columnDefinition.getName());
if (partitioningColumnValue != null && !Short.class.isAssignableFrom(partitioningColumnValue.getClass())) {
throw new TableDataException("Unexpected partitioning column value type for " + columnDefinition.getName()
+ ": " + partitioningColumnValue + " is not a Short at location " + locationKey);
}
return new ColumnRegionShort.Constant<>(unbox((Short) partitioningColumnValue));
}
}
}