Skip to content

Commit

Permalink
feat: cursors from clickhouse (#433)
Browse files Browse the repository at this point in the history
* cursors from clickhouse

* logging

* Update k8s/cursor-initializator-clickhouse/cursor-initializator-clickhouse.py

Co-authored-by: Saša Tomić <[email protected]>

* change path/name

* remove stray f

---------

Co-authored-by: Saša Tomić <[email protected]>
Co-authored-by: Nikola Milosavljevic <[email protected]>
  • Loading branch information
3 people authored May 29, 2024
1 parent 0c4fa3e commit dbd482b
Showing 1 changed file with 26 additions and 105 deletions.
131 changes: 26 additions & 105 deletions k8s/cursor-initializator-clickhouse/cursor-initializator-clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import argparse
import json
import logging
import os
import sys

import clickhouse_connect

Expand All @@ -11,19 +9,10 @@

def parse():
parser = argparse.ArgumentParser(description="Script to initialize cursors for vector shards")
parser.add_argument("clickhouse_host", help="Clickhouse host, i.e. localhost")
parser.add_argument("clickhouse_port", help="Clickhouse port, i.e. 8123")
parser.add_argument("url", help="Clickhouse DSN, i.e. https://username@host:port")
parser.add_argument("--password", help="Clickhouse password, i.e. default", default=os.environ.get("PASSWORD"))
parser.add_argument("node_filter", help="Node filter for current vector shard i.e. a.*")
parser.add_argument(
"--table",
help="Table to look in",
action="append",
dest="tables",
default=["ic", "ic_boundary", "certificate_syncer", "certificate_issuer"],
)
parser.add_argument("output_dir", help="Path to which to initialize cursors")

parser.add_argument("--username", help="Clickhouse username, i.e. default", default="default")
return parser.parse_args()


Expand All @@ -33,104 +22,36 @@ def get_logger():
return logging.getLogger(SCRIPT_NAME)


def get_distinct_values_query(table, field, pattern):
return f"""
SELECT DISTINCT {field}
FROM {table}
WHERE {field} LIKE '{pattern}'
"""


def get_last_cursor_for_node(table, ch_filter, field):
return f"""
SELECT temp.{field}, temp.utc, __CURSOR, temp.job
FROM {table}, (
SELECT {field}, max(toDateTime64(timestamp_utc, 9)) as utc, job
FROM {table}
GROUP BY {field}, job
HAVING {field} LIKE '{ch_filter}'
) as temp
WHERE temp.{field} = {table}.{field} AND temp.utc = {table}.timestamp_utc AND temp.job = {table}.job
"""


def main():
logger = get_logger()
args = parse()
logger.info("Initializing clickhouse client with host: %s and port: %s", args.clickhouse_host, args.clickhouse_port)
logger.info("Looking for nodes matching %s in tables %s", args.node_filter, str(args.tables))

client = clickhouse_connect.get_client(host=args.clickhouse_host, port=args.clickhouse_port, username=args.username)

tables = client.command(
"""
SELECT name
FROM system.tables
logger.info("Initializing clickhouse client with URL: %s", args.url)
logger.info("Looking for nodes matching %s", args.node_filter)

client = clickhouse_connect.get_client(interface="https", dsn=args.url, password=args.password)

result = client.query(
f"""
SELECT
`ic_node_id`,
`cursor`
FROM
`ic_boundary_cursor_distributed` FINAL
WHERE
match(`ic_node_id`, '{args.node_filter}')
"""
)

logger.info("Found %s tables", len(tables))
if not all([table in tables for table in args.tables]):
logger.error("Table %s not found", args.table)
sys.exit(1)

logger.info("Table found")

aggregated = {}

for table in args.tables:
logger.info("Looking for nodes in table %s", table)

field = "_HOSTNAME"
if table == "ic":
field = "ic_node"

command = get_last_cursor_for_node(table, args.node_filter, field)
logger.info("Executing command: \n%s", command)
response = client.command(command)

if not isinstance(response, list):
# should happen only if the result is empty
response = []

mapped = [item for line in response for item in line.split("\n")]

for i in range(0, len(mapped), 4):
node = mapped[i]
timestamp = mapped[i + 1]
cursor = mapped[i + 2]
job = mapped[i + 3]

if node not in aggregated:
aggregated[node] = {}
aggregated[node][job] = {
"cursor": cursor,
"timestamp": timestamp,
}

logger.info("Dumping aggregated cursors: \n%s", json.dumps(aggregated, indent=2, sort_keys=True))
created = 0
for node in aggregated:
for job in aggregated[node]:
file_name = node
if len(node.split("-")) == 2:
if job == "host_node_exporter":
file_name = f"{file_name}-host"
elif job == "node_exporter":
file_name = f"{file_name}-guest"

path = os.path.join(args.output_dir, f"{file_name}-{job}-source")
if not os.path.exists(path):
os.mkdir(path)
else:
logger.warning("Directory already exists, maybe this shouldn't be overriden? %s", path)

checkpointer = os.path.join(path, "checkpoint.txt")
with open(checkpointer, "w", encoding="utf-8") as f:
f.write(aggregated[node][job]["cursor"] + "\n")
created += 1

logger.info("Successfully initialized cursors %s on path %s", created, args.output_dir)
for r in result.result_rows:
dir = os.path.join(args.output_dir, f"{r[0]}-node_exporter")

if not os.path.exists(dir):
os.mkdir(dir)

with open(os.path.join(dir, "checkpoint.txt"), "w", encoding="utf-8") as f:
f.write(f"{r[1]}\n")

logger.info("Successfully initialized %d cursors on path %s", len(result.result_rows), args.output_dir)


if __name__ == "__main__":
Expand Down

0 comments on commit dbd482b

Please sign in to comment.