From 04c714a6e75663caf3c13f2125902ec583820a32 Mon Sep 17 00:00:00 2001 From: Michal Nowikowski Date: Wed, 29 Sep 2021 06:58:06 +0200 Subject: [PATCH] finalized Azure support, added presenting jobs executed on given machine --- server/kraken/server/backend.py | 49 +- server/kraken/server/bg/jobs.py | 51 +- server/kraken/server/cloud.py | 481 +++++++++++++----- server/kraken/server/dbutils.py | 20 +- server/kraken/server/execution.py | 12 + server/kraken/server/kkrq.py | 1 + server/kraken/server/management.py | 41 +- server/kraken/server/models.py | 4 +- server/kraken/server/swagger.yml | 70 +++ server/kraken/server/watchdog.py | 239 +++++---- server/poetry.lock | 19 +- server/pyproject.toml | 1 + .../agents-page/agents-page.component.html | 68 ++- .../app/agents-page/agents-page.component.ts | 21 +- .../grp-cloud-cfg.component.html | 14 +- .../grp-cloud-cfg/grp-cloud-cfg.component.ts | 24 +- .../app/run-results/run-results.component.ts | 1 + 17 files changed, 794 insertions(+), 322 deletions(-) diff --git a/server/kraken/server/backend.py b/server/kraken/server/backend.py index 9075e6b5..abca0200 100644 --- a/server/kraken/server/backend.py +++ b/server/kraken/server/backend.py @@ -33,6 +33,7 @@ from .. import version from . import kkrq from . import utils +from . import dbutils log = logging.getLogger(__name__) @@ -371,30 +372,47 @@ def _handle_step_result(agent, req): job.finished = utils.utcnow() agent.job = None - # check if aws machine should be destroyed now - aws = None - for aa in agent.agents_groups: - ag = aa.agents_group - if not ag.deployment: - continue - + # check if cloud machine should be destroyed now + ag = dbutils.find_cloud_assignment_group(agent) + if ag: + # aws ec2 if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: depl = ag.deployment['aws'] if depl and 'destruction_after_jobs' in depl and int(depl['destruction_after_jobs']) > 0: max_jobs = int(depl['destruction_after_jobs']) - jobs_num = Job.query.filter_by(agent_used=agent).count() + q = Job.query.filter_by(agent_used=agent) + q = q.filter(Job.finished.isnot(None)) + q = q.filter(Job.finished > agent.created) + jobs_num = q.count() log.info('JOB %s, num %d, max %d', job.id, jobs_num, max_jobs) if jobs_num >= max_jobs: agent.disabled = True kkrq.enq(bg_jobs.destroy_machine, agent.id) + # aws ecs fargate elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE: depl = ag.deployment['aws_ecs_fargate'] log.info('ECS FARGATE JOB %s - destroying task', job.id) agent.disabled = True kkrq.enq(bg_jobs.destroy_machine, agent.id) + # azure vm + elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM: + depl = ag.deployment['azure_vm'] + + if depl and 'destruction_after_jobs' in depl and int(depl['destruction_after_jobs']) > 0: + max_jobs = int(depl['destruction_after_jobs']) + q = Job.query.filter_by(agent_used=agent) + q = q.filter(Job.finished.isnot(None)) + q = q.filter(Job.finished > agent.created) + jobs_num = q.count() + log.info('JOB %s, num %d, max %d', job.id, jobs_num, max_jobs) + if jobs_num >= max_jobs: + agent.disabled = True + kkrq.enq(bg_jobs.destroy_machine, agent.id) + log.info('*' * 300) + db.session.commit() kkrq.enq(bg_jobs.job_completed, job.id) log.info('job %s finished by %s', job, agent) @@ -518,14 +536,22 @@ def _handle_keep_alive(agent, req): # pylint: disable=unused-argument def _handle_unknown_agent(address, ip_address, agent): try: + new = False if agent: - agent.deleted = None + now = utils.utcnow() + if agent.deleted: + log.info('undeleting agent %s with address %s', agent, address) + agent.deleted = None + agent.created = now agent.authorized = False agent.ip_address = ip_address - agent.last_seen = utils.utcnow() + agent.last_seen = now else: - Agent(name=address, address=address, authorized=False, ip_address=ip_address, last_seen=utils.utcnow()) + agent = Agent(name=address, address=address, authorized=False, ip_address=ip_address, last_seen=utils.utcnow()) + new = True db.session.commit() + if new: + log.info('created new agent instance %s for address %s', agent, address) except Exception as e: log.warning('IGNORED EXCEPTION: %s', str(e)) @@ -550,7 +576,6 @@ def serve_agent_request(): return json.dumps({}) agent.last_seen = utils.utcnow() - agent.deleted = None db.session.commit() if not agent.authorized: diff --git a/server/kraken/server/bg/jobs.py b/server/kraken/server/bg/jobs.py index 4363d470..b9547852 100644 --- a/server/kraken/server/bg/jobs.py +++ b/server/kraken/server/bg/jobs.py @@ -40,6 +40,7 @@ from .. import kkrq from .. import cloud from .. import utils +from .. import dbutils log = logging.getLogger(__name__) @@ -928,13 +929,11 @@ def spawn_new_agents(agents_group_id): server_url = get_setting('general', 'server_url') minio_addr = get_setting('general', 'minio_addr') clickhouse_addr = get_setting('general', 'clickhouse_addr') - access_key = get_setting('cloud', 'aws_access_key') - secret_access_key = get_setting('cloud', 'aws_secret_access_key') - settings = ['server_url', 'minio_addr', 'clickhouse_addr', 'access_key', 'secret_access_key'] + settings = ['server_url', 'minio_addr', 'clickhouse_addr'] for s in settings: val = locals()[s] if not val: - log.error('%s is empty, please set it in global general or cloud settings', s) + log.error('%s is empty, please set it in global general settings', s) return ag = AgentsGroup.query.filter_by(id=agents_group_id).one_or_none() @@ -942,14 +941,15 @@ def spawn_new_agents(agents_group_id): log.warning('cannot find agents group id: %d', agents_group_id) return - # only AWS supported depl = None if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: depl = ag.deployment['aws'] elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE: depl = ag.deployment['aws_ecs_fargate'] + elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM: + depl = ag.deployment['azure_vm'] else: - log.warning('deployment method %d in agents group id:%d not implemented', ag.deployment['method'], agents_group_id) + log.error('deployment method %d in agents group id:%d not implemented', ag.deployment['method'], agents_group_id) return # check if limit of agents is reached @@ -1000,8 +1000,7 @@ def spawn_new_agents(agents_group_id): log.info('enough agents, avail: %d, needed: %d', agents_count, needed_count) continue - cloud.create_machines(depl, access_key, secret_access_key, - ag, system, num, + cloud.create_machines(depl, ag, system, num, server_url, minio_addr, clickhouse_addr) @@ -1014,33 +1013,29 @@ def destroy_machine(agent_id): log.error('cannot find agent id:%d', agent_id) return - agent.deleted = utils.utcnow() - agent.disabled = True - db.session.commit() - if not agent.extra_attrs: log.warning('missing extra_attrs in agent %s', agent) + dbutils.delete_agent(agent) return - # find agents group to get region for aws login + ag = dbutils.find_cloud_assignment_group(agent) + if not ag: + log.error('agent %s does not have cloud group', agent) + dbutils.delete_agent(agent) + return + + dbutils.delete_agent(agent) + depl = None - for aa in agent.agents_groups: - ag = aa.agents_group - if not ag.deployment: - continue - if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: - depl = ag.deployment['aws'] - break - if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE: - depl = ag.deployment['aws_ecs_fargate'] - break + if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: + depl = ag.deployment['aws'] + elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE: + depl = ag.deployment['aws_ecs_fargate'] + elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM: + depl = ag.deployment['azure_vm'] if not depl: log.error('cannot find deployment info for agent %s', agent_id) return - access_key = get_setting('cloud', 'aws_access_key') - secret_access_key = get_setting('cloud', 'aws_secret_access_key') - - cloud.destroy_machine(access_key, secret_access_key, - ag.deployment['method'], depl, agent) + cloud.destroy_machine(ag.deployment['method'], depl, agent, ag) diff --git a/server/kraken/server/cloud.py b/server/kraken/server/cloud.py index 08f57dab..d5eb665d 100644 --- a/server/kraken/server/cloud.py +++ b/server/kraken/server/cloud.py @@ -16,6 +16,7 @@ import base64 import random import logging +import datetime # AWS import boto3 @@ -29,16 +30,60 @@ from azure.mgmt.network.models import SecurityRuleAccess, SecurityRuleDirection, SecurityRuleProtocol from azure.mgmt.compute import ComputeManagementClient from azure.mgmt.subscription import SubscriptionClient +from azure.mgmt.monitor import MonitorManagementClient import requests from sqlalchemy.orm.attributes import flag_modified +from sqlalchemy.exc import IntegrityError +from psycopg2.errors import UniqueViolation # pylint: disable=no-name-in-module from .models import db from .models import Agent, AgentAssignment, get_setting from . import utils +from . import consts + log = logging.getLogger(__name__) +azr_logger = logging.getLogger('azure') +azr_logger.setLevel(logging.WARNING) + + +def _create_agent(params, ag): + now = utils.utcnow() + params['last_seen'] = now + params['authorized'] = True + params['disabled'] = False + + try: + a = Agent(**params) + db.session.commit() + log.info('created new agent %s', a) + except Exception: + db.session.rollback() + a = Agent.query.filter_by(address=params['address']).one_or_none() + log.info('using existing agent %s', a) + if a: + a.created = now + if a.deleted: + log.info('undeleting agent %s', a) + a.deleted = None + for f, val in params.items(): + setattr(a, f, val) + db.session.commit() + else: + log.info('agent %s duplicated but cannot find it', params['address']) + raise + + try: + AgentAssignment(agent=a, agents_group=ag) + db.session.commit() + except IntegrityError as e: + db.session.rollback() + if not isinstance(e.orig, UniqueViolation): + raise + + return a # AWS EC2 ################################################################### @@ -67,12 +112,28 @@ def check_aws_settings(): return 'ok' -def create_ec2_vms(aws, access_key, secret_access_key, - ag, system, num, +def login_to_aws(): + access_key = get_setting('cloud', 'aws_access_key') + secret_access_key = get_setting('cloud', 'aws_secret_access_key') + settings = ['access_key', 'secret_access_key'] + for s in settings: + val = locals()[s] + if not val: + log.error('AWS %s is empty, please set it in global cloud settings', s) + return None + + return dict(aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) + + +def create_ec2_vms(aws, ag, system, num, server_url, minio_addr, clickhouse_addr): + credential = login_to_aws() + if not credential: + return + region = aws['region'] - ec2 = boto3.client("ec2", region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) - ec2_res = boto3.resource('ec2', region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) + ec2 = boto3.client("ec2", region_name=region, **credential) + ec2_res = boto3.resource('ec2', region_name=region, **credential) # get key pair if not ag.extra_attrs: @@ -172,7 +233,6 @@ def create_ec2_vms(aws, access_key, secret_access_key, sys_id = system.id if system.executor == 'local' else 0 - now = utils.utcnow() for i in instances: try: i.wait_until_running() @@ -182,35 +242,21 @@ def create_ec2_vms(aws, access_key, secret_access_key, i.load() name = '.'.join(i.public_dns_name.split('.')[:2]) address = i.private_ip_address - a = None params = dict(name=name, address=address, ip_address=i.public_ip_address, - extra_attrs=dict(system=sys_id, instance_id=i.id), - authorized=True, - last_seen=now) - try: - a = Agent(**params) - db.session.commit() - except Exception: - db.session.rollback() - a = Agent.query.filter_by(deleted=None, address=address).one_or_none() - if a: - for f, val in params.items(): - setattr(a, f, val) - db.session.commit() - else: - log.info('agent %s duplicated but cannot find it', address) - raise - - AgentAssignment(agent=a, agents_group=ag) - db.session.commit() + extra_attrs=dict(system=sys_id, instance_id=i.id)) + a = _create_agent(params, ag) log.info('spawned new agent %s on EC2 instance %s', a, i) -def destroy_aws_ec2_vm(access_key, secret_access_key, depl, agent): +def destroy_aws_ec2_vm(depl, agent, group): + credential = login_to_aws() + if not credential: + return + region = depl['region'] - ec2 = boto3.resource('ec2', region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) + ec2 = boto3.resource('ec2', region_name=region, **credential) instance_id = agent.extra_attrs['instance_id'] log.info('terminate ec2 vm %s', instance_id) @@ -221,14 +267,99 @@ def destroy_aws_ec2_vm(access_key, secret_access_key, depl, agent): log.exception('IGNORED EXCEPTION') +def aws_ec2_vm_exists(agent): + credential = cloud.login_to_aws() + if not credential: + raise Exception('wrong aws credential') + region = depl['region'] + ec2 = boto3.resource('ec2', region_name=region, **credential) + + instance_id = agent.extra_attrs['instance_id'] + try: + # try to get instance, if missing then raised exception will cause return False + i = ec2.Instance(instance_id) + i.state # pylint: disable=pointless-statement + if i.state['Name'] == 'terminated': + # if instance exists theb raising an exception and it will cause return False + raise Exception('terminated') + except Exception: + return False + + return True + + +def aws_ec2_cleanup_dangling_vms(depl, ag): + credential = login_to_aws() + if not credential: + return 0, 0, 0, 0, 0 + + region = depl['region'] + ec2 = boto3.resource('ec2', region_name=region, **credential) + + now = utils.utcnow() + + try: + vms = ec2.instances.filter(Filters=[{'Name': 'tag:kraken-group', 'Values': ['%d' % ag.id]}]) + vms = list(vms) + except Exception: + log.exception('IGNORED EXCEPTION') + return 0, 0, 0, 0, 0 + + instances = 0 + terminated_instances = 0 + assigned_instances = 0 + orphaned_instances = 0 + orphaned_terminated_instances = 0 + + for vm in vms: + instances += 1 + # if terminated then skip it + if vm.state['Name'] == 'terminated': + terminated_instances += 1 + continue + + # if assigned to some agent then skip it + assigned = False + for aa in ag.agents: + agent = aa.agent + if agent.extra_attrs and 'instance_id' in agent.extra_attrs and agent.extra_attrs['instance_id'] == vm.id: + assigned = True + break + if assigned: + assigned_instances += 1 + continue + + # instances have to be old enough to avoid race condition with + # case when instances are being created but not yet assigned to agents + lt = vm.launch_time.replace(tzinfo=pytz.utc) + if now - lt < datetime.timedelta(minutes=10): + continue + + # the instance is not terminated, not assigned, old enough + # so delete it as it seems to be a lost instance + log.info('terminating lost aws ec2 instance %s', vm.id) + orphaned_instances += 1 + try: + vm.terminate() + except Exception: + log.exception('IGNORED EXCEPTION') + + orphaned_terminated_instances += 1 + + return instances, terminated_instances, assigned_instances, orphaned_instances, orphaned_terminated_instances + + # AWS ECS FARGATE ############################################################# -def create_aws_ecs_fargate_tasks(aws, access_key, secret_access_key, - ag, system, num, +def create_aws_ecs_fargate_tasks(aws, ag, system, num, server_url, minio_addr, clickhouse_addr): + credential = login_to_aws() + if not credential: + return + region = aws['region'] - ec2 = boto3.client("ec2", region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) - ecs = boto3.client('ecs', region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) + ec2 = boto3.client("ec2", region_name=region, **credential) + ecs = boto3.client('ecs', region_name=region, **credential) system_norm = system.name.replace(':', '_').replace('/', '_').replace('.', '_') task_def_name = 'kraken-agent-1-%s' % system_norm @@ -357,38 +488,23 @@ def create_aws_ecs_fargate_tasks(aws, access_key, secret_access_key, sys_id = system.id if system.executor == 'local' else 0 - now = utils.utcnow() for task in tasks_ready: address = task['address'] - a = None params = dict(name=task['name'], address=address, ip_address=task['ip_address'], - extra_attrs=dict(system=sys_id, task_arn=task['task_arn']), - authorized=True, - last_seen=now) - try: - a = Agent(**params) - db.session.commit() - except Exception: - db.session.rollback() - a = Agent.query.filter_by(deleted=None, address=address).one_or_none() - if a: - for f, val in params.items(): - setattr(a, f, val) - db.session.commit() - else: - log.info('agent %s duplicated but cannot find it', address) - raise - - AgentAssignment(agent=a, agents_group=ag) - db.session.commit() + extra_attrs=dict(system=sys_id, task_arn=task['task_arn'])) + a = _create_agent(params, ag) log.info('spawned new agent %s on ECS Fargate task %s', a, task) -def destroy_aws_ecs_task(access_key, secret_access_key, depl, agent): +def destroy_aws_ecs_task(depl, agent, group): + credential = login_to_aws() + if not credential: + return + region = depl['region'] - ecs = boto3.client('ecs', region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) + ecs = boto3.client('ecs', region_name=region, **credential) task_arn = agent.extra_attrs['task_arn'] log.info('terminate ecs task %s', task_arn) @@ -434,16 +550,41 @@ def check_azure_settings(): return 'ok' -def _create_azure_vm(group_name, location, +def login_to_azure(): + subscription_id = get_setting('cloud', 'azure_subscription_id') + tenant_id = get_setting('cloud', 'azure_tenant_id') + client_id = get_setting('cloud', 'azure_client_id') + client_secret = get_setting('cloud', 'azure_client_secret') + settings = ['subscription_id', 'tenant_id', 'client_id', 'client_secret'] + for s in settings: + val = locals()[s] + if not val: + log.error('Azure %s is empty, please set it in global cloud settings', s) + return None, None + + credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) + return credential, subscription_id + + +def _create_azure_vm(ag, depl, system, server_url, minio_addr, clickhouse_addr, credential, subscription_id): instance_id = str(random.randrange(9999999999)) - print(f"Provisioning a virtual machine...some operations might take a minute or two.") + location = depl['location'] + vm_size = depl['vm_size'] + + sys_parts = system.name.split(':') + if len(sys_parts) != 4: + log.warning('incorrect system image name %s, Azure image name should have form :::', + system.name) + return + + log.info(f"Provisioning a virtual machine...some operations might take a minute or two.") # Step 1: Provision a resource group - # Obtain the management object for resources, using the credentials from the CLI login. + # Obtain the management object for resources, using the credential from the CLI login. resource_client = ResourceManagementClient(credential, subscription_id) # Provision the resource groups. @@ -454,16 +595,16 @@ def _create_azure_vm(group_name, location, "location": location } ) - print(f"Provisioned global resource group {rg_result.name} in the {rg_result.location} region") + log.info(f"Provisioned global resource group {rg_result.name} in the {rg_result.location} region") - group_rg = "kraken-%s-rg" % group_name + group_rg = "kraken-%d-rg" % ag.id rg_result = resource_client.resource_groups.create_or_update( group_rg, { "location": location } ) - print(f"Provisioned resource group {rg_result.name} in the {rg_result.location} region") + log.info(f"Provisioned resource group {rg_result.name} in the {rg_result.location} region") # For details on the previous code, see Example: Provision a resource group @@ -506,7 +647,7 @@ def _create_azure_vm(group_name, location, vnet_result = poller.result() - print(f"Provisioned virtual network {vnet_result.name} with address prefixes {vnet_result.address_space.address_prefixes}") + log.info(f"Provisioned virtual network {vnet_result.name} with address prefixes {vnet_result.address_space.address_prefixes}") ######### security_group_name = 'kraken-nsg' @@ -565,7 +706,7 @@ def _create_azure_vm(group_name, location, ) subnet_result = poller.result() - print(f"Provisioned virtual subnet {subnet_result.name} with address prefix {subnet_result.address_prefix}") + log.info(f"Provisioned virtual subnet {subnet_result.name} with address prefix {subnet_result.address_prefix}") # Step 4: Provision an IP address and wait for completion poller = network_client.public_ip_addresses.begin_create_or_update( @@ -580,7 +721,7 @@ def _create_azure_vm(group_name, location, ) ip_address_result = poller.result() - print(f"Provisioned public IP address {ip_address_result.name} with address {ip_address_result.ip_address}") + log.info(f"Provisioned public IP address {ip_address_result.name} with address {ip_address_result.ip_address}") # Step 5: Provision the network interface client poller = network_client.network_interfaces.begin_create_or_update( @@ -599,7 +740,7 @@ def _create_azure_vm(group_name, location, nic_result = poller.result() - print(f"Provisioned network interface client {nic_result.name}") + log.info(f"Provisioned network interface client {nic_result.name}") # Step 6: Provision the virtual machine @@ -609,7 +750,7 @@ def _create_azure_vm(group_name, location, USERNAME = "kraken" PASSWORD = "kraken123!" - print(f"Provisioning virtual machine {vm_name}; this operation might take a few minutes.") + log.info(f"Provisioning virtual machine {vm_name}; this operation might take a few minutes.") # Provision the VM specifying only minimal arguments, which defaults to an Ubuntu 18.04 VM # on a Standard DS1 v2 plan with a public IP address and a default virtual network/subnet. @@ -629,14 +770,15 @@ def _create_azure_vm(group_name, location, "location": location, "storage_profile": { "image_reference": { - "publisher": 'Canonical', - "offer": "0001-com-ubuntu-server-focal", - "sku": "20_04-lts", - "version": "latest" + "publisher": sys_parts[0], # 'Canonical', + "offer": sys_parts[1], # "0001-com-ubuntu-server-focal", + "sku": sys_parts[2], # "20_04-lts", + "version": sys_parts[3] # "latest" } + #"image_reference": "Canonical:0001-com-ubuntu-server-focal:20_04-lts:20.04.202109080" }, "hardware_profile": { - "vm_size": "Standard_B1ls" + "vm_size": vm_size }, "os_profile": { "computer_name": vm_name, @@ -654,97 +796,204 @@ def _create_azure_vm(group_name, location, vm_result = poller.result() - print(f"Provisioned virtual machine {vm_result.name}") + log.info(f"Provisioned Azure virtual machine {vm_result.name}") # VM params + sys_id = system.id if system.executor == 'local' else 0 address = nic_result.ip_configurations[0].private_ip_address params = dict(name=vm_name, address=address, ip_address=ip_address_result.ip_address, extra_attrs=dict( - system='sys_id', - instance_id=instance_id), - authorized=True, - last_seen='now') - print(params) - return params + system=sys_id, + instance_id=instance_id)) + + a = _create_agent(params, ag) + log.info('spawned new agent %s on Azure VM instance %s at %s', a, vm_result.name, a.created) -def create_azure_vms(depl, access_key, secret_access_key, - ag, system, num, +def create_azure_vms(depl, ag, system, num, server_url, minio_addr, clickhouse_addr): # Acquire a credential object using service principal authentication. - subscription_id = 'a80e8684-5866-41ef-ac39-e4058f7a490f' - tenant_id = '698e4063-ab0b-469b-b755-0204bc97db4b' - client_id = 'a756f034-5c2b-4cc1-98a7-a24bff5238bb' - client_secret = '2xEtx7CHJ.jloWrX5p7p5lL-IW._y.t07U' - credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) + credential, subscription_id = login_to_azure() + if not credential: + return - _create_azure_vm(group_name, location, - server_url, minio_addr, clickhouse_addr, - credential, subscription_id) + for i in range(num): + _create_azure_vm(ag, depl, system, + server_url, minio_addr, clickhouse_addr, + credential, subscription_id) -def delete_azure_vm(group_name, vm_name, instance_id): - log.info('deleting azure vm: %s', vm_name) +def _destroy_azure_vm(rg, vm_name, instance_id, cc, nc): + try: + vm = cc.virtual_machines.get(rg, vm_name) + disk_name = vm.storage_profile.os_disk.name + cc.virtual_machines.begin_delete(rg, vm_name).wait() + cc.disks.begin_delete(rg, disk_name) + except azure.core.exceptions.ResourceNotFoundError: + log.info('azure vm %s already missing', vm_name) + + nic_name = "kraken-%s-nic" % instance_id + try: + nc.network_interfaces.begin_delete(rg, nic_name).wait() + except azure.core.exceptions.ResourceNotFoundError: + log.info('azure nic %s already missing', nic_name) + + # ip_config_name = "kraken-%s-ip-config" % instance_id + ip_name = "kraken-%s-ip" % instance_id + try: + nc.public_ip_addresses.begin_delete(rg, ip_name).wait() + except azure.core.exceptions.ResourceNotFoundError: + log.info('azure ip %s already missing', ip_name) + + log.info('deleted azure vm: %s', vm_name) - rg = 'kraken-%s-rg' % group_name + +def destroy_azure_vm(depl, agent, ag): + instance_id = agent.extra_attrs['instance_id'] + vm_name = "kraken-agent-%s-vm" % instance_id + rg = 'kraken-%d-rg' % ag.id + + log.info('deleting azure vm: %s', vm_name) # Acquire a credential object using service principal authentication. - subscription_id = 'a80e8684-5866-41ef-ac39-e4058f7a490f' - tenant_id = '698e4063-ab0b-469b-b755-0204bc97db4b' - client_id = 'a756f034-5c2b-4cc1-98a7-a24bff5238bb' - client_secret = '2xEtx7CHJ.jloWrX5p7p5lL-IW._y.t07U' - credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) + credential, subscription_id = login_to_azure() + if not credential: + return cc = ComputeManagementClient(credential, subscription_id) - vm = cc.virtual_machines.get(rg, vm_name) - disk_name = vm.storage_profile.os_disk.name - cc.virtual_machines.begin_delete(rg, vm_name).wait() - cc.disks.begin_delete(rg, disk_name) + nc = NetworkManagementClient(credential, subscription_id) - ip_config_name = "kraken-%s-ip-config" % instance_id - ip_name = "kraken-%s-ip" % instance_id - nic_name = "kraken-%s-nic" % instance_id + _destroy_azure_vm(rg, vm_name, instance_id, cc, nc) + + +def azure_vm_exists(agent): + instance_id = agent.extra_attrs['instance_id'] + ag = dbutils.find_cloud_assignment_group(agent) + if not ag: + return False + rg = 'kraken-%d-rg' % ag.id + + credential, subscription_id = login_to_azure() + if not credential: + raise Exception('wrong azure credential') + + cc = ComputeManagementClient(credential, subscription_id) + try: + vm = cc.virtual_machines.get(rg, vm_name) + except azure.core.exceptions.ResourceNotFoundError: + return False + + return True + + +def azure_vm_cleanup_dangling_vms(depl, ag): + credential, subscription_id = login_to_azure() + if not credential: + return 0, 0, 0, 0, 0 + + cc = ComputeManagementClient(credential, subscription_id) + mc = MonitorManagementClient(credential, subscription_id) nc = NetworkManagementClient(credential, subscription_id) - nc.network_interfaces.begin_delete(rg, nic_name).wait() - nc.public_ip_addresses.begin_delete(rg, ip_name).wait() - log.info('deleted azure vm: %s', vm_name) + rg = 'kraken-%d-rg' % ag.id + now = utils.utcnow() + + try: + vms = cc.virtual_machines.list(rg) + except azure.core.exceptions.ResourceNotFoundError: + return 0, 0, 0, 0, 0 + + instances = 0 + terminated_instances = 0 + assigned_instances = 0 + orphaned_instances = 0 + orphaned_terminated_instances = 0 + + for vm in vms: + instances += 1 + + # if vm is being deleted then skip it + vm2 = cc.virtual_machines.instance_view(rg, vm.name) + log.info('vm %s statuses:', vm.name) + skip = False + for s in vm2.statuses: + log.info(s.code) + if s.code == 'ProvisioningState/deleting': + skip = True + break + if skip: + continue + + # if assigned to some agent then skip it + assigned = False + for aa in ag.agents: + agent = aa.agent + if agent.extra_attrs and 'instance_id' in agent.extra_attrs: + instance_id = agent.extra_attrs['instance_id'] + vm_name = "kraken-agent-%s-vm" % instance_id + if vm_name == vm.name: + assigned = True + break + if assigned: + assigned_instances += 1 + continue + + # instances have to be old enough to avoid race condition with + # case when instances are being created but not yet assigned to agents + fltr = " and ".join([ "eventTimestamp ge '{}T00:00:00Z'".format(now.date()), + "resourceUri eq '%s'" % vm.id ]) + created_at = None + for l in mc.activity_logs.list(filter=fltr, select="eventTimestamp"): + created_at = l.event_timestamp + if not created_at or now - created_at < datetime.timedelta(minutes=10): + continue + + # the instance is not terminated, not assigned, old enough + # so delete it as it seems to be a lost instance + log.info('terminating lost azure vm instance %s', vm_name) + orphaned_instances += 1 + try: + _destroy_azure_vm(rg, vm_name, instance_id, cc, nc) + except Exception: + log.exception('IGNORED EXCEPTION') + + orphaned_terminated_instances += 1 + + return instances, terminated_instances, assigned_instances, orphaned_instances, orphaned_terminated_instances #################################################################### -def create_machines(depl, access_key, secret_access_key, - ag, system, num, +def create_machines(depl, ag, system, num, server_url, minio_addr, clickhouse_addr): method = ag.deployment['method'] if method == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: - create_ec2_vms(depl, access_key, secret_access_key, - ag, system, num, + create_ec2_vms(depl, ag, system, num, server_url, minio_addr, clickhouse_addr) elif method == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE: - create_aws_ecs_fargate_tasks(depl, access_key, secret_access_key, - ag, system, num, + create_aws_ecs_fargate_tasks(depl, ag, system, num, server_url, minio_addr, clickhouse_addr) elif method == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM: - create_azure_vms(depl, access_key, secret_access_key, - ag, system, num, + create_azure_vms(depl, ag, system, num, server_url, minio_addr, clickhouse_addr) else: raise NotImplementedError('deployment method %s not supported' % str(ag.deployment['method'])) -def destroy_machine(access_key, secret_access_key, method, depl, agent): +def destroy_machine(method, depl, agent, group): if method == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: - destroy_aws_ec2_vm(access_key, secret_access_key, depl, agent) + destroy_aws_ec2_vm(depl, agent, group) elif method == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE: - destroy_aws_ecs_task(access_key, secret_access_key, depl, agent) + destroy_aws_ecs_task(depl, agent, group) + elif method == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM: + destroy_azure_vm(depl, agent, group) #if __name__ == '__main__': diff --git a/server/kraken/server/dbutils.py b/server/kraken/server/dbutils.py index 2000dff2..b3cbcc3a 100644 --- a/server/kraken/server/dbutils.py +++ b/server/kraken/server/dbutils.py @@ -14,7 +14,8 @@ from sqlalchemy.sql.expression import desc -from .models import Run, Flow +from .models import Run, Flow, db +from . import utils def get_prev_run(stage_id, flow_kind): @@ -26,3 +27,20 @@ def get_prev_run(stage_id, flow_kind): q = q.order_by(desc(Flow.created)) prev_run = q.first() return prev_run + + +def find_cloud_assignment_group(agent): + for aa in agent.agents_groups: + ag = aa.agents_group + if ag.deployment: + return ag + return None + + +def delete_agent(agent): + agent.deleted = utils.utcnow() + agent.disabled = True + agent.authorized = False + for aa in agent.agents_groups: + db.session.delete(aa) + db.session.commit() diff --git a/server/kraken/server/execution.py b/server/kraken/server/execution.py index 3041413a..bc8607b5 100644 --- a/server/kraken/server/execution.py +++ b/server/kraken/server/execution.py @@ -435,3 +435,15 @@ def delete_job(job_id): exec_utils.cancel_job(job, 'canceled by user', consts.JOB_CMPLT_USER_CANCEL) return {} + + +def get_agent_jobs(agent_id, start=0, limit=10): + q = Job.query + q = q.filter_by(agent_used_id=agent_id) + total = q.count() + q = q.order_by(desc('id')) + q = q.offset(start).limit(limit) + jobs = [] + for j in q.all(): + jobs.append(j.get_json()) + return {'items': jobs, 'total': total}, 200 diff --git a/server/kraken/server/kkrq.py b/server/kraken/server/kkrq.py index 5e63eeaa..73861aca 100644 --- a/server/kraken/server/kkrq.py +++ b/server/kraken/server/kkrq.py @@ -48,6 +48,7 @@ def enq_neck(func, *args, ignore_args=None): ignore_args=ignore_args) data = json.dumps(data) rds.publish('qneck', data) + log.info('enqueued func %s with args: %s and %s', func.__name__, args, ignore_args) def get_jobs(): diff --git a/server/kraken/server/management.py b/server/kraken/server/management.py index 4596126d..e7101cde 100644 --- a/server/kraken/server/management.py +++ b/server/kraken/server/management.py @@ -29,6 +29,7 @@ import boto3 from azure.identity import ClientSecretCredential from azure.mgmt.subscription import SubscriptionClient +from azure.mgmt.compute import ComputeManagementClient from . import consts, srvcheck, kkrq from .models import db, Branch, Stage, Agent, AgentsGroup, Secret, AgentAssignment, Setting @@ -38,6 +39,7 @@ from . import notify from . import cloud from . import utils +from . import dbutils log = logging.getLogger(__name__) @@ -568,6 +570,8 @@ def delete_agent(agent_id): job.state = consts.JOB_STATE_QUEUED agent.job = None + # only mark as deleted, do not unassign from groups yet as it is needed + # in destroy_machine if it is cloud machine agent.deleted = utils.utcnow() agent.authorized = False agent.disabled = True @@ -648,17 +652,19 @@ def delete_group(group_id): def get_aws_ec2_regions(): - access_key = get_setting('cloud', 'aws_access_key') - secret_access_key = get_setting('cloud', 'aws_secret_access_key') - ec2 = boto3.client('ec2', region_name='us-east-1', aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) + credential = cloud.login_to_aws() + if not credential: + abort(500, "Incorrect AWS credential, set them in global cloud settings") + ec2 = boto3.client('ec2', region_name='us-east-1', **credential) resp = ec2.describe_regions() return {'items': resp['Regions'], 'total': len(resp['Regions'])}, 200 def get_aws_ec2_instance_types(region): - access_key = get_setting('cloud', 'aws_access_key') - secret_access_key = get_setting('cloud', 'aws_secret_access_key') - ec2 = boto3.client('ec2', region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) + credential = cloud.login_to_aws() + if not credential: + abort(500, "Incorrect AWS credential, set them in global cloud settings") + ec2 = boto3.client('ec2', region_name=region, **credential) resp = ec2.describe_instance_type_offerings(Filters=[{'Name': 'location', 'Values':[region]}]) types = resp['InstanceTypeOfferings'] types.sort(key=lambda x: x['InstanceType']) @@ -667,12 +673,9 @@ def get_aws_ec2_instance_types(region): def get_azure_locations(): # Acquire a credential object using service principal authentication. - subscription_id = get_setting('cloud', 'azure_subscription_id') - tenant_id = get_setting('cloud', 'azure_tenant_id') - client_id = get_setting('cloud', 'azure_client_id') - client_secret = get_setting('cloud', 'azure_client_secret') - - credential = ClientSecretCredential(tenant_id=tenant_id, client_id=client_id, client_secret=client_secret) + credential, subscription_id = cloud.login_to_azure() + if not credential: + abort(500, "Incorrect Azure credential, set them in global cloud settings") subscription_client = SubscriptionClient(credential) locations = subscription_client.subscriptions.list_locations(subscription_id) locs = [] @@ -683,6 +686,20 @@ def get_azure_locations(): return {'items': locs, 'total': len(locs)}, 200 +def get_azure_vm_sizes(location): + # Acquire a credential object using service principal authentication. + credential, subscription_id = cloud.login_to_azure() + if not credential: + abort(500, "Incorrect Azure credential, set them in global cloud settings") + compute_client = ComputeManagementClient(credential, subscription_id) + vm_sizes_list = compute_client.virtual_machine_sizes.list(location=location) + vm_sizes = [] + for s in vm_sizes_list: + vm_sizes.append(s.as_dict()) + vm_sizes.sort(key=lambda x: x['name']) + + return {'items': vm_sizes, 'total': len(vm_sizes)}, 200 + def get_settings(): settings = Setting.query.filter_by().all() diff --git a/server/kraken/server/models.py b/server/kraken/server/models.py index e8c831d6..7daf4265 100644 --- a/server/kraken/server/models.py +++ b/server/kraken/server/models.py @@ -25,7 +25,7 @@ log = logging.getLogger(__name__) -db = SQLAlchemy() +db = SQLAlchemy(engine_options=dict(connect_args={"options": "-c timezone=utc"})) @event.listens_for(mapper, 'init') @@ -773,7 +773,7 @@ def get_json(self): deployment['aws_ecs_fargate'] = dict(region='', instances_limit=5, cluster='', subnets='', security_groups='') if 'azure_vm' not in deployment: - deployment['azure_vm'] = dict(location='', instances_limit=5, default_image='', instance_type='', + deployment['azure_vm'] = dict(location='', instances_limit=5, default_image='', vm_size='', destruction_after_jobs=1, destruction_after_time=30) return dict(id=self.id, diff --git a/server/kraken/server/swagger.yml b/server/kraken/server/swagger.yml index 2716b039..613ebfb4 100644 --- a/server/kraken/server/swagger.yml +++ b/server/kraken/server/swagger.yml @@ -1475,6 +1475,45 @@ paths: 200: description: Successfully deleted agent content: {} + /agents/{agent_id}/jobs: + get: + tags: + - Execution + summary: List jobs executed by given agent + operationId: get_agent_jobs + parameters: + - name: agent_id + in: path + description: ID of agent + required: true + schema: + type: integer + format: int64 + - name: start + in: query + description: How many items to return at one time (max 100) + schema: + type: integer + format: int32 + - name: limit + in: query + description: How many items to return at one time (max 100) + schema: + type: integer + format: int32 + responses: + 200: + description: A paged array of runs + content: + application/json: + schema: + $ref: '#/components/schemas/Jobs' + default: + description: unexpected error + content: + application/json: + schema: + $ref: '#/components/schemas/ApiError' /groups: get: tags: @@ -1633,6 +1672,27 @@ paths: application/json: schema: $ref: '#/components/schemas/AzureLocations' + /azure-vm-sizes/{location}: + get: + tags: + - Management + summary: Get settings + description: Get settings details + operationId: get_azure_vm_sizes + parameters: + - name: location + in: path + description: ID of group + required: true + schema: + type: string + responses: + 200: + description: Settings details + content: + application/json: + schema: + $ref: '#/components/schemas/AzureVmSizes' /settings: get: tags: @@ -2375,6 +2435,16 @@ components: total: type: integer format: int64 + AzureVmSizes: + type: object + properties: + items: + type: array + items: + type: string + total: + type: integer + format: int64 Settings: type: object properties: diff --git a/server/kraken/server/watchdog.py b/server/kraken/server/watchdog.py index b987c1ff..b39c73c4 100644 --- a/server/kraken/server/watchdog.py +++ b/server/kraken/server/watchdog.py @@ -37,6 +37,8 @@ from .bg import jobs as bg_jobs from . import kkrq from . import utils +from . import dbutils +from . import cloud log = logging.getLogger('watchdog') @@ -93,7 +95,8 @@ def _check_jobs_if_expired(): exec_utils.cancel_job(job, note, consts.JOB_CMPLT_SERVER_TIMEOUT) canceled_count += 1 - log.info('canceled jobs:%d / all:%d', canceled_count, job_count) + if job_count > 0: + log.info('canceled jobs:%d / all:%d', canceled_count, job_count) def _check_jobs_if_missing_agents(): @@ -102,11 +105,16 @@ def _check_jobs_if_missing_agents(): for job in q.all(): ag = job.agents_group if (ag.deployment and - ag.deployment['method'] in [consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2, consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE]): + ag.deployment['method'] in [consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2, + consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE, + consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM, + ]): groups.add(ag.id) for ag_id in groups: kkrq.enq_neck(bg_jobs.spawn_new_agents, ag_id) + if groups: + log.info('enqueued spawning new agents for groups with no agents but with waiting jobs') def _check_jobs(): @@ -164,13 +172,14 @@ def _check_agents_keep_alive(): q = q.filter(Agent.last_seen < five_mins_ago) for a in q.all(): - # in case of AWS VMs check for 10mins, not 5mins - for aa in a.agents_groups: - ag = aa.agents_group - if (ag.deployment and - ag.deployment['method'] in [consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2, consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE]): - if a.last_seen > ten_mins_ago: - continue + # in case of cloud machines check for 10mins, not 5mins + ag = dbutils.find_cloud_assignment_group(a) + if (ag and + ag.deployment['method'] in [consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2, + consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE, + consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM]): + if a.last_seen > ten_mins_ago: + continue a.disabled = True a.status_line = 'agent was not seen for last 5 minutes, disabled' @@ -181,42 +190,52 @@ def _check_agents_keep_alive(): _cancel_job_and_unassign_agent(a) -def _destroy_and_delete_if_outdated(agent, ag): - aws = ag.deployment['aws'] - +def _destroy_and_delete_if_outdated(agent, depl): destroy = False # check if idle time after job passed, then destroy VM - if 'destruction_after_time' in aws and int(aws['destruction_after_time']) > 0: + if 'destruction_after_time' in depl and int(depl['destruction_after_time']) > 0: + max_idle_time = datetime.timedelta(seconds=60 * int(depl['destruction_after_time'])) + q = Job.query.filter_by(agent_used=agent) q = q.filter(Job.finished.isnot(None)) + q = q.filter(Job.finished > agent.created) q = q.order_by(desc(Job.finished)) last_job = q.first() + now = utils.utcnow() if last_job: - now = utils.utcnow() dt = now - last_job.finished - timeout = datetime.timedelta(seconds=60 * int(aws['destruction_after_time'])) - if dt >= timeout: - log.info('agent:%d, timed out %d > %d - destroying it', agent.id, dt, timeout) + if dt >= max_idle_time: + log.info('agent:%d, timed out %s > %s - destroying it, last job %s', + agent.id, dt, max_idle_time, last_job) destroy = True else: - log.info('agent:%d, not yet timed out %d < %d - skipped', agent.id, dt, timeout) + log.info('agent:%d, not yet timed out %s < %s - skipped', agent.id, dt, max_idle_time) + elif now - agent.created >= max_idle_time: + log.info('agent:%d, timed out %s - destroying it, created at %s vs now %s', agent.id, max_idle_time, agent.created, now) + destroy = True else: - log.info('agent:%d, no last job - skipped', agent.id) + log.info('agent:%d, no last job and not idle enough - skipped', agent.id) else: log.info('agent:%d, destruction_after_time is 0 - skipped', agent.id) # check if number of executed jobs on VM is reached, then destroy VM - if not destroy and 'destruction_after_jobs' in aws and int(aws['destruction_after_jobs']) > 0: - max_jobs = int(aws['destruction_after_jobs']) - jobs_num = Job.query.filter_by(agent_used=agent, state=consts.JOB_STATE_COMPLETED).count() - if jobs_num >= max_jobs: - log.info('agent:%d, max jobs reached %d/%d', agent.id, jobs_num, max_jobs) - destroy = True + if not destroy: + if 'destruction_after_jobs' in depl and int(depl['destruction_after_jobs']) > 0: + max_jobs = int(depl['destruction_after_jobs']) + q = Job.query.filter_by(agent_used=agent, state=consts.JOB_STATE_COMPLETED) + q = q.filter(Job.finished > agent.created) + jobs_num = q.count() + if jobs_num >= max_jobs: + log.info('agent:%d, max jobs reached %d/%d', agent.id, jobs_num, max_jobs) + # TODO: remove it later, for debugging only + #for j in q.all(): + # log.info(' job: %s', j) + destroy = True + else: + log.info('agent:%d, max jobs not reached yet %d/%d - skipped', agent.id, jobs_num, max_jobs) else: - log.info('agent:%d, max jobs not reached yet %d/%d - skipped', agent.id, jobs_num, max_jobs) - else: - log.info('agent:%d, destruction_after_jobs is 0 - skipped', agent.id) + log.info('agent:%d, destruction_after_jobs is 0 - skipped', agent.id) # if machine mark for destruction then schedule it if destroy: @@ -229,29 +248,22 @@ def _destroy_and_delete_if_outdated(agent, ag): return False -def _delete_if_missing_in_aws(agent, ag): +def _delete_if_missing_in_cloud(agent, depl, depl_method): if not agent.extra_attrs or 'instance_id' not in agent.extra_attrs: log.warning('agent:%d, no instance id in extra_attrs', agent.id) return False - aws = ag.deployment['aws'] - region = aws['region'] - access_key = get_setting('cloud', 'aws_access_key') - secret_access_key = get_setting('cloud', 'aws_secret_access_key') - ec2 = boto3.resource('ec2', region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) - - instance_id = agent.extra_attrs['instance_id'] + exists = True try: - # try to get instance, if missing then raised exception will cause deleting agent - i = ec2.Instance(instance_id) - i.state # pylint: disable=pointless-statement - if i.state['Name'] == 'terminated': - # if instance exists but is terminated also trigger deleting agent by raising an exception - raise Exception('terminated') - except Exception: - agent.deleted = utils.utcnow() - agent.disabled = True - db.session.commit() + if depl_method == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: + exists = cloud.aws_ec2_vm_exists(agent) + elif depl_method == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM: + exists = cloud.azure_vm_exists(agent) + except: + pass + + if not exists: + dbutils.delete_agent(agent) log.info('deleted dangling agent %d', agent.id) return True @@ -259,123 +271,102 @@ def _delete_if_missing_in_aws(agent, ag): def _check_agents_to_destroy(): - q = Agent.query.filter_by(deleted=None) + q = Agent.query.filter_by(deleted=None, authorized=True) q = q.join('agents_groups', 'agents_group') #q = q.filter(or_(cast(AgentsGroup.deployment['method'], Integer) == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2, # cast(AgentsGroup.deployment['method'], Integer) == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE)) - q = q.filter(cast(AgentsGroup.deployment['method'], Integer) == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2) - + # TODO ECS + q = q.filter(AgentsGroup.deployment.isnot(None)) + q = q.filter(or_(cast(AgentsGroup.deployment['method'], Integer) == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2, + cast(AgentsGroup.deployment['method'], Integer) == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM)) outdated_count = 0 dangling_count = 0 all_count = 0 for agent in q.all(): all_count += 1 - ag = agent.agents_groups[0].agents_group + ag = dbutils.find_cloud_assignment_group(agent) - deleted = _destroy_and_delete_if_outdated(agent, ag) + if not ag: + log.error('missing ag in agent %s', agent) + continue + + if not ag.deployment: + log.error('missing deployment in ag %s', ag) + continue + + if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: + depl = ag.deployment['aws'] + elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM: + depl = ag.deployment['azure_vm'] + else: + log.error('unsupported deployment method %s', ag.deployment['method']) + continue + deleted = _destroy_and_delete_if_outdated(agent, depl) if deleted: outdated_count += 1 if agent.job: _cancel_job_and_unassign_agent(agent) continue - deleted = _delete_if_missing_in_aws(agent, ag) + deleted = _delete_if_missing_in_cloud(agent, depl, ag.deployment['method']) if deleted: dangling_count += 1 if agent.job: _cancel_job_and_unassign_agent(agent) continue - log.info('all agents:%d, destroyed and deleted %d aws ec2 instances and agents', - all_count, outdated_count) - log.info('deleted %d dangling agents without any aws ec2 instance', dangling_count) + if outdated_count > 0: + log.info('all agents:%d, destroyed and deleted %d VM instances and agents', + all_count, outdated_count) + if dangling_count > 0: + log.info('deleted %d dangling agents without any VM instance', dangling_count) return all_count, outdated_count, dangling_count def _check_machines_with_no_agent(): - # look for AWS EC2 machines that do not have agents in database + # look for AWS EC2 or Azure VM machines that do not have agents in database # and destroy such machines - access_key = get_setting('cloud', 'aws_access_key') - secret_access_key = get_setting('cloud', 'aws_secret_access_key') - q = AgentsGroup.query q = q.filter_by(deleted=None) q = q.filter(AgentsGroup.deployment.isnot(None)) all_groups = 0 - aws_groups = 0 + aws_ec2_groups = 0 + azure_vm_groups = 0 for ag in q.all(): all_groups += 1 - if not ag.deployment or ag.deployment['method'] != consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2 or 'aws' not in ag.deployment or not ag.deployment['aws']: - continue - - aws_groups += 1 - - aws = ag.deployment['aws'] - region = aws['region'] - ec2 = boto3.resource('ec2', region_name=region, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key) - now = utils.utcnow() - - try: - instances = ec2.instances.filter(Filters=[{'Name': 'tag:kraken-group', 'Values': ['%d' % ag.id]}]) - instances = list(instances) - except Exception: - log.exception('IGNORED EXCEPTION') + if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2: + depl = ag.deployment['aws'] + aws_ec2_groups += 1 + counts = cloud.aws_ec2_cleanup_dangling_vms(depl, ag) + elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM: + depl = ag.deployment['azure_vm'] + azure_vm_groups += 1 + counts = cloud.azure_vm_cleanup_dangling_vms(depl, ag) + else: continue - ec2_instances = 0 - ec2_terminated_instances = 0 - ec2_assigned_instances = 0 - ec2_orphaned_instances = 0 - ec2_orphaned_terminated_instances = 0 - - for i in instances: - ec2_instances += 1 - # if terminated then skip it - if i.state['Name'] == 'terminated': - ec2_terminated_instances += 1 - continue - - # if assigned to some agent then skip it - assigned = False - for aa in ag.agents: - agent = aa.agent - if agent.extra_attrs and 'instance_id' in agent.extra_attrs and agent.extra_attrs['instance_id'] == i.id: - assigned = True - break - if assigned: - ec2_assigned_instances += 1 - continue - - # instances have to be old enough to avoid race condition with - # case when instances are being created but not yet assigned to agents - lt = i.launch_time.replace(tzinfo=pytz.utc) - if now - lt < datetime.timedelta(minutes=10): - continue - - # the instance is not terminated, not assigned, old enough - # so delete it as it seems to be a lost instance - log.info('terminating lost aws ec2 instance %s', i.id) - ec2_orphaned_instances += 1 - try: - i.terminate() - except Exception: - log.exception('IGNORED EXCEPTION') - - ec2_orphaned_terminated_instances += 1 - - log.info('group:%d, aws ec2 instances:%d, already-terminated:%d, still-assigned:%d, orphaned:%d, terminated-orphaned:%d', - ag.id, - ec2_instances, - ec2_terminated_instances, - ec2_assigned_instances, - ec2_orphaned_instances, - ec2_orphaned_terminated_instances) - log.info('aws groups:%d / all:%d', aws_groups, all_groups) + instances = counts[0] + terminated_instances = counts[1] + assigned_instances = counts[2] + orphaned_instances = counts[3] + orphaned_terminated_instances = counts[4] + + if (instances + terminated_instances + assigned_instances + + orphaned_instances + orphaned_terminated_instances > 0): + log.info('group:%d, instances:%d, already-terminated:%d, still-assigned:%d, orphaned:%d, terminated-orphaned:%d', + ag.id, + instances, + terminated_instances, + assigned_instances, + orphaned_instances, + orphaned_terminated_instances) + if aws_ec2_groups + azure_vm_groups > 0: + log.info('aws groups:%d, azure vm groups:%d / all:%d', aws_ec2_groups, azure_vm_groups, all_groups) def _check_agents(): diff --git a/server/poetry.lock b/server/poetry.lock index 122b1c27..67a1077b 100644 --- a/server/poetry.lock +++ b/server/poetry.lock @@ -133,6 +133,19 @@ python-versions = "*" [package.dependencies] azure-core = ">=1.15.0,<2.0.0" +[[package]] +name = "azure-mgmt-monitor" +version = "2.0.0" +description = "Microsoft Azure Monitor Client Library for Python" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +azure-common = ">=1.1,<2.0" +azure-mgmt-core = ">=1.2.0,<2.0.0" +msrest = ">=0.5.0" + [[package]] name = "azure-mgmt-network" version = "19.0.0" @@ -1146,7 +1159,7 @@ testing = ["pytest (>=3.5,!=3.7.3)", "pytest-checkdocs (>=1.2.3)", "pytest-flake [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "554e35e6586743a26c844aafc1e93d397fd81d784be1abe1c15ef33a4f9669ac" +content-hash = "8339df5a1962f54c8d08d15312c49a419729e7052384015eddda38cf430a4538" [metadata.files] alembic = [ @@ -1189,6 +1202,10 @@ azure-mgmt-core = [ {file = "azure-mgmt-core-1.3.0.zip", hash = "sha256:3ffb7352b39e5495dccc2d2b47254f4d82747aff4735e8bf3267c335b0c9bb40"}, {file = "azure_mgmt_core-1.3.0-py2.py3-none-any.whl", hash = "sha256:7b7fa952aeb9d3eaa13eff905880f3d3b62200f7be7a8ba5a50c8b2e7295322a"}, ] +azure-mgmt-monitor = [ + {file = "azure-mgmt-monitor-2.0.0.zip", hash = "sha256:e7f7943fe8f0efe98b3b1996cdec47c709765257a6e09e7940f7838a0f829e82"}, + {file = "azure_mgmt_monitor-2.0.0-py2.py3-none-any.whl", hash = "sha256:af4917df2fe685e3daf25750f3586f11ccd2e7c2da68df392ca093fc3b7b8089"}, +] azure-mgmt-network = [ {file = "azure-mgmt-network-19.0.0.zip", hash = "sha256:5e39a26ae81fa58c13c02029700f8c7b22c3fd832a294c543e3156a91b9459e8"}, {file = "azure_mgmt_network-19.0.0-py2.py3-none-any.whl", hash = "sha256:4585c5eedcb8783c2840feef1d66843849f39835d7e3f2ec8a5af834fd7b949e"}, diff --git a/server/pyproject.toml b/server/pyproject.toml index 0f9c08fc..907b1055 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -35,6 +35,7 @@ azure-mgmt-network = "^19.0.0" azure-mgmt-storage = "^18.0.0" azure-identity = "^1.6.1" azure-mgmt-subscription = "^1.0.0" +azure-mgmt-monitor = "^2.0.0" [tool.poetry.dev-dependencies] pytest = "^5.2" diff --git a/ui/src/app/agents-page/agents-page.component.html b/ui/src/app/agents-page/agents-page.component.html index 886652ff..865ca5e0 100644 --- a/ui/src/app/agents-page/agents-page.component.html +++ b/ui/src/app/agents-page/agents-page.component.html @@ -46,7 +46,7 @@ Name IP Address - Disabled + Enabled Last Seen Groups System @@ -65,7 +65,7 @@ {{agent.ip_address}} - @@ -117,7 +117,7 @@

