Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added edmonds_blossom_algorithm.py. For maximum matching in the graph. #12043 #12056

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 267 additions & 0 deletions graphs/edmonds_blossom_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
from collections import deque


class BlossomAuxData:
"""Class to hold auxiliary data during the blossom algorithm's execution."""

def __init__(
self,
queue: deque,
parent: list[int],
base: list[int],
in_blossom: list[bool],
match: list[int],
in_queue: list[bool],
) -> None:
"""
Initializes the BlossomAuxData instance.

Args:
queue: A deque for BFS processing.
parent: List of parent vertices in the augmenting path.
base: List of base vertices for each vertex.
in_blossom: Boolean list indicating if a vertex is in a blossom.
match: List of matched vertices.
in_queue: Boolean list indicating if a vertex is in the queue.
"""
self.queue = queue
self.parent = parent
self.base = base
self.in_blossom = in_blossom
self.match = match
self.in_queue = in_queue


class BlossomData:
"""Class to encapsulate data related to a blossom in the graph."""

def __init__(
self,
aux_data: BlossomAuxData,
vertex_u: int,
vertex_v: int,
lowest_common_ancestor: int,
) -> None:
"""
Initializes the BlossomData instance.

Args:
aux_data: The auxiliary data related to the blossom.
vertex_u: One vertex in the blossom.
vertex_v: The other vertex in the blossom.
lowest_common_ancestor: The lowest common ancestor of vertex_u and vertex_v.
"""
self.aux_data = aux_data
self.vertex_u = vertex_u
self.vertex_v = vertex_v
self.lowest_common_ancestor = lowest_common_ancestor


class EdmondsBlossomAlgorithm:
UNMATCHED = -1 # Constant to represent unmatched vertices

@staticmethod
def maximum_matching(edges: list[list[int]], vertex_count: int) -> list[list[int]]:
"""
Finds the maximum matching in a graph using the Edmonds Blossom Algorithm.

Args:
edges: A list of edges represented as pairs of vertices.
vertex_count: The total number of vertices in the graph.

Returns:
A list of matched pairs in the form of a list of lists.
"""
# Create an adjacency list for the graph
graph: list[list[int]] = [[] for _ in range(vertex_count)]

# Populate the graph with the edges
for edge in edges:
u, v = edge
graph[u].append(v)
graph[v].append(u)

# All vertices are initially unmatched
match: list[int] = [EdmondsBlossomAlgorithm.UNMATCHED] * vertex_count
parent: list[int] = [EdmondsBlossomAlgorithm.UNMATCHED] * vertex_count
# Each vertex is its own base initially
base: list[int] = list(range(vertex_count))
in_blossom: list[bool] = [False] * vertex_count
# Tracks vertices in the BFS queue
in_queue: list[bool] = [False] * vertex_count

# Main logic for finding maximum matching
for u in range(vertex_count):
# Only consider unmatched vertices
if match[u] == EdmondsBlossomAlgorithm.UNMATCHED:
# BFS initialization
parent = [EdmondsBlossomAlgorithm.UNMATCHED] * vertex_count
base = list(range(vertex_count))
in_blossom = [False] * vertex_count
in_queue = [False] * vertex_count

queue = deque([u]) # Start BFS from the unmatched vertex
in_queue[u] = True

augmenting_path_found = False

# BFS to find augmenting paths
while queue and not augmenting_path_found:
current = queue.popleft() # Get the current vertex
for y in graph[current]: # Explore adjacent vertices
# Skip if we're looking at the current match
if match[current] == y:
continue

if base[current] == base[y]: # Avoid self-loops
continue

if parent[y] == EdmondsBlossomAlgorithm.UNMATCHED:
# Case 1: y is unmatched;
# we've found an augmenting path
if match[y] == EdmondsBlossomAlgorithm.UNMATCHED:
parent[y] = current # Update the parent
augmenting_path_found = True
# Augment along this path
EdmondsBlossomAlgorithm.update_matching(
match, parent, y
)
break

# Case 2: y is matched;
# add y's match to the queue
z = match[y]
parent[y] = current
parent[z] = y
if not in_queue[z]: # If z is not already in the queue
queue.append(z)
in_queue[z] = True
else:
# Case 3: Both current and y have a parent;
# check for a cycle/blossom
base_u = EdmondsBlossomAlgorithm.find_base(
base, parent, current, y
)
if base_u != EdmondsBlossomAlgorithm.UNMATCHED:
EdmondsBlossomAlgorithm.contract_blossom(
BlossomData(
BlossomAuxData(
queue,
parent,
base,
in_blossom,
match,
in_queue,
),
current,
y,
base_u,
)
)

# Create result list of matched pairs
matching_result: list[list[int]] = []
for v in range(vertex_count):
if (
match[v] != EdmondsBlossomAlgorithm.UNMATCHED and v < match[v]
): # Ensure pairs are unique
matching_result.append([v, match[v]])

return matching_result

@staticmethod
def update_matching(
match: list[int], parent: list[int], matched_vertex: int
) -> None:
"""
Updates the matching based on the augmenting path found.

Args:
match: The current match list.
parent: The parent list from BFS traversal.
matched_vertex: The vertex where the augmenting path ends.
"""
while matched_vertex != EdmondsBlossomAlgorithm.UNMATCHED:
v = parent[matched_vertex] # Get the parent vertex
next_match = match[v] # Store the next match
match[v] = matched_vertex # Update match for v
match[matched_vertex] = v # Update match for matched_vertex
matched_vertex = next_match # Move to the next vertex

