Skip to content

Commit

Permalink
Add agent posting of status updates to the server
Browse files Browse the repository at this point in the history
  • Loading branch information
val500 committed Jun 7, 2024
1 parent 22c2695 commit 343ea2a
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 40 deletions.
28 changes: 24 additions & 4 deletions agent/testflinger_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from testflinger_agent.job import TestflingerJob
from testflinger_agent.errors import TFServerError
from testflinger_agent.config import ATTACHMENTS_DIR
from testflinger_common.enums import JobState, TestPhase
from testflinger_agent.event_emitter import EventEmitter
from testflinger_common.enums import JobState, TestPhase, TestEvent


try:
# attempt importing a tarfile filter, to check if filtering is supported
Expand Down Expand Up @@ -189,6 +191,7 @@ def unpack_attachments(self, job_data: dict, cwd: Path):

def process_jobs(self):
"""Coordinate checking for new jobs and handling them if they exists"""

TEST_PHASES = [
TestPhase.SETUP,
TestPhase.PROVISION,
Expand All @@ -202,11 +205,17 @@ def process_jobs(self):
self.retry_old_results()

self.check_restart()

event_emitter = EventEmitter(None, None, self.client)
job_data = self.client.check_jobs()
while job_data:
try:
job = TestflingerJob(job_data, self.client)
event_emitter = EventEmitter(
job_data.get("job_queue"),
job_data.get("job_status_webhook"),
self.client,
)

logger.info("Starting job %s", job.job_id)
rundir = os.path.join(
self.client.config.get("execution_basedir"), job.job_id
Expand Down Expand Up @@ -243,25 +252,36 @@ def process_jobs(self):
== JobState.CANCELLED
):
logger.info("Job cancellation was requested, exiting.")
event_emitter.emit_event(TestEvent.CANCELLED)
break

self.client.post_job_state(job.job_id, phase)
self.set_agent_state(phase)
exitcode = job.run_test_phase(phase, rundir)

event_emitter.emit_event(TestEvent("started_" + phase))
exitcode, exit_event, exit_reason = job.run_test_phase(
phase, rundir
)
self.client.post_influx(phase, exitcode)

# exit code 46 is our indication that recovery failed!
# In this case, we need to mark the device offline
if exitcode == 46:
self.mark_device_offline()
event_emitter.emit_event(TestEvent.RECOVERY_FAILED)

if phase != "test" and exitcode:
logger.debug("Phase %s failed, aborting job" % phase)
event_emitter.emit_event(TestEvent.FAILED)
break

event_emitter.emit_event(TestEvent("ended_" + phase))
except Exception as e:
logger.exception(e)
finally:
# Always run the cleanup, even if the job was cancelled
event_emitter.emit_event("started_cleanup")
job.run_test_phase(TestPhase.CLEANUP, rundir)
event_emitter.emit_event("ended_cleanup")
# clear job id
self.client.post_agent_data({"job_id": ""})

Expand Down
36 changes: 36 additions & 0 deletions agent/testflinger_agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,39 @@ def post_influx(self, phase, result=None):
)
except InfluxDBClientError as exc:
logger.error(exc)

def post_status_update(self, job_queue, webhook, events):
"""
Posts status updates about the running job as long as there is a
webhook
:param job_queue:
TestFlinger queue the currently running job belongs to
:param webhook:
String URL to post status update to
:param events:
List of accumulated test events
"""
if webhook is None:
return

