Skip to content

Commit

Permalink
fixed running many tests, added paging results
Browse files Browse the repository at this point in the history
  • Loading branch information
godfryd committed Oct 14, 2019
1 parent 60f0dee commit 8b032a8
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 81 deletions.
7 changes: 3 additions & 4 deletions kraken/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
import server
import jobber

FILE_LOG_FMT = '%(asctime)s %(levelname)-4.4s p:%(process)5d %(module)8.8s:%(lineno)-5d %(message)s'
CONSOLE_LOG_FMT = '%(asctime)s p:%(process)d %(name)6.6s: %(levelname)-5.5s %(message)s'
LOG_FMT = '%(asctime)s %(levelname)-4.4s p:%(process)5d %(module)8.8s:%(lineno)-5d %(message)s'


log = logging.getLogger('agent')
Expand Down Expand Up @@ -39,7 +38,7 @@ def dispatch_job(srv, job):


def main():
logging.basicConfig(format=CONSOLE_LOG_FMT, level=logging.INFO)
logging.basicConfig(format=LOG_FMT, level=logging.INFO)

args = parse_args()
cfg = vars(args)
Expand All @@ -57,7 +56,7 @@ def main():
while True:
try:
job = srv.get_job()
log.info('received job: %s', job)
log.info('received job: %s', str(job)[:200])

if job:
dispatch_job(srv, job)
Expand Down
26 changes: 17 additions & 9 deletions kraken/agent/jobber.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,12 @@ def __init__(self, kk_srv, command, job_id, idx):


async def _async_pump_output(proc_coord, stream):
while True:
line = await stream.readline()
while True:
try:
line = await stream.readline()
except ValueError:
log.exception('IGNORED')
continue
if line:
line = line.decode().rstrip()
log.info(line)
Expand Down Expand Up @@ -150,13 +154,14 @@ async def _async_subprocess(proc_coord, cmd, cwd, timeout):
proc = await asyncio.create_subprocess_shell(
cmd,
cwd=cwd,
limit=1024 * 128, # 128 KiB
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT)

await _async_pump_output(proc_coord, proc.stdout)

done, pending = await asyncio.wait([proc.wait(), _async_monitor_proc(proc_coord, proc, timeout)],
timeout=timeout * 1.1)
timeout=timeout * 1.1)
#log.info('done %s', done)
#log.info('pending %s', pending)
proc_coord.end_time = datetime.datetime.now()
Expand All @@ -172,8 +177,8 @@ def __init__(self, proc_coord):

async def _async_handle_request(self, reader, writer):
addr = writer.get_extra_info('peername')
#data = await reader.read(8192)
while True:
#data = await reader.read(8192)
data = await reader.readline()
if not data:
break
Expand All @@ -188,11 +193,13 @@ async def _async_handle_request(self, reader, writer):
self.proc_coord.result = data

if self.proc_coord.command == 'run_tests':
# report partial results
srv = self.proc_coord.kk_srv
# TODO: check if result is not send twice: here and at the end of process
srv.report_step_result(self.proc_coord.job_id,
self.proc_coord.idx,
self.proc_coord.result)
self.proc_coord.result = {'status': 'in-progress'}


async def _async_tcp_server(proc_coord, server):
Expand All @@ -202,7 +209,7 @@ async def _async_tcp_server(proc_coord, server):

async def _async_exec_tool(proc_coord, tool_path, command, cwd, timeout, step_file_path):
handler = RequestHandler(proc_coord)
server = await asyncio.start_server(handler._async_handle_request, '0.0.0.0', 0)
server = await asyncio.start_server(handler._async_handle_request, '0.0.0.0', 0, limit=1024 * 1280)
addr = server.sockets[0].getsockname()
return_addr = "%s:%s" % addr

Expand Down Expand Up @@ -238,7 +245,7 @@ def _write_step_file(job_dir, step, idx):

def _run_step(srv, job_dir, job_id, idx, step, tools):
tool_name = step['tool']
log.info('step %s', step)
log.info('step %s', str(step)[:200])
if tool_name not in tools:
raise Exception('No such Kraken tool: %s' % tool_name)
tool_path = tools[tool_name]
Expand All @@ -257,7 +264,7 @@ def _run_step(srv, job_dir, job_id, idx, step, tools):
if 'collect_tests' in available_commands and ('tests' not in step or step['tests'] is None or len(step['tests']) == 0):
# collect tests from tool to execute
result = _exec_tool(srv, tool_path, 'collect_tests', job_dir, 10, step_file_path, job_id, idx)
log.info('result for collect_tests: %s', result)
log.info('result for collect_tests: %s', str(result)[:200])

