From dbd482b6fdd8c69573114630313c2b378c522763 Mon Sep 17 00:00:00 2001 From: Igor Novgorodov Date: Wed, 29 May 2024 15:30:46 +0200 Subject: [PATCH] feat: cursors from clickhouse (#433) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cursors from clickhouse * logging * Update k8s/cursor-initializator-clickhouse/cursor-initializator-clickhouse.py Co-authored-by: Saša Tomić * change path/name * remove stray f --------- Co-authored-by: Saša Tomić Co-authored-by: Nikola Milosavljevic <73236646+NikolaMilosa@users.noreply.github.com> --- .../cursor-initializator-clickhouse.py | 131 ++++-------------- 1 file changed, 26 insertions(+), 105 deletions(-) diff --git a/k8s/cursor-initializator-clickhouse/cursor-initializator-clickhouse.py b/k8s/cursor-initializator-clickhouse/cursor-initializator-clickhouse.py index 3a09f9da5..5b5cdf4a3 100644 --- a/k8s/cursor-initializator-clickhouse/cursor-initializator-clickhouse.py +++ b/k8s/cursor-initializator-clickhouse/cursor-initializator-clickhouse.py @@ -1,8 +1,6 @@ import argparse -import json import logging import os -import sys import clickhouse_connect @@ -11,19 +9,10 @@ def parse(): parser = argparse.ArgumentParser(description="Script to initialize cursors for vector shards") - parser.add_argument("clickhouse_host", help="Clickhouse host, i.e. localhost") - parser.add_argument("clickhouse_port", help="Clickhouse port, i.e. 8123") + parser.add_argument("url", help="Clickhouse DSN, i.e. https://username@host:port") + parser.add_argument("--password", help="Clickhouse password, i.e. default", default=os.environ.get("PASSWORD")) parser.add_argument("node_filter", help="Node filter for current vector shard i.e. a.*") - parser.add_argument( - "--table", - help="Table to look in", - action="append", - dest="tables", - default=["ic", "ic_boundary", "certificate_syncer", "certificate_issuer"], - ) parser.add_argument("output_dir", help="Path to which to initialize cursors") - - parser.add_argument("--username", help="Clickhouse username, i.e. default", default="default") return parser.parse_args() @@ -33,104 +22,36 @@ def get_logger(): return logging.getLogger(SCRIPT_NAME) -def get_distinct_values_query(table, field, pattern): - return f""" - SELECT DISTINCT {field} - FROM {table} - WHERE {field} LIKE '{pattern}' - """ - - -def get_last_cursor_for_node(table, ch_filter, field): - return f""" - SELECT temp.{field}, temp.utc, __CURSOR, temp.job - FROM {table}, ( - SELECT {field}, max(toDateTime64(timestamp_utc, 9)) as utc, job - FROM {table} - GROUP BY {field}, job - HAVING {field} LIKE '{ch_filter}' - ) as temp - WHERE temp.{field} = {table}.{field} AND temp.utc = {table}.timestamp_utc AND temp.job = {table}.job -""" - - def main(): logger = get_logger() args = parse() - logger.info("Initializing clickhouse client with host: %s and port: %s", args.clickhouse_host, args.clickhouse_port) - logger.info("Looking for nodes matching %s in tables %s", args.node_filter, str(args.tables)) - - client = clickhouse_connect.get_client(host=args.clickhouse_host, port=args.clickhouse_port, username=args.username) - - tables = client.command( - """ - SELECT name - FROM system.tables + logger.info("Initializing clickhouse client with URL: %s", args.url) + logger.info("Looking for nodes matching %s", args.node_filter) + + client = clickhouse_connect.get_client(interface="https", dsn=args.url, password=args.password) + + result = client.query( + f""" + SELECT + `ic_node_id`, + `cursor` + FROM + `ic_boundary_cursor_distributed` FINAL + WHERE + match(`ic_node_id`, '{args.node_filter}') """ ) - logger.info("Found %s tables", len(tables)) - if not all([table in tables for table in args.tables]): - logger.error("Table %s not found", args.table) - sys.exit(1) - - logger.info("Table found") - - aggregated = {} - - for table in args.tables: - logger.info("Looking for nodes in table %s", table) - - field = "_HOSTNAME" - if table == "ic": - field = "ic_node" - - command = get_last_cursor_for_node(table, args.node_filter, field) - logger.info("Executing command: \n%s", command) - response = client.command(command) - - if not isinstance(response, list): - # should happen only if the result is empty - response = [] - - mapped = [item for line in response for item in line.split("\n")] - - for i in range(0, len(mapped), 4): - node = mapped[i] - timestamp = mapped[i + 1] - cursor = mapped[i + 2] - job = mapped[i + 3] - - if node not in aggregated: - aggregated[node] = {} - aggregated[node][job] = { - "cursor": cursor, - "timestamp": timestamp, - } - - logger.info("Dumping aggregated cursors: \n%s", json.dumps(aggregated, indent=2, sort_keys=True)) - created = 0 - for node in aggregated: - for job in aggregated[node]: - file_name = node - if len(node.split("-")) == 2: - if job == "host_node_exporter": - file_name = f"{file_name}-host" - elif job == "node_exporter": - file_name = f"{file_name}-guest" - - path = os.path.join(args.output_dir, f"{file_name}-{job}-source") - if not os.path.exists(path): - os.mkdir(path) - else: - logger.warning("Directory already exists, maybe this shouldn't be overriden? %s", path) - - checkpointer = os.path.join(path, "checkpoint.txt") - with open(checkpointer, "w", encoding="utf-8") as f: - f.write(aggregated[node][job]["cursor"] + "\n") - created += 1 - - logger.info("Successfully initialized cursors %s on path %s", created, args.output_dir) + for r in result.result_rows: + dir = os.path.join(args.output_dir, f"{r[0]}-node_exporter") + + if not os.path.exists(dir): + os.mkdir(dir) + + with open(os.path.join(dir, "checkpoint.txt"), "w", encoding="utf-8") as f: + f.write(f"{r[1]}\n") + + logger.info("Successfully initialized %d cursors on path %s", len(result.result_rows), args.output_dir) if __name__ == "__main__":