Skip to content

Commit

Permalink
chore(weave): table row response includes absolute row indices
Browse files Browse the repository at this point in the history
  • Loading branch information
bcsherma committed Feb 6, 2025
1 parent 49dc299 commit 8ac6e07
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 12 deletions.
4 changes: 3 additions & 1 deletion weave/trace_server/clickhouse_trace_server_batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,9 @@ def _table_query_stream(
res = self._query_stream(query, parameters=pb.get_params())

for row in res:
yield tsi.TableRowSchema(digest=row[0], val=json.loads(row[1]))
yield tsi.TableRowSchema(
digest=row[0], val=json.loads(row[1]), original_index=row[2]
)

def table_query_stats(self, req: tsi.TableQueryStatsReq) -> tsi.TableQueryStatsRes:
parameters: dict[str, Any] = {
Expand Down
24 changes: 13 additions & 11 deletions weave/trace_server/table_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# Constants for table and column names
TABLE_ROWS_ALIAS = "tr"
VAL_DUMP_COLUMN_NAME = "val_dump"
ROW_ORDER_COLUMN_NAME = "row_order"
ROW_ORDER_COLUMN_NAME = "original_index"


def make_natural_sort_table_query(
Expand Down Expand Up @@ -36,21 +36,22 @@ def make_natural_sort_table_query(
row_digests_selection = f"arraySlice({row_digests_selection}, 1 + {{{pb.add_param(offset)}: Int64}}, {{{pb.add_param(limit)}: Int64}})"

query = f"""
SELECT DISTINCT tr.digest, tr.val_dump, t.row_order
SELECT DISTINCT tr.digest, tr.val_dump, t.original_index + {{{pb.add_param(offset or 0)}: Int64}} - 1 as original_index
FROM table_rows tr
INNER JOIN (
SELECT row_digest, row_number() OVER () AS row_order
SELECT row_digest, original_index
FROM (
SELECT {row_digests_selection} as row_digests
SELECT {row_digests_selection} as row_digests,
arrayEnumerate(row_digests) as original_indices
FROM tables
WHERE project_id = {{{project_id_name}: String}}
AND digest = {{{digest_name}: String}}
LIMIT 1
)
ARRAY JOIN row_digests AS row_digest
ARRAY JOIN row_digests AS row_digest, original_indices AS original_index
) AS t ON tr.digest = t.row_digest
WHERE tr.project_id = {{{project_id_name}: String}}
ORDER BY row_order ASC
ORDER BY original_index ASC
"""

return query
Expand Down Expand Up @@ -88,20 +89,21 @@ def make_standard_table_query(
)

query = f"""
SELECT tr.digest, tr.val_dump, tr.row_order FROM
SELECT tr.digest, tr.val_dump, tr.original_index FROM
(
SELECT DISTINCT tr.digest, tr.val_dump, t.row_order
SELECT DISTINCT tr.digest, tr.val_dump, t.original_index
FROM table_rows tr
INNER JOIN (
SELECT row_digest, row_number() OVER () AS row_order
SELECT row_digest, original_index - 1 as original_index
FROM (
SELECT row_digests
SELECT row_digests,
arrayEnumerate(row_digests) as original_indices
FROM tables
WHERE project_id = {{{project_id_name}: String}}
AND digest = {{{digest_name}: String}}
LIMIT 1
)
ARRAY JOIN row_digests AS row_digest
ARRAY JOIN row_digests AS row_digest, original_indices AS original_index
) AS t ON tr.digest = t.row_digest
WHERE tr.project_id = {{{project_id_name}: String}}
{sql_safe_filter_clause}
Expand Down
1 change: 1 addition & 0 deletions weave/trace_server/trace_server_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ class TableUpdateRes(BaseModel):
class TableRowSchema(BaseModel):
digest: str
val: Any
original_index: Optional[int] = None


class TableCreateRes(BaseModel):
Expand Down

0 comments on commit 8ac6e07

Please sign in to comment.