Skip to content

Commit

Permalink
Merge 32a3ca8 into 1d8fe0a
Browse files Browse the repository at this point in the history
  • Loading branch information
ggerlakh authored Jan 4, 2025
2 parents 1d8fe0a + 32a3ca8 commit 235e425
Showing 1 changed file with 121 additions and 55 deletions.
176 changes: 121 additions & 55 deletions ydb/tests/functional/ttl/test_ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,49 +49,81 @@ def build_table_description(cls):
ydb.TtlSettings().with_date_type_column('expire_at')
)

# @classmethod
# def upsert(cls, session, table):
# session.transaction().execute(
# 'upsert into `{}` (id, expire_at) values'
# '(1, cast("1970-01-01T00:00:00.000000Z" as Timestamp)),'
# '(2, cast("1990-03-01T00:00:00.000000Z" as Timestamp)),'
# '(3, cast("2030-04-15T00:00:00.000000Z" as Timestamp));'.format(table),
# commit_tx=True
# )

@classmethod
def upsert(cls, session, table):
session.transaction().execute(
'upsert into `{}` (id, expire_at) values'
'(1, cast("1970-01-01T00:00:00.000000Z" as Timestamp)),'
'(2, cast("1990-03-01T00:00:00.000000Z" as Timestamp)),'
'(3, cast("2030-04-15T00:00:00.000000Z" as Timestamp));'.format(table),
commit_tx=True
def upsert(cls, pool, table):
pool.execute_with_retries(
f"""upsert into `{table}` (id, expire_at) values
(1, cast("1970-01-01T00:00:00.000000Z" as Timestamp)),
(2, cast("1990-03-01T00:00:00.000000Z" as Timestamp)),
(3, cast("2030-04-15T00:00:00.000000Z" as Timestamp));
"""
)

def _run_test(self):
with ydb.Driver(ydb.DriverConfig(self.endpoint, self.database)) as driver:
with ydb.SessionPool(driver) as pool:
with pool.checkout() as session:
table = os.path.join(self.database, 'table_with_ttl_column')

session.create_table(table, self.build_table_description())

description = session.describe_table(table)
assert_that(description.ttl_settings, not_none())
assert_that(any_of(
description.ttl_settings.date_type_column,
description.ttl_settings.value_since_unix_epoch
), not_none())

self.upsert(session, table)

# conditional erase runs every 60 second
for i in range(60):
time.sleep(4)

content = list(self._read_table(session, table, columns=('id',)))
if len(content) == 1:
break

content = list(self._read_table(session, table, columns=('id',)))
assert_that(content, has_length(1))
assert_that(content, has_item(has_properties(id=3)))

# with ydb.SessionPool(driver) as pool:
with ydb.QuerySessionPool(driver) as pool:
# with pool.checkout() as session:
table = os.path.join(self.database, 'table_with_ttl_column')

# session.create_table(table, self.build_table_description())

result_sets = pool.execute_with_retries(
f"""
CREATE TABLE `{table}`
(
id UInt64,
expire_at Timestamp,
PRIMARY KEY (id)
) WITH (
TTL = Interval("PT0S") ON expire_at
);
"""
)

description = session.describe_table(table)
assert_that(description.ttl_settings, not_none())
assert_that(any_of(
description.ttl_settings.date_type_column,
description.ttl_settings.value_since_unix_epoch
), not_none())

self.upsert(pool, table)

# conditional erase runs every 60 second
for i in range(60):
time.sleep(4)

# content = list(self._read_table(session, table, columns=('id',)))
content = list(self._read_table(result_sets))
if len(content) == 1:
break

# content = list(self._read_table(session, table, columns=('id',)))
content = list(self._read_table(result_sets))
assert_that(content, has_length(1))
assert_that(content, has_item(has_properties(id=3)))

# @staticmethod
# def _read_table(session, *args, **kwargs):
# for chunk in iter(session.read_table(*args, **kwargs)):
# for row in chunk.rows:
# yield row

@staticmethod
def _read_table(session, *args, **kwargs):
for chunk in iter(session.read_table(*args, **kwargs)):
for row in chunk.rows:
def _read_table(result_sets, *args, **kwargs):
for result_set in result_sets:
for row in result_set.rows:
yield row


Expand Down Expand Up @@ -125,14 +157,24 @@ def build_table_description(cls):
ydb.TtlSettings().with_value_since_unix_epoch('expire_at', ydb.ColumnUnit.UNIT_SECONDS)
)

# @classmethod
# def upsert(cls, session, table):
# session.transaction().execute(
# 'upsert into `{}` (id, expire_at) values'
# '(1, 0),'
# '(2, 636249600),'
# '(3, 1902441600);'.format(table),
# commit_tx=True
# )

@classmethod
def upsert(cls, session, table):
session.transaction().execute(
'upsert into `{}` (id, expire_at) values'
'(1, 0),'
'(2, 636249600),'
'(3, 1902441600);'.format(table),
commit_tx=True
def upsert(cls, pool, table):
pool.execute_with_retries(
f"""upsert into `{table}` (id, expire_at) values
(1, 0),
(2, 636249600),
(3, 1902441600);
"""
)

def test_case(self):
Expand All @@ -153,30 +195,54 @@ def teardown_class(cls):

def test_case(self):
with ydb.Driver(ydb.DriverConfig(self.endpoint, self.database)) as driver:
with ydb.SessionPool(driver) as pool:
with pool.checkout() as session:
# with ydb.SessionPool(driver) as pool:
with ydb.QuerySessionPool(driver) as pool:
# with pool.checkout() as session:
table = os.path.join(self.database, 'table_with_ttl_column')

session.create_table(
table, ydb.TableDescription()
.with_primary_keys('id')
.with_columns(
ydb.Column('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)),
ydb.Column('expire_at', ydb.OptionalType(ydb.PrimitiveType.Timestamp)),
)
# session.create_table(
# table, ydb.TableDescription()
# .with_primary_keys('id')
# .with_columns(
# ydb.Column('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)),
# ydb.Column('expire_at', ydb.OptionalType(ydb.PrimitiveType.Timestamp)),
# )
# )

create_result_sets = pool.execute_with_retries(
f"""
CREATE TABLE `{table}`
(
id UInt64,
expire_at Timestamp,
PRIMARY KEY (id)
);
"""
)

description = session.describe_table(table)
assert_that(description.ttl_settings, none())

session.alter_table(table, set_ttl_settings=ydb.TtlSettings().with_date_type_column('expire_at'))
# session.alter_table(table, set_ttl_settings=ydb.TtlSettings().with_date_type_column('expire_at'))

first_alter_result_sets = pool.execute_with_retries(
f"""
ALTER TABLE `{table}` SET (TTL = Interval("PT0S") ON expire_at);
"""
)

description = session.describe_table(table)
assert_that(description.ttl_settings, not_none())
assert_that(description.ttl_settings.date_type_column, not_none())
assert_that(description.ttl_settings.date_type_column, has_properties(column_name='expire_at'))

session.alter_table(table, drop_ttl_settings=True)
# session.alter_table(table, drop_ttl_settings=True)

second_alter_result_sets = pool.execute_with_retries(
f"""
ALTER TABLE `{table}` RESET (TTL);
"""
)

description = session.describe_table(table)
assert_that(description.ttl_settings, none())

0 comments on commit 235e425

Please sign in to comment.