Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Display embeddings (#459) #493

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 12 additions & 21 deletions cli/cli/handlers/finder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Optional

from cli.handlers.errors import handle_errors
from winnow.utils.logging import configure_logging_cli


class FinderCli:
Expand All @@ -13,30 +12,22 @@ def __init__(self, pipeline):
@handle_errors
def local_matches(self):
"""Find matches between local videos."""
from winnow.pipeline.generate_local_matches import generate_local_matches
from winnow.utils.files import scan_videos
import luigi

config = self._pipeline.config
configure_logging_cli(config.logging)
from winnow.pipeline.luigi.matches import MatchesReportTask

videos = scan_videos(config.sources.root, "**", extensions=config.sources.extensions)
generate_local_matches(files=videos, pipeline=self._pipeline)
luigi.build([MatchesReportTask(config=self._pipeline.config)], local_scheduler=True, workers=1)

def remote_matches(self, repo: Optional[str] = None, contributor: Optional[str] = None):
def remote_matches(self, repo: Optional[str] = None):
"""Find matches between local files and remote fingerprints."""
from winnow.pipeline.generate_remote_matches import generate_remote_matches
import logging.config
import luigi

config = self._pipeline.config
configure_logging_cli(config.logging)
from winnow.pipeline.luigi.matches import RemoteMatchesTask

if repo is not None:
repo = str(repo)

if contributor is not None:
contributor = str(contributor)

generate_remote_matches(
pipeline=self._pipeline,
repository_name=repo,
contributor_name=contributor,
logging.config.fileConfig("./logging.conf")
luigi.build(
[RemoteMatchesTask(config=self._pipeline.config, repository_name=repo)],
local_scheduler=True,
workers=1,
)
35 changes: 18 additions & 17 deletions cli/cli/handlers/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import os


class PipelineCli:
"""Process video files."""

Expand All @@ -9,20 +6,24 @@ def __init__(self, config):

def all(self):
"""Process all video files."""
from winnow.utils.logging import configure_logging_cli
from winnow.pipeline.detect_scenes import detect_scenes
from winnow.pipeline.generate_local_matches import generate_local_matches
from winnow.utils.files import scan_videos
from winnow.pipeline.extract_exif import extract_exif
from winnow.pipeline.pipeline_context import PipelineContext

configure_logging_cli(self._config.logging)
import luigi

# Resolve list of video files from the directory
absolute_root = os.path.abspath(self._config.sources.root)
videos = scan_videos(absolute_root, "**", extensions=self._config.sources.extensions)
from winnow.pipeline.luigi.exif import ExifTask
from winnow.pipeline.luigi.signatures import (
SignaturesTask,
DBSignaturesTask,
)
from winnow.pipeline.luigi.matches import MatchesReportTask, DBMatchesTask

pipeline_context = PipelineContext(self._config)
generate_local_matches(files=videos, pipeline=pipeline_context)
detect_scenes(files=videos, pipeline=pipeline_context)
extract_exif(videos, pipeline=pipeline_context)
luigi.build(
[
ExifTask(config=self._config),
SignaturesTask(config=self._config),
DBSignaturesTask(config=self._config),
MatchesReportTask(config=self._config),
DBMatchesTask(config=self._config),
],
local_scheduler=True,
workers=1,
)
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ services:
SECURITY_STORAGE_PATH: "/project/data/representations"
RPC_SERVER_HOST: "rpc"
RPC_SERVER_PORT: 50051
EMBEDDINGS_FOLDER: "/project/data/representations/embeddings"
volumes:
# Set the BENETECH_DATA_LOCATION environment variable to the path
# on your host machine where you placed your video files
Expand Down
1 change: 1 addition & 0 deletions environment-gpu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dependencies:
- trimap
- pacmap
- torch
- typing_extensions==4.1.1
- grpcio==1.43.0
- grpcio-tools==1.43.0

1 change: 1 addition & 0 deletions environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies:
- trimap
- pacmap
- torch
- typing_extensions==4.1.1
- grpcio==1.43.0
- grpcio-tools==1.43.0

Expand Down
3 changes: 0 additions & 3 deletions process_video_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import luigi

from winnow.pipeline.luigi.download import DownloadFilesTask
from winnow.pipeline.pipeline_context import PipelineContext
from winnow.pipeline.process_urls import process_urls
from winnow.utils.config import resolve_config
from winnow.utils.logging import configure_logging_cli


@click.command()
Expand Down
110 changes: 110 additions & 0 deletions rpc/embeddings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from threading import Lock
from typing import List, Dict, Optional, Tuple

import numpy as np
from annoy import AnnoyIndex

import rpc.rpc_pb2 as proto
from winnow.pipeline.luigi.condense import CondensedFingerprints
from winnow.pipeline.luigi.embeddings import (
EmbeddingsTask,
UmapEmbeddingsTask,
TSNEEmbeddingsTask,
TriMapEmbeddingsTask,
PaCMAPEmbeddingsTask,
)
from winnow.pipeline.luigi.embeddings_annoy_index import (
EmbeddingsAnnoyIndexTask,
PaCMAPAnnoyIndexTask,
TriMAPAnnoyIndexTask,
UMAPAnnoyIndexTask,
TSNEAnnoyIndexTask,
)
from winnow.pipeline.luigi.utils import FileKeyDF
from winnow.pipeline.pipeline_context import PipelineContext
from winnow.storage.file_key import FileKey


class EmbeddingsIndex:
def __init__(self, annoy_index: AnnoyIndex, files: List[FileKey], positions: Dict[FileKey, np.ndarray]):
self._annoy_index: AnnoyIndex = annoy_index
self._files: List[FileKey] = files
self._positions: Dict[FileKey, np.ndarray] = positions

def query(
self,
x: float,
y: float,
max_count: int = 10,
max_distance: Optional[float] = None,
) -> List[proto.FoundNeighbor]:
if max_distance <= 0:
max_distance = None
indices, distances = self._annoy_index.get_nns_by_vector([x, y], max_count, include_distances=True)
files = [self._files[i] for i in indices]
results: List[proto.FoundNeighbor] = []
for file, distance in zip(files, distances):
if max_distance is not None and distance > max_distance:
break
x, y = self._positions[file]
results.append(
proto.FoundNeighbor(
file_path=file.path,
file_hash=file.hash,
distance=distance,
x=x,
y=y,
)
)
return results


class EmbeddingLoader:
def __init__(self, pipeline: PipelineContext):
self._pipeline: PipelineContext = pipeline
self._cache: Dict[str, EmbeddingsIndex] = {}
self._lock = Lock()

def load(self, algorithm: str) -> Optional[EmbeddingsIndex]:
with self._lock:
if algorithm not in self._cache:
index = self._do_load(algorithm)
if index is not None:
self._cache[algorithm] = index
return self._cache.get(algorithm)

def _do_load(self, algorithm: str) -> Optional[EmbeddingsIndex]:
"""Do load embeddings index."""
embeddings_task, annoy_task = self._task(algorithm)
if embeddings_task is None or annoy_task is None:
return None
embeddings: CondensedFingerprints = embeddings_task.output().read()
if embeddings is None:
return None

annoy_output = annoy_task.output()
annoy_paths, _ = annoy_output.latest_result
if annoy_paths is None:
return None

annoy_index_path, annoy_files_path = annoy_paths
annoy_index = AnnoyIndex(2, "euclidean")
annoy_index.load(annoy_index_path)
annoy_files_df = FileKeyDF.read_csv(annoy_files_path)
positions: Dict[FileKey, np.ndarray] = {}
for i, file_key in enumerate(embeddings.to_file_keys()):
positions[file_key] = embeddings.fingerprints[i]
return EmbeddingsIndex(annoy_index, FileKeyDF.to_file_keys(annoy_files_df), positions)

def _task(self, algorithm: str) -> Tuple[Optional[EmbeddingsTask], Optional[EmbeddingsAnnoyIndexTask]]:
config = self._pipeline.config
if algorithm == "pacmap":
return PaCMAPEmbeddingsTask(config=config), PaCMAPAnnoyIndexTask(config=config)
elif algorithm == "trimap":
return TriMapEmbeddingsTask(config=config), TriMAPAnnoyIndexTask(config=config)
elif algorithm == "umap":
return UmapEmbeddingsTask(config=config), UMAPAnnoyIndexTask(config=config)
elif algorithm == "t-sne":
return TSNEEmbeddingsTask(config=config), TSNEAnnoyIndexTask(config=config)
else:
return None, None
31 changes: 31 additions & 0 deletions rpc/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,35 @@ message StatusRequest {
// Service status response
message StatusResponse {
bool status = 1;
}

// Embeddings service
service Embeddings {
// Get nearest neighbors
rpc query_nearest_neighbors (NearestNeighborsRequest) returns (NearestNeighborsResults) {}
rpc get_status (EmbeddingsStatusRequest) returns (StatusResponse) {}
}

message NearestNeighborsRequest {
string algorithm = 1;
float x = 2;
float y = 3;
float max_distance = 4;
int32 max_count = 5;
}

message NearestNeighborsResults {
repeated FoundNeighbor neighbors = 1;
}

message FoundNeighbor {
string file_path = 1;
string file_hash = 2;
float x = 3;
float y = 4;
float distance = 5;
}

message EmbeddingsStatusRequest {
string algorithm = 1;
}
Loading