Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Dec 25, 2024
1 parent a8cc17e commit d40f4d1
Showing 1 changed file with 72 additions and 70 deletions.
142 changes: 72 additions & 70 deletions src/squidpy/tl/_sliding_window.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
from __future__ import annotations

import math
import time
from collections import defaultdict
from itertools import product
import time

import numpy as np
import math
import pandas as pd
from anndata import AnnData
from scanpy import logging as logg
from spatialdata import SpatialData
import math
import math

from squidpy._docs import d
from squidpy.gr._utils import _save_data

__all__ = ["sliding_window"]


@d.dedent
def sliding_window(
adata: AnnData | SpatialData,
Expand All @@ -26,12 +26,12 @@ def sliding_window(
spatial_key: str = "spatial",
sliding_window_key: str = "sliding_window_assignment",
overlap: int = 0,
max_n_cells = None,
split_line: str = 'h',
n_splits = None,
max_n_cells=None,
split_line: str = "h",
n_splits=None,
drop_partial_windows: bool = False,
square: bool = False,
window_size_per_library_key: str = 'equal',
window_size_per_library_key: str = "equal",
copy: bool = False,
) -> pd.DataFrame | None:
"""
Expand All @@ -56,7 +56,7 @@ def sliding_window(
If window_size is None, either 'n_split' or 'max_n_cells' can be set.
max_n_cells sets an upper limit for the number of cells within each region.
n_splits: int
This can be used to split the entire region to some splits.
This can be used to split the entire region to some splits.
copy: bool
If True, return the result, otherwise save it to the adata object.
split_line: str
Expand All @@ -67,20 +67,22 @@ def sliding_window(
If ``copy = True``, returns the sliding window annotation(s) as pandas dataframe
Otherwise, stores the sliding window annotation(s) in .obs.
"""

if overlap < 0:
raise ValueError("Overlap must be non-negative.")

if isinstance(adata, SpatialData):
adata = adata.table

assert max_n_cells is None or n_splits is None, "You can specify only one from the parameters 'n_split' and 'max_n_cells' "

assert (
max_n_cells is None or n_splits is None
), "You can specify only one from the parameters 'n_split' and 'max_n_cells' "

# we don't want to modify the original adata in case of copy=True
if copy:
adata = adata.copy()
if 'sliding_window_assignment_colors' in adata.uns:
del adata.uns['sliding_window_assignment_colors']
if "sliding_window_assignment_colors" in adata.uns:
del adata.uns["sliding_window_assignment_colors"]
# extract coordinates of observations
x_col, y_col = coord_columns
if x_col in adata.obs and y_col in adata.obs:
Expand All @@ -95,87 +97,86 @@ def sliding_window(
raise ValueError(
f"Coordinates not found. Provide `{coord_columns}` in `adata.obs` or specify a suitable `spatial_key` in `adata.obsm`."
)

if library_key is not None and library_key not in adata.obs:
raise ValueError(f"Library key '{library_key}' not found in adata.obs")
if library_key is None and not 'fov' in adata.obs.columns:
adata.obs['fov'] = 'fov1'

if library_key is None and "fov" not in adata.obs.columns:
adata.obs["fov"] = "fov1"

libraries = adata.obs[library_key].unique()

fovs_x_range = [(adata.obs[adata.obs[library_key]==key][x_col].max(), adata.obs[adata.obs[library_key]==key][x_col].min()) for key in libraries]
fovs_y_range = [(adata.obs[adata.obs[library_key]==key][y_col].max(), adata.obs[adata.obs[library_key]==key][y_col].min()) for key in libraries]
fovs_width = [i-j for (i, j) in fovs_x_range]
fovs_height = [i-j for (i, j) in fovs_y_range]
fovs_n_cell = [adata[adata.obs[library_key]==key].shape[0] for key in libraries]
fovs_area = [i*j for i, j in zip(fovs_width, fovs_height)]
fovs_density = [i/j for i, j in zip(fovs_n_cell, fovs_area)]
fovs_x_range = [
(adata.obs[adata.obs[library_key] == key][x_col].max(), adata.obs[adata.obs[library_key] == key][x_col].min())
for key in libraries
]
fovs_y_range = [
(adata.obs[adata.obs[library_key] == key][y_col].max(), adata.obs[adata.obs[library_key] == key][y_col].min())
for key in libraries
]
fovs_width = [i - j for (i, j) in fovs_x_range]
fovs_height = [i - j for (i, j) in fovs_y_range]
fovs_n_cell = [adata[adata.obs[library_key] == key].shape[0] for key in libraries]
fovs_area = [i * j for i, j in zip(fovs_width, fovs_height)]
fovs_density = [i / j for i, j in zip(fovs_n_cell, fovs_area)]
window_sizes = []

if window_size is None:
if max_n_cells is None and n_splits is None:
n_splits = 2

if window_size_per_library_key == 'equal':
if window_size_per_library_key == "equal":
if max_n_cells is not None:
n_splits = max(2, int(max(fovs_n_cell)/max_n_cells))
n_splits = max(2, int(max(fovs_n_cell) / max_n_cells))
else:
max_n_cells = int(max(fovs_n_cell) / n_splits)
min_n_cells = int(min(fovs_n_cell) / n_splits)
maximum_region_area = max_n_cells / max(fovs_density)
minimum_region_area = min_n_cells / max(fovs_density)
window_size = _optimize_tile_size(min(fovs_width),
min(fovs_height),
minimum_region_area,
maximum_region_area,
square,
split_line)
window_sizes = [window_size]*len(libraries)
maximum_region_area = max_n_cells / max(fovs_density)
minimum_region_area = min_n_cells / max(fovs_density)
window_size = _optimize_tile_size(
min(fovs_width), min(fovs_height), minimum_region_area, maximum_region_area, square, split_line
)
window_sizes = [window_size] * len(libraries)
else:
for i, lib in enumerate(libraries):
if max_n_cells is not None:
n_splits = max(2, int(fovs_n_cell[i]/max_n_cells))
n_splits = max(2, int(fovs_n_cell[i] / max_n_cells))
else:
max_n_cells = int(fovs_n_cell[i] / n_splits)
min_n_cells = int(fovs_n_cell[i] / n_splits)
minimum_region_area = min_n_cells / max(fovs_density)
maximum_region_area = fovs_area[i]/fovs_density[i]
minimum_region_area = min_n_cells / max(fovs_density)
maximum_region_area = fovs_area[i] / fovs_density[i]
window_sizes.append(
_optimize_tile_size(fovs_width[i],
fovs_height[i],
minimum_region_area,
maximum_region_area,
square,
split_line
)
)
_optimize_tile_size(
fovs_width[i], fovs_height[i], minimum_region_area, maximum_region_area, square, split_line
)
)
else:
assert split_line is None, logg.warning("'split' ignored as window_size is specified for square regions")
assert n_splits is None, logg.warning("'n_split' ignored as window_size is specified for square regions")
assert max_n_cells is None, logg.warning("'max_n_cells' ignored as window_size is specified")
if isinstance(window_size, (int, float)):
if window_size<= 0:
if window_size <= 0:
raise ValueError("Window size must be larger than 0.")
else:
window_size = (window_size, window_size)
elif isinstance(window_size, tuple):
for i in window_size:
if i<= 0:
if i <= 0:
raise ValueError("Window size must be larger than 0.")
window_sizes = [window_size]*len(libraries)

window_sizes = [window_size] * len(libraries)

# Create a DataFrame to store the sliding window assignments
sliding_window_df = pd.DataFrame(index=adata.obs.index)
if sliding_window_key in adata.obs:
logg.warning(f"Overwriting existing column '{sliding_window_key}' in adata.obs.")
adata.obs[sliding_window_key] = 'window_0'
adata.obs[sliding_window_key] = "window_0"

for i, lib in enumerate(libraries):
lib_mask = adata.obs[library_key] == lib
lib_coords = coords.loc[lib_mask]

# precalculate windows
windows = _calculate_window_corners(
fovs_x_range[i],
Expand Down Expand Up @@ -231,7 +232,7 @@ def sliding_window(
sliding_window_df[sliding_window_key].unique(),
key=lambda x: int(x.split("_")[-1]),
),
)
)