status_update_request = {
"agent_id": self.config.get("agent_id"),
"job_queue": job_queue,
"job_status_webhook": webhook,
"events": events,
}
status_update_uri = urljoin(self.server, "/v1/agents/status")
try:
job_request = self.session.post(
status_update_uri, json=status_update_request, timeout=1
)
except RequestException as exc:
logger.error(exc)
raise TFServerError("other exception") from exc
if not job_request:
logger.error(
"Unable to post results to: %s (error: %s)"
% (status_update_uri, job_request.status_code)
)
16 changes: 8 additions & 8 deletions agent/testflinger_agent/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,24 @@ def run_test_phase(self, phase, rundir):
node = self.client.config.get("agent_id")
if not cmd:
logger.info("No %s_command configured, skipping...", phase)
return 0
return 0, None, None
if phase == "provision" and not self.job_data.get("provision_data"):
logger.info("No provision_data defined in job data, skipping...")
return 0
return 0, None, None
if phase == "firmware_update" and not self.job_data.get(
"firmware_update_data"
):
logger.info(
"No firmware_update_data defined in job data, skipping..."
)
return 0
return 0, None, None
if phase == "test" and not self.job_data.get("test_data"):
logger.info("No test_data defined in job data, skipping...")
return 0
return 0, None, None
if phase == "allocate" and not self.job_data.get("allocate_data"):
return 0
return 0, None, None
if phase == "reserve" and not self.job_data.get("reserve_data"):
return 0
return 0, None, None
results_file = os.path.join(rundir, "testflinger-outcome.json")
output_log = os.path.join(rundir, phase + ".log")
serial_log = os.path.join(rundir, phase + "-serial.log")
Expand Down Expand Up @@ -117,7 +117,7 @@ def run_test_phase(self, phase, rundir):
):
runner.run(f"echo '{line}'")
try:
exitcode, exit_reason = runner.run(cmd)
exitcode, exit_event, exit_reason = runner.run(cmd)
except Exception as e:
logger.exception(e)
finally:
Expand All @@ -126,7 +126,7 @@ def run_test_phase(self, phase, rundir):
)
if phase == "allocate":
self.allocate_phase(rundir)
return exitcode
return exitcode, exit_event, exit_reason

