diff --git a/.github/workflows/JOB_tests.yml b/.github/workflows/JOB_tests.yml index f2e5a8941..1ad5afa40 100644 --- a/.github/workflows/JOB_tests.yml +++ b/.github/workflows/JOB_tests.yml @@ -48,6 +48,23 @@ jobs: pip install wheel && \ pip install --upgrade setuptools && \ pip install --editable '.[test,ml,medical,dev, ocv]'" + + - name: Install ffmpeg (Ubuntu) + if: matrix.os == 'ubuntu-latest' + shell: bash + run: | + sudo apt-get update + sudo apt-get install -y ffmpeg + + - name: Install ffmpeg (macOS) + if: matrix.os == 'macos-latest' + shell: bash + run: brew install ffmpeg + + - name: Install ffmpeg (Windows) + if: matrix.os == 'windows-latest' + shell: pwsh + run: choco install ffmpeg -y - name: Run pytest shell: bash # Stops Windows hosts from using PowerShell diff --git a/.gitignore b/.gitignore index 65ee2ae61..fccfeb8d7 100644 --- a/.gitignore +++ b/.gitignore @@ -13,7 +13,6 @@ darwin_py.egg-info/PKG-INFO *.jpg *.bpm *.mov -*.mp4 *.txt # from https://raw.githubusercontent.com/github/gitignore/master/Python.gitignore diff --git a/Dockerfile b/Dockerfile index 269220f3b..24c6e5762 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,6 +23,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libssl-dev \ python3-dev \ curl \ + ffmpeg \ && rm -rf /var/lib/apt/lists/* # Install Poetry in a known location and add to PATH diff --git a/README.md b/README.md index 49a1c55f4..c6834637b 100644 --- a/README.md +++ b/README.md @@ -15,15 +15,16 @@ Darwin-py can both be used from the [command line](#usage-as-a-command-line-inte Main functions are (but not limited to): -- Client authentication -- Listing local and remote datasets -- Create/remove datasets -- Upload/download data to/from remote datasets -- Direct integration with PyTorch dataloaders +- Client authentication +- Listing local and remote datasets +- Create/remove datasets +- Upload/download data to/from remote datasets +- Direct integration with PyTorch dataloaders +- Extracting video artifacts Support tested for python 3.9 - 3.12 -## 🏁 Installation +## 🏁 Installation ``` pip install darwin-py @@ -43,11 +44,14 @@ If you wish to use video frame extraction, then you can use the `ocv` flag to in pip install darwin-py[ocv] ``` +If you wish to use video artifacts extraction, then you need to install [FFmpeg](https://www.ffmpeg.org/download.html) + To run test, first install the `test` extra package ``` pip install darwin-py[test] ``` + ### Development See our development and QA environment installation recommendations [here](docs/DEV.md) @@ -132,8 +136,8 @@ For videos, the frame rate extraction rate can be specified by adding `--fps None: elif args.command == "convert": f.convert(args.format, args.files, args.output_dir) + elif args.command == "extract": + if args.extract_type == "video-artifacts": + f.extract_video_artifacts( + source_file=args.source_file, + output_dir=args.output_dir, + storage_key_prefix=args.storage_key_prefix, + fps=args.fps, + segment_length=args.segment_length, + repair=args.repair, + ) + else: + parser.print_help() elif args.command == "dataset": if args.action == "remote": f.list_remote_datasets(args.all, args.team) diff --git a/darwin/cli_functions.py b/darwin/cli_functions.py index bd8ce1f91..c1954c60c 100644 --- a/darwin/cli_functions.py +++ b/darwin/cli_functions.py @@ -60,16 +60,17 @@ ) from darwin.exporter import ExporterNotFoundError, export_annotations, get_exporter from darwin.exporter.formats import supported_formats as export_formats +from darwin.extractor import video from darwin.importer import ImporterNotFoundError, get_importer, import_annotations from darwin.importer.formats import supported_formats as import_formats from darwin.item import DatasetItem from darwin.utils import ( + BLOCKED_UPLOAD_ERROR_ALREADY_EXISTS, find_files, persist_client_configuration, prompt, secure_continue_request, validate_file_against_schema, - BLOCKED_UPLOAD_ERROR_ALREADY_EXISTS, ) @@ -1468,3 +1469,41 @@ def _console_theme() -> Theme: def _has_valid_status(status: str) -> bool: return status in ["new", "annotate", "review", "complete", "archived"] + + +def extract_video_artifacts( + source_file: str, + output_dir: str, + storage_key_prefix: str, + *, + fps: float = 0.0, + segment_length: int = 2, + repair: bool = False, +) -> None: + """ + Generate video artifacts (segments, sections, thumbnail, frames manifest). + + Parameters + ---------- + source_file : str + Path to input video file + output_dir : str + Output directory for artifacts + storage_key_prefix : str + Storage key prefix for generated files + fps : float, optional + Desired output FPS (0.0 for native), by default 0.0 + segment_length : int, optional + Length of each segment in seconds, by default 2 + repair : bool, optional + Whether to attempt to repair video if errors are detected, by default False + """ + + video.extract_artifacts( + source_file=source_file, + output_dir=output_dir, + storage_key_prefix=storage_key_prefix, + fps=fps, + segment_length=segment_length, + repair=repair, + ) diff --git a/darwin/extractor/__init__.py b/darwin/extractor/__init__.py new file mode 100644 index 000000000..0ec53aa52 --- /dev/null +++ b/darwin/extractor/__init__.py @@ -0,0 +1 @@ +"""Video extraction functionality for Darwin.""" diff --git a/darwin/extractor/video.py b/darwin/extractor/video.py new file mode 100644 index 000000000..439d5e56e --- /dev/null +++ b/darwin/extractor/video.py @@ -0,0 +1,710 @@ +import gzip +import json +import os +import re +import subprocess +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +from rich.console import Console + +console = Console() + + +def _check_ffmpeg_version(): + """ + Check if FFmpeg version 5 or higher is installed. + Raises RuntimeError if FFmpeg is not found or version is lower. + """ + try: + result = subprocess.run(["ffmpeg", "-version"], capture_output=True, text=True) + version_line = result.stdout.split("\n")[0] + # Extract major version number (e.g., "ffmpeg version 5.1.2" -> "5") + version_match = re.search(r"ffmpeg version (\d+)", version_line) + if not version_match: + raise RuntimeError("Could not determine FFmpeg version") + + major_version = int(version_match.group(1)) + if major_version < 5: + raise RuntimeError( + f"FFmpeg version 5 or higher required, found version {major_version}" + ) + + except FileNotFoundError: + raise RuntimeError( + "FFmpeg not found. Please install FFmpeg version 5 or higher" + ) + + +def _create_directories(base_dir: str) -> Dict[str, str]: + """Create required directory structure for artifacts""" + + paths = { + "base_dir": base_dir, + "segments": os.path.join(base_dir, "segments"), + "sections": os.path.join(base_dir, "sections"), + } + + # Create high/low quality segment dirs + paths["segments_high"] = os.path.join(paths["segments"], "high") + paths["segments_low"] = os.path.join(paths["segments"], "low") + + for path in paths.values(): + os.makedirs(path, exist_ok=True) + + return paths + + +def _maybe_repair_video(source_file: str, output_dir: str) -> Tuple[bool, str]: + """ + Attempt to repair video if errors are detected. + + Args: + source_file: Path to source video file + output_dir: Directory to store repaired video if needed + + Returns: + Tuple[bool, str]: (was_repaired, final_source_file_path) + """ + console.print(f"Checking video for errors: {source_file}") + errors = _check_video_for_errors(source_file) + if errors: + errors_list = errors.split("\n") + first_three = "\n".join(errors_list[:3]) + console.print(f"Video contains errors:\n{first_three}\n...") + console.print("Attempting to repair video...") + repaired_file = _attempt_video_repair(source_file, output_dir) + console.print(f"Video repaired successfully: {repaired_file}") + return (True, repaired_file) + else: + console.print("No errors detected, proceeding with original video") + return (False, source_file) + + +def _check_video_for_errors(source_file: str) -> str: + """ + Check if video file has any errors using FFmpeg error detection. + + Args: + source_file: Path to source video file + + Returns: + str: Error message if errors were detected, empty string otherwise + """ + cmd = [ + "ffmpeg", + "-err_detect", + "explode", + "-xerror", + "-v", + "error", + "-i", + source_file, + "-f", + "null", + "-", + ] + + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + return result.stderr.strip() + except subprocess.CalledProcessError as e: + return e.stderr.strip() + + +def _attempt_video_repair(source_file: str, output_dir: str) -> str: + """ + Attempt to repair corrupted video by re-encoding it using hardware acceleration if available. + + Args: + source_file: Path to source video file + output_dir: Directory to store repaired video + + Returns: + str: Path to repaired video file + """ + output_file = os.path.join(output_dir, f"repaired_{os.path.basename(source_file)}") + + try: + # First try hardware accelerated encoding (H265) + cmd = [ + "ffmpeg", + "-y", + "-fflags", + "discardcorrupt+genpts", + "-vaapi_device", + "/dev/dri/renderD128", + "-i", + source_file, + "-vf", + "format=nv12,hwupload", + "-c:v", + "hevc_vaapi", + "-c:a", + "copy", + "-vsync", + "cfr", + output_file, + ] + subprocess.run(cmd, check=True, capture_output=True) + return output_file + except subprocess.CalledProcessError: + console.print( + "Hardware acceleration with H265 failed, falling back to software encoding with H264" + ) + + cmd = [ + "ffmpeg", + "-y", + "-fflags", + "discardcorrupt+genpts", + "-i", + source_file, + "-c:v", + "libx264", + "-c:a", + "copy", + "-vsync", + "cfr", + output_file, + ] + subprocess.run(cmd, check=True, capture_output=True) + return output_file + + +def _count_frames(source_file: str) -> int: + cmd = [ + "ffprobe", + "-v", + "error", + "-select_streams", + "v:0", + "-count_frames", + "-show_entries", + "stream=nb_read_frames", + "-of", + "json", + source_file, + ] + result = subprocess.run(cmd, capture_output=True, text=True) + return int(json.loads(result.stdout)["streams"][0]["nb_read_frames"]) + + +def _get_video_metadata(source_file: str) -> Dict: + """Extract video metadata using ffprobe""" + cmd = [ + "ffprobe", + "-v", + "error", + "-select_streams", + "v:0", + "-show_entries", + "stream=width,height,avg_frame_rate,duration", + "-of", + "json", + source_file, + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + data = json.loads(result.stdout)["streams"][0] + + # Try to calculate native fps from avg_frame_rate first + try: + num, den = map(int, data["avg_frame_rate"].split("/")) + if den > 0: + native_fps = num / den + else: + native_fps = 0.0 + except (ValueError, ZeroDivisionError): + native_fps = 0.0 + + # If avg_frame_rate calculation failed, try calculating from frame count and duration + if native_fps == 0.0: + duration = float(data["duration"]) + total_frames = _count_frames(source_file) + native_fps = round(total_frames / duration, 2) + + return { + "width": int(data["width"]), + "height": int(data["height"]), + "native_fps": native_fps, + } + + +def _calculate_avg_bitrate(index_data: str, segments: List[str]) -> Optional[float]: + """ + Calculate average bitrate from HLS segments. + Returns None if no valid segments found. + """ + bitrates = [] + + # Parse durations from index file + durations = [] + for line in index_data.splitlines(): + if line.startswith("#EXTINF:"): + try: + duration = float(line.split(":")[1].split(",")[0]) + durations.append(duration) + except (IndexError, ValueError): + continue + + # Calculate bitrates for each segment + for duration, segment in zip(durations, segments): + try: + if duration > 0: + size = os.path.getsize(segment) + # Convert bytes to bits and divide by duration to get bits per second + bitrate = (size * 8) / duration + bitrates.append(bitrate) + except (OSError, ZeroDivisionError): + continue + + # Calculate average bitrate if we have valid measurements + if bitrates: + return sum(bitrates) / len(bitrates) + return None + + +def _extract_segments(source_file: str, dirs: Dict, segment_length: int) -> Dict: + """ + Extract HLS segments in high and low quality + Returns segment info and frame counts per segment + """ + qualities = { + "high": {"crf": 23, "gop": 15}, + "low": { + "crf": 40, + "gop": 15, + "scale": "-2:'if(gt(ih,720),max(ceil(ih/4)*2,720),ih)'", + }, + } + + bitrates = {} + + for quality, opts in qualities.items(): + quality_dir = dirs["segments_" + quality] + segment_pattern = os.path.join(quality_dir, "%09d.ts") + index_path = os.path.join(quality_dir, "index.m3u8") + + # Build ffmpeg command + cmd = [ + "ffmpeg", + "-hide_banner", + "-v", + "error", + "-i", + source_file, + "-c:v", + "libx264", + "-crf", + str(opts["crf"]), + "-g", + str(opts["gop"]), + "-f", + "hls", + "-hls_time", + str(segment_length), + "-hls_list_size", + "0", + "-start_number", + "0", + "-hls_segment_filename", + segment_pattern, + "-vsync", + "passthrough", + "-max_muxing_queue_size", + "1024", + ] + + # Add scale filter for low quality + if "scale" in opts: + cmd.extend(["-vf", f"scale={opts['scale']}"]) + + cmd.append(index_path) + + subprocess.run(cmd, check=True) + + # Read index file and calculate bitrate + with open(index_path) as f: + index_data = f.read() + segments = sorted(Path(quality_dir).glob("*.ts")) + bitrate = _calculate_avg_bitrate(index_data, [str(s) for s in segments]) + bitrates[quality] = bitrate + + return {"bitrates": bitrates} + + +def _extract_thumbnail(source_file: str, output_path: str, total_frames: int) -> str: + """Extract thumbnail from middle frame""" + middle_frame = total_frames // 2 + + cmd = [ + "ffmpeg", + "-hide_banner", + "-v", + "error", + "-i", + source_file, + "-vf", + f"select=gte(n\\,{middle_frame}),scale=w=356:h=200:force_original_aspect_ratio=decrease", + "-vframes", + "1", + "-y", + output_path, + ] + + subprocess.run(cmd, check=True) + return output_path + + +def _get_frames_timestamps(source_file: str) -> List[float]: + """Get frame timestamps using ffmpeg showinfo filter""" + cmd = [ + "ffmpeg", + "-hide_banner", + "-v", + "info", + "-i", + source_file, + "-vsync", + "passthrough", + "-vf", + "showinfo", + "-f", + "null", + "-", + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + + # Parse timestamps from stderr + frames = [] + for line in result.stderr.splitlines(): + if "pts_time:" in line: + pts_time = line.split("pts_time:")[1].split()[0] + frames.append(float(pts_time)) + + return frames + + +def _extract_frames(source_file: str, output_dir: str, downsampling_step: float): + """Extract frames using ffmpeg with optional downsampling""" + frame_pattern = os.path.join(output_dir, "%09d.png") + + if downsampling_step > 1: + # Use select filter to precisely control what frames are extracted + # This matches the frame selection logic in the manifest + select_expr = f"select='eq(trunc(trunc((n+1)/{downsampling_step})*{downsampling_step})\\,n)'" + cmd = [ + "ffmpeg", + "-hide_banner", + "-v", + "error", + "-i", + source_file, + "-start_number", + "0", + "-vsync", + "passthrough", + "-vf", + select_expr, + "-f", + "image2", + frame_pattern, + ] + else: + # Extract all frames when no downsampling needed + cmd = [ + "ffmpeg", + "-hide_banner", + "-v", + "error", + "-i", + source_file, + "-start_number", + "0", + "-vsync", + "passthrough", + "-f", + "image2", + frame_pattern, + ] + + subprocess.run(cmd, check=True) + + +def _get_segment_frame_counts(segments_dir: str) -> List[int]: + """Get frame counts for each segment in order""" + segments = sorted(Path(segments_dir).glob("*.ts")) + segment_frame_counts = [] + for segment in segments: + count = _count_frames(str(segment)) + segment_frame_counts.append(count) + return segment_frame_counts + + +def _create_frames_manifest( + source_file: str, segments_dir: str, downsampling_step: float, manifest_path: str +) -> Dict: + """ + Create frames manifest mapping frames to segments + Format: FRAME_NO_IN_SEGMENT:SEGMENT_NO:VISIBILITY_FLAG:TIMESTAMP + """ + frames_timestamps = _get_frames_timestamps(source_file) + segment_frame_counts = _get_segment_frame_counts(segments_dir) + + file_lines = [] + visible_frames = 0 + frame_no = 0 + segment_no = 0 + frame_no_segment = 0 + + for frame_timestamp in frames_timestamps: + # Check if frame should be visible based on downsampling + next_visible_frame = int(visible_frames * downsampling_step) + visibility = 1 if frame_no == next_visible_frame else 0 + + if visibility == 1: + visible_frames += 1 + + # Move to next segment if current is full + if frame_no_segment >= segment_frame_counts[segment_no]: + segment_no += 1 + frame_no_segment = 0 + + # Create manifest line + line = f"{frame_no_segment}:{segment_no}:{visibility}:{frame_timestamp}" + file_lines.append(line) + + frame_no += 1 + frame_no_segment += 1 + + with open(manifest_path, "w") as f: + f.write("\n".join(file_lines)) + + return {"visible_frames": visible_frames, "total_frames": frame_no} + + +def _get_hls_index_with_storage_urls(segments_dir: str, storage_key_prefix: str) -> str: + """ + Replaces relative paths in HLS index file by storage keys. + """ + index_path = os.path.join(segments_dir, "index.m3u8") + with open(index_path, "r") as f: + content = f.read() + return re.sub( + r"^(.*\.ts)$", + lambda m: f"{storage_key_prefix}/{m.group(1)}", + content, + flags=re.MULTILINE, + ) + + +def _maybe_extract_audio_peaks(source_file: str, output_dir: str) -> Optional[str]: + """ + Extract audio peaks from video file and gzip the result. + Returns path to the gzipped peaks file if audio stream exists, None otherwise. + """ + # First check if audio stream exists + cmd = [ + "ffprobe", + "-v", + "error", + "-select_streams", + "a", + "-show_entries", + "stream=codec_type", + "-of", + "json", + source_file, + ] + result = subprocess.run(cmd, capture_output=True, text=True) + data = json.loads(result.stdout) + + if not data.get("streams"): + console.print("No audio streams found") + return None + + raw_output_path = os.path.join(output_dir, "audio_peaks.raw") + gzipped_output_path = os.path.join(output_dir, "audio_peaks.gz") + + cmd = [ + "ffmpeg", + "-hide_banner", + "-v", + "error", + "-i", + source_file, + "-map", + "0:a:0", + "-ac", + "1", + "-af", + "aresample=1000,asetnsamples=1", + "-f", + "u8", + raw_output_path, + ] + + try: + subprocess.run(cmd, check=True) + # Gzip the output file + with open(raw_output_path, "rb") as f_in: + with gzip.open(gzipped_output_path, "wb") as f_out: + f_out.writelines(f_in) + # Remove the raw file + os.remove(raw_output_path) + return gzipped_output_path + except (subprocess.CalledProcessError, OSError): + console.print("Failed to extract audio peaks") + if os.path.exists(raw_output_path): + os.remove(raw_output_path) + if os.path.exists(gzipped_output_path): + os.remove(gzipped_output_path) + return None + + +def extract_artifacts( + source_file: str, + output_dir: str, + storage_key_prefix: str, + *, + fps: float = 0.0, + segment_length: int = 2, + repair: bool = False, +) -> Dict: + """ + Extracts video artifacts including segments, frames, thumbnail for + read-only registration in the Darwin platform. + + Args: + source_file: Path to source video file + output_dir: Directory to store generated artifacts + storage_key_prefix: Prefix for storage keys + fps: Desired frames per second (0.0 for native fps), defaults to 0.0 + segment_length: Length of each segment in seconds, defaults to 2 + repair: If True, attempt to repair video if errors are detected, defaults to False + + Returns: + Dict containing metadata and paths to generated artifacts + + Raises: + FileNotFoundError: If source_file does not exist + """ + if not os.path.exists(source_file): + raise FileNotFoundError(f"Source video file not found: {source_file}") + + _check_ffmpeg_version() + dirs = _create_directories(output_dir) + + repaired = False + if repair: + repaired, source_file = _maybe_repair_video(source_file, dirs["base_dir"]) + + storage_key_prefix = storage_key_prefix.strip("/") + + console.print("\nExtracting video metadata...") + + metadata = _get_video_metadata(source_file) + native_fps = float(metadata["native_fps"]) + downsampling_step = round(max(native_fps / fps, 1.0), 4) if fps > 0 else 1.0 + source_file_size = os.path.getsize(source_file) + + console.print(f"\nVideo resolution: {metadata['width']}x{metadata['height']}") + console.print(f"Native FPS: {native_fps}") + console.print(f"Downsampling step: {downsampling_step}") + console.print(f"Source file size: {source_file_size} bytes") + + console.print("\nExtracting video segments...") + + segments_metadata = _extract_segments( + source_file=source_file, dirs=dirs, segment_length=segment_length + ) + + console.print("\nExtracting frames...") + + _extract_frames(source_file, dirs["sections"], downsampling_step) + + console.print("\nCreating frames manifest...") + + manifest_metadata = _create_frames_manifest( + source_file=source_file, + segments_dir=dirs["segments_high"], + downsampling_step=downsampling_step, + manifest_path=os.path.join(dirs["base_dir"], "frames_manifest.txt"), + ) + + console.print("\nExtracting thumbnail...") + + _extract_thumbnail( + source_file=source_file, + output_path=os.path.join(dirs["base_dir"], "thumbnail.jpg"), + total_frames=manifest_metadata["total_frames"], + ) + + console.print("\nExtracting audio peaks...") + + _maybe_extract_audio_peaks(source_file, dirs["base_dir"]) + + console.print("\nProcessing segment indices...") + + hq_hls_index = _get_hls_index_with_storage_urls( + dirs["segments_high"], f"{storage_key_prefix}/segments/high" + ) + lq_hls_index = _get_hls_index_with_storage_urls( + dirs["segments_low"], f"{storage_key_prefix}/segments/low" + ) + + console.print("\nSaving metadata...") + + source_file_name = os.path.basename(source_file) + + # Prepare final metadata + result_metadata = { + "repaired": repaired, + "source_file": source_file, + "storage_key_prefix": storage_key_prefix, + "registration_payload": { + "type": "video", + "width": metadata["width"], + "height": metadata["height"], + "native_fps": metadata["native_fps"], + "fps": fps, + "visible_frames": manifest_metadata["visible_frames"], + "total_frames": manifest_metadata["total_frames"], + # After item registration with this payload, + # DARWIN will use storage keys from HLS indexes and fields defined below + # to create signed URLs and fetch files from storage. + # Therefore, the storage key must be correct. + "hls_segments": { + "high_quality": { + "index": hq_hls_index, + "bitrate": segments_metadata["bitrates"]["high"], + }, + "low_quality": { + "index": lq_hls_index, + "bitrate": segments_metadata["bitrates"]["low"], + }, + }, + "storage_key": f"{storage_key_prefix}/{source_file_name}", + "storage_sections_key_prefix": f"{storage_key_prefix}/sections", + "storage_frames_manifest_key": f"{storage_key_prefix}/frames_manifest.txt", + "storage_thumbnail_key": f"{storage_key_prefix}/thumbnail.jpg", + "total_size_bytes": source_file_size, + "name": source_file_name, + "path": "/", + }, + } + + # Add audio peaks key if audio was extracted + # Must be uploaded with Content-Encoding gzip + audio_peaks_path = os.path.join(dirs["base_dir"], "audio_peaks.gz") + if os.path.exists(audio_peaks_path): + result_metadata["registration_payload"][ + "storage_audio_peaks_key" + ] = f"{storage_key_prefix}/audio_peaks.gz" + + with open(os.path.join(dirs["base_dir"], "metadata.json"), "w") as f: + json.dump(result_metadata, f, indent=2) + + return result_metadata diff --git a/darwin/options.py b/darwin/options.py index f793ce74a..90b79f6e2 100644 --- a/darwin/options.py +++ b/darwin/options.py @@ -548,6 +548,59 @@ def cpu_default_types(input: Any) -> Optional[int]: # type: ignore "version", help="Check current version of the repository. " ) + # EXTRACTION + parser_extract = subparsers.add_parser( + "extract", help="Extract and process media files" + ) + extract_subparsers = parser_extract.add_subparsers(dest="extract_type") + + # Video artifacts + parser_video = extract_subparsers.add_parser( + "video-artifacts", + help="Extract video artifacts for read-only registration in the Darwin platform", + description="Process video files to generate streaming artifacts including HLS segments, " + "thumbnails, frame extracts, and manifest files required for video playback " + "in the V7 Darwin platform.", + ) + parser_video.add_argument( + "source_file", + type=str, + help="Path to input video file", + ) + parser_video.add_argument( + "-p", + "--storage-key-prefix", + type=str, + required=True, + help="Storage key prefix for generated files", + ) + parser_video.add_argument( + "-o", + "--output-dir", + type=str, + required=True, + help="Output directory for artifacts", + ) + parser_video.add_argument( + "-f", + "--fps", + type=float, + default=0.0, + help="Desired output FPS (0.0 for native)", + ) + parser_video.add_argument( + "-s", + "--segment-length", + type=int, + default=2, + help="Length of each segment in seconds", + ) + parser_video.add_argument( + "--repair", + action="store_true", + help="Checks video for errors and attempts to repair them", + ) + argcomplete.autocomplete(self.parser) def parse_args(self) -> Tuple[Namespace, ArgumentParser]: diff --git a/tests/darwin/cli_functions_test.py b/tests/darwin/cli_functions_test.py index 8e6569bfd..182aa5904 100644 --- a/tests/darwin/cli_functions_test.py +++ b/tests/darwin/cli_functions_test.py @@ -6,13 +6,18 @@ import responses from rich.console import Console -from darwin.cli_functions import delete_files, set_file_status, upload_data +from darwin.cli_functions import ( + delete_files, + extract_video_artifacts, + set_file_status, + upload_data, +) from darwin.client import Client from darwin.config import Config from darwin.dataset import RemoteDataset from darwin.dataset.remote_dataset_v2 import RemoteDatasetV2 -from tests.fixtures import * from darwin.utils import BLOCKED_UPLOAD_ERROR_ALREADY_EXISTS +from tests.fixtures import * @pytest.fixture @@ -434,3 +439,31 @@ def error_mock(): ) mock.assert_called_once() exception.assert_called_once_with(1) + + +class TestExtractVideo: + def test_extract_video(self, tmp_path): + """Test basic video extraction via CLI function""" + source_file = "test_video.mp4" + output_dir = str(tmp_path) + + with patch("darwin.extractor.video.extract_artifacts") as mock_extract: + mock_extract.return_value = {} + + extract_video_artifacts( + source_file, + output_dir, + storage_key_prefix="test/prefix", + fps=30.0, + segment_length=2, + repair=False, + ) + + mock_extract.assert_called_once_with( + source_file=source_file, + output_dir=output_dir, + fps=30.0, + segment_length=2, + repair=False, + storage_key_prefix="test/prefix", + ) diff --git a/tests/darwin/data/test_video.mp4 b/tests/darwin/data/test_video.mp4 new file mode 100644 index 000000000..12e219bf4 Binary files /dev/null and b/tests/darwin/data/test_video.mp4 differ diff --git a/tests/darwin/data/test_video_corrupted.mp4 b/tests/darwin/data/test_video_corrupted.mp4 new file mode 100644 index 000000000..dc0b9f168 Binary files /dev/null and b/tests/darwin/data/test_video_corrupted.mp4 differ diff --git a/tests/darwin/data/test_video_with_audio.mp4 b/tests/darwin/data/test_video_with_audio.mp4 new file mode 100644 index 000000000..ac6bfce85 Binary files /dev/null and b/tests/darwin/data/test_video_with_audio.mp4 differ diff --git a/tests/darwin/extractor/video_test.py b/tests/darwin/extractor/video_test.py new file mode 100644 index 000000000..eee0e7efc --- /dev/null +++ b/tests/darwin/extractor/video_test.py @@ -0,0 +1,291 @@ +import tempfile +from pathlib import Path + +import pytest + +from darwin.extractor.video import extract_artifacts + + +@pytest.fixture +def data_dir(): + return Path(__file__).parent.parent / "data" + + +@pytest.fixture +def output_dir(): + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + +class TestVideoArtifactExtraction: + def test_extract_artifacts_basic_video(self, data_dir, output_dir): + """Test basic video artifact extraction without audio""" + source_file = data_dir / "test_video.mp4" + + result = extract_artifacts( + source_file=str(source_file), + output_dir=str(output_dir), + storage_key_prefix="test/prefix", + fps=30.0, + segment_length=2, + repair=False, + ) + + # Get the actual result for bitrate values + actual_payload = result["registration_payload"] + + # Store actual bitrates for assertion + actual_high_bitrate = actual_payload["hls_segments"]["high_quality"]["bitrate"] + actual_low_bitrate = actual_payload["hls_segments"]["low_quality"]["bitrate"] + + # Verify bitrates are non-negative numbers + assert actual_high_bitrate >= 0, "High quality bitrate should be non-negative" + assert actual_low_bitrate >= 0, "Low quality bitrate should be non-negative" + assert ( + actual_high_bitrate > actual_low_bitrate + ), "High quality bitrate should be higher than low quality" + + expected_payload = { + "type": "video", + "width": 1280, + "height": 720, + "native_fps": 30.0, + "fps": 30.0, + "total_frames": 150, # 5 seconds * 30 fps + "visible_frames": 150, # 5 seconds * 30 fps + "hls_segments": { + "high_quality": { + "bitrate": actual_high_bitrate, # Use actual value instead of hardcoded + "index": ( + "#EXTM3U\n" + "#EXT-X-VERSION:3\n" + "#EXT-X-TARGETDURATION:2\n" + "#EXT-X-MEDIA-SEQUENCE:0\n" + "#EXTINF:2.000000,\n" + "test/prefix/segments/high/000000000.ts\n" + "#EXTINF:2.000000,\n" + "test/prefix/segments/high/000000001.ts\n" + "#EXTINF:1.000000,\n" + "test/prefix/segments/high/000000002.ts\n" + "#EXT-X-ENDLIST\n" + ), + }, + "low_quality": { + "bitrate": actual_low_bitrate, # Use actual value instead of hardcoded + "index": ( + "#EXTM3U\n" + "#EXT-X-VERSION:3\n" + "#EXT-X-TARGETDURATION:2\n" + "#EXT-X-MEDIA-SEQUENCE:0\n" + "#EXTINF:2.000000,\n" + "test/prefix/segments/low/000000000.ts\n" + "#EXTINF:2.000000,\n" + "test/prefix/segments/low/000000001.ts\n" + "#EXTINF:1.000000,\n" + "test/prefix/segments/low/000000002.ts\n" + "#EXT-X-ENDLIST\n" + ), + }, + }, + "storage_key": "test/prefix/test_video.mp4", + "storage_frames_manifest_key": "test/prefix/frames_manifest.txt", + "storage_thumbnail_key": "test/prefix/thumbnail.jpg", + "storage_sections_key_prefix": "test/prefix/sections", + "total_size_bytes": 61859, + "name": "test_video.mp4", + "path": "/", + } + + # Verify the result structure matches exactly + assert result["registration_payload"] == expected_payload + + # Verify generated files + assert (output_dir / "metadata.json").exists() + assert (output_dir / "thumbnail.jpg").exists() + assert (output_dir / "frames_manifest.txt").exists() + + # Verify frames manifest content + with open(output_dir / "frames_manifest.txt") as f: + manifest_lines = f.readlines() + + # Verify number of frames + assert len(manifest_lines) == 150, "Should have 150 frames" + + # Parse and verify each frame entry + frame_duration = 1.0 / 30.0 # 0.0333333... seconds per frame + for i, line in enumerate(manifest_lines): + # Parse line format: frame_number:segment_number:1:timestamp + frame_num, segment_num, _, timestamp = line.strip().split(":") + frame_num = int(frame_num) + segment_num = int(segment_num) + timestamp = float(timestamp) + + # Verify frame number (0-59 within each segment) + assert frame_num == i % 60, f"Wrong frame number at line {i}" + + # Verify segment number (0-2) + assert segment_num == i // 60, f"Wrong segment number at line {i}" + + # Verify timestamp (allowing small floating point differences) + expected_time = i * frame_duration + assert ( + abs(timestamp - expected_time) < 0.0001 + ), f"Wrong timestamp at line {i}" + + # Verify generated files + assert len(list((output_dir / "sections").glob("*.png"))) == 150 + + # Verify HLS segments + assert len(list((output_dir / "segments" / "high").glob("*.ts"))) == 3 + assert len(list((output_dir / "segments" / "low").glob("*.ts"))) == 3 + + def test_extract_artifacts_with_audio(self, data_dir, output_dir): + """Test video artifact extraction with audio""" + source_file = data_dir / "test_video_with_audio.mp4" + + result = extract_artifacts( + source_file=str(source_file), + output_dir=str(output_dir), + storage_key_prefix="test/prefix", + fps=30.0, + segment_length=2, + repair=False, + ) + + # Verify audio peaks are extracted + assert ( + result["registration_payload"]["storage_audio_peaks_key"] + == "test/prefix/audio_peaks.gz" + ) + assert (output_dir / "audio_peaks.gz").exists() + + def test_extract_artifacts_with_repair(self, data_dir, output_dir): + """Test video repair functionality""" + source_file = data_dir / "test_video_corrupted.mp4" + + result = extract_artifacts( + source_file=str(source_file), + output_dir=str(output_dir), + storage_key_prefix="test/prefix", + fps=30.0, + segment_length=2, + repair=True, + ) + + repaired_file = output_dir / "repaired_test_video_corrupted.mp4" + + assert result["repaired"] is True + assert result["source_file"] == str(repaired_file) + assert repaired_file.exists() + # Verify the video was processed successfully despite corruption + assert result["registration_payload"]["type"] == "video" + + def test_extract_artifacts_file_not_found(self, output_dir): + """Test handling of non-existent video file""" + with pytest.raises(FileNotFoundError): + extract_artifacts( + source_file="nonexistent.mp4", + output_dir=str(output_dir), + storage_key_prefix="test", + ) + + def test_extract_artifacts_custom_fps(self, data_dir, output_dir): + """Test artifact extraction with custom FPS""" + source_file = data_dir / "test_video.mp4" + + result = extract_artifacts( + source_file=str(source_file), + output_dir=str(output_dir), + storage_key_prefix="test/prefix", + fps=15.0, # Half of the original FPS + segment_length=2, + repair=False, + ) + + # Verify FPS settings + assert result["registration_payload"]["native_fps"] == 30.0 + assert result["registration_payload"]["fps"] == 15.0 + # Should have half as many visible frames due to downsampling + assert result["registration_payload"]["visible_frames"] == 75 # 150/2 + + def test_extract_artifacts_native_fps(self, data_dir, output_dir): + """Test artifact extraction using native FPS""" + source_file = data_dir / "test_video.mp4" + + result = extract_artifacts( + source_file=str(source_file), + output_dir=str(output_dir), + storage_key_prefix="test/prefix", + fps=0.0, # Use native FPS + segment_length=2, + repair=False, + ) + + # Verify FPS settings + assert result["registration_payload"]["native_fps"] == 30.0 + assert result["registration_payload"]["fps"] == 0.0 + # Should have same number of visible frames as total frames + assert ( + result["registration_payload"]["visible_frames"] + == result["registration_payload"]["total_frames"] + ) + + def test_extract_artifacts_segment_length(self, data_dir, output_dir): + """Test that segment_length parameter affects HLS segment duration""" + source_file = data_dir / "test_video.mp4" + + # Test with 1-second segments + extract_artifacts( + source_file=str(source_file), + output_dir=str(output_dir / "1s"), + storage_key_prefix="test/prefix", + fps=30.0, + segment_length=1, # 1-second segments + repair=False, + ) + + # Test with 3-second segments + extract_artifacts( + source_file=str(source_file), + output_dir=str(output_dir / "3s"), + storage_key_prefix="test/prefix", + fps=30.0, + segment_length=3, # 3-second segments + repair=False, + ) + + # Read and parse HLS manifests + def get_segment_durations(manifest_path): + durations = [] + with open(manifest_path) as f: + for line in f: + if line.startswith("#EXTINF:"): + # Extract duration value (format: #EXTINF:1.0,) + duration = float(line.strip().split(":")[1].split(",")[0]) + durations.append(duration) + return durations + + # Check high quality segments + durations_1s = get_segment_durations( + output_dir / "1s" / "segments" / "high" / "index.m3u8" + ) + durations_3s = get_segment_durations( + output_dir / "3s" / "segments" / "high" / "index.m3u8" + ) + + # Print actual durations for debugging + print("\nActual 1-second segment durations:", durations_1s) + print("Actual 3-second segment durations:", durations_3s) + + # Verify segment durations (allowing small deviation due to keyframe alignment) + assert all( + 0.9 <= d <= 1.1 for d in durations_1s + ), "1-second segments should be ~1 second long" + assert all( + 1.9 <= d <= 3.1 for d in durations_3s + ), "3-second segments should be ~2-3 seconds long" + + # Verify we have more segments with 1-second duration + assert len(durations_1s) > len( + durations_3s + ), "Should have more 1-second segments than 3-second segments"