Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafkamanager added. #408

Merged
merged 1 commit into from
Aug 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 99 additions & 3 deletions simulation-system/libs/csle-cli/src/csle_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ def stop_shell_complete(ctx, param, incomplete) -> List[str]:
@click.command("stop", help="prometheus | node_exporter | cadvisor | grafana | flask | container-name | "
"emulation-name | statsmanager | emulation_executions | pgadmin | all | nginx | postgresql "
"| docker | clustermanager | hostmanagers | hostmanager | clientmanager | snortmanagers "
"| snortmanager | elkmanager | trafficmanagers | trafficmanager")
"| snortmanager | elkmanager | trafficmanagers | trafficmanager | kafkamanager")
def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str = "") -> None:
"""
Stops an entity
Expand Down Expand Up @@ -745,6 +745,8 @@ def stop(entity: str, name: str, id: int = -1, ip: str = "", container_ip: str =
stop_traffic_managers(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "trafficmanager":
stop_traffic_manager(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id)
elif entity == "kafkamanager":
stop_kafka_manager(ip=ip, emulation=name, ip_first_octet=id)
else:
container_stopped = False
for node in config.cluster_config.cluster_nodes:
Expand Down Expand Up @@ -1063,6 +1065,30 @@ def stop_elk_manager(ip: str, emulation: str, ip_first_octet: int) -> None:
bold=False)


def stop_kafka_manager(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for stopping the Kafka manage

:param ip: the ip of the node to stop the Kafka manger
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
stopped = ClusterController.stop_kafka_manager(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet)
if stopped.outcome:
click.secho(f"Stopping Kafka manager on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Kafka manager is not stopped:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def stop_traffic_managers(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for stopping the traffic managers
Expand Down Expand Up @@ -1307,7 +1333,7 @@ def start_shell_complete(ctx, param, incomplete) -> List[str]:
"container-name | emulation-name | all | statsmanager | training_job "
"| system_id_job | nginx | postgresql | docker | clustermanager | hostmanagers "
"| hostmanager | clientmanager | snortmanagers | snortmanager | elkmanager "
"| trafficmanagers | trafficmanager")
"| trafficmanagers | trafficmanager | kafkamanager")
def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, no_network: bool, ip: str,
container_ip: str, no_beats: bool) -> None:
"""
Expand Down Expand Up @@ -1378,6 +1404,8 @@ def start(entity: str, no_traffic: bool, name: str, id: int, no_clients: bool, n
start_traffic_managers(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "trafficmanager":
start_traffic_manager(ip=ip, container_ip=container_ip, emulation=name, ip_first_octet=id)
elif entity == "kafkamanager":
start_kafka_manager(ip=ip, emulation=name, ip_first_octet=id)
else:
container_started = False
for node in config.cluster_config.cluster_nodes:
Expand Down Expand Up @@ -1773,6 +1801,30 @@ def start_traffic_manager(ip: str, container_ip: str, emulation: str, ip_first_o
bold=False)


def start_kafka_manager(ip: str, emulation: str, ip_first_octet: int):
"""
Utility function for starting Kafka manager

:param ip: the ip of the node to start Kafka manager
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution
:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
operation_outcome = ClusterController.start_kafka_manager(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet)
if operation_outcome.outcome:
click.secho(f"Starting Kafka manager on port:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}")
else:
click.secho(f"Kafka manager are not started:{constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT}",
bold=False)


def run_image(image: str, name: str, create_network: bool = True, version: str = "0.0.1") -> bool:
"""
Runs a container with a given image
Expand Down Expand Up @@ -2060,7 +2112,7 @@ def ls_shell_complete(ctx, param, incomplete) -> List[str]:
@click.command("ls", help="containers | networks | images | emulations | all | environments | prometheus "
"| node_exporter | cadvisor | pgadmin | statsmanager | flask | "
"simulations | emulation_executions | cluster | nginx | postgresql | docker | hostmanagers | "
"clientmanager | snortmanagers | elkmanager | trafficmanagers")
"clientmanager | snortmanagers | elkmanager | trafficmanagers | kafkamanager")
@click.argument('entity', default='all', type=str, shell_complete=ls_shell_complete)
@click.option('--all', is_flag=True, help='list all')
@click.option('--running', is_flag=True, help='list running only (default)')
Expand Down Expand Up @@ -2135,6 +2187,8 @@ def ls(entity: str, all: bool, running: bool, stopped: bool, ip: str, name: str,
list_elk_manager(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "trafficmanagers":
list_traffic_managers(ip=ip, emulation=name, ip_first_octet=id)
elif entity == "kafkamanager":
list_kafka_managers(ip=ip, emulation=name, ip_first_octet=id)
else:
container = get_running_container(name=entity)
if container is not None:
Expand Down Expand Up @@ -2205,6 +2259,48 @@ def list_host_managers(ip: str, emulation: str, ip_first_octet: int) -> None:
click.secho('+' + '-' * 50 + '+', fg='white')


def list_kafka_managers(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for listing Kafka managers

:param ip: the ip of the node to list Kafka manager
:param emulation: the emulation of the execution
:param ip_first_octet: the ID of the execution

:return: None
"""
import csle_common.constants.constants as constants
from csle_common.metastore.metastore_facade import MetastoreFacade
config = MetastoreFacade.get_config(id=1)
for node in config.cluster_config.cluster_nodes:
if node.ip == ip or ip == "":
kafka_manager_info = ClusterController.get_kafka_managers_info(
ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation,
ip_first_octet=ip_first_octet)
for i in range(len(kafka_manager_info.ips)):
status_color = 'green' if kafka_manager_info.kafkaManagersRunning[i] else 'red'
manager_status = 'Running' if kafka_manager_info.kafkaManagersRunning[i] else 'Stopped'
click.secho('+' + '-' * 60 + '+', fg='white')
click.secho(f'|{"Kafka manager IP":^40}', nl=False, fg='white')
click.secho('|', nl=False, fg='white')
click.secho(f'{kafka_manager_info.ips[i]:<19}', nl=False, fg=status_color)
click.secho('|', fg='white')
click.secho('+' + '-' * 60 + '+', fg='white')
click.secho(f'|{"Kafka manager status":^40}', nl=False, fg='white')
click.secho('|', nl=False, fg='white')
click.secho(f'{manager_status:<19}', nl=False, fg=status_color)
click.secho('|', fg='white')
click.secho('+' + '-' * 60 + '+', fg='white')
if manager_status == "Running":
click.secho(f'|{"Kafka topics":^60}|', fg='white')
click.secho('+' + '-' * 60 + '+', fg='white')
for topic in kafka_manager_info.kafkaManagersStatuses[0].topics:
click.secho('|', nl=False, fg='white')
click.secho(f'{topic:^60}', nl=False, fg='green')
click.secho('|', fg='white')
click.secho('+' + '-' * 60 + '+', fg='white')


def list_traffic_managers(ip: str, emulation: str, ip_first_octet: int) -> None:
"""
Utility function for listing traffic managers
Expand Down