if copy:
return sliding_window_df
Expand Down Expand Up @@ -278,10 +279,10 @@ def _calculate_window_corners(
raise ValueError("Overlap must be non-negative.")
if overlap >= x_window_size or overlap >= y_window_size:
raise ValueError("Overlap must be less than the window size.")

max_x, min_x = x_range
max_y, min_y = y_range

x_step = x_window_size - overlap
y_step = y_window_size - overlap

Expand Down Expand Up @@ -319,23 +320,24 @@ def _calculate_window_corners(
windows = windows.reset_index(drop=True)
return windows[["x_start", "x_end", "y_start", "y_end"]]

def _optimize_tile_size(L, W, A_min=None, A_max=None, square=False, split_line='v'):

def _optimize_tile_size(L, W, A_min=None, A_max=None, square=False, split_line="v"):
"""
This function optimizes the tile size for covering a rectangle of dimensions LxW.
It returns a tuple (x, y) where x and y are the dimensions of the optimal tile.
Parameters:
- L (int): Length of the rectangle.
- W (int): Width of the rectangle.
- A_min (int, optional): Minimum allowed area of each tile. If None, no minimum area limit is applied.
- A_max (int, optional): Maximum allowed area of each tile. If None, no maximum area limit is applied.
- square (bool, optional): If True, tiles will be square (x = y).
Returns:
- tuple: (x, y) representing the optimal tile dimensions.
"""
best_tile_size = None
min_uncovered_area = float('inf')
min_uncovered_area = float("inf")
if square:
# Calculate square tiles
max_side = int(math.sqrt(A_max)) if A_max else int(min(L, W))
Expand All @@ -344,39 +346,39 @@ def _optimize_tile_size(L, W, A_min=None, A_max=None, square=False, split_line='
for side in range(min_side, max_side + 1):
if (A_min and side * side < A_min) or (A_max and side * side > A_max):
continue # Skip sizes that are out of the area limits

# Calculate number of tiles that fit in the rectangle
num_tiles_x = L // side
num_tiles_y = W // side
uncovered_area = L * W - (num_tiles_x * num_tiles_y * side * side)

# Track the best tile size
if uncovered_area < min_uncovered_area:
min_uncovered_area = uncovered_area
best_tile_size = (side, side)
else:
# For non-square tiles, optimize both dimensions independently
if split_line == 'v':
max_tile_length = A_max/W if A_max else int(L)
if split_line == "v":
max_tile_length = A_max / W if A_max else int(L)
max_tile_width = W
min_tile_length = A_min/W
min_tile_length = A_min / W
min_tile_width = W
if split_line == 'h':
if split_line == "h":
max_tile_length = L
max_tile_width = A_max/L if A_max else int()
min_tile_width = A_min/L
max_tile_width = A_max / L if A_max else 0
min_tile_width = A_min / L
min_tile_length = L
# Try all combinations of width and height within the bounds
for width in range(int(min_tile_width), int(max_tile_width) + 1):
for height in range(int(min_tile_length), int(max_tile_length) + 1):
if (A_min and width * height < A_min) or (A_max and width * height > A_max):
continue # Skip sizes that are out of the area limits

# Calculate number of tiles that fit in the rectangle
num_tiles_x = L // width
num_tiles_y = W // height
uncovered_area = L * W - (num_tiles_x * num_tiles_y * width * height)

# Track the best tile size (minimizing uncovered area)
if uncovered_area < min_uncovered_area:
min_uncovered_area = uncovered_area
Expand Down

0 comments on commit d40f4d1

Please sign in to comment.