diff --git a/sleap/gui/commands.py b/sleap/gui/commands.py index b4fea4ca1..7b6b44d64 100644 --- a/sleap/gui/commands.py +++ b/sleap/gui/commands.py @@ -35,7 +35,7 @@ class which inherits from `AppCommand` (or a more specialized class such as import traceback from enum import Enum from glob import glob -from itertools import product +from itertools import permutations, product from pathlib import Path, PurePath from typing import Callable, Dict, Iterator, List, Optional, Tuple, Type, Union, cast @@ -3422,9 +3422,19 @@ def do_action(cls, context: CommandContext, params: dict): video = params.get("video", None) or context.state["video"] session = params.get("session", None) or context.labels.get_session(video) instances = params["instances"] + session = cast(RecordingSession, session) # Could be None if no labels or video + + # Get best instance grouping and reprojected coords + instances_and_reprojected_coords = ( + TriangulateSession.get_instance_grouping_and_reprojected_coords( + session=session, instance_hypotheses=instances + ) + ) # Update instances - TriangulateSession.update_instances(session=session, instances=instances) + TriangulateSession.update_instances( + instances_and_coords=instances_and_reprojected_coords + ) @classmethod def ask(cls, context: CommandContext, params: dict) -> bool: @@ -3480,7 +3490,6 @@ def verify_views_and_instances(cls, context: CommandContext, params: dict) -> bo if session is None or instance is None: return - track = instance.track # TODO(LM): Replace with InstanceGroup cams_to_include = params.get("cams_to_include", None) or session.linked_cameras # If not enough `Camcorder`s available/specified, then return @@ -3492,13 +3501,12 @@ def verify_views_and_instances(cls, context: CommandContext, params: dict) -> bo ): return False - # Get all instances accross views at this frame index + # Get all instances products accross views at this frame index instances = TriangulateSession.get_and_verify_enough_instances( context=context, session=session, - frame_inds=[frame_idx], + frame_idx=frame_idx, cams_to_include=cams_to_include, - track=track, show_dialog=show_dialog, ) @@ -3514,19 +3522,18 @@ def verify_views_and_instances(cls, context: CommandContext, params: dict) -> bo @staticmethod def get_and_verify_enough_instances( session: RecordingSession, - frame_inds: List[int], + frame_idx: int, context: Optional[CommandContext] = None, cams_to_include: Optional[List[Camcorder]] = None, - track: Union[Track, int] = -1, show_dialog: bool = True, ) -> Union[Dict[int, Dict[Camcorder, List[Instance]]], bool]: - """Get all instances accross views at this frame index. + """Get all instances accross views at this frame index (and products of instances). If not enough `Instance`s are available at this frame index, then return False. Args: session: The `RecordingSession` containing the `Camcorder`s. - frame_inds: List of frame indices to get instances from (0-indexed). + frame_idx: Frame index to get instances from (0-indexed). context: The optional command context used to display a dialog. cams_to_include: List of `Camcorder`s to include. Default is all. track: `Track` object used to find instances accross views. Default is -1 @@ -3534,20 +3541,18 @@ def get_and_verify_enough_instances( show_dialog: If True, then show a warning dialog. Default is True. Returns: - Dict with frame identifier keys (does not necessarily need to be the frame - index) and values of another inner dict with `Camcorder` keys and - `List[Instance]` values if enough instances are found, False otherwise. + Dict with frame identifier keys (not the frame index) and values of another + inner dict with `Camcorder` keys and `List[Instance]` values if enough + instances are found, False otherwise. """ try: instances: Dict[ int, Dict[Camcorder, List[Instance]] - ] = TriangulateSession.get_instances_across_views_multiple_frames( + ] = TriangulateSession.get_products_of_instances( session=session, - frame_inds=frame_inds, + frame_idx=frame_idx, cams_to_include=cams_to_include, - track=track, - require_multiple_views=True, ) return instances except Exception as e: @@ -3747,38 +3752,298 @@ def get_all_views_at_frame( return views + @staticmethod + def get_instance_grouping_and_reprojected_coords( + session: RecordingSession, + instance_hypotheses: Dict[int, Dict[Camcorder, List[Instance]]], + ): + """Get instance grouping and reprojected coords with lowest reprojection error. + + Triangulation of all possible groupings needs to be performed... Thus, we return + the best grouping's triangulation in this step to then be used when updating the + instance. + + Args: + session: The `RecordingSession` containing the `Camcorder`s. + instance_hypotheses: Dict with frame identifier keys (not the frame index) + and values of another inner dict with `Camcorder` keys and + `List[Instance]` values. + + + Returns: + best_instances_and_reprojected_coords: Dict with `Camcorder` keys with + `Tuple[Instance, np.ndarray]` values. + """ + + # Calculate reprojection error for each instance grouping + ( + reprojection_error_per_frame, + instances_and_coords, + ) = TriangulateSession.calculate_error_per_frame( + session=session, + instances=instance_hypotheses, + ) + + # Just for type hinting + reprojection_error_per_frame = cast( + Dict[int, float], reprojection_error_per_frame + ) + instances_and_coords = cast( + Dict[int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]]], + instances_and_coords, + ) + + # Get instance grouping with lowest reprojection error + best_instances, frame_id_min_error = TriangulateSession._get_instance_grouping( + instances=instance_hypotheses, + reprojection_error_per_frame=reprojection_error_per_frame, + ) + + # Just for type hinting + best_instances = cast(Dict[Camcorder, List[Instance]], best_instances) + instances_and_coords = cast( + Dict[int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]]], + instances_and_coords, + ) + + # Get the best reprojection + best_instances_and_reprojected_coords: Dict[ + Camcorder, Iterator[Tuple[Instance, np.ndarray]] + ] = instances_and_coords[frame_id_min_error] + + return best_instances_and_reprojected_coords + + @staticmethod + def _get_instance_grouping( + instances: Dict[int, Dict[Camcorder, List[Instance]]], + reprojection_error_per_frame: Dict[int, float], + ) -> Tuple[Dict[Camcorder, List[Instance]], int]: + """Get instance grouping with lowest reprojection error. + + Args: + instances: Dict with frame identifier keys (not the frame index) and values + of another inner dict with `Camcorder` keys and `List[Instance]` values. + reprojection_error_per_frame: Dict with frame identifier keys (not the frame + index) and values of reprojection error for the frame. + + Returns: + best_instances: Dict with `Camcorder` keys and `List[Instance]` values. + frame_id_min_error: The frame identifier with the lowest reprojection + """ + + frame_id_min_error: int = min( + reprojection_error_per_frame, key=reprojection_error_per_frame.get + ) + + best_instances: Dict[Camcorder, List[Instance]] = instances[frame_id_min_error] + + return best_instances, frame_id_min_error + + @staticmethod + def _calculate_reprojection_error( + session: RecordingSession, + instances: Dict[int, Dict[Camcorder, List[Instance]]], + per_instance: bool = False, + per_view: bool = False, + ) -> Tuple[ + Dict[int, Union[float, Dict[Camcorder, List[Tuple[Instance, float]]]]], + Dict[int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]]], + ]: + """Calculate reprojection error per frame or per instance. + + Args: + session: The `RecordingSession` containing the `Camcorder`s. + instances: Dict with frame identifier keys (not the frame index) and values + of another inner dict with `Camcorder` keys and `List[Instance]` values. + per_instance: If True, then return a dict with frame identifier keys and + values of another inner dict with `Camcorder` keys and + `List[Tuple[Instance, float]]` values. + per_view: If True, then return a dict with frame identifier keys and values + of another inner dict with `Camcorder` keys and + `Tuple[Tuple[str, str], float]` values. If per_instance is True, then that takes precendence. + + Returns: + reprojection_per_frame: Dict with frame identifier keys (not the frame index) and values of another + inner dict with `Camcorder` keys and `List[Tuple[Instance, float]]` values + if per_instance is True, otherwise a dict with frame identifier keys and + values of reprojection error for the frame. + instances_and_coords: Dict with frame identifier keys (not the frame index) + and values of another inner dict with `Camcorder` keys and + `Iterator[Tuple[Instance, np.ndarray]]` values that contain the instance + and the reprojected coordinates. + + """ + + reprojection_error_per_frame = {} + + # Triangulate and reproject instance coordinates. + instances_and_coords: Dict[ + int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]] + ] = TriangulateSession.calculate_reprojected_points( + session=session, instances=instances + ) + for frame_id, instances_in_frame in instances_and_coords.items(): + frame_error: Union[Dict, float] = {} if per_instance or per_view else 0 + for cam, instances_in_view in instances_in_frame.items(): + # Compare instance coordinates here + instance_ids: List[Union[Track, str]] = [] + view_error: Union[List, int] = [] if per_instance else 0 + for inst, inst_coords in instances_in_view: + node_errors = np.nan_to_num(inst.numpy() - inst_coords) + instance_error = np.linalg.norm(node_errors) + + if per_instance: + view_error = cast(List, view_error) + view_error.append((inst, instance_error)) + else: + view_error = cast(int, view_error) + view_error += instance_error + + inst_id: Union[Track, str] = ( + inst.track if inst.track is not None else "None" + ) + instance_ids.append(inst_id) + + if per_instance: + frame_error = cast(Dict, frame_error) + frame_error[cam] = view_error + elif per_view: + view_error = cast(int, view_error) + frame_error = cast( + Dict[Camcorder, Tuple[Tuple[Union[Track, str], ...], int]], + frame_error, + ) + frame_error[cam] = (tuple(instance_ids), view_error) + else: + view_error = cast(int, view_error) + frame_error = cast(int, frame_error) + frame_error += view_error + + reprojection_error_per_frame[frame_id] = frame_error + + return reprojection_error_per_frame, instances_and_coords + + @staticmethod + def calculate_error_per_instance( + session: RecordingSession, instances: Dict[int, Dict[Camcorder, List[Instance]]] + ) -> Tuple[ + Dict[int, Dict[Camcorder, List[Tuple[Instance, float]]]], + Dict[int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]]], + ]: + """Calculate reprojection error per instance. + + Args: + session: The `RecordingSession` containing the `Camcorder`s. + instances: Dict with frame identifier keys (not the frame index) and values + of another inner dict with `Camcorder` keys and `List[Instance]` values. + + Returns: + reprojection_error_per_instance: Dict with frame identifier keys (not the + frame index) and values of another inner dict with `Camcorder` keys and + `List[Tuple[Instance, float]]` values. + instances_and_coords: Dict with frame identifier keys (not the frame index) + and values of another inner dict with `Camcorder` keys and + `Iterator[Tuple[Instance, np.ndarray]]` values that contain the instance + and the reprojected coordinates. + """ + + ( + reprojection_error_per_instance, + instances_and_coords, + ) = TriangulateSession._calculate_reprojection_error( + session=session, instances=instances, per_instance=True + ) + + return reprojection_error_per_instance, instances_and_coords + + @staticmethod + def calculate_error_per_view( + session: RecordingSession, instances: Dict[int, Dict[Camcorder, List[Instance]]] + ) -> Tuple[ + Dict[int, Dict[Camcorder, float]], + Dict[int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]]], + ]: + """Calculate reprojection error per instance. + + Args: + session: The `RecordingSession` containing the `Camcorder`s. + instances: Dict with frame identifier keys (not the frame index) and values + of another inner dict with `Camcorder` keys and `List[Instance]` values. + + Returns: + reprojection_error_per_view: Dict with frame identifier keys (not the frame + index) and values of another inner dict with `Camcorder` keys and + `float` values. + instances_and_coords: Dict with frame identifier keys (not the frame index) + and values of another inner dict with `Camcorder` keys and + `Iterator[Tuple[Instance, np.ndarray]]` values that contain the instance + and the reprojected coordinates. + """ + + ( + reprojection_error_per_view, + instances_and_coords, + ) = TriangulateSession._calculate_reprojection_error( + session=session, instances=instances, per_view=True + ) + + return reprojection_error_per_view, instances_and_coords + + @staticmethod + def calculate_error_per_frame( + session: RecordingSession, instances: Dict[int, Dict[Camcorder, List[Instance]]] + ) -> Tuple[ + Dict[int, float], + Dict[int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]]], + ]: + """Calculate reprojection error per frame. + + Args: + session: The `RecordingSession` containing the `Camcorder`s. + instances: Dict with frame identifier keys (not the frame index) and values + of another inner dict with `Camcorder` keys and `List[Instance]` values. + + Returns: + reprojection_error_per_frame: Dict with frame identifier keys (not the frame + index) and values of reprojection error for the frame. + instances_and_coords: Dict with frame identifier keys (not the frame index) + and values of another inner dict with `Camcorder` keys and + `Iterator[Tuple[Instance, np.ndarray]]` values that contain the instance + and the reprojected coordinates. + """ + + ( + reprojection_error_per_frame, + instances_and_coords, + ) = TriangulateSession._calculate_reprojection_error( + session=session, instances=instances, per_instance=False + ) + + return reprojection_error_per_frame, instances_and_coords + @staticmethod def get_products_of_instances( - selected_instance: Instance, session: RecordingSession, frame_idx: int, cams_to_include: Optional[List[Camcorder]] = None, ) -> Dict[int, Dict[Camcorder, List[Instance]]]: - """Get all (single-instance) possible products of instances across views. + """Get all (multi-instance) possible products of instances across views. Args: - selected_instance: The `Instance` to add to permutations of instances in - views other than that of the `selected_instance`. session: The `RecordingSession` containing the `Camcorder`s. frame_idx: Frame index to get instances from (0-indexed). cams_to_include: List of `Camcorder`s to include. Default is all. require_multiple_views: If True, then raise and error if one or less views or instances are found. - Raises: - ValueError if one or less views or instances are found. - Returns: Dict with frame identifier keys (not the frame index) and values of another inner dict with `Camcorder` keys and `List[Instance]` values. Each - `List[Instance]` is of length 1. + `List[Instance]` is of length "max number of instances in frame set". """ - cam_selected = session.get_camera(selected_instance.video) - cam_selected = cast(Camcorder, cam_selected) # Could be None if not in session - # Get all instances accross views at this frame index, then remove selected - instances_excluding_selected: Dict[ + instances: Dict[ Camcorder, List[Instance] ] = TriangulateSession.get_instances_across_views( session=session, @@ -3787,47 +4052,52 @@ def get_products_of_instances( track=-1, # Get all instances regardless of track. require_multiple_views=True, ) - instances_excluding_selected.pop(cam_selected) + + # Get the skeleton from an example instance + skeleton = next(iter(instances.values()))[0].skeleton # Find max number of instances in other views - max_num_instances = max( - [len(instances) for instances in instances_excluding_selected.values()] - ) + max_num_instances = max([len(instances) for instances in instances.values()]) # Create a dummy instance of all nan values dummy_instance = Instance.from_numpy( np.full( - shape=(len(selected_instance.skeleton.nodes), 2), + shape=(len(skeleton.nodes), 2), fill_value=np.nan, ), - skeleton=selected_instance.skeleton, + skeleton=skeleton, ) - # Append a dummy instance to all lists of instances if less than the max length - for instances in instances_excluding_selected.values(): - num_instances = len(instances) + # Get permutations of instances from other views + instances_permutations: Dict[Camcorder, Iterator[Tuple]] = {} + for cam, instances_in_view in instances.items(): + # Append a dummy instance to all lists of instances if less than the max length + num_missing = 1 + num_instances = len(instances_in_view) if num_instances < max_num_instances: num_missing = max_num_instances - num_instances - instances.extend([dummy_instance] * num_missing) - # Permute instances from other views into all possible combos + # Extend the list first + instances_in_view.extend([dummy_instance] * num_missing) + + # Permute instances into all possible orderings w/in a view + instances_permutations[cam] = permutations(instances_in_view) + + # Get products of instances from other views into all possible groupings # Ordering of dict_values is preserved in Python 3.7+ - permutated_instances: Iterator[Tuple] = product( - *instances_excluding_selected.values() + products_of_instances: Iterator[Iterator[Tuple]] = product( + *instances_permutations.values() ) - # Reorganize permutaions by cam and add selected instance to each permutation - instances: Dict[int, Dict[Camcorder, List[Instance]]] = {} - for frame_id, perm in enumerate(permutated_instances): - instances[frame_id] = {cam_selected: [selected_instance]} - instances[frame_id].update( - { - cam: [inst] - for cam, inst in zip(instances_excluding_selected.keys(), perm) - } - ) + # Reorganize products by cam and add selected instance to each permutation + instances_hypotheses: Dict[int, Dict[Camcorder, List[Instance]]] = {} + for frame_id, prod in enumerate(products_of_instances): + instances_hypotheses[frame_id] = { + cam: [*inst] for cam, inst in zip(instances.keys(), prod) + } - return instances # Expect ! frames + # Expect "max # instances in view" ** "# views" frames (a.k.a. hypotheses) + return instances_hypotheses @staticmethod def get_instances_matrices( @@ -3956,10 +4226,7 @@ def calculate_excluded_views_multiple_frames( @staticmethod def _calculate_reprojected_points( session: RecordingSession, instances: Dict[int, Dict[Camcorder, List[Instance]]] - ) -> Tuple[ - Dict[int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]]], - List[Camcorder], - ]: + ) -> Tuple[np.ndarray, List[Camcorder]]: """Triangulate and reproject instance coordinates. Note that the order of the instances in the list must match the order of the @@ -4069,6 +4336,8 @@ def group_instances_and_coords( insts_coords_in_frame: np.ndarray = insts_coords_list[frame_idx][ cam_idx ] # len(T) of N x 2 + + # TODO(LM): I think we will need a reconsumable iterator here. insts_and_coords_in_frame[cam] = zip( instances_in_frame_ordered, insts_coords_in_frame, @@ -4104,7 +4373,7 @@ def calculate_reprojected_points( session=session, instances=instances ) - # Group together instances (the reordered by cam) and the reprojected coords. + # Reorder instances (by cam) and the reprojected coords. instances_and_coords: Dict[ int, Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]] ] = TriangulateSession.group_instances_and_coords( @@ -4117,31 +4386,23 @@ def calculate_reprojected_points( @staticmethod def update_instances( - session, instances: Dict[int, Dict[Camcorder, List[Instance]]] + instances_and_coords: Dict[Camcorder, Iterator[Tuple[Instance, np.ndarray]]] ): - """Triangulate, reproject, and update coordinates of `Instances`. + """Update coordinates of `Instances`. Args: - session: The `RecordingSession` containing the `Camcorder`s. - instances: Dict with frame identifier keys (does not necessarily need to be - the frame index) and values of another inner dict with `Camcorder` keys - and `List[Instance]` values. + instances_and_coords: Dict with `Camcorder` keys and + `Iterator[Tuple[Instance, np.ndarray]]` values containing the Instance + and it's reprojected coordinates. Returns: None """ - # Triangulate and reproject instance coordinates. - instances_and_coords = TriangulateSession.calculate_reprojected_points( - session=session, instances=instances - ) - - # TODO(LM): Since we only use the values here, is a dictionary overkill? # Update the instance coordinates. - for instances_in_frame in instances_and_coords.values(): - for instances_in_view in instances_in_frame.values(): - for inst, inst_coord in instances_in_view: - inst.update_points(points=inst_coord, exclude_complete=True) + for instances_in_view in instances_and_coords.values(): + for inst, inst_coord in instances_in_view: + inst.update_points(points=inst_coord, exclude_complete=True) def open_website(url: str): diff --git a/tests/gui/test_commands.py b/tests/gui/test_commands.py index 1329fd4c0..c6a9a279a 100644 --- a/tests/gui/test_commands.py +++ b/tests/gui/test_commands.py @@ -1063,32 +1063,29 @@ def test_triangulate_session_get_and_verify_enough_instances( labels = multiview_min_session_labels session = labels.sessions[0] lf = labels.labeled_frames[0] - track = labels.tracks[1] # Test with no cams_to_include, expect views from all linked cameras instances = TriangulateSession.get_and_verify_enough_instances( - session=session, frame_inds=[lf.frame_idx], track=track + session=session, frame_idx=lf.frame_idx ) - instances_in_frame = instances[lf.frame_idx] + instances_in_frame = instances[0] assert ( - len(instances_in_frame) == 6 - ) # Some views don't have an instance at this track + len(instances_in_frame) == 8 + ) # All views should have same number of instances (padded with dummy instance(s)) for cam in session.linked_cameras: if cam.name in ["side", "sideL"]: # The views that don't have an instance continue instances_in_view = instances_in_frame[cam] for inst in instances_in_view: assert inst.frame_idx == lf.frame_idx - assert inst.track == track assert inst.video == session[cam] # Test with cams_to_include, expect views from only those cameras cams_to_include = session.linked_cameras[-2:] instances = TriangulateSession.get_and_verify_enough_instances( session=session, - frame_inds=[lf.frame_idx], + frame_idx=lf.frame_idx, cams_to_include=cams_to_include, - track=track, ) instances_in_frame = instances[lf.frame_idx] assert len(instances_in_frame) == len(cams_to_include) @@ -1096,21 +1093,24 @@ def test_triangulate_session_get_and_verify_enough_instances( instances_in_view = instances_in_frame[cam] for inst in instances_in_view: assert inst.frame_idx == lf.frame_idx - assert inst.track == track assert inst.video == session[cam] # Test with not enough instances, expect views from only those cameras cams_to_include = session.linked_cameras[0:2] + cam = cams_to_include[0] + video = session[cam] + lfs = labels.find(video, lf.frame_idx) + lf = lfs[0] + lf.instances = [] instances = TriangulateSession.get_and_verify_enough_instances( session=session, - frame_inds=[lf.frame_idx], + frame_idx=lf.frame_idx, cams_to_include=cams_to_include, - track=None, ) assert isinstance(instances, bool) assert not instances messages = "".join([rec.message for rec in caplog.records]) - assert "One or less instances found for frame" in messages + assert "No Instances found for" in messages def test_triangulate_session_verify_enough_views( @@ -1259,7 +1259,9 @@ def test_triangulate_session_update_instances(multiview_min_session_labels: Labe # Just run for code coverage testing, do not test output here (race condition) # (see "functional core, imperative shell" pattern) - TriangulateSession.update_instances(session=session, instances=instances) + TriangulateSession.update_instances( + instances_and_coords=instances_and_coordinates[0] + ) def test_triangulate_session_do_action(multiview_min_session_labels: Labels): @@ -1332,20 +1334,19 @@ def test_triangulate_session_get_products_of_instances( selected_instance = lf.instances[0] instances = TriangulateSession.get_products_of_instances( - selected_instance=selected_instance, session=session, frame_idx=lf.frame_idx, ) views = TriangulateSession.get_all_views_at_frame(session, lf.frame_idx) max_num_instances_in_view = max([len(instances) for instances in views.values()]) - assert len(instances) == max_num_instances_in_view ** (len(views) - 1) + assert len(instances) == max_num_instances_in_view ** len(views) for frame_id in instances: instances_in_frame = instances[frame_id] for cam in instances_in_frame: instances_in_view = instances_in_frame[cam] - assert len(instances_in_view) == 1 + assert len(instances_in_view) == max_num_instances_in_view for inst in instances_in_view: try: assert inst.frame_idx == selected_instance.frame_idx @@ -1353,3 +1354,68 @@ def test_triangulate_session_get_products_of_instances( except: assert inst.frame is None assert inst.video is None + + +def test_triangulate_session_calculate_error_per_frame( + multiview_min_session_labels: Labels, +): + """Test `TriangulateSession.calculate_error_per_frame`.""" + + labels = multiview_min_session_labels + session = labels.sessions[0] + lf = labels.labeled_frames[0] + + instances = TriangulateSession.get_products_of_instances( + session=session, + frame_idx=lf.frame_idx, + ) + + ( + reprojection_error_per_frame, + instances_and_coords, + ) = TriangulateSession.calculate_error_per_frame( + session=session, instances=instances + ) + + for frame_id in instances.keys(): + assert frame_id in reprojection_error_per_frame + assert isinstance(reprojection_error_per_frame[frame_id], float) + + +def test_triangulate_session_get_instance_grouping( + multiview_min_session_labels: Labels, +): + """Test `TriangulateSession._get_instance_grouping`.""" + + labels = multiview_min_session_labels + session = labels.sessions[0] + lf = labels.labeled_frames[0] + selected_instance = lf.instances[0] + + instances = TriangulateSession.get_products_of_instances( + session=session, + frame_idx=lf.frame_idx, + ) + + ( + reprojection_error_per_frame, + instances_and_coords, + ) = TriangulateSession.calculate_error_per_frame( + session=session, instances=instances + ) + + best_instances, frame_id_min_error = TriangulateSession._get_instance_grouping( + instances=instances, reprojection_error_per_frame=reprojection_error_per_frame + ) + assert len(best_instances) == len(session.camera_cluster) + for instances_in_view in best_instances.values(): + tracks_in_view = set( + [inst.track if inst is not None else "None" for inst in instances_in_view] + ) + assert len(tracks_in_view) == len(instances_in_view) + for inst in instances_in_view: + try: + assert inst.frame_idx == selected_instance.frame_idx + except: + assert inst.frame is None + assert inst.track is None