Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace SessionPool with QuerySessionPool in ttl/test_ttl.py #13173

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 84 additions & 32 deletions ydb/tests/functional/ttl/test_ttl.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,45 @@ def build_table_description(cls):
ydb.TtlSettings().with_date_type_column('expire_at')
)

# @classmethod
# def upsert(cls, session, table):
# session.transaction().execute(
# 'upsert into `{}` (id, expire_at) values'
# '(1, cast("1970-01-01T00:00:00.000000Z" as Timestamp)),'
# '(2, cast("1990-03-01T00:00:00.000000Z" as Timestamp)),'
# '(3, cast("2030-04-15T00:00:00.000000Z" as Timestamp));'.format(table),
# commit_tx=True
# )

@classmethod
def upsert(cls, session, table):
session.transaction().execute(
'upsert into `{}` (id, expire_at) values'
'(1, cast("1970-01-01T00:00:00.000000Z" as Timestamp)),'
'(2, cast("1990-03-01T00:00:00.000000Z" as Timestamp)),'
'(3, cast("2030-04-15T00:00:00.000000Z" as Timestamp));'.format(table),
commit_tx=True
def upsert(cls, pool, table):
pool.execute_with_retries(
f"""upsert into `{table}` (id, expire_at) values
(1, cast("1970-01-01T00:00:00.000000Z" as Timestamp)),
(2, cast("1990-03-01T00:00:00.000000Z" as Timestamp)),
(3, cast("2030-04-15T00:00:00.000000Z" as Timestamp));
"""
)

def _run_test(self):
with ydb.Driver(ydb.DriverConfig(self.endpoint, self.database)) as driver:
with ydb.SessionPool(driver) as pool:
with ydb.QuerySessionPool(driver) as pool:
with pool.checkout() as session:
table = os.path.join(self.database, 'table_with_ttl_column')

session.create_table(table, self.build_table_description())

result_sets = pool.execute_with_retries(
f"""
CREATE TABLE `{table}`
(
id UInt64,
expire_at Timestamp,
PRIMARY KEY (id)
) WITH (
TTL = Interval("PT0S") ON expire_at
);
"""
)

description = session.describe_table(table)
assert_that(description.ttl_settings, not_none())
Expand All @@ -74,24 +96,32 @@ def _run_test(self):
description.ttl_settings.value_since_unix_epoch
), not_none())

self.upsert(session, table)
self.upsert(pool, table)

# conditional erase runs every 60 second
for i in range(60):
time.sleep(4)

content = list(self._read_table(session, table, columns=('id',)))
# content = list(self._read_table(session, table, columns=('id',)))
content = list(self._read_table(result_sets))
if len(content) == 1:
break

content = list(self._read_table(session, table, columns=('id',)))
# content = list(self._read_table(session, table, columns=('id',)))
content = list(self._read_table(result_sets))
assert_that(content, has_length(1))
assert_that(content, has_item(has_properties(id=3)))

# @staticmethod
# def _read_table(session, *args, **kwargs):
# for chunk in iter(session.read_table(*args, **kwargs)):
# for row in chunk.rows:
# yield row

@staticmethod
def _read_table(session, *args, **kwargs):
for chunk in iter(session.read_table(*args, **kwargs)):
for row in chunk.rows:
def _read_table(result_sets, *args, **kwargs):
for result_set in result_sets:
for row in result_set.rows:
yield row


Expand Down Expand Up @@ -125,14 +155,24 @@ def build_table_description(cls):
ydb.TtlSettings().with_value_since_unix_epoch('expire_at', ydb.ColumnUnit.UNIT_SECONDS)
)

# @classmethod
# def upsert(cls, session, table):
# session.transaction().execute(
# 'upsert into `{}` (id, expire_at) values'
# '(1, 0),'
# '(2, 636249600),'
# '(3, 1902441600);'.format(table),
# commit_tx=True
# )

@classmethod
def upsert(cls, session, table):
session.transaction().execute(
'upsert into `{}` (id, expire_at) values'
'(1, 0),'
'(2, 636249600),'
'(3, 1902441600);'.format(table),
commit_tx=True
def upsert(cls, pool, table):
pool.execute_with_retries(
f"""upsert into `{table}` (id, expire_at) values
(1, 0),
(2, 636249600),
(3, 1902441600);
"""
)

def test_case(self):
Expand All @@ -153,30 +193,42 @@ def teardown_class(cls):

def test_case(self):
with ydb.Driver(ydb.DriverConfig(self.endpoint, self.database)) as driver:
with ydb.SessionPool(driver) as pool:
with ydb.QuerySessionPool(driver) as pool:
with pool.checkout() as session:
table = os.path.join(self.database, 'table_with_ttl_column')

session.create_table(
table, ydb.TableDescription()
.with_primary_keys('id')
.with_columns(
ydb.Column('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)),
ydb.Column('expire_at', ydb.OptionalType(ydb.PrimitiveType.Timestamp)),
)
create_result_sets = pool.execute_with_retries(
f"""
CREATE TABLE `{table}`
(
id UInt64,
expire_at Timestamp,
PRIMARY KEY (id)
);
"""
)

description = session.describe_table(table)
assert_that(description.ttl_settings, none())

session.alter_table(table, set_ttl_settings=ydb.TtlSettings().with_date_type_column('expire_at'))

first_alter_result_sets = pool.execute_with_retries(
f"""
ALTER TABLE `{table}` SET (TTL = Interval("PT0S") ON expire_at);
"""
)

description = session.describe_table(table)
assert_that(description.ttl_settings, not_none())
assert_that(description.ttl_settings.date_type_column, not_none())
assert_that(description.ttl_settings.date_type_column, has_properties(column_name='expire_at'))

session.alter_table(table, drop_ttl_settings=True)

second_alter_result_sets = pool.execute_with_retries(
f"""
ALTER TABLE `{table}` RESET (TTL);
"""
)

description = session.describe_table(table)
assert_that(description.ttl_settings, none())
Loading