Skip to content

Commit

Permalink
Merge pull request #263 from canonical/agent_status_endpoint
Browse files Browse the repository at this point in the history
Add job status endpoint to server
  • Loading branch information
val500 authored Jul 26, 2024
2 parents eaee22c + fd6c29d commit c0f59b1
Show file tree
Hide file tree
Showing 18 changed files with 552 additions and 58 deletions.
56 changes: 43 additions & 13 deletions agent/testflinger_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from testflinger_agent.job import TestflingerJob
from testflinger_agent.errors import TFServerError
from testflinger_agent.config import ATTACHMENTS_DIR
from testflinger_common.enums import JobState, TestPhase
from testflinger_agent.event_emitter import EventEmitter
from testflinger_common.enums import JobState, TestPhase, TestEvent


try:
# attempt importing a tarfile filter, to check if filtering is supported
Expand Down Expand Up @@ -189,6 +191,7 @@ def unpack_attachments(self, job_data: dict, cwd: Path):

def process_jobs(self):
"""Coordinate checking for new jobs and handling them if they exists"""

TEST_PHASES = [
TestPhase.SETUP,
TestPhase.PROVISION,
Expand All @@ -202,12 +205,23 @@ def process_jobs(self):
self.retry_old_results()

self.check_restart()

job_data = self.client.check_jobs()
while job_data:
try:
job = TestflingerJob(job_data, self.client)
event_emitter = EventEmitter(
job_data.get("job_queue"),
job_data.get("job_status_webhook"),
self.client,
job.job_id,
)
job_end_reason = TestEvent.NORMAL_EXIT

logger.info("Starting job %s", job.job_id)
event_emitter.emit_event(
TestEvent.JOB_START,
f"{self.client.server}/jobs/{job.job_id}",
)
rundir = os.path.join(
self.client.config.get("execution_basedir"), job.job_id
)
Expand Down Expand Up @@ -243,32 +257,48 @@ def process_jobs(self):
== JobState.CANCELLED
):
logger.info("Job cancellation was requested, exiting.")
event_emitter.emit_event(TestEvent.CANCELLED)
break

self.client.post_job_state(job.job_id, phase)
self.set_agent_state(phase)
exit_code = job.run_test_phase(phase, rundir)

event_emitter.emit_event(TestEvent(phase + "_start"))
exit_code, exit_event, exit_reason = job.run_test_phase(
phase, rundir
)
self.client.post_influx(phase, exit_code)
event_emitter.emit_event(exit_event, exit_reason)

if phase == "provision":
if exit_code:
# exit code 46 is our indication that recovery failed!
# In this case, we need to mark the device offline
if exit_code == 46:
self.mark_device_offline()
# Replace with TestEvent enum values once it lands
detail = "recovery_fail"
detail = "provision_fail" if exit_code else ""
self.client.post_provision_log(
job.job_id, exit_code, detail
)
if phase != "test" and exit_code:
logger.debug("Phase %s failed, aborting job" % phase)
break
exit_event = TestEvent.RECOVERY_FAIL
else:
exit_event = TestEvent(phase + "_fail")
event_emitter.emit_event(exit_event)
if phase == "provision":
self.client.post_provision_log(
job.job_id, exit_code, exit_event
)
if phase != "test":
logger.debug(
"Phase %s failed, aborting job" % phase
)
job_end_reason = exit_event
break
else:
event_emitter.emit_event(TestEvent(phase + "_success"))
except Exception as e:
logger.exception(e)
finally:
# Always run the cleanup, even if the job was cancelled
event_emitter.emit_event(TestEvent.CLEANUP_START)
job.run_test_phase(TestPhase.CLEANUP, rundir)
event_emitter.emit_event(TestEvent.CLEANUP_SUCCESS)
event_emitter.emit_event(TestEvent.JOB_END, job_end_reason)
# clear job id
self.client.post_agent_data({"job_id": ""})

Expand Down
45 changes: 45 additions & 0 deletions agent/testflinger_agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tempfile
import time

