diff --git a/pyproject.toml b/pyproject.toml index 7ad26c9..d7da9f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,8 +19,9 @@ classifiers = [ ] dependencies = [ "colorednoise", + "harmoni", "mne", - "harmoni" + "networkx" ] [project.urls] diff --git a/src/meegsim/coupling_graph.py b/src/meegsim/coupling_graph.py new file mode 100644 index 0000000..b8f1b02 --- /dev/null +++ b/src/meegsim/coupling_graph.py @@ -0,0 +1,77 @@ +import networkx as nx +import numpy as np + + +def generate_walkaround_paths(tree, start_node=None, random_state=None): + """ + Generate a list of walkaround paths in a tree starting from start_node. + + Walkaround paths are pairs of nodes where each pair represents an edge + in the tree, starting from the specified start_node. + + Parameters: + ---------- + tree : networkx.Graph + The tree in which to generate walkaround paths. + start_node : int + The node from which to start generating paths. + If start_node is None (default), the start node will be drawn randomly. + random_state : int or None, optional + Seed for the random number generator. If start_node is None (default), the + start node will be drawn randomly, and results will vary between function calls. + + Returns: + ------- + out : list of lists of int + A list of pairs of nodes representing walkaround paths. + """ + + if start_node is None: + # take random + rng = np.random.default_rng(random_state) + start_node = rng.choice(list(tree.nodes)) + + return list(nx.dfs_edges(tree, source=start_node)) + + +def connecting_paths(coupling_setup, random_state=None): + """ + Constructs a graph from the provided edge list and attributes, and identifies walkaround paths in tree topologies. + + Parameters + ---------- + coupling_setup : dict + with keys being edges (source, target) + with values being coupling parameters dict(method='ppc_von_mises', kappa=0.5, phase_lag=1) + random_state : int or None, optional + Seed for the random number generator. If start_node is None, the start node will be drawn + randomly, and results will vary between function calls. default = None. + + Returns + ------- + out : tuple (G, walkaround) + - G : networkx.Graph + The constructed graph with edges, weights, and capacities. + - walkaround : list of lists + A list of walkaround paths for each tree topology in the graph. Each walkaround path is a list of node pairs. + """ + + # Build graph + G = nx.Graph() + G.add_edges_from(coupling_setup) + + if not nx.is_forest(G): + raise ValueError("The graph contains cycles. Cycles are not supported.") + + # iterate over connected components + walkaround = [] + for component in nx.connected_components(G): + subgraph = G.subgraph(component) + + # build the path starting from random node + walkaround_paths = generate_walkaround_paths(subgraph, start_node=None, + random_state=random_state) + walkaround.append(walkaround_paths) + + return G, walkaround + diff --git a/tests/test_coupling_graph.py b/tests/test_coupling_graph.py new file mode 100644 index 0000000..aee2569 --- /dev/null +++ b/tests/test_coupling_graph.py @@ -0,0 +1,107 @@ +import numpy as np +import networkx as nx +import pytest + +from meegsim.coupling_graph import connecting_paths, generate_walkaround_paths + + +def test_generate_walkaround_paths_with_start_node(): + # Create a simple tree graph + tree = nx.Graph() + tree.add_edges_from([(0, 1), (0, 2), (1, 3), (1, 4)]) + + # Generate paths starting from node 0 + result = generate_walkaround_paths(tree, start_node=0) + expected = [(0, 1), (1, 3), (1, 4), (0, 2)] + assert result == expected, f"Failed with start_node: Expected {expected}, got {result}" + + +def test_generate_walkaround_paths_random_start_node(): + # Create a simple tree graph + tree = nx.Graph() + tree.add_edges_from([(0, 1), (0, 2), (1, 3), (1, 4)]) + + # Generate paths with a random start node using a fixed seed + result1 = generate_walkaround_paths(tree, random_state=42) + result2 = generate_walkaround_paths(tree, random_state=42) + assert result1 == result2, "Failed with random_state: Results should be identical" + + +def test_generate_walkaround_paths_single_node(): + # Test with a single-node tree + tree = nx.Graph() + tree.add_node(0) + + # Generate paths starting from node 0 + result = generate_walkaround_paths(tree, start_node=0) + expected = [] + assert result == expected, f"Failed on single-node tree: Expected {expected}, got {result}" + + +def test_connecting_paths_tree_topology(): + # Test with a simple tree topology + edgelist = [(0, 1), (1, 2), (1, 3)] + kappa_list = [0.1, 0.2, 0.3] + phase_lag_list = [0.5, 0.6, 0.7] + + coupling_setup = { + edge: { + 'method': 'ppc_von_mises', + 'kappa': kappa_list[i], + 'phase_lag': phase_lag_list[i] + } + for i, edge in enumerate(edgelist) + } + + G, walkaround = connecting_paths(coupling_setup, random_state=42) + + assert list(G.edges) == edgelist, "Graph edges do not match the edge list" + assert len(walkaround) == 1, "There should be one walkaround path for one tree topology" + + # Convert lists to sets of frozensets to account for order invariance + expected_walkaround = {(0, 1), (1, 2), (1, 3)} + result_walkaround = set(walkaround[0]) + + assert result_walkaround == expected_walkaround, ( + f"Expected walkaround paths {expected_walkaround}, got {result_walkaround}" + ) + + +def test_connecting_paths_with_cycle_topology(): + # Test with a graph containing a cycle + edgelist = [(0, 1), (1, 2), (2, 0)] + kappa_list = [0.1, 0.2, 0.3] + phase_lag_list = [0.5, 0.6, 0.7] + + coupling_setup = { + edge: { + 'method': 'ppc_von_mises', + 'kappa': kappa_list[i], + 'phase_lag': phase_lag_list[i] + } + for i, edge in enumerate(edgelist) + } + + with pytest.raises(ValueError, match="The graph contains cycles. Cycles are not supported."): + connecting_paths(coupling_setup) + + +def test_connecting_paths_random_state(): + # Test with random_state for reproducibility + edgelist = [(0, 1), (1, 2), (1, 3)] + kappa_list = [0.1, 0.2, 0.3] + phase_lag_list = [0.5, 0.6, 0.7] + + coupling_setup = { + edge: { + 'method': 'ppc_von_mises', + 'kappa': kappa_list[i], + 'phase_lag': phase_lag_list[i] + } + for i, edge in enumerate(edgelist) + } + + _, walkaround1 = connecting_paths(coupling_setup, random_state=42) + _, walkaround2 = connecting_paths(coupling_setup, random_state=42) + + assert walkaround1 == walkaround2, "Walkaround paths should be identical with the same random_state"