Skip to content

Commit

Permalink
feat: stream trace events (#431)
Browse files Browse the repository at this point in the history
* feat: stream trace events

* fix: runner events
  • Loading branch information
varshith15 authored Feb 25, 2025
1 parent c2067ef commit 657db85
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 13 deletions.
5 changes: 5 additions & 0 deletions runner/app/live/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ async def parse_request_data(request: web.Request, temp_dir: str) -> Dict:

async def handle_start_stream(request: web.Request):
try:
stream_request_timestamp = int(time.time() * 1000)
process = cast(ProcessGuardian, request.app["process"])
prev_streamer = cast(PipelineStreamer, request.app["streamer"])
if prev_streamer and prev_streamer.is_running():
Expand Down Expand Up @@ -142,6 +143,10 @@ async def handle_start_stream(request: web.Request):

await streamer.start(params.params)
request.app["streamer"] = streamer
await protocol.emit_monitoring_event({
"type": "runner_receive_stream_request",
"timestamp": stream_request_timestamp,
}, queue_event_type="stream_trace")

return web.Response(text="Stream started successfully")
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion runner/app/live/streamer/protocol/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def egress_loop(self, output_frames: AsyncGenerator[Image.Image, None]):
pass

@abstractmethod
async def emit_monitoring_event(self, event: dict):
async def emit_monitoring_event(self, event: dict, queue_event_type: str = "ai_stream_events"):
"""Sends a monitoring event to the event stream if available"""
pass

Expand Down
8 changes: 4 additions & 4 deletions runner/app/live/streamer/protocol/trickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[s
async def start(self):
metadata_queue = queue.Queue[dict]() # to pass video metadata from decoder to encoder
self.subscribe_task = asyncio.create_task(
media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_queue.put)
media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_queue.put, self.emit_monitoring_event)
)
self.publish_task = asyncio.create_task(
media.run_publish(self.publish_url, self.publish_queue.get, metadata_queue.get)
media.run_publish(self.publish_url, self.publish_queue.get, metadata_queue.get, self.emit_monitoring_event)
)
if self.control_url and self.control_url.strip() != "":
self.control_subscriber = TrickleSubscriber(self.control_url)
Expand Down Expand Up @@ -88,11 +88,11 @@ def enqueue_bytes(frame: OutputFrame):
async for frame in output_frames:
await asyncio.to_thread(enqueue_bytes, frame)

async def emit_monitoring_event(self, event: dict):
async def emit_monitoring_event(self, event: dict, queue_event_type: str = "ai_stream_events"):
if not self.events_publisher:
return
try:
event_json = json.dumps(event)
event_json = json.dumps({"event": event, "queue_event_type": queue_event_type})
async with await self.events_publisher.next() as event:
await event.write(event_json.encode())
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion runner/app/live/streamer/protocol/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async def egress_loop(self, output_frames: AsyncGenerator[Image.Image, None]):
frame_bytes = to_jpeg_bytes(frame)
await self.output_socket.send(frame_bytes)

async def emit_monitoring_event(self, event: dict):
async def emit_monitoring_event(self, event: dict, queue_event_type: str = "ai_stream_events"):
pass # No-op for ZeroMQ

async def control_loop(self, done: asyncio.Event) -> AsyncGenerator[dict, None]:
Expand Down
4 changes: 2 additions & 2 deletions runner/app/live/streamer/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ async def report_status_loop(self):
)
self.stop_event.set()

async def _emit_monitoring_event(self, event: dict):
async def _emit_monitoring_event(self, event: dict, queue_event_type: str = "ai_stream_events"):
"""Protected method to emit monitoring event with lock"""
event["timestamp"] = timestamp_to_ms(time.time())
logging.info(f"Emitting monitoring event: {event}")
async with self.emit_event_lock:
try:
await self.protocol.emit_monitoring_event(event)
await self.protocol.emit_monitoring_event(event, queue_event_type)
except Exception as e:
logging.error(f"Failed to emit monitoring event: {e}")

Expand Down
26 changes: 22 additions & 4 deletions runner/app/live/trickle/media.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import aiohttp
import asyncio
import logging
Expand All @@ -10,12 +11,12 @@
from .decoder import decode_av
from .encoder import encode_av

async def run_subscribe(subscribe_url: str, image_callback, put_metadata):
async def run_subscribe(subscribe_url: str, image_callback, put_metadata, monitoring_callback):
# TODO add some pre-processing parameters, eg image size
try:
read_fd, write_fd = os.pipe()
parse_task = asyncio.create_task(decode_in(read_fd, image_callback, put_metadata))
subscribe_task = asyncio.create_task(subscribe(subscribe_url, await AsyncifyFdWriter(write_fd)))
subscribe_task = asyncio.create_task(subscribe(subscribe_url, await AsyncifyFdWriter(write_fd), monitoring_callback))
await asyncio.gather(subscribe_task, parse_task)
logging.info("run_subscribe complete")
except Exception as e:
Expand All @@ -24,7 +25,9 @@ async def run_subscribe(subscribe_url: str, image_callback, put_metadata):
finally:
put_metadata(None) # in case decoder quit without writing anything

async def subscribe(subscribe_url, out_pipe):
async def subscribe(subscribe_url, out_pipe, monitoring_callback):
first_segment = True

async with TrickleSubscriber(url=subscribe_url) as subscriber:
logging.info(f"launching subscribe loop for {subscribe_url}")
while True:
Expand All @@ -33,6 +36,12 @@ async def subscribe(subscribe_url, out_pipe):
segment = await subscriber.next()
if not segment:
break # complete
if first_segment:
first_segment = False
await monitoring_callback({
"type": "runner_receive_first_ingest_segment",
"timestamp": int(time.time() * 1000)
}, queue_event_type="stream_trace")
while True:
chunk = await segment.read()
if not chunk:
Expand Down Expand Up @@ -72,13 +81,16 @@ def decode_runner():
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, decode_runner)

async def run_publish(publish_url: str, image_generator, get_metadata):
async def run_publish(publish_url: str, image_generator, get_metadata, monitoring_callback):
first_segment = True

publisher = None
try:
publisher = TricklePublisher(url=publish_url, mime_type="video/mp2t")

loop = asyncio.get_running_loop()
async def callback(pipe_file, pipe_name):
nonlocal first_segment
# trickle publish a segment with the contents of `pipe_file`
async with await publisher.next() as segment:
# convert pipe_fd into an asyncio friendly StreamReader
Expand All @@ -91,6 +103,12 @@ async def callback(pipe_file, pipe_name):
if not data:
break
await segment.write(data)
if first_segment:
first_segment = False
await monitoring_callback({
"type": "runner_send_first_processed_segment",
"timestamp": int(time.time() * 1000)
}, queue_event_type="stream_trace")
transport.close()

def sync_callback(pipe_file, pipe_name):
Expand Down
3 changes: 2 additions & 1 deletion runner/docker/Dockerfile.live-app-noop
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ RUN pip install --no-cache-dir \
opencv-python==4.10.0.84 \
--no-binary=av \
av==14.0.1 \
psutil==6.0.0
psutil==6.0.0 \
prometheus_client>=0.21.1

# Set environment variables
ENV MAX_WORKERS=1
Expand Down

0 comments on commit 657db85

Please sign in to comment.