Skip to content

Commit

Permalink
ENH: support the traversal of custom coupling graphs (#12)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Nikolai Kapralov <[email protected]>
  • Loading branch information
astudenova and ctrltz authored Sep 17, 2024
1 parent 0ef5ea0 commit 7837b06
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ classifiers = [
]
dependencies = [
"colorednoise",
"harmoni",
"mne",
"harmoni"
"networkx"
]

[project.urls]
Expand Down
77 changes: 77 additions & 0 deletions src/meegsim/coupling_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import networkx as nx
import numpy as np


def generate_walkaround_paths(tree, start_node=None, random_state=None):
"""
Generate a list of walkaround paths in a tree starting from start_node.
Walkaround paths are pairs of nodes where each pair represents an edge
in the tree, starting from the specified start_node.
Parameters:
----------
tree : networkx.Graph
The tree in which to generate walkaround paths.
start_node : int
The node from which to start generating paths.
If start_node is None (default), the start node will be drawn randomly.
random_state : int or None, optional
Seed for the random number generator. If start_node is None (default), the
start node will be drawn randomly, and results will vary between function calls.
Returns:
-------
out : list of lists of int
A list of pairs of nodes representing walkaround paths.
"""

if start_node is None:
# take random
rng = np.random.default_rng(random_state)
start_node = rng.choice(list(tree.nodes))

return list(nx.dfs_edges(tree, source=start_node))


def connecting_paths(coupling_setup, random_state=None):
"""
Constructs a graph from the provided edge list and attributes, and identifies walkaround paths in tree topologies.
Parameters
----------
coupling_setup : dict
with keys being edges (source, target)
with values being coupling parameters dict(method='ppc_von_mises', kappa=0.5, phase_lag=1)
random_state : int or None, optional
Seed for the random number generator. If start_node is None, the start node will be drawn
randomly, and results will vary between function calls. default = None.
Returns
-------
out : tuple (G, walkaround)
- G : networkx.Graph
The constructed graph with edges, weights, and capacities.
- walkaround : list of lists
A list of walkaround paths for each tree topology in the graph. Each walkaround path is a list of node pairs.
"""

# Build graph
G = nx.Graph()
G.add_edges_from(coupling_setup)

if not nx.is_forest(G):
raise ValueError("The graph contains cycles. Cycles are not supported.")

# iterate over connected components
walkaround = []
for component in nx.connected_components(G):
subgraph = G.subgraph(component)

# build the path starting from random node
walkaround_paths = generate_walkaround_paths(subgraph, start_node=None,
random_state=random_state)
walkaround.append(walkaround_paths)

return G, walkaround

107 changes: 107 additions & 0 deletions tests/test_coupling_graph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import numpy as np
import networkx as nx
import pytest

from meegsim.coupling_graph import connecting_paths, generate_walkaround_paths


def test_generate_walkaround_paths_with_start_node():
# Create a simple tree graph
tree = nx.Graph()
tree.add_edges_from([(0, 1), (0, 2), (1, 3), (1, 4)])

# Generate paths starting from node 0
result = generate_walkaround_paths(tree, start_node=0)
expected = [(0, 1), (1, 3), (1, 4), (0, 2)]
assert result == expected, f"Failed with start_node: Expected {expected}, got {result}"


def test_generate_walkaround_paths_random_start_node():
# Create a simple tree graph
tree = nx.Graph()
tree.add_edges_from([(0, 1), (0, 2), (1, 3), (1, 4)])

# Generate paths with a random start node using a fixed seed
result1 = generate_walkaround_paths(tree, random_state=42)
result2 = generate_walkaround_paths(tree, random_state=42)
assert result1 == result2, "Failed with random_state: Results should be identical"


def test_generate_walkaround_paths_single_node():
# Test with a single-node tree
tree = nx.Graph()
tree.add_node(0)

# Generate paths starting from node 0
result = generate_walkaround_paths(tree, start_node=0)
expected = []
assert result == expected, f"Failed on single-node tree: Expected {expected}, got {result}"


def test_connecting_paths_tree_topology():
# Test with a simple tree topology
edgelist = [(0, 1), (1, 2), (1, 3)]
kappa_list = [0.1, 0.2, 0.3]
phase_lag_list = [0.5, 0.6, 0.7]

coupling_setup = {
edge: {
'method': 'ppc_von_mises',
'kappa': kappa_list[i],
'phase_lag': phase_lag_list[i]
}
for i, edge in enumerate(edgelist)
}

G, walkaround = connecting_paths(coupling_setup, random_state=42)

assert list(G.edges) == edgelist, "Graph edges do not match the edge list"
assert len(walkaround) == 1, "There should be one walkaround path for one tree topology"

# Convert lists to sets of frozensets to account for order invariance
expected_walkaround = {(0, 1), (1, 2), (1, 3)}
result_walkaround = set(walkaround[0])

assert result_walkaround == expected_walkaround, (
f"Expected walkaround paths {expected_walkaround}, got {result_walkaround}"
)


def test_connecting_paths_with_cycle_topology():
# Test with a graph containing a cycle
edgelist = [(0, 1), (1, 2), (2, 0)]
kappa_list = [0.1, 0.2, 0.3]
phase_lag_list = [0.5, 0.6, 0.7]

coupling_setup = {
edge: {
'method': 'ppc_von_mises',
'kappa': kappa_list[i],
'phase_lag': phase_lag_list[i]
}
for i, edge in enumerate(edgelist)
}

with pytest.raises(ValueError, match="The graph contains cycles. Cycles are not supported."):
connecting_paths(coupling_setup)


def test_connecting_paths_random_state():
# Test with random_state for reproducibility
edgelist = [(0, 1), (1, 2), (1, 3)]
kappa_list = [0.1, 0.2, 0.3]
phase_lag_list = [0.5, 0.6, 0.7]

coupling_setup = {
edge: {
'method': 'ppc_von_mises',
'kappa': kappa_list[i],
'phase_lag': phase_lag_list[i]
}
for i, edge in enumerate(edgelist)
}

_, walkaround1 = connecting_paths(coupling_setup, random_state=42)
_, walkaround2 = connecting_paths(coupling_setup, random_state=42)

assert walkaround1 == walkaround2, "Walkaround paths should be identical with the same random_state"

0 comments on commit 7837b06

Please sign in to comment.