@staticmethod
def find_base(
base: list[int], parent: list[int], vertex_u: int, vertex_v: int
) -> int:
"""
Finds the base of the blossom.

Args:
base: The base array for each vertex.
parent: The parent array from BFS.
vertex_u: One endpoint of the blossom.
vertex_v: The other endpoint of the blossom.

Returns:
The lowest common ancestor of vertex_u and vertex_v in the blossom.
"""
visited: list[bool] = [False] * len(base)

# Mark ancestors of vertex_u
current_vertex_u = vertex_u
while True:
current_vertex_u = base[current_vertex_u]
# Mark this base as visited
visited[current_vertex_u] = True
if parent[current_vertex_u] == EdmondsBlossomAlgorithm.UNMATCHED:
break
current_vertex_u = parent[current_vertex_u]

# Find the common ancestor of vertex_v
current_vertex_v = vertex_v
while True:
current_vertex_v = base[current_vertex_v]
# Check if we've already visited this base
if visited[current_vertex_v]:
return current_vertex_v
current_vertex_v = parent[current_vertex_v]

@staticmethod
def contract_blossom(blossom_data: BlossomData) -> None:
"""
Contracts a blossom found during the matching process.

Args:
blossom_data: The data related to the blossom to be contracted.
"""
# Mark vertices in the blossom
for x in range(
blossom_data.vertex_u,
blossom_data.aux_data.base[blossom_data.vertex_u]
!= blossom_data.lowest_common_ancestor,
):
base_x = blossom_data.aux_data.base[x]
match_base_x = blossom_data.aux_data.base[blossom_data.aux_data.match[x]]
# Mark the base as in a blossom
blossom_data.aux_data.in_blossom[base_x] = True
blossom_data.aux_data.in_blossom[match_base_x] = True

for x in range(
blossom_data.vertex_v,
blossom_data.aux_data.base[blossom_data.vertex_v]
!= blossom_data.lowest_common_ancestor,
):
base_x = blossom_data.aux_data.base[x]
match_base_x = blossom_data.aux_data.base[blossom_data.aux_data.match[x]]
# Mark the base as in a blossom
blossom_data.aux_data.in_blossom[base_x] = True
blossom_data.aux_data.in_blossom[match_base_x] = True

# Update the base for all marked vertices
for i in range(len(blossom_data.aux_data.base)):
if blossom_data.aux_data.in_blossom[blossom_data.aux_data.base[i]]:
# Contract to the lowest common ancestor
blossom_data.aux_data.base[i] = blossom_data.lowest_common_ancestor
if not blossom_data.aux_data.in_queue[i]:
# Add to queue if not already present
blossom_data.aux_data.queue.append(i)
blossom_data.aux_data.in_queue[i] = True
72 changes: 72 additions & 0 deletions graphs/tests/test_edmonds_blossom_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import unittest

from graphs.edmonds_blossom_algorithm import EdmondsBlossomAlgorithm


class EdmondsBlossomAlgorithmTest(unittest.TestCase):
def convert_matching_to_array(self, matching):
"""Helper method to convert a
list of matching pairs into a sorted 2D array.
"""
# Convert the list of pairs into a list of lists
result = [list(pair) for pair in matching]

# Sort each individual pair for consistency
for pair in result:
pair.sort()

# Sort the array of pairs to ensure consistent order
result.sort(key=lambda x: x[0])
return result

def test_case_1(self):
"""Test Case 1: A triangle graph where vertices 0, 1, and 2 form a cycle."""
edges = [[0, 1], [1, 2], [2, 0]]
matching = EdmondsBlossomAlgorithm.maximum_matching(edges, 3)

expected = [[0, 1]]
assert expected == self.convert_matching_to_array(matching)

def test_case_2(self):
"""Test Case 2: A disconnected graph with two components."""
edges = [[0, 1], [1, 2], [3, 4]]
matching = EdmondsBlossomAlgorithm.maximum_matching(edges, 5)

expected = [[0, 1], [3, 4]]
assert expected == self.convert_matching_to_array(matching)

def test_case_3(self):
"""Test Case 3: A cycle graph with an additional edge outside the cycle."""
edges = [[0, 1], [1, 2], [2, 3], [3, 0], [4, 5]]
matching = EdmondsBlossomAlgorithm.maximum_matching(edges, 6)

expected = [[0, 1], [2, 3], [4, 5]]
assert expected == self.convert_matching_to_array(matching)

def test_case_no_matching(self):
"""Test Case 4: A graph with no edges."""
edges = [] # No edges
matching = EdmondsBlossomAlgorithm.maximum_matching(edges, 3)

expected = []
assert expected == self.convert_matching_to_array(matching)

def test_case_large_graph(self):
"""Test Case 5: A complex graph with multiple cycles and extra edges."""
edges = [[0, 1], [1, 2], [2, 3], [3, 4], [4, 5], [5, 0], [1, 4], [2, 5]]
matching = EdmondsBlossomAlgorithm.maximum_matching(edges, 6)

# Check if the size of the matching is correct (i.e., 3 pairs)
assert len(matching) == 3

# Check that the result contains valid pairs (any order is fine)
possible_matching_1 = [[0, 1], [2, 5], [3, 4]]
possible_matching_2 = [[0, 1], [2, 3], [4, 5]]
result = self.convert_matching_to_array(matching)

# Assert that the result is one of the valid maximum matchings
assert result in (possible_matching_1, possible_matching_2)


if __name__ == "__main__":
unittest.main()