Skip to content

Commit

Permalink
finalized docstring
Browse files Browse the repository at this point in the history
  • Loading branch information
fverdoja committed May 3, 2024
1 parent 34a2ebd commit 4b86c86
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 93 deletions.
102 changes: 91 additions & 11 deletions bff/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ def plot_dir(
dpi: int = 300,
cmap: str = "inferno",
) -> None:
"""Plots the specied directional probabilities extracted from a dynamic
map.
Args:
occupancy (OccupancyMap): The occupancy map to overlay on top of the
dynamics.
dynamics (np.ndarray): An array containing the 8-directional dynamics.
dir (Direction): The direction for which the dynamics should be
plotted.
dpi (int, optional): The dots-per-inch (pixel per inch) for the created
plot. Default is 300.
cmap (str, optional): The colormap used to map normalized data values
to RGB colors. Default is "inferno".
"""
binary_map = occupancy.binary_map
plt.figure(dpi=dpi)
plt.title(f"Direction: {dir.name}")
Expand Down Expand Up @@ -58,6 +72,23 @@ def plot_quivers(
normalize: bool = True,
dpi: int = 300,
) -> None:
"""Plots the eight-direction people dynamics as quivers/arrow plots over
the occupancy map.
Args:
occupancy (np.ndarray): The occupancy grid as a 2D numpy array.
dynamics (np.ndarray): An array containing the 8-directional dynamics.
scale (int, optional): Scaling factor used for reducing arrow density.
Default is 1.
window_size (int, optional): The size of the window to be plotted. If
provided, a center must also be provided.
center (RowColumnPair, optional): The center of the window to be
plotted. If provided, a window size must also be provided.
normalize (bool, optional): If True, the arrow scales are normalized.
Default is True.
dpi (int, optional): The dots-per-inch (pixel per inch) for the plot.
Default is 300.
"""
sz_occ = occupancy.shape
sz_dyn = dynamics.shape
assert sz_occ[0] // scale == sz_dyn[0] and sz_occ[1] // scale == sz_dyn[1]
Expand Down Expand Up @@ -113,22 +144,12 @@ def plot_quivers(


def scale_quivers(d: np.ndarray) -> np.ndarray:
"""Scales the quivers by normalizing the arrow lengths."""
max = np.amax(d, axis=1)
ret = np.expand_dims(np.where(max != 0, max, 1), axis=1)
return d / ret


def random_input(
size: int = 32,
p_occupied: float = 0.5,
device: torch.device = torch.device("cpu"),
) -> torch.Tensor:
w = Window(size)
a = torch.rand((1, 1, size, size), device=device) < p_occupied
a[0, 0, w.center[0], w.center[1]] = 0 # ensure center is empty
return a.type(torch.float)


def estimate_dynamics(
net: PeopleFlow,
occupancy: Union[OccupancyMap, np.ndarray],
Expand All @@ -138,6 +159,32 @@ def estimate_dynamics(
batch_size: int = 4,
device: Optional[torch.device] = None,
) -> np.ndarray:
"""Estimates the dynamics of people flow in a given occupancy.
The function applies the network on windows of the provided occupancy to
estimate the people flow dynamics in the region. It handles both
OccupancyMap objects and numpy arrays representing occupancy. By default,
if a scale factor greater than 1 is provided and the occupancy is an
OccupancyMap, it will rescale the occupancy binary map accordingly before
applying the network.
Args:
net (PeopleFlow): PeopleFlow network used for estimating the dynamics.
occupancy (Union[OccupancyMap, np.ndarray]): Occupancy space to
estimate the dynamics upon.
scale (int, optional): Scale down factor for the occupancy map. This
parameter is ignored when occupancy is a numpy array. Default is 1.
net_scale (int, optional): Scale factor for the network window size.
Default is 1.
batch_size (int, optional): Number of patch samples to take from each
batch. Default is 4.
device (torch.device, optional): The device to move the network model
and data to for computation. If not given, automatically set to GPU if
available, else CPU.
Returns:
np.ndarray: An numpy array containing the estimated dynamics of people
flow.
"""
window = Window(net.window_size * net_scale)
if not device:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
Expand Down Expand Up @@ -346,24 +393,45 @@ def load(self, path: str) -> None:


class Window:
"""A class representing a window on a 2-dimensional grid."""

def __init__(self, size: int) -> None:
"""Init `Window` class
Args:
size (int): The size of the square window, represented as the
length of one side.
"""
self.size = size

@property
def half_size(self) -> int:
"""Returns the integer division of the window size by 2"""
return self.size // 2

@property
def center(self) -> RowColumnPair:
"""Returns the coordinate of the window center"""
return (self.half_size, self.half_size)

@property
def pad_amount(self) -> Sequence[int]:
"""Returns the number of rows and columns to pad around the window"""
return (self.half_size, self.half_size + self.size % 2 - 1)

def corners(
self, center: RowColumnPair, bounds: Optional[Sequence[int]] = None
) -> Sequence[int]:
"""Returns the corners of a window centered on the center.
Args:
center (RowColumnPair): The center coordinates of the window.
bounds (Sequence[int], optional): If given, the provided
corners will not exceed these bounds. Bounds should be given as
(min_row, max_row, min_col, max_col). Defaults to None.
Returns:
Sequence[int]: The corner coordinates as (left, top, right, bottom)
"""
left, top, right, bottom = (
center[1] - self.half_size, # left
center[0] - self.half_size, # top
Expand All @@ -382,6 +450,18 @@ def corners(
def indeces(
self, center: RowColumnPair, bounds: Optional[Sequence[int]] = None
) -> Set[RowColumnPair]:
"""Generates the indices encompassed by the window centered in center.
Args:
center (RowColumnPair): The center coordinates of the window.
bounds (Sequence[int], optional): If given, the provided indeces
will not exceed these bounds. Bounds should be given as
(min_row, max_row, min_col, max_col). Defaults to None.
Returns:
Set[RowColumnPair]: A set of row and column pairs representing each
cell position within the window.
"""
left, top, right, bottom = self.corners(center, bounds)
indeces = {
(row, col)
Expand Down
76 changes: 43 additions & 33 deletions mod/curation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@ def mm2m(val: float) -> float:


class DataCurator:
"""
Stump of class handing data curation. In current version handles only
subsampling.
"""
"""Stump of class handing data curation. In current version handles only
subsampling."""

# TODO Update description and add logging
def __init__(self, params):
"""
Constructor for data curator class.
:param params: Parameters guiding the curation (see json schema
@config/curator_shema.json)
:type params: dict
def __init__(self, params: dict):
"""Init `DataCurator` class.
Args:
params (dict): Parameters guiding the curation (see json schema
@config/curator_shema.json)
"""
# load params
self.params = params
Expand All @@ -39,16 +36,12 @@ def __init__(self, params):
logger.info("Parameters for data curation {}".format(self.params))

def curate(self):
"""
Function running main curation loop
"""
"""Function running main curation loop"""
if self.params["chunk_size"] != 0:
self.__chunk_processing()

def __chunk_processing(self):
"""
Function for curating large csv files in chunks.
"""
"""Function for curating large csv files in chunks."""
header = True
logger.info(
"Processing in chunks of size {}".format(self.params["chunk_size"])
Expand Down Expand Up @@ -87,41 +80,50 @@ def __chunk_processing(self):

header = False

def __subsample(self, chunk):
"""
Function for subsampling data. Depending on the flag different
def __subsample(self, chunk: pd.DataFrame) -> pd.DataFrame:
"""Function for subsampling data. Depending on the flag different
subsampling policies can be used. Currently only available one is to
keep very n-th line.
:param chunk: Data to be subsample :type chunk: dataframe :return:
Subsampled data :rtype: dataframe
Args:
chunk (pd.Dataframe): Data to be subsampled
Returns:
pd.Dataframe: Subsampled data
"""
result = []
if self.params["subsample"]["method"] == "line_keep":
result = chunk.iloc[:: self.params["subsample"]["parameter"], :]
logger.info(
"Was {} rows, remained {} rows".format(
len(chunk.index), len(result.index)
len(chunk.index), len(result.index) # type: ignore
)
)
return result

def __drop_columns(self, chunk):
"""
Function dropping redundant columns from the original data
def __drop_columns(self, chunk: pd.DataFrame) -> pd.DataFrame:
"""Function dropping redundant columns from the original data
Args:
chunk (pd.Dataframe): Data to be edited
:param chunk: Data to be edited
:type chunk: dataframe
:return: edited chunk
:rtype: dataframe
Returns:
pd.Dataframe: Edited data
"""
for c in self.params["drop_columns"]:
chunk = chunk.drop(c, axis=1)
logger.info("Dropping column: {}".format(c))
return chunk

def __process_column(self, chunk):
# TODO Description and logging
def __process_column(self, chunk: pd.DataFrame) -> pd.DataFrame:
"""Function processing columns in the original data
Args:
chunk (pd.Dataframe): Data to be edited
Returns:
pd.Dataframe: Edited data
"""
for function_name, column_name in zip(
self.params["process_columns"]["method"],
self.params["process_columns"]["columns"],
Expand All @@ -133,7 +135,15 @@ def __process_column(self, chunk):
chunk[column_name] = chunk[column_name].apply(mm2m)
return chunk

def __replace_header(self, chunk):
def __replace_header(self, chunk: pd.DataFrame) -> pd.DataFrame:
"""Function replacing the header row from the original data
Args:
chunk (pd.Dataframe): Data to be edited
Returns:
pd.Dataframe: Edited data
"""
for old_name, new_name in zip(
self.params["replace_header"]["input"],
self.params["replace_header"]["replacement"],
Expand Down
47 changes: 47 additions & 0 deletions mod/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,19 @@


class Grid(BaseModel):
"""Represents a 2D Map of Dynamics as a grid. Each cell of the grid is an
instance of models.Cell or its subclasses.
Attributes:
resolution (PositiveFloat): The size of the sides of the grid's
squares.
origin (XYCoords): The coordinate reference for the origin point.
model (type[Cell]): The type of cell used to fill the grid.
cells (dict[RCCoords, Cell]): A mapping of grid coordinates to cell
instances.
total_count (int): The total number of data items added to grid.
"""

resolution: PositiveFloat
origin: XYCoords
model: type[Cell] = Cell
Expand All @@ -18,6 +31,12 @@ class Grid(BaseModel):

@property
def dimensions(self) -> RCCoords:
"""Calculate the extent of the grid in rows and columns.
Returns:
RCCoords: The maximum row and column values currently within the
grid.
"""
max_r = 0
max_c = 0
for c in self.cells:
Expand All @@ -28,6 +47,13 @@ def dimensions(self) -> RCCoords:
return RCCoords(max_r, max_c)

def add_data(self, data: pd.DataFrame) -> None:
"""Add positional data to grid points by calculating which cell in the
grid each dataset entry belongs to.
Args:
data (pd.DataFrame): Data with 'x' and 'y' positional entries
"""

data_with_row_col = data.assign(
row=((data.y - self.origin.y) // self.resolution).astype(int),
col=((data.x - self.origin.x) // self.resolution).astype(int),
Expand All @@ -48,13 +74,24 @@ def add_data(self, data: pd.DataFrame) -> None:
self.total_count += len(cell_data.index)

def update_model(self) -> None:
"""Update all Cell models within the grid. Used after all data has been
added to the grid to finalize model parameters."""

for cell in self.cells.values():
cell.update_model(self.total_count)


def assign_prior_to_grid(
grid: Grid, prior: list[Probability], alpha: float
) -> None:
"""Assigns a single prior probability value to all cells in a grid.
Args:
grid (Grid): The grid to which the prior will be assigned.
prior (list[Probability]): The prior probabilities to be assigned to
each cell.
alpha (float): Concentration hyperparameter for the Dirichlet prior.
"""
for cell in grid.cells.values():
assert isinstance(cell, BayesianDiscreteDirectional)
cell.update_prior(prior, alpha)
Expand All @@ -66,6 +103,16 @@ def assign_cell_priors_to_grid(
alpha: float,
add_missing_cells: bool = False,
) -> None:
"""Assigns individual cell priors to each cell in the grid.
Args:
grid (Grid): The grid to which the prior will be assigned.
priors (dict[RCCoords, list[Probability]]): Mapping of cell coordinates
to their corresponding prior probabilities.
alpha (float): Concentration hyperparameter for the Dirichlet prior.
add_missing_cells (bool, optional): Set to True to add cells that are
missing in grid but present in priors. Defaults to False.
"""
for cell_id in priors:
cell = grid.cells.get(cell_id)
if cell:
Expand Down
Loading

0 comments on commit 4b86c86

Please sign in to comment.