from typing import List, Dict
from urllib.parse import urljoin
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
Expand Down Expand Up @@ -387,3 +388,47 @@ def post_provision_log(self, job_id: str, exit_code: int, detail: str):
self.session.post(agent_data_url, json=data, timeout=30)
except RequestException as exc:
logger.warning("Unable to post provision log to server: %s", exc)

def post_status_update(
self,
job_queue: str,
webhook: str,
events: List[Dict[str, str]],
job_id: str,
):
"""
Posts status updates about the running job as long as there is a
webhook
:param job_queue:
TestFlinger queue the currently running job belongs to
:param webhook:
String URL to post status update to
:param events:
List of accumulated test events
:param job_id:
id for the job on which we want to post results
"""
if webhook is None:
return

status_update_request = {
"agent_id": self.config.get("agent_id"),
"job_queue": job_queue,
"job_status_webhook": webhook,
"events": events,
}
status_update_uri = urljoin(self.server, f"/v1/job/{job_id}/events")
try:
job_request = self.session.post(
status_update_uri, json=status_update_request, timeout=30
)
except RequestException as exc:
logger.error("Server Error: %s" % exc)
job_request = None
if not job_request:
logger.error(
"Unable to post status updates to: %s (error: %s)"
% (status_update_uri, job_request.status_code)
)
57 changes: 57 additions & 0 deletions agent/testflinger_agent/event_emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (C) 2024 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>


from datetime import timezone, datetime

from testflinger_agent.client import TestflingerClient
from testflinger_common.enums import TestEvent


class EventEmitter:
def __init__(
self,
job_queue: str,
webhook: str,
client: TestflingerClient,
job_id: str,
):
"""
:param job_queue:
String representing job_queue the running job belongs to
:param webhook:
String url to send status updates to
:param client:
TestflingerClient used to post status updates to the server
:param job_id:
id for the job on which we want to post updates
"""
self.job_queue = job_queue
self.webhook = webhook
self.events = []
self.client = client
self.job_id = job_id

def emit_event(self, test_event: TestEvent, detail: str = ""):
if test_event is not None:
new_event_json = {
"event_name": test_event,
"timestamp": datetime.now(timezone.utc).isoformat(),
"detail": detail,
}
self.events.append(new_event_json)
self.client.post_status_update(
self.job_queue, self.webhook, self.events, self.job_id
)
18 changes: 10 additions & 8 deletions agent/testflinger_agent/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,24 @@ def run_test_phase(self, phase, rundir):
node = self.client.config.get("agent_id")
if not cmd:
logger.info("No %s_command configured, skipping...", phase)
return 0
return 0, None, None
if phase == "provision" and not self.job_data.get("provision_data"):
logger.info("No provision_data defined in job data, skipping...")
return 0
return 0, None, None
if phase == "firmware_update" and not self.job_data.get(
"firmware_update_data"
):
logger.info(
"No firmware_update_data defined in job data, skipping..."
)
return 0
return 0, None, None
if phase == "test" and not self.job_data.get("test_data"):
logger.info("No test_data defined in job data, skipping...")
return 0
return 0, None, None
if phase == "allocate" and not self.job_data.get("allocate_data"):
return 0
return 0, None, None
if phase == "reserve" and not self.job_data.get("reserve_data"):
return 0
return 0, None, None
results_file = os.path.join(rundir, "testflinger-outcome.json")
output_log = os.path.join(rundir, phase + ".log")
serial_log = os.path.join(rundir, phase + "-serial.log")
Expand Down Expand Up @@ -117,7 +117,9 @@ def run_test_phase(self, phase, rundir):
):
runner.run(f"echo '{line}'")
try:
exitcode, exit_reason = runner.run(cmd)
# Set exit_event to fail for this phase in case of an exception
exit_event = f"{phase}_fail"
exitcode, exit_event, exit_reason = runner.run(cmd)
except Exception as exc:
logger.exception(exc)
exitcode = 100
Expand All @@ -128,7 +130,7 @@ def run_test_phase(self, phase, rundir):
)
if phase == "allocate":
self.allocate_phase(rundir)
return exitcode
return exitcode, exit_event, exit_reason