# check result
if not isinstance(result, dict):
Expand Down Expand Up @@ -285,13 +292,14 @@ def _run_step(srv, job_dir, job_id, idx, step, tools):

if 'run_tests' in available_commands:
result = _exec_tool(srv, tool_path, 'run_tests', job_dir, 60, step_file_path, job_id, idx)
log.info('result for run_tests: %s', result)
log.info('result for run_tests: %s', str(result)[:200])
srv.report_step_result(job_id, idx, result)

if 'run' in available_commands:
result = _exec_tool(srv, tool_path, 'run', job_dir, 60, step_file_path, job_id, idx)
log.info('result for run: %s', result)
srv.report_step_result(job_id, idx, result)

srv.report_step_result(job_id, idx, result)
return result


Expand Down
8 changes: 4 additions & 4 deletions kraken/agent/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
import argparse
import traceback

FILE_LOG_FMT = '%(asctime)s %(levelname)-4.4s p:%(process)5d %(module)8.8s:%(lineno)-5d %(message)s'
CONSOLE_LOG_FMT = '%(asctime)s p:%(process)d %(name)6.6s: %(levelname)-5.5s %(message)s'
LOG_FMT = '%(asctime)s %(levelname)-4.4s p:%(process)5d %(module)8.8s:%(lineno)-5d %(message)s'

log = logging.getLogger(__name__)

Expand All @@ -25,6 +24,7 @@ def report_result(self, result):
self.results.append(result)

now = datetime.datetime.now()
# report results after 100 results or after 20 seconds
if len(self.results) > 100 or (now - self.last_reported > datetime.timedelta(seconds=20)):
self.flush()

Expand All @@ -37,7 +37,7 @@ def flush(self):

def execute(sock, command, step_file_path):
try:
logging.basicConfig(format=CONSOLE_LOG_FMT, level=logging.INFO)
logging.basicConfig(format=LOG_FMT, level=logging.INFO)
log.info('started tool for step')

with open(step_file_path) as f:
Expand Down Expand Up @@ -92,8 +92,8 @@ def __init__(self, address):
self.connect((address[0], int(address[1])))

def send_json(self, data):
log.info('tool response: %s', data)
data = json.dumps(data) + '\n'
log.info('tool response: %s', data[:200])
self.sendall(bytes(data, "utf-8"))


Expand Down
11 changes: 7 additions & 4 deletions kraken/server/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _handle_get_job(executor, req):
tests.append(tcr.test_case.name)
if tests:
job['steps'][-1]['tests'] = tests
log.info('sending job: %s', job)
log.info('sending job: %s', str(job)[:200])
return {'job': job}


Expand Down Expand Up @@ -88,7 +88,7 @@ def _handle_step_result(executor, req):
job_finished = True
log.info('checking steps')
for s in job.steps:
log.info('%s: %s', s.index, s.status)
log.info('%s: %s', s.index, consts.STEP_STATUS_NAME[s.status] if s.status in consts.STEP_STATUS_NAME else s.status)
if s.status == consts.STEP_STATUS_DONE:
continue
elif s.status == consts.STEP_STATUS_ERROR:
Expand Down Expand Up @@ -116,7 +116,7 @@ def _create_test_records(step, tests):

for t in tests:
tc = tool_test_cases.get(t, None)
if tc == None:
if tc is None:
tc = TestCase(name=t, tool=step.tool)
tcr = TestCaseResult(test_case=tc, job=step.job)
db.session.commit()
Expand Down Expand Up @@ -144,6 +144,9 @@ def _handle_dispatch_tests(executor, req):
return {}

tests_cnt = len(tests)
if len(set(tests)) != tests_cnt:
log.warn('there are tests duplicates')
return {}
if tests_cnt == 0:
# TODO
raise NotImplementedError
Expand Down Expand Up @@ -173,7 +176,7 @@ def serve_agent_request():
req = request.get_json()
# log.info('request headers: %s', request.headers)
# log.info('request args: %s', request.args)
log.info('request data: %s', req)
log.info('request data: %s', str(req)[:200])

