diff --git a/runner/app/live/api/api.py b/runner/app/live/api/api.py index f3f4614e..e53871c6 100644 --- a/runner/app/live/api/api.py +++ b/runner/app/live/api/api.py @@ -106,6 +106,7 @@ async def parse_request_data(request: web.Request, temp_dir: str) -> Dict: async def handle_start_stream(request: web.Request): try: + stream_request_timestamp = int(time.time() * 1000) process = cast(ProcessGuardian, request.app["process"]) prev_streamer = cast(PipelineStreamer, request.app["streamer"]) if prev_streamer and prev_streamer.is_running(): @@ -142,6 +143,10 @@ async def handle_start_stream(request: web.Request): await streamer.start(params.params) request.app["streamer"] = streamer + await protocol.emit_monitoring_event({ + "type": "runner_receive_stream_request", + "timestamp": stream_request_timestamp, + }, queue_event_type="stream_trace") return web.Response(text="Stream started successfully") except Exception as e: diff --git a/runner/app/live/streamer/protocol/protocol.py b/runner/app/live/streamer/protocol/protocol.py index 18ba2716..6df8c6c5 100644 --- a/runner/app/live/streamer/protocol/protocol.py +++ b/runner/app/live/streamer/protocol/protocol.py @@ -27,7 +27,7 @@ async def egress_loop(self, output_frames: AsyncGenerator[Image.Image, None]): pass @abstractmethod - async def emit_monitoring_event(self, event: dict): + async def emit_monitoring_event(self, event: dict, queue_event_type: str = "ai_stream_events"): """Sends a monitoring event to the event stream if available""" pass diff --git a/runner/app/live/streamer/protocol/trickle.py b/runner/app/live/streamer/protocol/trickle.py index 9db3a50e..46a9c496 100644 --- a/runner/app/live/streamer/protocol/trickle.py +++ b/runner/app/live/streamer/protocol/trickle.py @@ -26,10 +26,10 @@ def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[s async def start(self): metadata_queue = queue.Queue[dict]() # to pass video metadata from decoder to encoder self.subscribe_task = asyncio.create_task( - media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_queue.put) + media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_queue.put, self.emit_monitoring_event) ) self.publish_task = asyncio.create_task( - media.run_publish(self.publish_url, self.publish_queue.get, metadata_queue.get) + media.run_publish(self.publish_url, self.publish_queue.get, metadata_queue.get, self.emit_monitoring_event) ) if self.control_url and self.control_url.strip() != "": self.control_subscriber = TrickleSubscriber(self.control_url) @@ -88,11 +88,11 @@ def enqueue_bytes(frame: OutputFrame): async for frame in output_frames: await asyncio.to_thread(enqueue_bytes, frame) - async def emit_monitoring_event(self, event: dict): + async def emit_monitoring_event(self, event: dict, queue_event_type: str = "ai_stream_events"): if not self.events_publisher: return try: - event_json = json.dumps(event) + event_json = json.dumps({"event": event, "queue_event_type": queue_event_type}) async with await self.events_publisher.next() as event: await event.write(event_json.encode()) except Exception as e: diff --git a/runner/app/live/streamer/protocol/zeromq.py b/runner/app/live/streamer/protocol/zeromq.py index 316ac4a2..8964e0c7 100644 --- a/runner/app/live/streamer/protocol/zeromq.py +++ b/runner/app/live/streamer/protocol/zeromq.py @@ -37,7 +37,7 @@ async def egress_loop(self, output_frames: AsyncGenerator[Image.Image, None]): frame_bytes = to_jpeg_bytes(frame) await self.output_socket.send(frame_bytes) - async def emit_monitoring_event(self, event: dict): + async def emit_monitoring_event(self, event: dict, queue_event_type: str = "ai_stream_events"): pass # No-op for ZeroMQ async def control_loop(self, done: asyncio.Event) -> AsyncGenerator[dict, None]: diff --git a/runner/app/live/streamer/streamer.py b/runner/app/live/streamer/streamer.py index a9032497..f7aaea20 100644 --- a/runner/app/live/streamer/streamer.py +++ b/runner/app/live/streamer/streamer.py @@ -138,13 +138,13 @@ async def report_status_loop(self): ) self.stop_event.set() - async def _emit_monitoring_event(self, event: dict): + async def _emit_monitoring_event(self, event: dict, queue_event_type: str = "ai_stream_events"): """Protected method to emit monitoring event with lock""" event["timestamp"] = timestamp_to_ms(time.time()) logging.info(f"Emitting monitoring event: {event}") async with self.emit_event_lock: try: - await self.protocol.emit_monitoring_event(event) + await self.protocol.emit_monitoring_event(event, queue_event_type) except Exception as e: logging.error(f"Failed to emit monitoring event: {e}") diff --git a/runner/app/live/trickle/media.py b/runner/app/live/trickle/media.py index 7ef3825f..b9a5c42f 100644 --- a/runner/app/live/trickle/media.py +++ b/runner/app/live/trickle/media.py @@ -1,3 +1,4 @@ +import time import aiohttp import asyncio import logging @@ -10,12 +11,12 @@ from .decoder import decode_av from .encoder import encode_av -async def run_subscribe(subscribe_url: str, image_callback, put_metadata): +async def run_subscribe(subscribe_url: str, image_callback, put_metadata, monitoring_callback): # TODO add some pre-processing parameters, eg image size try: read_fd, write_fd = os.pipe() parse_task = asyncio.create_task(decode_in(read_fd, image_callback, put_metadata)) - subscribe_task = asyncio.create_task(subscribe(subscribe_url, await AsyncifyFdWriter(write_fd))) + subscribe_task = asyncio.create_task(subscribe(subscribe_url, await AsyncifyFdWriter(write_fd), monitoring_callback)) await asyncio.gather(subscribe_task, parse_task) logging.info("run_subscribe complete") except Exception as e: @@ -24,7 +25,9 @@ async def run_subscribe(subscribe_url: str, image_callback, put_metadata): finally: put_metadata(None) # in case decoder quit without writing anything -async def subscribe(subscribe_url, out_pipe): +async def subscribe(subscribe_url, out_pipe, monitoring_callback): + first_segment = True + async with TrickleSubscriber(url=subscribe_url) as subscriber: logging.info(f"launching subscribe loop for {subscribe_url}") while True: @@ -33,6 +36,12 @@ async def subscribe(subscribe_url, out_pipe): segment = await subscriber.next() if not segment: break # complete + if first_segment: + first_segment = False + await monitoring_callback({ + "type": "runner_receive_first_ingest_segment", + "timestamp": int(time.time() * 1000) + }, queue_event_type="stream_trace") while True: chunk = await segment.read() if not chunk: @@ -72,13 +81,16 @@ def decode_runner(): loop = asyncio.get_running_loop() await loop.run_in_executor(None, decode_runner) -async def run_publish(publish_url: str, image_generator, get_metadata): +async def run_publish(publish_url: str, image_generator, get_metadata, monitoring_callback): + first_segment = True + publisher = None try: publisher = TricklePublisher(url=publish_url, mime_type="video/mp2t") loop = asyncio.get_running_loop() async def callback(pipe_file, pipe_name): + nonlocal first_segment # trickle publish a segment with the contents of `pipe_file` async with await publisher.next() as segment: # convert pipe_fd into an asyncio friendly StreamReader @@ -91,6 +103,12 @@ async def callback(pipe_file, pipe_name): if not data: break await segment.write(data) + if first_segment: + first_segment = False + await monitoring_callback({ + "type": "runner_send_first_processed_segment", + "timestamp": int(time.time() * 1000) + }, queue_event_type="stream_trace") transport.close() def sync_callback(pipe_file, pipe_name): diff --git a/runner/docker/Dockerfile.live-app-noop b/runner/docker/Dockerfile.live-app-noop index 2b88bb49..e9ad77de 100644 --- a/runner/docker/Dockerfile.live-app-noop +++ b/runner/docker/Dockerfile.live-app-noop @@ -35,7 +35,8 @@ RUN pip install --no-cache-dir \ opencv-python==4.10.0.84 \ --no-binary=av \ av==14.0.1 \ - psutil==6.0.0 + psutil==6.0.0 \ + prometheus_client>=0.21.1 # Set environment variables ENV MAX_WORKERS=1