From 90b22b34e99a4458ed097eff591c41e868187d9e Mon Sep 17 00:00:00 2001 From: foroogh shahab Date: Sat, 3 Aug 2024 09:14:03 +0200 Subject: [PATCH] Kafkamanager added. --- .../libs/csle-cli/src/csle_cli/cli.py | 102 +++++++++++++++++- 1 file changed, 99 insertions(+), 3 deletions(-) diff --git a/simulation-system/libs/csle-cli/src/csle_cli/cli.py b/simulation-system/libs/csle-cli/src/csle_cli/cli.py index 1f8685767..a4a3698c5 100755 --- a/simulation-system/libs/csle-cli/src/csle_cli/cli.py +++ b/simulation-system/libs/csle-cli/src/csle_cli/cli.py @@ -685,7 +685,7 @@ def stop_shell_complete(ctx, param, incomplete) -> List[str]: @click.command("stop", help="prometheus | node_exporter | cadvisor | grafana | flask | container-name | " "emulation-name | statsmanager | emulation_executions | pgadmin | all | nginx | postgresql " "| docker | clustermanager | hostmanagers | hostmanager | clientmanager | snortmanagers " - "| snortmanager | elkmanager | trafficmanagers | trafficmanager") + "| snortmanager | elkmanager | trafficmanagers | trafficmanager | kafkamanager") def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str = "") -> None: """ Stops an entity @@ -745,6 +745,8 @@ def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str = stop_traffic_managers(ip=ip, emulation=name, ip_first_octet=id) elif entity == "trafficmanager": stop_traffic_manager(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id) + elif entity == "kafkamanager": + stop_kafka_manager(ip=ip, emulation=name, ip_first_octet=id) else: container_stopped = False for node in config.cluster_config.cluster_nodes: @@ -1063,6 +1065,30 @@ def stop_elk_manager(ip: str, emulation: str, ip_first_octet: int) -> None: bold=False) +def stop_kafka_manager(ip: str, emulation: str, ip_first_octet: int) -> None: + """ + Utility function for stopping the Kafka manage + + :param ip: the ip of the node to stop the Kafka manger + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + stopped = ClusterController.stop_kafka_manager( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet) + if stopped.outcome: + click.secho(f"Stopping Kafka manager on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Kafka manager is not stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + def stop_traffic_managers(ip: str, emulation: str, ip_first_octet: int) -> None: """ Utility function for stopping the traffic managers @@ -1307,7 +1333,7 @@ def start_shell_complete(ctx, param, incomplete) -> List[str]: "container-name | emulation-name | all | statsmanager | training_job " "| system_id_job | nginx | postgresql | docker | clustermanager | hostmanagers " "| hostmanager | clientmanager | snortmanagers | snortmanager | elkmanager " - "| trafficmanagers | trafficmanager") + "| trafficmanagers | trafficmanager | kafkamanager") def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, no_network: bool, ip: str, container_ip: str, no_beats: bool) -> None: """ @@ -1378,6 +1404,8 @@ def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, n start_traffic_managers(ip=ip, emulation=name, ip_first_octet=id) elif entity == "trafficmanager": start_traffic_manager(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id) + elif entity == "kafkamanager": + start_kafka_manager(ip=ip, emulation=name, ip_first_octet=id) else: container_started = False for node in config.cluster_config.cluster_nodes: @@ -1773,6 +1801,30 @@ def start_traffic_manager(ip: str, container_ip: str, emulation: str, ip_first_o bold=False) +def start_kafka_manager(ip: str, emulation: str, ip_first_octet: int): + """ + Utility function for starting Kafka manager + + :param ip: the ip of the node to start Kafka manager + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + operation_outcome = ClusterController.start_kafka_manager( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet) + if operation_outcome.outcome: + click.secho(f"Starting Kafka manager on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}") + else: + click.secho(f"Kafka manager are not started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}", + bold=False) + + def run_image(image: str, name: str, create_network: bool = True, version: str = "0.0.1") -> bool: """ Runs a container with a given image @@ -2060,7 +2112,7 @@ def ls_shell_complete(ctx, param, incomplete) -> List[str]: @click.command("ls", help="containers | networks | images | emulations | all | environments | prometheus " "| node_exporter | cadvisor | pgadmin | statsmanager | flask | " "simulations | emulation_executions | cluster | nginx | postgresql | docker | hostmanagers | " - "clientmanager | snortmanagers | elkmanager | trafficmanagers") + "clientmanager | snortmanagers | elkmanager | trafficmanagers | kafkamanager") @click.argument('entity', default='all', type=str, shell_complete=ls_shell_complete) @click.option('--all', is_flag=True, help='list all') @click.option('--running', is_flag=True, help='list running only (default)') @@ -2135,6 +2187,8 @@ def ls(entity: str, all: bool, running: bool, stopped: bool, ip: str, name: str, list_elk_manager(ip=ip, emulation=name, ip_first_octet=id) elif entity == "trafficmanagers": list_traffic_managers(ip=ip, emulation=name, ip_first_octet=id) + elif entity == "kafkamanager": + list_kafka_managers(ip=ip, emulation=name, ip_first_octet=id) else: container = get_running_container(name=entity) if container is not None: @@ -2205,6 +2259,48 @@ def list_host_managers(ip: str, emulation: str, ip_first_octet: int) -> None: click.secho('+' + '-' * 50 + '+', fg='white') +def list_kafka_managers(ip: str, emulation: str, ip_first_octet: int) -> None: + """ + Utility function for listing Kafka managers + + :param ip: the ip of the node to list Kafka manager + :param emulation: the emulation of the execution + :param ip_first_octet: the ID of the execution + + :return: None + """ + import csle_common.constants.constants as constants + from csle_common.metastore.metastore_facade import MetastoreFacade + config = MetastoreFacade.get_config(id=1) + for node in config.cluster_config.cluster_nodes: + if node.ip == ip or ip == "": + kafka_manager_info = ClusterController.get_kafka_managers_info( + ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, + ip_first_octet=ip_first_octet) + for i in range(len(kafka_manager_info.ips)): + status_color = 'green' if kafka_manager_info.kafkaManagersRunning[i] else 'red' + manager_status = 'Running' if kafka_manager_info.kafkaManagersRunning[i] else 'Stopped' + click.secho('+' + '-' * 60 + '+', fg='white') + click.secho(f'|{"Kafka manager IP":^40}', nl=False, fg='white') + click.secho('|', nl=False, fg='white') + click.secho(f'{kafka_manager_info.ips[i]:<19}', nl=False, fg=status_color) + click.secho('|', fg='white') + click.secho('+' + '-' * 60 + '+', fg='white') + click.secho(f'|{"Kafka manager status":^40}', nl=False, fg='white') + click.secho('|', nl=False, fg='white') + click.secho(f'{manager_status:<19}', nl=False, fg=status_color) + click.secho('|', fg='white') + click.secho('+' + '-' * 60 + '+', fg='white') + if manager_status == "Running": + click.secho(f'|{"Kafka topics":^60}|', fg='white') + click.secho('+' + '-' * 60 + '+', fg='white') + for topic in kafka_manager_info.kafkaManagersStatuses[0].topics: + click.secho('|', nl=False, fg='white') + click.secho(f'{topic:^60}', nl=False, fg='green') + click.secho('|', fg='white') + click.secho('+' + '-' * 60 + '+', fg='white') + + def list_traffic_managers(ip: str, emulation: str, ip_first_octet: int) -> None: """ Utility function for listing traffic managers