def _update_phase_results(
self, results_file, phase, exitcode, output_log, serial_log
Expand Down
14 changes: 9 additions & 5 deletions agent/testflinger_agent/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from enum import Enum
from typing import Callable, List, Optional, Tuple

from testflinger_common.enums import TestEvent

logger = logging.getLogger(__name__)

OutputHandlerType = Callable[[str], None]
Expand Down Expand Up @@ -78,7 +80,7 @@ def post_output(self, data: str):
def register_stop_condition_checker(self, checker: StopConditionType):
self.stop_condition_checkers.append(checker)

def check_stop_conditions(self) -> str:
def check_stop_conditions(self) -> Optional[Tuple[TestEvent, str]]:
"""
Check stop conditions and return the reason if any are met. Otherwise,
return an empty string if none are met
Expand All @@ -87,7 +89,7 @@ def check_stop_conditions(self) -> str:
output = checker()
if output:
return output
return ""
return None

def check_and_post_output(self):
raw_output = self.process.stdout.read()
Expand Down Expand Up @@ -117,9 +119,10 @@ def cleanup(self):
if self.process is not None:
self.process.kill()

def run(self, cmd: str) -> Tuple[int, str]:
def run(self, cmd: str) -> Tuple[int, Optional[TestEvent], str]:
# Ensure that the process is None before starting
self.process = None
stop_event = None
stop_reason = ""

signal.signal(signal.SIGTERM, lambda signum, frame: self.cleanup())
Expand All @@ -136,7 +139,8 @@ def run(self, cmd: str) -> Tuple[int, str]:
while self.process.poll() is None:
time.sleep(10)

if stop_reason := self.check_stop_conditions():
stop_event, stop_reason = self.check_stop_conditions()
if stop_event is not None:
self.post_output(f"\n{stop_reason}\n")
self.cleanup()
break
Expand All @@ -150,7 +154,7 @@ def run(self, cmd: str) -> Tuple[int, str]:
if stop_reason == "":
stop_reason = get_stop_reason(self.process.returncode, "")

return self.process.returncode, stop_reason
return self.process.returncode, stop_event, stop_reason


def get_stop_reason(returncode: int, stop_reason: str) -> str:
Expand Down
33 changes: 21 additions & 12 deletions agent/testflinger_agent/stop_condition_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>

import time
from typing import Optional

from typing import Optional, Tuple
from testflinger_common.enums import JobState, TestEvent
from .client import TestflingerClient


Expand All @@ -23,32 +23,41 @@ def __init__(self, client: TestflingerClient, job_id: str):
self.client = client
self.job_id = job_id

def __call__(self) -> Optional[str]:
if self.client.check_job_state(self.job_id) == "cancelled":
return "Job cancellation was requested, exiting."
return None
def __call__(self) -> Tuple[Optional[TestEvent], str]:
if self.client.check_job_state(self.job_id) == JobState.CANCELLED:
return (
TestEvent.CANCELLED,
"Job cancellation was requested, exiting.",
)
return None, ""


class GlobalTimeoutChecker:
def __init__(self, timeout: int):
self.timeout = timeout
self.start_time = time.time()

def __call__(self) -> Optional[str]:
def __call__(self) -> Tuple[Optional[TestEvent], str]:
if time.time() - self.start_time > self.timeout:
return f"ERROR: Global timeout reached! ({self.timeout}s)"
return None
return (
TestEvent.GLOBAL_TIMEOUT,
f"ERROR: Global timeout reached! ({self.timeout}s)",
)
return None, ""


class OutputTimeoutChecker:
def __init__(self, timeout: int):
self.timeout = timeout
self.last_output_time = time.time()

def __call__(self) -> Optional[str]:
def __call__(self) -> Tuple[Optional[TestEvent], str]:
if time.time() - self.last_output_time > self.timeout:
return f"ERROR: Output timeout reached! ({self.timeout}s)"
return None
return (
TestEvent.OUTPUT_TIMEOUT,
f"ERROR: Output timeout reached! ({self.timeout}s)",
)
return None, ""

def update(self):
"""Update the last output time to the current time."""
Expand Down
63 changes: 63 additions & 0 deletions agent/testflinger_agent/tests/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from testflinger_agent.errors import TFServerError
from testflinger_agent.client import TestflingerClient as _TestflingerClient
from testflinger_agent.agent import TestflingerAgent as _TestflingerAgent
from testflinger_common.enums import TestPhase


class TestClient:
Expand Down Expand Up @@ -399,3 +400,65 @@ def test_post_agent_data(self, agent):
"provision_type": self.config["provision_type"],
}
)

def test_post_agent_status_update(self, agent, requests_mock):
self.config["test_command"] = "echo test1"
fake_job_data = {
"job_id": str(uuid.uuid1()),
"job_queue": "test",
"test_data": {"test_cmds": "foo"},
"job_status_webhook": "https://mywebhook",
}
requests_mock.get(
"http://127.0.0.1:8000/v1/job?queue=test",
[{"text": json.dumps(fake_job_data)}, {"text": "{}"}],
)
status_url = "http://127.0.0.1:8000/v1/agents/status"
requests_mock.post(status_url, status_code=200)
with patch("shutil.rmtree"):
agent.process_jobs()

status_update_requests = list(
filter(
lambda req: req.url == status_url,
requests_mock.request_history,
)
)
last_request = status_update_requests[-1].json()
assert len(status_update_requests) == 2 * len(TestPhase)
assert last_request["events"][-1]["event_name"] == "ended_cleanup"
assert len(last_request["events"]) == 2 * len(TestPhase)

def test_post_agent_status_update_cancelled(self, agent, requests_mock):
self.config["test_command"] = "echo test1"
job_id = str(uuid.uuid1())
fake_job_data = {
"job_id": job_id,
"job_queue": "test",
"test_data": {"test_cmds": "foo"},
"job_status_webhook": "https://mywebhook",
}
requests_mock.get(
"http://127.0.0.1:8000/v1/job?queue=test",
[{"text": json.dumps(fake_job_data)}, {"text": "{}"}],
)
status_url = "http://127.0.0.1:8000/v1/agents/status"
requests_mock.post(status_url, status_code=200)

requests_mock.get(
"http://127.0.0.1:8000/v1/result/" + job_id,
json={"job_state": "cancelled"},
)
with patch("shutil.rmtree"):
agent.process_jobs()

status_update_requests = list(
filter(
lambda req: req.url == status_url,
requests_mock.request_history,
)
)
assert (
status_update_requests[-1].json()["events"][-3]["event_name"]
== "cancelled"
)
30 changes: 30 additions & 0 deletions agent/testflinger_agent/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,33 @@ def test_transmit_job_outcome_missing_json(self, client, tmp_path, caplog):
"""
client.transmit_job_outcome(tmp_path)
assert "Unable to read job ID" in caplog.text

def test_post_status_update(self, client, requests_mock):
"""
Test that the agent sends a status update to the status endpoint
if there is a valid webhook
"""
webhook = "http://foo"
requests_mock.post(
"http://127.0.0.1:8000/v1/agents/status", status_code=200
)
events = [
{
"event_name": "started_provision",
"timestamp": "2014-12-22T03:12:58.019077+00:00",
"detail_msg": "",
},
{
"event_name": "ended_provision",
"timestamp": "2014-12-22T03:12:58.019077+00:00",
"detail_msg": "",
},
]
client.post_status_update("myjobqueue", webhook, events)
expected_json = {
"agent_id": client.config.get("agent_id"),
"job_queue": "myjobqueue",
"job_status_webhook": webhook,
"events": events,
}
assert requests_mock.last_request.json() == expected_json
Loading

0 comments on commit 343ea2a

Please sign in to comment.