Skip to content

Commit

Permalink
feat: stream trace events
Browse files Browse the repository at this point in the history
  • Loading branch information
varshith15 committed Feb 22, 2025
1 parent c2067ef commit fe881c3
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
5 changes: 5 additions & 0 deletions runner/app/live/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ async def parse_request_data(request: web.Request, temp_dir: str) -> Dict:

async def handle_start_stream(request: web.Request):
try:
stream_request_timestamp = int(time.time() * 1000)
process = cast(ProcessGuardian, request.app["process"])
prev_streamer = cast(PipelineStreamer, request.app["streamer"])
if prev_streamer and prev_streamer.is_running():
Expand Down Expand Up @@ -142,6 +143,10 @@ async def handle_start_stream(request: web.Request):

await streamer.start(params.params)
request.app["streamer"] = streamer
await protocol.emit_monitoring_event({
"type": "runner_receive_stream_request",
"timestamp": stream_request_timestamp,
})

return web.Response(text="Stream started successfully")
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions runner/app/live/streamer/protocol/trickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def __init__(self, subscribe_url: str, publish_url: str, control_url: Optional[s
async def start(self):
metadata_queue = queue.Queue[dict]() # to pass video metadata from decoder to encoder
self.subscribe_task = asyncio.create_task(
media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_queue.put)
media.run_subscribe(self.subscribe_url, self.subscribe_queue.put, metadata_queue.put, self.emit_monitoring_event)
)
self.publish_task = asyncio.create_task(
media.run_publish(self.publish_url, self.publish_queue.get, metadata_queue.get)
media.run_publish(self.publish_url, self.publish_queue.get, metadata_queue.get, self.emit_monitoring_event)
)
if self.control_url and self.control_url.strip() != "":
self.control_subscriber = TrickleSubscriber(self.control_url)
Expand Down
17 changes: 13 additions & 4 deletions runner/app/live/trickle/media.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import aiohttp
import asyncio
import logging
Expand All @@ -10,12 +11,12 @@
from .decoder import decode_av
from .encoder import encode_av

async def run_subscribe(subscribe_url: str, image_callback, put_metadata):
async def run_subscribe(subscribe_url: str, image_callback, put_metadata, monitoring_callback):
# TODO add some pre-processing parameters, eg image size
try:
read_fd, write_fd = os.pipe()
parse_task = asyncio.create_task(decode_in(read_fd, image_callback, put_metadata))
subscribe_task = asyncio.create_task(subscribe(subscribe_url, await AsyncifyFdWriter(write_fd)))
subscribe_task = asyncio.create_task(subscribe(subscribe_url, await AsyncifyFdWriter(write_fd), monitoring_callback))
await asyncio.gather(subscribe_task, parse_task)
logging.info("run_subscribe complete")
except Exception as e:
Expand All @@ -24,15 +25,19 @@ async def run_subscribe(subscribe_url: str, image_callback, put_metadata):
finally:
put_metadata(None) # in case decoder quit without writing anything

async def subscribe(subscribe_url, out_pipe):
async def subscribe(subscribe_url, out_pipe, monitoring_callback):
async with TrickleSubscriber(url=subscribe_url) as subscriber:
logging.info(f"launching subscribe loop for {subscribe_url}")
first_segment = False
while True:
segment = None
try:
segment = await subscriber.next()
if not segment:
break # complete
if not first_segment:
await monitoring_callback({"type": "runner_receive_first_ingest_segment", "timestamp": int(time.time() * 1000)})
first_segment = True
while True:
chunk = await segment.read()
if not chunk:
Expand Down Expand Up @@ -72,14 +77,15 @@ def decode_runner():
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, decode_runner)

async def run_publish(publish_url: str, image_generator, get_metadata):
async def run_publish(publish_url: str, image_generator, get_metadata, monitoring_callback):
publisher = None
try:
publisher = TricklePublisher(url=publish_url, mime_type="video/mp2t")

loop = asyncio.get_running_loop()
async def callback(pipe_file, pipe_name):
# trickle publish a segment with the contents of `pipe_file`
first_segment = False
async with await publisher.next() as segment:
# convert pipe_fd into an asyncio friendly StreamReader
reader = asyncio.StreamReader()
Expand All @@ -91,6 +97,9 @@ async def callback(pipe_file, pipe_name):
if not data:
break
await segment.write(data)
if not first_segment:
await monitoring_callback({"type": "runner_send_first_processed_segment", "timestamp": int(time.time() * 1000)})
first_segment = True
transport.close()

def sync_callback(pipe_file, pipe_name):
Expand Down
3 changes: 2 additions & 1 deletion runner/docker/Dockerfile.live-app-noop
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ RUN pip install --no-cache-dir \
opencv-python==4.10.0.84 \
--no-binary=av \
av==14.0.1 \
psutil==6.0.0
psutil==6.0.0 \
prometheus_client>=0.21.1

# Set environment variables
ENV MAX_WORKERS=1
Expand Down

0 comments on commit fe881c3

Please sign in to comment.