From c1fe83cdeb36a3b9945750f9077ece04ce499529 Mon Sep 17 00:00:00 2001 From: Simon Nilsson Date: Sat, 25 Nov 2023 11:32:07 -0500 Subject: [PATCH] Add files via upload --- simba/mixins/geometry_mixin.py | 899 ++++++++++++--------------------- 1 file changed, 336 insertions(+), 563 deletions(-) diff --git a/simba/mixins/geometry_mixin.py b/simba/mixins/geometry_mixin.py index fef4216ae..65947d955 100644 --- a/simba/mixins/geometry_mixin.py +++ b/simba/mixins/geometry_mixin.py @@ -1,28 +1,24 @@ -import functools -import itertools -import multiprocessing +import numpy as np +from typing import Optional, List, Union, Tuple, Iterable +from typing_extensions import Literal +import pandas as pd +from shapely.geometry import Polygon, LineString, Point, MultiPolygon, MultiLineString, GeometryCollection, MultiPoint +from shapely.ops import unary_union, linemerge, cascaded_union, split, triangulate from copy import deepcopy -from typing import Iterable, List, Optional, Tuple, Union - import cv2 import imutils -import numpy as np -from numba import njit, prange -from shapely.geometry import (GeometryCollection, LineString, MultiLineString, - MultiPolygon, Point, Polygon) -from shapely.ops import cascaded_union, linemerge, split, unary_union -from typing_extensions import Literal - -from simba.utils.checks import (check_float, check_if_valid_input, - check_instance, check_int, - check_iterable_length) -from simba.utils.enums import Defaults from simba.utils.errors import InvalidInputError -from simba.utils.lookups import get_color_dict from simba.utils.read_write import find_core_cnt, find_max_vertices_coordinates +from simba.utils.checks import check_int +import multiprocessing +import functools +import itertools +from simba.utils.checks import check_instance, check_if_valid_input, check_iterable_length, check_float +from simba.utils.enums import Defaults +from simba.utils.lookups import get_color_dict +from numba import prange, njit -CAP_STYLE_MAP = {"round": 1, "square": 2, "flat": 3} - +CAP_STYLE_MAP = {'round': 1, 'square': 2, 'flat': 3} class GeometryMixin(object): @@ -37,13 +33,11 @@ def __init__(self): pass @staticmethod - def bodyparts_to_polygon( - data: np.ndarray, - cap_style: Literal["round", "square", "flat"] = "round", - parallel_offset: int = 1, - simplify_tolerance: float = 2, - preserve_topology: bool = True, - ) -> Polygon: + def bodyparts_to_polygon(data: np.ndarray, + cap_style: Literal['round', 'square', 'flat'] = 'round', + parallel_offset: int = 1, + simplify_tolerance: float = 2, + preserve_topology: bool = True) -> Polygon: """ .. image:: _static/img/bodyparts_to_polygon.png :width: 400 @@ -54,16 +48,15 @@ def bodyparts_to_polygon( >>> GeometryMixin().bodyparts_to_polygon(data=data) """ - polygon = Polygon( - LineString(data.tolist()) - .buffer(distance=parallel_offset, cap_style=CAP_STYLE_MAP[cap_style]) - .simplify(tolerance=simplify_tolerance, preserve_topology=preserve_topology) - .convex_hull - ) + polygon = Polygon(LineString(data.tolist()).buffer(distance=parallel_offset, + cap_style=CAP_STYLE_MAP[cap_style]).simplify(tolerance=simplify_tolerance, + preserve_topology=preserve_topology).convex_hull) return polygon @staticmethod - def bodyparts_to_circle(data: np.ndarray, parallel_offset: int = 1) -> Polygon: + def bodyparts_to_circle(data: np.ndarray, + parallel_offset: int = 1) -> Polygon: + """ .. image:: _static/img/bodyparts_to_circle.png :width: 400 @@ -75,15 +68,15 @@ def bodyparts_to_circle(data: np.ndarray, parallel_offset: int = 1) -> Polygon: """ if data.shape != (2,): - raise InvalidInputError( - msg=f"Cannot create circle data is not a (2,) array: " f"{data.shape}", - source=GeometryMixin.bodyparts_to_circle.__name__, - ) + raise InvalidInputError(msg=f'Cannot create circle data is not a (2,) array: ' + f'{data.shape}', source=GeometryMixin.bodyparts_to_circle.__name__) return Point(data).buffer(parallel_offset) + @staticmethod def bodyparts_to_multistring_skeleton(data: np.ndarray) -> MultiLineString: + """ .. image:: _static/img/bodyparts_to_multistring_skeleton.png :width: 400 @@ -94,25 +87,21 @@ def bodyparts_to_multistring_skeleton(data: np.ndarray) -> MultiLineString: >>> shape_multistring = GeometryMixin().bodyparts_to_multistring_skeleton(bodyparts=skeleton, shape=True) """ + if data.ndim != 3: - raise InvalidInputError( - msg=f"Body-parts to skeleton expects a 3D array, got {data.ndim}", - source=GeometryMixin.bodyparts_to_line.__name__, - ) + raise InvalidInputError(msg=f'Body-parts to skeleton expects a 3D array, got {data.ndim}', source=GeometryMixin.bodyparts_to_line.__name__) shape_skeleton = [] - for i in data: - shape_skeleton.append(GeometryMixin().bodyparts_to_line(data=i)) + for i in data: shape_skeleton.append(GeometryMixin().bodyparts_to_line(data=i)) shape_skeleton = linemerge(MultiLineString(shape_skeleton)) return shape_skeleton @staticmethod - def buffer_shape( - shape: Union[Polygon, LineString], - size_mm: int, - pixels_per_mm: float, - cap_style: Literal["round", "square", "flat"] = "round", - ) -> Polygon: + def buffer_shape(shape: Union[Polygon, LineString], + size_mm: int, + pixels_per_mm: float, + cap_style: Literal['round', 'square', 'flat'] = 'round') -> Polygon: + """ Create a buffered shape by applying a buffer operation to the input polygon or linestring. @@ -131,19 +120,14 @@ def buffer_shape( >>> buffered_polygon = GeometryMixin().buffer_shape(shape=polygon, size_mm=-1, pixels_per_mm=1) """ - check_instance( - source=GeometryMixin.buffer_shape.__name__, - instance=shape, - accepted_types=(LineString, Polygon), - ) - check_int(name="BUFFER SHAPE size_mm", value=size_mm) - check_float(name="BUFFER SHAPE pixels_per_mm", value=pixels_per_mm, min_value=1) - return shape.buffer( - distance=int(size_mm / pixels_per_mm), cap_style=CAP_STYLE_MAP[cap_style] - ) + check_instance(source=GeometryMixin.buffer_shape.__name__, instance=shape, accepted_types=(LineString, Polygon)) + check_int(name='BUFFER SHAPE size_mm', value=size_mm) + check_float(name='BUFFER SHAPE pixels_per_mm', value=pixels_per_mm, min_value=1) + return shape.buffer(distance=int(size_mm / pixels_per_mm), cap_style=CAP_STYLE_MAP[cap_style]) @staticmethod def compute_pct_shape_overlap(shapes: List[Union[Polygon, LineString]]) -> float: + """ Compute the percentage of overlap between two shapes. @@ -162,26 +146,11 @@ def compute_pct_shape_overlap(shapes: List[Union[Polygon, LineString]]) -> float """ for shape in shapes: - check_instance( - source=GeometryMixin.compute_pct_shape_overlap.__name__, - instance=shape, - accepted_types=(LineString, Polygon), - ) - check_iterable_length( - source=GeometryMixin.compute_pct_shape_overlap.__name__, - val=len(shapes), - exact_accepted_length=2, - ) + check_instance(source=GeometryMixin.compute_pct_shape_overlap.__name__, instance=shape, accepted_types=(LineString, Polygon)) + check_iterable_length(source=GeometryMixin.compute_pct_shape_overlap.__name__, val=len(shapes), exact_accepted_length=2) if shapes[0].intersects(shapes[1]): intersection = shapes[0].intersection(shapes[1]) - return round( - ( - intersection.area - / ((shapes[0].area + shapes[1].area) - intersection.area) - * 100 - ), - 2, - ) + return round((intersection.area / ((shapes[0].area + shapes[1].area) - intersection.area) * 100), 2) else: return 0.0 @@ -204,24 +173,13 @@ def crosses(shapes: List[LineString]) -> bool: >>> True """ - check_iterable_length( - source=GeometryMixin.compute_pct_shape_overlap.__name__, - val=len(shapes), - exact_accepted_length=2, - ) + check_iterable_length(source=GeometryMixin.compute_pct_shape_overlap.__name__, val=len(shapes), exact_accepted_length=2) for shape in shapes: - check_instance( - source=GeometryMixin.compute_pct_shape_overlap.__name__, - instance=shape, - accepted_types=LineString, - ) + check_instance(source=GeometryMixin.compute_pct_shape_overlap.__name__, instance=shape, accepted_types=LineString) return shapes[0].crosses(shapes[1]) @staticmethod - def is_shape_covered( - shape: Union[LineString, Polygon, MultiPolygon], - other_shape: Union[LineString, Polygon, MultiPolygon], - ) -> bool: + def is_shape_covered(shape: Union[LineString, Polygon, MultiPolygon], other_shape: Union[LineString, Polygon, MultiPolygon]) -> bool: """ Check if one geometry fully covers another. @@ -239,25 +197,16 @@ def is_shape_covered( >>> True """ - check_instance( - source=GeometryMixin.is_shape_covered.__name__, - instance=shape, - accepted_types=(LineString, Polygon, MultiPolygon), - ) - check_instance( - source=GeometryMixin.is_shape_covered.__name__, - instance=other_shape, - accepted_types=(LineString, Polygon, MultiPolygon), - ) + check_instance(source=GeometryMixin.is_shape_covered.__name__, instance=shape, accepted_types=(LineString, Polygon, MultiPolygon)) + check_instance(source=GeometryMixin.is_shape_covered.__name__, instance=other_shape, accepted_types=(LineString, Polygon, MultiPolygon)) return shape.covers(other_shape) + @staticmethod - def shape_distance( - shapes: List[Union[LineString, Polygon]], - pixels_per_mm: float, - unit: Literal["mm", "cm", "dm", "m"] = "mm", - ) -> float: + def shape_distance(shapes: List[Union[LineString, Polygon]], + pixels_per_mm: float, + unit: Literal['mm', 'cm', 'dm', 'm'] = 'mm') -> float: """ Calculate the distance between two geometries in specified units. @@ -276,30 +225,24 @@ def shape_distance( >>> 0 """ - check_if_valid_input(name="UNIT", input=unit, options=["mm", "cm", "dm", "m"]) + check_if_valid_input(name='UNIT', input=unit, options=['mm', 'cm', 'dm', 'm']) for shape in shapes: - check_instance( - source=GeometryMixin.compute_pct_shape_overlap.__name__, - instance=shape, - accepted_types=(LineString, Polygon), - ) - check_iterable_length( - source=GeometryMixin.compute_pct_shape_overlap.__name__, - val=len(shapes), - exact_accepted_length=2, - ) + check_instance(source=GeometryMixin.compute_pct_shape_overlap.__name__, instance=shape, accepted_types=(LineString, Polygon)) + check_iterable_length(source=GeometryMixin.compute_pct_shape_overlap.__name__, val=len(shapes), exact_accepted_length=2) D = shapes[0].distance(shapes[1]) / pixels_per_mm - if unit == "cm": + if unit == 'cm': D = D / 10 - elif unit == "dm": + elif unit == 'dm': D = D / 100 - elif unit == "m": + elif unit == 'm': D = D / 1000 return D + @staticmethod def bodyparts_to_line(data: np.ndarray): + """ .. image:: _static/img/bodyparts_to_line.png @@ -312,10 +255,7 @@ def bodyparts_to_line(data: np.ndarray): """ if data.ndim != 2: - raise InvalidInputError( - msg=f"Body-parts to linestring expects a 2D array, got {data.ndim}", - source=GeometryMixin.bodyparts_to_line.__name__, - ) + raise InvalidInputError(msg=f'Body-parts to linestring expects a 2D array, got {data.ndim}', source=GeometryMixin.bodyparts_to_line.__name__) return LineString(data.tolist()) @staticmethod @@ -331,15 +271,11 @@ def get_center(shape: Union[LineString, Polygon, MultiPolygon]) -> np.ndarray: >>> [33.96969697, 62.32323232] """ - check_instance( - source=GeometryMixin.get_center.__name__, - instance=shape, - accepted_types=(MultiPolygon, LineString, Polygon), - ) + check_instance(source=GeometryMixin.get_center.__name__, instance=shape, accepted_types=(MultiPolygon, LineString, Polygon)) return np.array(shape.centroid) @staticmethod - def is_touching(shapes=List[Union[LineString, Polygon]]) -> bool: + def is_touching(shapes = List[Union[LineString, Polygon]]) -> bool: """ Check if two geometries touch each other. @@ -361,20 +297,12 @@ def is_touching(shapes=List[Union[LineString, Polygon]]) -> bool: """ for i in shapes: - check_instance( - source=GeometryMixin.is_touching.__name__, - instance=i, - accepted_types=(LineString, Polygon), - ) - check_iterable_length( - source=GeometryMixin.is_touching.__name__, - val=len(shapes), - exact_accepted_length=2, - ) + check_instance(source=GeometryMixin.is_touching.__name__, instance=i, accepted_types=(LineString, Polygon)) + check_iterable_length(source=GeometryMixin.is_touching.__name__, val=len(shapes), exact_accepted_length=2) return shapes[0].touches(shapes[1]) @staticmethod - def is_containing(shapes=List[Union[LineString, Polygon]]) -> bool: + def is_containing(shapes = List[Union[LineString, Polygon]]) -> bool: """ .. image:: _static/img/is_containing.png :width: 500 @@ -383,24 +311,14 @@ def is_containing(shapes=List[Union[LineString, Polygon]]) -> bool: :example: """ for i in shapes: - check_instance( - source=GeometryMixin.get_center.__name__, - instance=i, - accepted_types=(LineString, Polygon), - ) - check_iterable_length( - source=GeometryMixin.get_center.__name__, - val=len(shapes), - exact_accepted_length=2, - ) + check_instance(source=GeometryMixin.get_center.__name__, instance=i, accepted_types=(LineString, Polygon)) + check_iterable_length(source=GeometryMixin.get_center.__name__, val=len(shapes), exact_accepted_length=2) return shapes[0].contains(shapes[1]) @staticmethod - def difference( - shape=Union[LineString, Polygon, MultiPolygon], - overlap_shapes=List[Union[LineString, Polygon, MultiPolygon]], - ) -> Polygon: + def difference(shape = Union[LineString, Polygon, MultiPolygon], + overlap_shapes = List[Union[LineString, Polygon, MultiPolygon]]) -> Polygon: """ Calculate the difference between a shape and one or more potentially overlapping shapes. @@ -423,20 +341,10 @@ def difference( >>> difference = GeometryMixin().difference(shape = polygon_1, overlap_shapes=[polygon_2, polygon_3]) """ - check_iterable_length( - source=GeometryMixin.difference.__name__, val=len(overlap_shapes), min=1 - ) + check_iterable_length(source=GeometryMixin.difference.__name__, val=len(overlap_shapes), min=1) for overlap_shap in overlap_shapes: - check_instance( - source=GeometryMixin.difference.__name__, - instance=overlap_shap, - accepted_types=(LineString, Polygon, MultiPolygon), - ) - check_instance( - source=GeometryMixin.difference.__name__, - instance=shape, - accepted_types=(LineString, Polygon), - ) + check_instance(source=GeometryMixin.difference.__name__, instance=overlap_shap, accepted_types=(LineString, Polygon, MultiPolygon)) + check_instance(source=GeometryMixin.difference.__name__, instance=shape, accepted_types=(LineString, Polygon)) for overlap_shap in overlap_shapes: if isinstance(overlap_shap, MultiPolygon): @@ -447,9 +355,7 @@ def difference( return shape @staticmethod - def union( - shapes: List[Union[LineString, Polygon, MultiPolygon]] - ) -> Union[MultiPolygon, Polygon, MultiLineString]: + def union(shapes: List[Union[LineString, Polygon, MultiPolygon]]) -> Union[MultiPolygon, Polygon, MultiLineString]: """ Compute the union of multiple geometries. @@ -466,21 +372,12 @@ def union( >>> union = GeometryMixin().union(shape = polygon_1, overlap_shapes=[polygon_2, polygon_2]) """ - check_iterable_length( - source=GeometryMixin.union.__name__, val=len(shapes), min=2 - ) - for shape in shapes: - check_instance( - source=GeometryMixin.union.__name__, - instance=shape, - accepted_types=(LineString, Polygon, MultiPolygon), - ) + check_iterable_length(source=GeometryMixin.union.__name__, val=len(shapes), min=2) + for shape in shapes: check_instance(source=GeometryMixin.union.__name__, instance=shape, accepted_types=(LineString, Polygon, MultiPolygon)) return unary_union(shapes) @staticmethod - def symmetric_difference( - shapes: List[Union[LineString, Polygon, MultiPolygon]] - ) -> List[Union[Polygon, MultiPolygon]]: + def symmetric_difference(shapes: List[Union[LineString, Polygon, MultiPolygon]]) -> List[Union[Polygon, MultiPolygon]]: """ Computes a new geometry consisting of the parts that are exclusive to each input geometry. @@ -498,31 +395,20 @@ def symmetric_difference( >>> polygon_2 = GeometryMixin().bodyparts_to_polygon(np.array([[1, 25], [1, 75], [110, 25], [110, 75]])) >>> symmetric_difference = symmetric_difference(shapes=[polygon_1, polygon_2]) """ - check_iterable_length( - source=GeometryMixin.union.__name__, val=len(shapes), min=2 - ) - for shape in shapes: - check_instance( - source=GeometryMixin.compute_pct_shape_overlap.__name__, - instance=shape, - accepted_types=(LineString, Polygon, MultiPolygon), - ) + check_iterable_length(source=GeometryMixin.union.__name__, val=len(shapes), min=2) + for shape in shapes: check_instance(source=GeometryMixin.compute_pct_shape_overlap.__name__, instance=shape, + accepted_types=(LineString, Polygon, MultiPolygon)) results = deepcopy(shapes) for c in itertools.combinations(list(range(0, len(shapes))), 2): - results[c[0]] = results[c[0]].convex_hull.difference( - results[c[1]].convex_hull - ) - results[c[1]] = results[c[1]].convex_hull.difference( - results[c[0]].convex_hull - ) + results[c[0]] = results[c[0]].convex_hull.difference(results[c[1]].convex_hull) + results[c[1]] = results[c[1]].convex_hull.difference(results[c[0]].convex_hull) results = [geometry for geometry in results if not geometry.is_empty] return results @staticmethod - def view_shapes( - shapes: List[Union[LineString, Polygon, MultiPolygon, MultiLineString]] - ) -> np.ndarray: + def view_shapes(shapes: List[Union[LineString, Polygon, MultiPolygon, MultiLineString]]) -> np.ndarray: + """ Helper function to draw shapes on white canvas . @@ -534,86 +420,32 @@ def view_shapes( """ for i in shapes: - check_instance( - source=GeometryMixin.view_shapes.__name__, - instance=i, - accepted_types=( - LineString, - Polygon, - MultiPolygon, - MultiLineString, - Point, - ), - ) + check_instance(source=GeometryMixin.view_shapes.__name__, instance=i, accepted_types=(LineString, Polygon, MultiPolygon, MultiLineString, Point)) max_vertices = find_max_vertices_coordinates(shapes=shapes, buffer=50) img = np.ones((max_vertices[0], max_vertices[1], 3), dtype=np.uint8) * 255 colors = list(get_color_dict().values()) for shape_cnt, shape in enumerate(shapes): if isinstance(shape, Polygon): - cv2.polylines( - img, - [np.array(shape.exterior.coords).astype(np.int)], - True, - (colors[shape_cnt][::-1]), - thickness=2, - ) - interior_coords = [ - np.array(interior.coords, dtype=np.int32).reshape((-1, 1, 2)) - for interior in shape.interiors - ] + cv2.polylines(img, [np.array(shape.exterior.coords).astype(np.int)], True, (colors[shape_cnt][::-1]) , thickness=2) + interior_coords = [np.array(interior.coords, dtype=np.int32).reshape((-1, 1, 2)) for interior in shape.interiors] for interior in interior_coords: - cv2.polylines( - img, - [interior], - isClosed=True, - color=(colors[shape_cnt][::-1]), - thickness=2, - ) + cv2.polylines(img, [interior], isClosed=True, color=(colors[shape_cnt][::-1]), thickness=2) if isinstance(shape, LineString): - cv2.polylines( - img, - [np.array(shape.coords, dtype=np.int32)], - False, - (colors[shape_cnt][::-1]), - thickness=2, - ) + cv2.polylines(img, [np.array(shape.coords, dtype=np.int32)], False, (colors[shape_cnt][::-1]), thickness=2) if isinstance(shape, MultiPolygon): for polygon_cnt, polygon in enumerate(shape.geoms): - polygon_np = np.array( - (polygon.convex_hull.exterior.coords), dtype=np.int32 - ) - cv2.polylines( - img, - [polygon_np], - True, - (colors[shape_cnt + polygon_cnt + 1][::-1]), - thickness=2, - ) + polygon_np = np.array((polygon.convex_hull.exterior.coords), dtype=np.int32) + cv2.polylines(img, [polygon_np], True, (colors[shape_cnt+ polygon_cnt + 1][::-1]),thickness=2) if isinstance(shape, MultiLineString): for line_cnt, line in enumerate(shape.geoms): - cv2.polylines( - img, - [np.array(shape[line_cnt].coords, dtype=np.int32)], - False, - (colors[shape_cnt][::-1]), - thickness=2, - ) + cv2.polylines(img, [np.array(shape[line_cnt].coords, dtype=np.int32)], False, (colors[shape_cnt][::-1]), thickness=2) if isinstance(shape, Point): - cv2.circle( - img, - ( - int(np.array(shape.centroid)[0]), - int(np.array(shape.centroid)[1]), - ), - 0, - colors[shape_cnt][::-1], - 1, - ) + cv2.circle(img, (int(np.array(shape.centroid)[0]), int(np.array(shape.centroid)[1])), 0, colors[shape_cnt][::-1], 1) return imutils.resize(img, width=800) @staticmethod - def minimum_rotated_rectangle(shape=Polygon) -> bool: + def minimum_rotated_rectangle(shape = Polygon) -> bool: """ Calculate the minimum rotated rectangle that bounds a given polygon. @@ -631,19 +463,14 @@ def minimum_rotated_rectangle(shape=Polygon) -> bool: >>> rectangle = GeometryMixin().minimum_rotated_rectangle(shape=polygon) """ - check_instance( - source=GeometryMixin.get_center.__name__, - instance=shape, - accepted_types=Polygon, - ) + check_instance(source=GeometryMixin.get_center.__name__, instance=shape, accepted_types=Polygon) return shape.minimum_rotated_rectangle + @staticmethod - def length( - shape: Union[LineString, MultiLineString], - pixels_per_mm: float, - unit: Literal["mm", "cm", "dm", "m"] = "mm", - ) -> float: + def length(shape: Union[LineString, MultiLineString], + pixels_per_mm: float, + unit: Literal['mm', 'cm', 'dm', 'm'] = 'mm') -> float: """ Calculate the length of a LineString geometry. @@ -661,31 +488,25 @@ def length( >>> 50.6449510224598 """ - check_float(name="line_length pixels_per_mm", value=pixels_per_mm, min_value=0) - check_instance( - source=GeometryMixin.compute_pct_shape_overlap.__name__, - instance=shape, - accepted_types=LineString, - ) + check_float(name='line_length pixels_per_mm', value=pixels_per_mm, min_value=0) + check_instance(source=GeometryMixin.compute_pct_shape_overlap.__name__, instance=shape, accepted_types=LineString) L = shape.length - if unit == "cm": + if unit == 'cm': L = L / 10 - elif unit == "dm": + elif unit == 'dm': L = L / 100 - elif unit == "m": + elif unit == 'm': L = L / 1000 return L - def multiframe_bodyparts_to_polygon( - self, - data: np.ndarray, - cap_style: Literal["round", "square", "flat"] = "round", - parallel_offset: int = 1, - simplify_tolerance: float = 2, - preserve_topology: bool = True, - core_cnt: int = -1, - ) -> List[Polygon]: + def multiframe_bodyparts_to_polygon(self, + data: np.ndarray, + cap_style: Literal['round', 'square', 'flat'] = 'round', + parallel_offset: int = 1, + simplify_tolerance: float = 2, + preserve_topology: bool = True, + core_cnt: int = -1) -> List[Polygon]: """ Convert multidimensional NumPy array representing body part coordinates to a list of Polygons. @@ -699,68 +520,77 @@ def multiframe_bodyparts_to_polygon( >>> GeometryMixin().multiframe_bodyparts_to_polygon(data=data) """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] results = [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - constants = functools.partial( - GeometryMixin.bodyparts_to_polygon, - parallel_offset=parallel_offset, - cap_style=cap_style, - simplify_tolerance=simplify_tolerance, - preserve_topology=preserve_topology, - ) + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + constants = functools.partial(GeometryMixin.bodyparts_to_polygon, + parallel_offset=parallel_offset, + cap_style=cap_style, + simplify_tolerance=simplify_tolerance, + preserve_topology=preserve_topology) for cnt, mp_return in enumerate(pool.imap(constants, data, chunksize=1)): results.append(mp_return) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results - def multiframe_bodyparts_to_circle( - self, data: np.ndarray, parallel_offset: int = 1, core_cnt: int = -1 - ) -> List[Polygon]: + + def multiframe_bodyparts_to_circle(self, + data: np.ndarray, + parallel_offset: int = 1, + core_cnt: int = -1) -> List[Polygon]: """ :example: >>> data = np.random.randint(0, 100, (100, 2)) >>> circles = GeometryMixin().multiframe_bodyparts_to_circle(data=data) """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] results = [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - constants = functools.partial( - GeometryMixin.bodyparts_to_circle, parallel_offset=parallel_offset - ) + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + constants = functools.partial(GeometryMixin.bodyparts_to_circle, + parallel_offset=parallel_offset) for cnt, mp_return in enumerate(pool.imap(constants, data, chunksize=1)): results.append(mp_return) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results - def multiframe_bodyparts_to_line( - self, data: np.ndarray, core_cnt: Optional[int] = -1 - ) -> List[LineString]: + @staticmethod + def delaunay_triangulate_keypoints(data: np.ndarray) -> List[Polygon]: + """ + Triangulates a set of 2D keypoints. E.g., use to polygonize animal hull. + + This method takes a 2D numpy array representing a set of keypoints and + triangulates them using the Delaunay triangulation algorithm. The input + array should have two columns corresponding to the x and y coordinates of + the keypoints. + + .. image:: _static/img/delaunay_triangulate_keypoints.png + :width: 400 + :align: center + + .. image:: _static/img/delaunay_triangulate_keypoints.gif + :width: 450 + :align: center + + :param np.ndarray data: NumPy array of body part coordinates. Each subarray represents the coordinates of a body part. + :returns List[Polygon]: A list of `Polygon` objects representing the triangles formed by the Delaunay triangulation. + + :example: + >>> data = np.array([[126, 122],[152, 116],[136, 85],[167, 172],[161, 206],[197, 193],[191, 237]]) + >>> triangulated_hull = GeometryMixin().delaunay_triangulate_keypoints(data=data) + """ + + check_instance(source=GeometryMixin().delaunay_triangulate_keypoints.__name__, instance=data, accepted_types= np.ndarray) + if data.ndim != 2: raise InvalidInputError(msg=f'Triangulate requires 2D array, got {data.ndim}', source=GeometryMixin.delaunay_triangulate_keypoints.__name__) + return triangulate(MultiPoint(data.astype(np.int64))) + + def multiframe_bodyparts_to_line(self, + data: np.ndarray, + core_cnt: Optional[int] = -1) -> List[LineString]: """ Convert multiframe body-parts data to a list of LineString objects using multiprocessing. @@ -774,35 +604,21 @@ def multiframe_bodyparts_to_line( >>> lines = GeometryMixin().multiframe_bodyparts_to_line(data=data) """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] if data.ndim != 3: - raise InvalidInputError( - msg=f"Multiframe body-parts to linestring expects a 3D array, got {data.ndim}", - source=GeometryMixin.bodyparts_to_line.__name__, - ) + raise InvalidInputError(msg=f'Multiframe body-parts to linestring expects a 3D array, got {data.ndim}', source=GeometryMixin.bodyparts_to_line.__name__) results = [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - for cnt, result in enumerate( - pool.imap(GeometryMixin.bodyparts_to_line, data, chunksize=1) - ): + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + for cnt, result in enumerate(pool.imap(GeometryMixin.bodyparts_to_line, data, chunksize=1)): results.append(result) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results - def multifrm_compute_pct_shape_overlap( - self, shape_1: List[Polygon], shape_2: List[Polygon], core_cnt=-1 - ) -> List[float]: + def multifrm_compute_pct_shape_overlap(self, + shape_1: List[Polygon], + shape_2: List[Polygon], + core_cnt = -1) -> List[float]: """ Compute the percentage overlap between corresponding Polygons in two lists. @@ -812,54 +628,27 @@ def multifrm_compute_pct_shape_overlap( :return List[float]: List of percentage overlap between corresponding Polygons. """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] - if len(shape_1) != len(shape_2): - raise InvalidInputError( - msg=f"shape_1 and shape_2 are unequal sizes: {len(shape_1)} vs {len(shape_2)}", - source=GeometryMixin.multifrm_compute_pct_shape_overlap.__name__, - ) - input_dtypes = list( - set([type(x) for x in shape_1] + [type(x) for x in shape_2]) - ) + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] + if len(shape_1) != len(shape_2): raise InvalidInputError(msg=f'shape_1 and shape_2 are unequal sizes: {len(shape_1)} vs {len(shape_2)}', source=GeometryMixin.multifrm_compute_pct_shape_overlap.__name__) + input_dtypes = list(set([type(x) for x in shape_1] + [type(x) for x in shape_2])) if len(input_dtypes) > 1: - raise InvalidInputError( - msg=f"shape_1 and shape_2 contains more than 1 dtype {input_dtypes}", - source=GeometryMixin.multifrm_compute_pct_shape_overlap.__name__, - ) - check_instance( - source=GeometryMixin.multifrm_compute_pct_shape_overlap.__name__, - instance=shape_1[0], - accepted_types=(LineString, Polygon), - ) + raise InvalidInputError(msg=f'shape_1 and shape_2 contains more than 1 dtype {input_dtypes}', source=GeometryMixin.multifrm_compute_pct_shape_overlap.__name__) + check_instance(source=GeometryMixin.multifrm_compute_pct_shape_overlap.__name__, instance=shape_1[0], accepted_types=(LineString, Polygon)) data, results = np.column_stack((shape_1, shape_2)), [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - for cnt, result in enumerate( - pool.imap(GeometryMixin.compute_pct_shape_overlap, data, chunksize=1) - ): + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + for cnt, result in enumerate(pool.imap(GeometryMixin.compute_pct_shape_overlap, data, chunksize=1)): results.append(result) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results - def multiframe_shape_distance( - self, - shape_1: List[Union[LineString, Polygon]], - shape_2: List[Union[LineString, Polygon]], - pixels_per_mm: float, - unit: Literal["mm", "cm", "dm", "m"] = "mm", - core_cnt=-1, - ) -> List[float]: + def multiframe_shape_distance(self, + shape_1: List[Union[LineString, Polygon]], + shape_2: List[Union[LineString, Polygon]], + pixels_per_mm: float, + unit: Literal['mm', 'cm', 'dm', 'm'] = 'mm', + core_cnt = -1) -> List[float]: """ Compute shape distances between corresponding shapes in two lists of LineString or Polygon geometries for multiple frames. @@ -871,40 +660,26 @@ def multiframe_shape_distance( :return List[float]: List of shape distances between corresponding shapes in passed unit. """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - check_float(name="PIXELS PER MM", value=pixels_per_mm, min_value=0.0) - check_if_valid_input(name="UNIT", input=unit, options=["mm", "cm", "dm", "m"]) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] - if len(shape_1) != len(shape_2): - raise InvalidInputError( - msg=f"shape_1 and shape_2 are unequal sizes: {len(shape_1)} vs {len(shape_2)}", - source=GeometryMixin.multifrm_compute_pct_shape_overlap.__name__, - ) - check_float(name="pixels_per_mm", value=pixels_per_mm, min_value=0.0) + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + check_float(name='PIXELS PER MM', value=pixels_per_mm, min_value=0.0) + check_if_valid_input(name='UNIT', input=unit, options=['mm', 'cm', 'dm', 'm']) + if core_cnt == -1: core_cnt = find_core_cnt()[0] + if len(shape_1) != len(shape_2): raise InvalidInputError(msg=f'shape_1 and shape_2 are unequal sizes: {len(shape_1)} vs {len(shape_2)}', source=GeometryMixin.multifrm_compute_pct_shape_overlap.__name__) + check_float(name='pixels_per_mm', value=pixels_per_mm, min_value=0.0) data, results = np.column_stack((shape_1, shape_2)), [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - constants = functools.partial( - GeometryMixin.shape_distance, pixels_per_mm=pixels_per_mm, unit=unit - ) + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + constants = functools.partial(GeometryMixin.shape_distance, + pixels_per_mm=pixels_per_mm, + unit=unit) for cnt, result in enumerate(pool.imap(constants, data, chunksize=1)): results.append(result) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results - def multiframe_minimum_rotated_rectangle( - self, shapes: List[Polygon], core_cnt: int = -1 - ) -> List[Polygon]: + def multiframe_minimum_rotated_rectangle(self, + shapes: List[Polygon], + core_cnt: int = -1) -> List[Polygon]: """ Compute the minimum rotated rectangle for each Polygon in a list using mutiprocessing. @@ -912,31 +687,21 @@ def multiframe_minimum_rotated_rectangle( :param core_cnt: Number of CPU cores to use for parallel processing. Default is -1, which uses all available cores. """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] results = [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - for cnt, result in enumerate( - pool.imap(GeometryMixin.minimum_rotated_rectangle, shapes, chunksize=1) - ): + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + for cnt, result in enumerate(pool.imap(GeometryMixin.minimum_rotated_rectangle, shapes, chunksize=1)): results.append(result) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results @staticmethod - @njit("(float32[:,:,:], float64[:])") - def static_point_lineside(lines: np.ndarray, point: np.ndarray) -> np.ndarray: + @njit('(float32[:,:,:], float64[:])') + def static_point_lineside(lines: np.ndarray, + point: np.ndarray) -> np.ndarray: + """ Determine the relative position (left vs right) of a static point with respect to multiple lines. @@ -962,9 +727,8 @@ def static_point_lineside(lines: np.ndarray, point: np.ndarray) -> np.ndarray: results = np.full((lines.shape[0]), np.nan) threshhold = 1e-9 for i in prange(lines.shape[0]): - v = (lines[i][1][0] - lines[i][0][0]) * (point[1] - lines[i][0][1]) - ( - lines[i][1][1] - lines[i][0][1] - ) * (point[0] - lines[i][0][0]) + v = ((lines[i][1][0] - lines[i][0][0]) * (point[1] - lines[i][0][1]) - (lines[i][1][1] - lines[i][0][1]) * ( + point[0] - lines[i][0][0])) if v >= threshhold: results[i] = 2 elif v <= -threshhold: @@ -974,8 +738,9 @@ def static_point_lineside(lines: np.ndarray, point: np.ndarray) -> np.ndarray: return results @staticmethod - @njit("(float32[:,:,:], float32[:, :])") - def point_lineside(lines: np.ndarray, points: np.ndarray) -> np.ndarray: + @njit('(float32[:,:,:], float32[:, :])') + def point_lineside(lines: np.ndarray, + points: np.ndarray) -> np.ndarray: """ Determine the relative position of a point (left vs right) with respect to a lines in each frame. @@ -997,9 +762,8 @@ def point_lineside(lines: np.ndarray, points: np.ndarray) -> np.ndarray: threshhold = 1e-9 for i in prange(lines.shape[0]): line, point = lines[i], points[i] - v = (line[1][0] - line[0][0]) * (point[1] - line[0][1]) - ( - line[1][1] - line[0][1] - ) * (point[0] - line[0][0]) + v = ((line[1][0] - line[0][0]) * (point[1] - line[0][1]) - (line[1][1] - line[0][1]) * ( + point[0] - line[0][0])) if v >= threshhold: results[i] = 2 elif v <= -threshhold: @@ -1008,11 +772,12 @@ def point_lineside(lines: np.ndarray, points: np.ndarray) -> np.ndarray: results[i] = 0 return results + @staticmethod - @njit("(int64[:,:], int64[:])") - def extend_line_to_bounding_box_edges( - line_points: np.ndarray, bounding_box: np.ndarray - ) -> np.ndarray: + @njit('(int64[:,:], int64[:])') + def extend_line_to_bounding_box_edges(line_points: np.ndarray, + bounding_box: np.ndarray) -> np.ndarray: + """ Jitted extend a line segment defined by two points to fit within a bounding box. @@ -1036,13 +801,9 @@ def extend_line_to_bounding_box_edges( min_x, min_y, max_x, max_y = bounding_box if x1 == x2: - intersection_points = np.array( - [[x1, max(min_y, 0)], [x1, min(max_y, min_y)]] - ).astype(np.float32) + intersection_points = np.array([[x1, max(min_y, 0)], [x1, min(max_y, min_y)]]).astype(np.float32) elif y1 == y2: - intersection_points = np.array([[min_x, y1], [max_x, y1]]).astype( - np.float32 - ) + intersection_points = np.array([[min_x, y1], [max_x, y1]]).astype(np.float32) else: slope = (y2 - y1) / (x2 - x1) intercept = y1 - slope * x1 @@ -1055,16 +816,16 @@ def extend_line_to_bounding_box_edges( # x_min_intersection = np.clip(x_min_intersection, min_x, max_x) # x_max_intersection = np.clip(x_max_intersection, min_x, max_x) - intersection_points = np.array( - [[x_min_intersection, min_y], [x_max_intersection, max_y]] - ).astype(np.float32) + intersection_points = np.array([[x_min_intersection, min_y], + [x_max_intersection, max_y]]).astype(np.float32) return intersection_points + @staticmethod - def line_split_bounding_box( - intersections: np.ndarray, bounding_box: np.ndarray - ) -> GeometryCollection: + def line_split_bounding_box(intersections: np.ndarray, + bounding_box: np.ndarray) -> GeometryCollection: + """ Split a bounding box into two parts using an extended line. @@ -1091,24 +852,19 @@ def line_split_bounding_box( """ extended_line = LineString(intersections) - original_polygon = Polygon( - [ - (bounding_box[0], bounding_box[1]), - (bounding_box[2], bounding_box[1]), - (bounding_box[2], bounding_box[3]), - (bounding_box[0], bounding_box[3]), - ] - ) + original_polygon = Polygon([(bounding_box[0], bounding_box[1]), + (bounding_box[2], bounding_box[1]), + (bounding_box[2], bounding_box[3]), + (bounding_box[0], bounding_box[3])]) return split(original_polygon, extended_line) - def multiframe_length( - self, - shapes: List[Union[LineString, MultiLineString]], - pixels_per_mm: float, - core_cnt: int = -1, - unit: Literal["mm", "cm", "dm", "m"] = "mm", - ) -> List[float]: + def multiframe_length(self, + shapes: List[Union[LineString, MultiLineString]], + pixels_per_mm: float, + core_cnt: int = -1, + unit: Literal['mm', 'cm', 'dm', 'm'] = 'mm') -> List[float]: + """ :example: >>> data = np.random.randint(0, 100, (5000, 2)) @@ -1117,33 +873,23 @@ def multiframe_length( >>> lengths = GeometryMixin().multiframe_length(shapes=lines, pixels_per_mm=1.0) """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] - check_float(name="PIXELS PER MM", value=pixels_per_mm, min_value=0.0) - check_if_valid_input(name="UNIT", input=unit, options=["mm", "cm", "dm", "m"]) + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] + check_float(name='PIXELS PER MM', value=pixels_per_mm, min_value=0.0) + check_if_valid_input(name='UNIT', input=unit, options=['mm', 'cm', 'dm', 'm']) results = [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - constants = functools.partial( - GeometryMixin.length, pixels_per_mm=pixels_per_mm, unit=unit - ) + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + constants = functools.partial(GeometryMixin.length, + pixels_per_mm=pixels_per_mm, + unit=unit) for cnt, result in enumerate(pool.imap(constants, shapes, chunksize=1)): results.append(result) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results - def multiframe_union( - self, shapes: Iterable[Union[LineString, MultiLineString]], core_cnt: int = -1 - ) -> Iterable[Union[LineString, MultiLineString]]: + def multiframe_union(self, + shapes: Iterable[Union[LineString, MultiLineString]], + core_cnt: int = -1) -> Iterable[Union[LineString, MultiLineString]]: """ :example: >>> data_1 = np.random.randint(0, 100, (5000, 2)).reshape(1000,-1, 2) @@ -1154,30 +900,18 @@ def multiframe_union( >>> unions = GeometryMixin().multiframe_union(shapes=data) """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] results = [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - for cnt, result in enumerate( - pool.imap(GeometryMixin().union, shapes, chunksize=1) - ): + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + for cnt, result in enumerate(pool.imap(GeometryMixin().union, shapes, chunksize=1)): results.append(result) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results - def multiframe_symmetric_difference( - self, shapes: Iterable[Union[LineString, MultiLineString]], core_cnt: int = -1 - ): + def multiframe_symmetric_difference(self, + shapes: Iterable[Union[LineString, MultiLineString]], + core_cnt: int = -1): """ Compute the symmetric differences between corresponding LineString or MultiLineString geometries usng multiprocessing. @@ -1189,28 +923,50 @@ def multiframe_symmetric_difference( >>> data = np.array([polygon_1, polygon_2]).T >>> symmetric_differences = GeometryMixin().multiframe_symmetric_difference(shapes=data) """ - check_int( - name="CORE COUNT", - value=core_cnt, - min_value=-1, - max_value=find_core_cnt()[0], - raise_error=True, - ) - if core_cnt == -1: - core_cnt = find_core_cnt()[0] + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] results = [] - with multiprocessing.Pool( - core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value - ) as pool: - for cnt, result in enumerate( - pool.imap(GeometryMixin().symmetric_difference, shapes, chunksize=1) - ): + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + for cnt, result in enumerate(pool.imap(GeometryMixin().symmetric_difference, shapes, chunksize=1)): results.append(result) - pool.join() - pool.terminate() + pool.join(); pool.terminate() return results + def multiframe_delaunay_triangulate_keypoints(self, + data: np.ndarray, + core_cnt: int = -1) -> List[List[Polygon]]: + + """ + >>> data_path = '/Users/simon/Desktop/envs/troubleshooting/Rat_NOR/project_folder/csv/machine_results/08102021_DOT_Rat7_8(2).csv' + >>> data = pd.read_csv(data_path, index_col=0).head(1000).iloc[:, 0:21] + >>> data = data[data.columns.drop(list(data.filter(regex='_p')))] + >>> animal_data = data.values.reshape(len(data), -1, 2).astype(int) + >>> tri = GeometryMixin().multiframe_delaunay_triangulate_keypoints(data=animal_data) + """ + + check_int(name='CORE COUNT', value=core_cnt, min_value=-1, max_value=find_core_cnt()[0], raise_error=True) + if core_cnt == -1: core_cnt = find_core_cnt()[0] + check_instance(source=GeometryMixin().multiframe_delaunay_triangulate_keypoints.__name__, instance=data, accepted_types=np.ndarray) + if data.ndim != 3: + raise InvalidInputError(msg=f'Multiframe delaunay triangulate keypointstriangulate keypoints expects a 3D array, got {data.ndim}', source=GeometryMixin.multiframe_delaunay_triangulate_keypoints.__name__) + results = [] + with multiprocessing.Pool(core_cnt, maxtasksperchild=Defaults.MAX_TASK_PER_CHILD.value) as pool: + for cnt, result in enumerate(pool.imap(GeometryMixin().delaunay_triangulate_keypoints, data, chunksize=1)): + results.append(result) + + pool.join(); pool.terminate() + return results + + + + + + + + + + # point_of_interest = Point(0, 0) # obstacle1 = LineString([(5, 0), (5, 10), (2, 2), (2, 1)]) # visible_space = GeometryMixin().rotational_sweep_visibility(point=point_of_interest, obstacle=[obstacle1]) @@ -1220,8 +976,8 @@ def multiframe_symmetric_difference( # cv2.imshow('ssadf', img) # cv2.waitKey(50000) -# data = np.array([[10, 10],[20, 10],[30, 10],[40, 10]]) -# line_1 = GeometryMixin().bodyparts_to_line(np.array([[10, 10],[20, 10],[30, 10],[40, 10]])) +#data = np.array([[10, 10],[20, 10],[30, 10],[40, 10]]) +#line_1 = GeometryMixin().bodyparts_to_line(np.array([[10, 10],[20, 10],[30, 10],[40, 10]])) # data_1 = np.random.randint(0, 100, (5000, 2)).reshape(1000,-1, 2) # data_2 = np.random.randint(0, 100, (5000, 2)).reshape(1000,-1, 2) # polygon_1 = GeometryMixin().multiframe_bodyparts_to_polygon(data=data_1) @@ -1231,6 +987,13 @@ def multiframe_symmetric_difference( # len(unions) + + + + + + + # data = data.reshape(1000,-1, data.shape[1]) # lines_1 = GeometryMixin().multiframe_bodyparts_to_line(data=data) # data = np.random.randint(0, 100, (5000, 2)) @@ -1245,6 +1008,10 @@ def multiframe_symmetric_difference( # + + + + # skeleton = np.array([[[5, 5], [1, 10]], # [[5, 5], [9, 10]], # [[9, 10], [1, 10]], @@ -1261,11 +1028,11 @@ def multiframe_symmetric_difference( # -# multipolygon_1 = MultiPolygon([Polygon([[200, 110],[200, 100],[200, 100],[200, 110]]), Polygon([[70, 70],[70, 60],[10, 50],[1, 70]])]) -# GeometryMixin().get_center(shape=multipolygon_1) +#multipolygon_1 = MultiPolygon([Polygon([[200, 110],[200, 100],[200, 100],[200, 110]]), Polygon([[70, 70],[70, 60],[10, 50],[1, 70]])]) +#GeometryMixin().get_center(shape=multipolygon_1) -# line_1 = GeometryMixin().bodyparts_to_line(np.array([[10, 70],[20, 60],[30, 50],[40, 70]])) +#line_1 = GeometryMixin().bodyparts_to_line(np.array([[10, 70],[20, 60],[30, 50],[40, 70]])) # buffered_polygon = GeometryMixin().buffer_shape(shape=line_1, size_mm=5, pixels_per_mm=1) # img = GeometryMixin.view_shapes(shapes=[line_1, buffered_polygon]) # # # @@ -1279,6 +1046,9 @@ def multiframe_symmetric_difference( # covered + + + # # img = GeometryMixin.view_shapes(shapes=[line_1, line_2]) # # # @@ -1287,7 +1057,10 @@ def multiframe_symmetric_difference( # -# img = GeometryMixin.view_shapes(shapes=[line_1, polygon_1, multipolygon_1]) + +#img = GeometryMixin.view_shapes(shapes=[line_1, polygon_1, multipolygon_1]) + + # line_1 = GeometryMixin().bodyparts_to_line(np.array([[10, 70],[20, 60],[30, 50],[40, 70]])) @@ -1304,15 +1077,15 @@ def multiframe_symmetric_difference( # return results -# union_geometry = ops.cascaded_union(shapes) -# result_geometry = ops.cascaded_union(shapes).symmetric_difference(union_geometry) -# #result_geometries = [geometry for geometry in result_geometry if not geometry.is_empty] -# return result_geometry -# #return result_geometry -# # union_geometry = unary_union(shapes) -# # result_geometries = [geometry.difference(union_geometry.intersection(geometry)) for geometry in shapes] -# # result_geometries = [geometry for geometry in result_geometries if not geometry.is_empty] -# # return result_geometries + # union_geometry = ops.cascaded_union(shapes) + # result_geometry = ops.cascaded_union(shapes).symmetric_difference(union_geometry) + # #result_geometries = [geometry for geometry in result_geometry if not geometry.is_empty] + # return result_geometry + # #return result_geometry + # # union_geometry = unary_union(shapes) + # # result_geometries = [geometry.difference(union_geometry.intersection(geometry)) for geometry in shapes] + # # result_geometries = [geometry for geometry in result_geometries if not geometry.is_empty] + # # return result_geometries # # #