Skip to content

Commit

Permalink
Add option to specify Time To Live (TTL) for Astra writes (#500)
Browse files Browse the repository at this point in the history
  • Loading branch information
JanusAsmussen authored Jan 14, 2025
1 parent 4ee53ff commit 90a4a0e
Showing 1 changed file with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ def upsert_entity(
keyspace: Optional[str] = None,
table_name: Optional[str] = None,
client_rate_limit: str = "1000 per second",
time_to_live: Optional[int] = None,
) -> None:
"""
Inserts a record into existing table.
Expand All @@ -419,6 +420,7 @@ def upsert_entity(
:param: table_name: Table to insert entity into.
:param: keyspace: Optional keyspace name, if not provided in the client constructor.
:param: client_rate_limit: the limit string to parse (eg: "1 per hour"), default: "1000 per second"
:param: time_to_live: Time to live in seconds for the inserted entity.
"""

@on_exception(
Expand All @@ -429,15 +431,16 @@ def upsert_entity(
raise_on_giveup=True,
)
@rate_limit(limit=client_rate_limit)
def _save_entity(model_object: Model):
model_object.save()
def _save_entity(model_object: Model, ttl: int):
model_object.ttl(ttl).save()

cassandra_model = get_mapper(
data_model=type(entity),
table_name=table_name,
keyspace=keyspace,
).map()
_save_entity(cassandra_model(**asdict(entity)))

_save_entity(cassandra_model(**asdict(entity)), ttl=time_to_live)

def upsert_batch(
self,
Expand All @@ -447,6 +450,7 @@ def upsert_batch(
table_name: Optional[str] = None,
batch_size=1000,
client_rate_limit: str = "1000 per second",
time_to_live: Optional[int] = None,
) -> None:
"""
Inserts a batch into existing table.
Expand All @@ -457,6 +461,7 @@ def upsert_batch(
:param: table_name: Table to insert entity into.
:param: batch_size: elements per batch to upsert.
:param: client_rate_limit: the limit string to parse (eg: "1 per hour"), default: "1000 per second"
:param: time_to_live: Time to live in seconds for the inserted entities.
"""

@on_exception(
Expand All @@ -467,10 +472,10 @@ def upsert_batch(
raise_on_giveup=True,
)
@rate_limit(limit=client_rate_limit)
def _save_entities(model_class: Type[Model], values: List[dict]):
def _save_entities(model_class: Type[Model], values: List[dict], ttl: int):
with BatchQuery(batch_type=BatchType.UNLOGGED) as upsert_batch:
for value in values:
model_class.batch(upsert_batch).create(**value)
model_class.batch(upsert_batch).ttl(ttl).create(**value)

cassandra_model = get_mapper(
data_model=entity_type,
Expand All @@ -482,6 +487,7 @@ def _save_entities(model_class: Type[Model], values: List[dict]):
_save_entities(
model_class=cassandra_model,
values=chunk,
ttl=time_to_live,
)

def ann_search(
Expand Down

0 comments on commit 90a4a0e

Please sign in to comment.