General

- + @@ -158,10 +158,28 @@

General

+ + + + + + + + + + @@ -217,5 +235,45 @@

Other

- Disabled + Enabled + + {{ !agentTab.agent.disabled }} +
+ Created - {{ agentTab.agent.disabled }} + {{ agentTab.agent.created | localtime }} +
+ Deleted + + {{ agentTab.agent.deleted | localtime }}
+
+

Jobs

+ + + + ID + Name + Started + Finished + Completed + Run ID + State + Covered + + + + + {{job.id}} + {{job.name}} + {{job.started | localtime}} + {{job.finished | localtime}} + {{job.completed | localtime}} + {{job.run_id}} + {{job.state}} + {{job.covered}} + + + +
+ diff --git a/ui/src/app/agents-page/agents-page.component.ts b/ui/src/app/agents-page/agents-page.component.ts index 205464c0..2649b185 100644 --- a/ui/src/app/agents-page/agents-page.component.ts +++ b/ui/src/app/agents-page/agents-page.component.ts @@ -6,6 +6,7 @@ import { MessageService, MenuItem } from 'primeng/api' import { AuthService } from '../auth.service' import { ManagementService } from '../backend/api/management.service' +import { ExecutionService } from '../backend/api/execution.service' import { BreadcrumbsService } from '../breadcrumbs.service' @Component({ @@ -35,12 +36,18 @@ export class AgentsPageComponent implements OnInit { agentGroups: any[] = [] + // agent jobs table + agentJobs: any[] = [] + totalAgentJobs = 0 + loadingAgentJobs = false + constructor( private route: ActivatedRoute, private router: Router, public auth: AuthService, private msgSrv: MessageService, protected managementService: ManagementService, + protected executionService: ExecutionService, protected breadcrumbService: BreadcrumbsService, private titleService: Title ) {} @@ -162,7 +169,6 @@ export class AgentsPageComponent implements OnInit { } loadAgentsLazy(event) { - console.info(event) this.loadingAgents = true this.managementService .getAgents(false, event.first, event.rows) @@ -290,6 +296,17 @@ export class AgentsPageComponent implements OnInit { } changeAgentDisable(ev, ag) { - this.updateAgent(ag.id, { disabled: ag.disabled }) + this.updateAgent(ag.id, { disabled: !ag.disabled }) + } + + loadAgentJobsLazy(event) { + this.loadingAgentJobs = true + this.executionService + .getAgentJobs(this.agentTab.agent.id, event.first, event.rows) + .subscribe((data) => { + this.agentJobs = data.items + this.totalAgentJobs = data.total + this.loadingAgentJobs = false + }) } } diff --git a/ui/src/app/grp-cloud-cfg/grp-cloud-cfg.component.html b/ui/src/app/grp-cloud-cfg/grp-cloud-cfg.component.html index ae48d1a5..7fac8c51 100644 --- a/ui/src/app/grp-cloud-cfg/grp-cloud-cfg.component.html +++ b/ui/src/app/grp-cloud-cfg/grp-cloud-cfg.component.html @@ -61,7 +61,7 @@

VM Options

General

VM Options

- +
- diff --git a/ui/src/app/grp-cloud-cfg/grp-cloud-cfg.component.ts b/ui/src/app/grp-cloud-cfg/grp-cloud-cfg.component.ts index 583a3b1a..b88ee195 100644 --- a/ui/src/app/grp-cloud-cfg/grp-cloud-cfg.component.ts +++ b/ui/src/app/grp-cloud-cfg/grp-cloud-cfg.component.ts @@ -19,11 +19,11 @@ export class GrpCloudCfgComponent implements OnInit { // aws awsRegions: any[] - instanceTypes: any[] + awsInstanceTypes: any[] // azure azureLocations: any[] - azureInstanceTypes: any[] + azureVmSizes: any[] constructor( protected managementService: ManagementService) @@ -78,7 +78,7 @@ export class GrpCloudCfgComponent implements OnInit { this.managementService .getAwsEc2InstanceTypes(region) .subscribe((data) => { - this.instanceTypes = data.items + this.awsInstanceTypes = data.items }) } @@ -87,19 +87,19 @@ export class GrpCloudCfgComponent implements OnInit { this.managementService.getAzureLocations().subscribe((data) => { this.azureLocations = data.items - //if (this.deployment.aws.region) { - // this.regionChange() - //} + if (this.deployment.azure_vm.location) { + this.azureLocationChange() + } }) } } azureLocationChange() { - const location = this.deployment.azure.location - // this.managementService - // .getAwsEc2InstanceTypes(region) - // .subscribe((data) => { - // this.instanceTypes = data.items - // }) + const location = this.deployment.azure_vm.location + this.managementService + .getAzureVmSizes(location) + .subscribe((data) => { + this.azureVmSizes = data.items + }) } } diff --git a/ui/src/app/run-results/run-results.component.ts b/ui/src/app/run-results/run-results.component.ts index 3d144010..43939a5a 100644 --- a/ui/src/app/run-results/run-results.component.ts +++ b/ui/src/app/run-results/run-results.component.ts @@ -122,6 +122,7 @@ export class RunResultsComponent implements OnInit, OnDestroy { ngOnInit() { this.route.paramMap.subscribe((params) => { const runId = parseInt(params.get('id'), 10) + this.run.id = runId const tab = params.get('tab') if (tab === '') {