def _update_phase_results(
self, results_file, phase, exitcode, output_log, serial_log
Expand Down
20 changes: 12 additions & 8 deletions agent/testflinger_agent/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from enum import Enum
from typing import Callable, List, Optional, Tuple

from testflinger_common.enums import TestEvent

logger = logging.getLogger(__name__)

OutputHandlerType = Callable[[str], None]
Expand Down Expand Up @@ -78,16 +80,16 @@ def post_output(self, data: str):
def register_stop_condition_checker(self, checker: StopConditionType):
self.stop_condition_checkers.append(checker)

def check_stop_conditions(self) -> str:
def check_stop_conditions(self) -> Tuple[Optional[TestEvent], str]:
"""
Check stop conditions and return the reason if any are met. Otherwise,
return an empty string if none are met
"""
for checker in self.stop_condition_checkers:
output = checker()
if output:
return output
return ""
event, detail = checker()
if event is not None:
return event, detail
return None, ""

def check_and_post_output(self):
raw_output = self.process.stdout.read()
Expand Down Expand Up @@ -117,9 +119,10 @@ def cleanup(self):
if self.process is not None:
self.process.kill()

def run(self, cmd: str) -> Tuple[int, str]:
def run(self, cmd: str) -> Tuple[int, Optional[TestEvent], str]:
# Ensure that the process is None before starting
self.process = None
stop_event = None
stop_reason = ""

signal.signal(signal.SIGTERM, lambda signum, frame: self.cleanup())
Expand All @@ -136,7 +139,8 @@ def run(self, cmd: str) -> Tuple[int, str]:
while self.process.poll() is None:
time.sleep(10)

if stop_reason := self.check_stop_conditions():
stop_event, stop_reason = self.check_stop_conditions()
if stop_event is not None:
self.post_output(f"\n{stop_reason}\n")
self.cleanup()
break
Expand All @@ -150,7 +154,7 @@ def run(self, cmd: str) -> Tuple[int, str]:
if stop_reason == "":
stop_reason = get_stop_reason(self.process.returncode, "")

return self.process.returncode, stop_reason
return self.process.returncode, stop_event, stop_reason


def get_stop_reason(returncode: int, stop_reason: str) -> str:
Expand Down
33 changes: 21 additions & 12 deletions agent/testflinger_agent/stop_condition_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>

import time
from typing import Optional

from typing import Optional, Tuple
from testflinger_common.enums import JobState, TestEvent
from .client import TestflingerClient


Expand All @@ -23,32 +23,41 @@ def __init__(self, client: TestflingerClient, job_id: str):
self.client = client
self.job_id = job_id

def __call__(self) -> Optional[str]:
if self.client.check_job_state(self.job_id) == "cancelled":
return "Job cancellation was requested, exiting."
return None
def __call__(self) -> Tuple[Optional[TestEvent], str]:
if self.client.check_job_state(self.job_id) == JobState.CANCELLED:
return (
TestEvent.CANCELLED,
"Job cancellation was requested, exiting.",
)
return None, ""


class GlobalTimeoutChecker:
def __init__(self, timeout: int):
self.timeout = timeout
self.start_time = time.time()

def __call__(self) -> Optional[str]:
def __call__(self) -> Tuple[Optional[TestEvent], str]:
if time.time() - self.start_time > self.timeout:
return f"ERROR: Global timeout reached! ({self.timeout}s)"
return None
return (
TestEvent.GLOBAL_TIMEOUT,
f"ERROR: Global timeout reached! ({self.timeout}s)",
)
return None, ""


class OutputTimeoutChecker:
def __init__(self, timeout: int):
self.timeout = timeout
self.last_output_time = time.time()

def __call__(self) -> Optional[str]:
def __call__(self) -> Tuple[Optional[TestEvent], str]:
if time.time() - self.last_output_time > self.timeout:
return f"ERROR: Output timeout reached! ({self.timeout}s)"
return None
return (
TestEvent.OUTPUT_TIMEOUT,
f"ERROR: Output timeout reached! ({self.timeout}s)",
)
return None, ""

def update(self):
"""Update the last output time to the current time."""
Expand Down
Loading

0 comments on commit c0f59b1

Please sign in to comment.