msg = req['msg']
address = req['address']
Expand Down
90 changes: 50 additions & 40 deletions kraken/server/bg/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,44 +44,54 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
log.info('PROBLEMS')


@app.task(base=BaseTask)
def job_completed(job_id):
app = create_app()

with app.app_context():

now = datetime.datetime.utcnow()

job = Job.query.filter_by(id=job_id).one_or_none()
job.completed = now
job.state = consts.JOB_STATE_COMPLETED
job.completion_status = consts.JOB_CMPLT_ALL_OK
db.session.commit()

# establish new run state
run = job.run
new_state = consts.RUN_STATE_COMPLETED
for j in run.jobs:
if j.state != consts.JOB_STATE_COMPLETED:
new_state = consts.RUN_STATE_IN_PROGRESS
break

if run.state == new_state:
return

run.state = new_state
run.finished = now
db.session.commit()

# establish new flow state
flow = run.flow
new_state = consts.FLOW_STATE_COMPLETED
for r in flow.runs:
if r.state != consts.RUN_STATE_COMPLETED:
new_state = consts.FLOW_STATE_IN_PROGRESS
break

if flow.state != new_state:
flow.state = new_state
flow.finished = now
@app.task(base=BaseTask, bind=True)
def job_completed(self, job_id):
try:
app = create_app()

with app.app_context():

now = datetime.datetime.utcnow()

log.info('completing job %s', job_id)
job = Job.query.filter_by(id=job_id).one_or_none()
job.completed = now
job.state = consts.JOB_STATE_COMPLETED
job.completion_status = consts.JOB_CMPLT_ALL_OK
log.info('checking steps')
for step in job.steps:
log.info('%s: %s', step.index, consts.STEP_STATUS_NAME[step.status] if step.status in consts.STEP_STATUS_NAME else step.status)
if step.status == consts.STEP_STATUS_ERROR:
job.completion_status = consts.JOB_CMPLT_AGENT_ERROR_RETURNED
break
db.session.commit()

# establish new run state
run = job.run
new_state = consts.RUN_STATE_COMPLETED
for j in run.jobs:
if j.state != consts.JOB_STATE_COMPLETED:
new_state = consts.RUN_STATE_IN_PROGRESS
break

if run.state == new_state:
return

run.state = new_state
run.finished = now
db.session.commit()

# establish new flow state
flow = run.flow
new_state = consts.FLOW_STATE_COMPLETED
for r in flow.runs:
if r.state != consts.RUN_STATE_COMPLETED:
new_state = consts.FLOW_STATE_IN_PROGRESS
break

if flow.state != new_state:
flow.state = new_state
flow.finished = now
db.session.commit()
except Exception as exc:
raise self.retry(exc=exc)
10 changes: 7 additions & 3 deletions kraken/server/consts.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
FILE_LOG_FMT = '%(asctime)s %(levelname)-4.4s p:%(process)5d %(module)8.8s:%(lineno)-5d %(message)s'
CONSOLE_LOG_FMT = '%(asctime)s p:%(process)d %(name)6.6s: %(levelname)-5.5s %(message)s'

LOG_FMT = '%(asctime)s %(levelname)-4.4s p:%(process)5d %(module)8.8s:%(lineno)-5d %(message)s'

JOB_STATE_PREQUEUED = 1
JOB_STATE_QUEUED = 2
Expand Down Expand Up @@ -35,6 +33,12 @@
'done': STEP_STATUS_DONE,
'error': STEP_STATUS_ERROR
}
STEP_STATUS_NAME = {
STEP_STATUS_NOT_STARTED: 'not-started',
STEP_STATUS_IN_PROGRES: 'in-progress',
STEP_STATUS_DONE: 'done',
STEP_STATUS_ERROR: 'error'
}

TC_RESULT_NOT_RUN = 0
TC_RESULT_PASSED = 1
Expand Down
6 changes: 4 additions & 2 deletions kraken/server/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,14 @@ def get_runs(stage_id):
return runs, 200


def get_run_results(run_id):
def get_run_results(run_id, start=0, limit=10):
q = TestCaseResult.query.join('job').filter(Job.run_id == run_id)
total = q.count()
q = q.offset(start).limit(limit)
results = []
for r in q.all():
results.append(r.get_json())
return {'items': results, 'total': len(results)}, 200
return {'items': results, 'total': total}, 200


def get_run(run_id):
Expand Down
Loading

0 comments on commit 8b032a8

Please sign in to comment.