Skip to content

Commit

Permalink
finalized Azure support, added presenting jobs executed on given machine
Browse files Browse the repository at this point in the history
  • Loading branch information
godfryd committed Oct 14, 2021
1 parent ce50cf5 commit 04c714a
Show file tree
Hide file tree
Showing 17 changed files with 794 additions and 322 deletions.
49 changes: 37 additions & 12 deletions server/kraken/server/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .. import version
from . import kkrq
from . import utils
from . import dbutils

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -371,30 +372,47 @@ def _handle_step_result(agent, req):
job.finished = utils.utcnow()
agent.job = None

# check if aws machine should be destroyed now
aws = None
for aa in agent.agents_groups:
ag = aa.agents_group
if not ag.deployment:
continue

# check if cloud machine should be destroyed now
ag = dbutils.find_cloud_assignment_group(agent)
if ag:
# aws ec2
if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2:
depl = ag.deployment['aws']

if depl and 'destruction_after_jobs' in depl and int(depl['destruction_after_jobs']) > 0:
max_jobs = int(depl['destruction_after_jobs'])
jobs_num = Job.query.filter_by(agent_used=agent).count()
q = Job.query.filter_by(agent_used=agent)
q = q.filter(Job.finished.isnot(None))
q = q.filter(Job.finished > agent.created)
jobs_num = q.count()
log.info('JOB %s, num %d, max %d', job.id, jobs_num, max_jobs)
if jobs_num >= max_jobs:
agent.disabled = True
kkrq.enq(bg_jobs.destroy_machine, agent.id)

# aws ecs fargate
elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE:
depl = ag.deployment['aws_ecs_fargate']
log.info('ECS FARGATE JOB %s - destroying task', job.id)
agent.disabled = True
kkrq.enq(bg_jobs.destroy_machine, agent.id)

# azure vm
elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM:
depl = ag.deployment['azure_vm']

if depl and 'destruction_after_jobs' in depl and int(depl['destruction_after_jobs']) > 0:
max_jobs = int(depl['destruction_after_jobs'])
q = Job.query.filter_by(agent_used=agent)
q = q.filter(Job.finished.isnot(None))
q = q.filter(Job.finished > agent.created)
jobs_num = q.count()
log.info('JOB %s, num %d, max %d', job.id, jobs_num, max_jobs)
if jobs_num >= max_jobs:
agent.disabled = True
kkrq.enq(bg_jobs.destroy_machine, agent.id)
log.info('*' * 300)

db.session.commit()
kkrq.enq(bg_jobs.job_completed, job.id)
log.info('job %s finished by %s', job, agent)
Expand Down Expand Up @@ -518,14 +536,22 @@ def _handle_keep_alive(agent, req): # pylint: disable=unused-argument

def _handle_unknown_agent(address, ip_address, agent):
try:
new = False
if agent:
agent.deleted = None
now = utils.utcnow()
if agent.deleted:
log.info('undeleting agent %s with address %s', agent, address)
agent.deleted = None
agent.created = now
agent.authorized = False
agent.ip_address = ip_address
agent.last_seen = utils.utcnow()
agent.last_seen = now
else:
Agent(name=address, address=address, authorized=False, ip_address=ip_address, last_seen=utils.utcnow())
agent = Agent(name=address, address=address, authorized=False, ip_address=ip_address, last_seen=utils.utcnow())
new = True
db.session.commit()
if new:
log.info('created new agent instance %s for address %s', agent, address)
except Exception as e:
log.warning('IGNORED EXCEPTION: %s', str(e))

Expand All @@ -550,7 +576,6 @@ def serve_agent_request():
return json.dumps({})

agent.last_seen = utils.utcnow()
agent.deleted = None
db.session.commit()

if not agent.authorized:
Expand Down
51 changes: 23 additions & 28 deletions server/kraken/server/bg/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from .. import kkrq
from .. import cloud
from .. import utils
from .. import dbutils

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -928,28 +929,27 @@ def spawn_new_agents(agents_group_id):
server_url = get_setting('general', 'server_url')
minio_addr = get_setting('general', 'minio_addr')
clickhouse_addr = get_setting('general', 'clickhouse_addr')
access_key = get_setting('cloud', 'aws_access_key')
secret_access_key = get_setting('cloud', 'aws_secret_access_key')
settings = ['server_url', 'minio_addr', 'clickhouse_addr', 'access_key', 'secret_access_key']
settings = ['server_url', 'minio_addr', 'clickhouse_addr']
for s in settings:
val = locals()[s]
if not val:
log.error('%s is empty, please set it in global general or cloud settings', s)
log.error('%s is empty, please set it in global general settings', s)
return

ag = AgentsGroup.query.filter_by(id=agents_group_id).one_or_none()
if ag is None:
log.warning('cannot find agents group id: %d', agents_group_id)
return

# only AWS supported
depl = None
if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2:
depl = ag.deployment['aws']
elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE:
depl = ag.deployment['aws_ecs_fargate']
elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM:
depl = ag.deployment['azure_vm']
else:
log.warning('deployment method %d in agents group id:%d not implemented', ag.deployment['method'], agents_group_id)
log.error('deployment method %d in agents group id:%d not implemented', ag.deployment['method'], agents_group_id)
return

# check if limit of agents is reached
Expand Down Expand Up @@ -1000,8 +1000,7 @@ def spawn_new_agents(agents_group_id):
log.info('enough agents, avail: %d, needed: %d', agents_count, needed_count)
continue

cloud.create_machines(depl, access_key, secret_access_key,
ag, system, num,
cloud.create_machines(depl, ag, system, num,
server_url, minio_addr, clickhouse_addr)


Expand All @@ -1014,33 +1013,29 @@ def destroy_machine(agent_id):
log.error('cannot find agent id:%d', agent_id)
return

agent.deleted = utils.utcnow()
agent.disabled = True
db.session.commit()

if not agent.extra_attrs:
log.warning('missing extra_attrs in agent %s', agent)
dbutils.delete_agent(agent)
return

# find agents group to get region for aws login
ag = dbutils.find_cloud_assignment_group(agent)
if not ag:
log.error('agent %s does not have cloud group', agent)
dbutils.delete_agent(agent)
return

dbutils.delete_agent(agent)

depl = None
for aa in agent.agents_groups:
ag = aa.agents_group
if not ag.deployment:
continue
if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2:
depl = ag.deployment['aws']
break
if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE:
depl = ag.deployment['aws_ecs_fargate']
break
if ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_EC2:
depl = ag.deployment['aws']
elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AWS_ECS_FARGATE:
depl = ag.deployment['aws_ecs_fargate']
elif ag.deployment['method'] == consts.AGENT_DEPLOYMENT_METHOD_AZURE_VM:
depl = ag.deployment['azure_vm']

if not depl:
log.error('cannot find deployment info for agent %s', agent_id)
return

access_key = get_setting('cloud', 'aws_access_key')
secret_access_key = get_setting('cloud', 'aws_secret_access_key')

cloud.destroy_machine(access_key, secret_access_key,
ag.deployment['method'], depl, agent)
cloud.destroy_machine(ag.deployment['method'], depl, agent, ag)
Loading

0 comments on commit 04c714a

Please sign in to comment.