Skip to content

Commit

Permalink
Add agent posting of status updates to the server
Browse files Browse the repository at this point in the history
  • Loading branch information
val500 committed May 13, 2024
1 parent 3ce6c84 commit 71cd870
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 3 deletions.
53 changes: 50 additions & 3 deletions agent/testflinger_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import shutil
import tempfile

from datetime import timezone, datetime

from testflinger_agent.job import TestflingerJob
from testflinger_agent.errors import TFServerError
from testflinger_agent.config import ATTACHMENTS_DIR
from testflinger_agent.enums import JobState

try:
# attempt importing a tarfile filter, to check if filtering is supported
Expand Down Expand Up @@ -184,6 +187,7 @@ def unpack_attachments(self, job_data: dict, cwd: Path):

def process_jobs(self):
"""Coordinate checking for new jobs and handling them if they exists"""

TEST_PHASES = [
"setup",
"provision",
Expand All @@ -197,11 +201,19 @@ def process_jobs(self):
self.retry_old_results()

self.check_restart()

webhook = None
job_queue = ""
job_data = self.client.check_jobs()
while job_data:

try:
job = TestflingerJob(job_data, self.client)

# List of phases to send to status endpoint on the server
completed_phases = []
webhook = job_data.get("job_status_webhook")
job_queue = job_data.get("job_queue")

logger.info("Starting job %s", job.job_id)
rundir = os.path.join(
self.client.config.get("execution_basedir"), job.job_id
Expand Down Expand Up @@ -233,27 +245,62 @@ def process_jobs(self):

for phase in TEST_PHASES:
# First make sure the job hasn't been cancelled
if self.client.check_job_state(job.job_id) == "cancelled":
if (
self.client.check_job_state(job.job_id)
== JobState.CANCELLED
):
logger.info("Job cancellation was requested, exiting.")
self.client.post_status_update(
JobState.CANCELLED,
webhook,
completed_phases,
job_queue,
)
break
self.client.post_job_state(job.job_id, phase)
self.set_agent_state(phase)
self.client.post_status_update(
phase, webhook, completed_phases, job_queue
)
cur_phase_data = {
"phase_name": phase,
"start": datetime.now(timezone.utc).isoformat(),
}
exitcode = job.run_test_phase(phase, rundir)
cur_phase_data["end"] = datetime.now(
timezone.utc
).isoformat()
cur_phase_data["result"] = str(exitcode)

self.client.post_influx(phase, exitcode)

# exit code 46 is our indication that recovery failed!
# In this case, we need to mark the device offline
if exitcode == 46:
self.mark_device_offline()
cur_phase_data["reason"] = "Recovery Failed"

if phase != "test" and exitcode:
logger.debug("Phase %s failed, aborting job" % phase)
cur_phase_data["reason"] = "Phase failed"
completed_phases.append(cur_phase_data)
self.client.post_status_update(
phase, webhook, completed_phases, job_queue
)
break

completed_phases.append(cur_phase_data)
self.client.post_status_update(
phase, webhook, completed_phases, job_queue
)
except Exception as e:
logger.exception(e)
finally:
# Always run the cleanup, even if the job was cancelled
job.run_test_phase("cleanup", rundir)
job.run_test_phase(JobState.CLEANUP, rundir)
self.client.post_status_update(
JobState.CLEANUP, webhook, completed_phases, job_queue
)
# clear job id
self.client.post_agent_data({"job_id": ""})

Expand Down
38 changes: 38 additions & 0 deletions agent/testflinger_agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,41 @@ def post_influx(self, phase, result=None):
)
except InfluxDBClientError as exc:
logger.error(exc)

def post_status_update(self, job_state, webhook, phases, job_queue):
"""
Posts status updates about the running job as long as there is a
webhook
:param job_state:
String of currently running jobstate
:param webhook:
String URL to post status update to
:param phases:
List of accumulated test phases that have run
:param job_queue:
TestFlinger queue the currently running job belongs to
"""
if webhook is None:
return

status_update_request = {
"agent_id": self.config.get("agent_id"),
"job_queue": job_queue,
"job_state": job_state,
"job_status_webhook": webhook,
"phases": phases,
}
status_update_uri = urljoin(self.server, "/v1/agents/status")
try:
job_request = self.session.post(
status_update_uri, json=status_update_request, timeout=1
)
except RequestException as exc:
logger.error(exc)
raise TFServerError("other exception") from exc
if not job_request:
logger.error(
"Unable to post results to: %s (error: %s)"
% (status_update_uri, job_request.status_code)
)
42 changes: 42 additions & 0 deletions agent/testflinger_agent/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright (C) 2022 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
Job State and Test Phase Enums
"""

from enum import Enum


class JobState(str, Enum):
SETUP = "setup"
PROVISION = "provision"
FIRMWARE_UPDATE = "firmware_update"
TEST = "test"
ALLOCATE = "allocate"
ALLOCATED = "allocated"
RESERVE = "reserve"
CLEANUP = "cleanup"
CANCELLED = "cancelled"
COMPLETED = "completed"


class TestPhase(str, Enum):
SETUP = "setup"
PROVISION = "provision"
FIRMWARE_UPDATE = "firmware_update"
TEST = "test"
ALLOCATE = "allocate"
RESERVE = "reserve"
60 changes: 60 additions & 0 deletions agent/testflinger_agent/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from testflinger_agent.errors import TFServerError
from testflinger_agent.client import TestflingerClient as _TestflingerClient
from testflinger_agent.agent import TestflingerAgent as _TestflingerAgent
from testflinger_agent.enums import TestPhase


class TestClient:
Expand Down Expand Up @@ -399,3 +400,62 @@ def test_post_agent_data(self, agent):
"provision_type": self.config["provision_type"],
}
)

def test_post_agent_status_update(self, agent, requests_mock):
self.config["test_command"] = "echo test1"
fake_job_data = {
"job_id": str(uuid.uuid1()),
"job_queue": "test",
"test_data": {"test_cmds": "foo"},
"job_status_webhook": "https://mywebhook",
}
requests_mock.get(
"http://127.0.0.1:8000/v1/job?queue=test",
[{"text": json.dumps(fake_job_data)}, {"text": "{}"}],
)
status_url = "http://127.0.0.1:8000/v1/agents/status"
requests_mock.post(status_url, status_code=200)
with patch("shutil.rmtree"):
agent.process_jobs()

status_update_requests = list(
filter(
lambda req: req.url == status_url,
requests_mock.request_history,
)
)
last_request = status_update_requests[-1].json()
assert len(status_update_requests) == (2 * len(TestPhase) + 1)
assert last_request["job_state"] == "cleanup"
assert len(last_request["phases"]) == len(TestPhase)

def test_post_agent_status_update_cancelled(self, agent, requests_mock):
self.config["test_command"] = "echo test1"
job_id = str(uuid.uuid1())
fake_job_data = {
"job_id": job_id,
"job_queue": "test",
"test_data": {"test_cmds": "foo"},
"job_status_webhook": "https://mywebhook",
}
requests_mock.get(
"http://127.0.0.1:8000/v1/job?queue=test",
[{"text": json.dumps(fake_job_data)}, {"text": "{}"}],
)
status_url = "http://127.0.0.1:8000/v1/agents/status"
requests_mock.post(status_url, status_code=200)

requests_mock.get(
"http://127.0.0.1:8000/v1/result/" + job_id,
json={"job_state": "cancelled"},
)
with patch("shutil.rmtree"):
agent.process_jobs()

status_update_requests = list(
filter(
lambda req: req.url == status_url,
requests_mock.request_history,
)
)
assert status_update_requests[-2].json()["job_state"] == "cancelled"
23 changes: 23 additions & 0 deletions agent/testflinger_agent/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,26 @@ def test_transmit_job_outcome_missing_json(self, client, tmp_path, caplog):
"""
client.transmit_job_outcome(tmp_path)
assert "Unable to read job ID" in caplog.text

def test_post_status_update(self, client, requests_mock):
"""
Test that the agent sends a status update to the status endpoint
if there is a valid webhook
"""
webhook = "http://foo"
requests_mock.post(
"http://127.0.0.1:8000/v1/agents/status", status_code=200
)
phases = [
{"phase_name": "phase1", "result": 0},
{"phase_name": "phase2", "result": 1},
]
client.post_status_update("myjobstate", webhook, phases, "myjobqueue")
expected_json = {
"agent_id": client.config.get("agent_id"),
"job_queue": "myjobqueue",
"job_state": "myjobstate",
"job_status_webhook": webhook,
"phases": phases,
}
assert requests_mock.last_request.json() == expected_json

0 comments on commit 71cd870

Please sign in to comment.