Skip to content

Commit

Permalink
Updated
Browse files Browse the repository at this point in the history
  • Loading branch information
marcusforby committed Feb 20, 2025
1 parent e64612f commit 67c8bf5
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@

from adapta import __version__
from adapta.storage.distributed_object_store.v3.datastax_astra._models import SimilarityFunction, VectorSearchQuery
from adapta.storage.models.filter_expression import Expression, AstraFilterExpression, compile_expression
from adapta.storage.models.filter_expression import (
Expression,
AstraFilterExpression,
compile_expression,
FilterExpressionOperationAstraSuffix,
)
from adapta.utils import chunk_list, rate_limit
from adapta.utils.metaframe import MetaFrame, concat
from adapta.storage.distributed_object_store.v3.datastax_astra._model_mappers import get_mapper
Expand Down Expand Up @@ -329,14 +334,16 @@ def to_frame(
else PythonSchemaEntity(model_class).get_field_names()
)

cassandra_model = get_mapper(
cassandra_model_mapper = get_mapper(
data_model=model_class,
keyspace=keyspace,
table_name=table_name,
primary_keys=primary_keys,
partition_keys=partition_keys,
custom_indexes=custom_indexes,
).map()
)

cassandra_model = cassandra_model_mapper.map()

compiled_filter_values = (
compile_expression(key_column_filter_values, AstraFilterExpression)
Expand All @@ -351,7 +358,28 @@ def to_frame(
)

if allow_partitioning_filtering:
print("test")
astra_suffixes = [
op.value for op in FilterExpressionOperationAstraSuffix.__members__.values() if op.value != ""
]
filtering_keys = {key for fk in compiled_filter_values for key in fk.keys()}
strip_values = {key: "" for key in filtering_keys}

for key in filtering_keys:
for suffix in astra_suffixes:
if key.endswith(suffix):
strip_values[key] = suffix
break

filtering_keys_stripped = [
key[: -len(suffix)] if len(suffix) > 0 else key for key, suffix in strip_values.items()
]

missing_primary_keys = list(set(cassandra_model_mapper.primary_keys) - set(filtering_keys_stripped))

if len(missing_primary_keys) > 0:
raise ValueError(
f"All primary keys must be defined in order to allow partitioning filtering. Missing primary keys: {missing_primary_keys}"
)

if num_threads:
max_threads = (
Expand All @@ -364,7 +392,7 @@ def to_frame(
tpe.map(
lambda args: to_frame(*args),
[
(cassandra_model, key_column_filter, select_columns)
(cassandra_model, key_column_filter, select_columns, allow_partitioning_filtering)
for key_column_filter in compiled_filter_values
],
chunksize=max(int(len(compiled_filter_values) / num_threads), 1),
Expand Down
25 changes: 19 additions & 6 deletions adapta/storage/models/filter_expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@
TCompileResult = TypeVar("TCompileResult") # pylint: disable=invalid-name


class FilterExpressionOperationAstraSuffix(Enum):
"""
An enumeration of filter expression operation astra suffixes.
"""

GT = "__gt"
GE = "__gte"
LT = "__lt"
LE = "__lte"
EQ = ""
IN = "__in"


# pylint: disable=E1101
class FilterExpressionOperation(Enum):
"""
Expand All @@ -26,12 +39,12 @@ class FilterExpressionOperation(Enum):
],
}
OR = {"arrow": pyarrow.compute.Expression.__or__, "astra": lambda left_exprs, right_exprs: left_exprs + right_exprs}
GT = {"arrow": pyarrow.compute.Expression.__gt__, "astra": "__gt"}
GE = {"arrow": pyarrow.compute.Expression.__ge__, "astra": "__gte"}
LT = {"arrow": pyarrow.compute.Expression.__lt__, "astra": "__lt"}
LE = {"arrow": pyarrow.compute.Expression.__le__, "astra": "__lte"}
EQ = {"arrow": pyarrow.compute.Expression.__eq__, "astra": ""}
IN = {"arrow": pyarrow.compute.Expression.isin, "astra": "__in"}
GT = {"arrow": pyarrow.compute.Expression.__gt__, "astra": FilterExpressionOperationAstraSuffix.GT.value}
GE = {"arrow": pyarrow.compute.Expression.__ge__, "astra": FilterExpressionOperationAstraSuffix.GE.value}
LT = {"arrow": pyarrow.compute.Expression.__lt__, "astra": FilterExpressionOperationAstraSuffix.LT.value}
LE = {"arrow": pyarrow.compute.Expression.__le__, "astra": FilterExpressionOperationAstraSuffix.LE.value}
EQ = {"arrow": pyarrow.compute.Expression.__eq__, "astra": FilterExpressionOperationAstraSuffix.EQ.value}
IN = {"arrow": pyarrow.compute.Expression.isin, "astra": FilterExpressionOperationAstraSuffix.IN.value}

def to_string(self):
"""
Expand Down

0 comments on commit 67c8bf5

Please sign in to comment.