Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add a local search function #99

Merged
merged 17 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions qamomile/core/post_process/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from qamomile.core.converters.converter import QuantumConverter

__all__ = [
QuantumConverter
]
261 changes: 261 additions & 0 deletions qamomile/core/post_process/local_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
"""
qamomile/core/post_process.py

This module provides functionality for Local Search Algorithm. Local Search Algorithm is one of the simplest
heuristic algorithms for the optimization problem. You can apply the local search to the following Ising Hamiltonian.

.. math::
\sum_{ij} J_{ij} z_i z_j + \sum_i h_i z_i, \\\\
\\text{s.t.}~z_i \in \{-1, 1\}

Key Features:

- Implementation of first- and best-improvement local search algorithms

- Construction of cost Hamiltonians for the Ising model

- Conversion of Ising interactions from dictionary format to matrix form

"""

from typing import Callable, Optional

import jijmodeling as jm
import numpy as np
from qamomile.core.bitssample import BitsSample, BitsSampleSet
from qamomile.core.converters.converter import QuantumConverter
from qamomile.core.ising_qubo import IsingModel


class IsingMatrix:
"""
A class to represent the Ising model in matrix form.

Attributes:
quad (np.ndarray): The interaction terms between spin variables in the Ising model.
linear (np.ndarray): The external magnetic fields or self-interaction applied to each spin in the Ising model

Methods:
to_ising_matrix: Converts an Ising model into its corresponding matrix form.
calc_E_diff: Calculates the change in energy if a specific bit is flipped.
"""

def __init__(self, quad: Optional[np.ndarray] = None, linear: Optional[np.ndarray] = None):
"""
Initializes an IsingMatrix instance.

Args:
quad (Optional[np.ndarray]): Quadratic term matrix representing the interaction term.
linear (Optional[np.ndarray]): Linear term vector representing the self-interaction term.
"""
if quad is None:
self.quad = np.array([])
else:
self.quad = quad

if linear is None:
self.linear = np.array([])
else:
self.linear = linear

def to_ising_matrix(self, ising: "IsingModel") -> "IsingMatrix":
"""
Converts an IsingModel into matrix form with quadratic and linear components.

Args:
ising (IsingModel): The Ising model to be converted.

Returns:
IsingMatrix: The converted Ising model in matrix form.
"""
size = ising.num_bits()

quad_matrix = np.zeros((size, size))
for (i, j), value in ising.quad.items():
quad_matrix[i, j] = value
quad_matrix[j, i] = value

linear_vector = np.zeros(size)
for i, value in ising.linear.items():
linear_vector[i] = value

self.quad = quad_matrix
self.linear = linear_vector
return self

def calc_E_diff(self, state: np.ndarray, l: int) -> float:
"""
Calculates the energy difference when flipping a bit in the Ising state.

Args:
state (np.ndarray): The current state of the Ising model.
l (int): Index of the bit to be flipped.

Returns:
float: The difference in energy due to flipping the specified bit.
"""
delta_E = -2 * state[l] * (self.quad[:, l] @ state + self.linear[l])
return delta_E


class LocalSearch:
"""
A class to perform local search algorithms on Ising models.

Attributes:
converter (QuantumConverter): The converter should be an implementation of the `QuantumConverter` abstract class, allowing the optimization problem
to be encoded as an Ising model.
ising (IsingModel): The encoded Ising model to be optimized.

Methods:
decode: Decodes the final solution from Ising spin state to jijmodeling SampleSet.
first_improvement: Performs first-improvement local search.
best_improvement: Performs best-improvement local search.
run: Runs the local search algorithm with a specified method.
"""

def __init__(self, converter: QuantumConverter):
"""
Initializes a LocalSearch instance with a quantum converter.

Args:
converter (QuantumConverter): A converter used to encode optimization problems to Ising form.
"""
self.converter = converter
self.ising = converter.ising_encode()

def decode(self, result) -> jm.experimental.SampleSet:
"""
Decodes the result obtained from a local search into a jijmodeling SampleSet.

Args:
result (np.ndarray): The final state of the Ising model after local search.

Returns:
jm.experimental.SampleSet: The decoded results.
"""
sample = BitsSample(1, np.where(result == -1, 1, 0).tolist())
sample_set = BitsSampleSet(bitarrays=[sample])
decoded_sampleset = self.converter.decode_bits_to_sampleset(sample_set)

return decoded_sampleset

def first_improvement(
self, ising_matrix: IsingMatrix, current_state: np.ndarray, N: int
) -> np.ndarray:
"""
Performs first-improvement local search on the Ising model.

The first-improvement local search method iteratively examines each bit in the current state
of the Ising model to determine if flipping that bit will reduce the system's energy.
If a bit flip results in an energy decrease, the flip is accepted immediately, and the algorithm moves to the next bit.
This process continues until all bits have been evaluated once.

Args:
ising_matrix (IsingMatrix): The Ising matrix representation of the problem.
current_state (np.ndarray): The current state of the Ising model.
N (int): Number of bits in the state.

Returns:
np.ndarray: The updated state after attempting to find an improving move.
"""
for i in range(N):
delta_E_i = ising_matrix.calc_E_diff(current_state, i)
if delta_E_i < 0:
current_state[i] = -current_state[i]

return current_state

def best_improvement(
self, ising_matrix: IsingMatrix, current_state: np.ndarray, N: int
) -> np.ndarray:
"""
Performs best-improvement local search on the Ising model.

The best-improvement local search method examines all possible bit flips in the current state of
the Ising model to determine which single bit flip would result in the greatest decrease in energy.
It then flips the bit that leads to the most significant energy reduction, provided there is at least one such improvement.
If no flip results in a reduction in energy, the state remains unchanged.

Args:
ising_matrix (IsingMatrix): The Ising matrix representation of the problem.
current_state (np.ndarray): The current state of the Ising model.
N (int): Number of bits in the state.

Returns:
np.ndarray: The updated state after attempting to find the best improving move.
"""
delta_E = np.array(
[ising_matrix.calc_E_diff(current_state, i) for i in range(N)]
)
best_index = np.argmin(delta_E)
best_delta_E = delta_E[best_index]

if best_delta_E < 0:
current_state[best_index] = -current_state[best_index]

return current_state

def run(
self,
initial_state: np.ndarray,
max_iter: int = -1,
local_search_method: str = "best_improvement",
) -> jm.experimental.SampleSet:
"""
Runs the local search algorithm until convergence or until a maximum number of iterations.

Args:
initial_state (np.ndarray): The initial state to start the local search from.
max_iter (int): Maximum number of iterations to run the local search. Defaults to -1 (no limit).
local_search_method (str): Method of local search ("best_improvement" or "first_improvement").
Defaults to "best_improvement".

Returns:
jm.experimental.SampleSet: The decoded solution after the local search.
"""
method_map = {
"best_improvement": self.best_improvement,
"first_improvement": self.first_improvement,
}
if local_search_method not in method_map:
raise ValueError(
f"Invalid local_search_method: {local_search_method}. Choose from {list(method_map.keys())}."
)

method = method_map[local_search_method]
result = self._run_local_search(method, initial_state, max_iter)
decoded_sampleset = self.decode(result)
return decoded_sampleset

def _run_local_search(
self, method: Callable, initial_state: np.ndarray, max_iter: int
) -> np.ndarray:
"""
Internal method to perform the local search on the Ising model.

Args:
method (Callable): The local search method (either best or first improvement).
initial_state (np.ndarray): The initial state to start the search from.
max_iter (int): The maximum number of iterations for the local search.

Returns:
np.ndarray: The final state after convergence or reaching the iteration limit.
"""
current_state = initial_state.copy()
ising_matrix = IsingMatrix()
ising_matrix.to_ising_matrix(self.ising)
N = len(current_state)
counter = 0

while max_iter == -1 or counter < max_iter:
previous_state = current_state.copy()
current_state = method(ising_matrix, current_state, N)

if np.array_equal(previous_state, current_state):
break

counter += 1

return current_state

110 changes: 110 additions & 0 deletions tests/core/post_process/test_local_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import pytest
import jijmodeling as jm
import jijmodeling_transpiler.core as jmt
from qamomile.core.post_process.local_search import IsingMatrix, LocalSearch
from qamomile.core.converters.qaoa import QAOAConverter
import numpy as np


@pytest.fixture
def qaoa_converter() -> QAOAConverter:
x = jm.BinaryVar("x", shape=(3,))
problem = jm.Problem("qubo")
problem += (
8 * x[0] * x[1] + 8 * x[1] * x[2] - 12 * x[0] - 8 * x[1] - 4 * x[2] + 8
) # in order to generate IsingModel({(0, 1): 2.0 ,(1, 2): 2.0}, {0: 4.0}, 0)
compiled_instance = jmt.compile_model(problem, {})
qaoa_converter = QAOAConverter(compiled_instance)

return qaoa_converter


@pytest.fixture
def ising_matrix():
return IsingMatrix()


@pytest.fixture
def local_search_instance(qaoa_converter):
return LocalSearch(qaoa_converter)


def test_to_ising_matrix(ising_matrix, qaoa_converter):
qaoa_encoded = qaoa_converter.ising_encode()
ising_matrix.to_ising_matrix(qaoa_encoded)
size = qaoa_encoded.num_bits()

assert isinstance(ising_matrix.quad, np.ndarray)
assert isinstance(ising_matrix.linear, np.ndarray)
assert ising_matrix.quad.shape == (size, size)
assert ising_matrix.linear.shape == (size,)


def test_calc_E_diff(ising_matrix, qaoa_converter):
ising_ex = ising_ex = qaoa_converter.ising_encode()
ising_matrix.to_ising_matrix(ising_ex)
state = np.array([1, 1, -1])
delta_E = ising_matrix.calc_E_diff(state, 0)

assert delta_E == -12.0


def test_run(local_search_instance, qaoa_converter):
size = qaoa_converter.ising_encode().num_bits()
state = np.random.choice([-1, 1], size=size)

invalid_method = "non_existent_method"
with pytest.raises(ValueError):
local_search_instance.run(state, local_search_method=invalid_method)


def test_run_local_search(local_search_instance, qaoa_converter):
local_search_instance.ising = qaoa_converter.ising_encode()
state = np.array([1, -1, -1])
new_state = local_search_instance._run_local_search(
method=local_search_instance.best_improvement, initial_state=state, max_iter=-1
)

assert np.array_equal(new_state, np.array([-1, 1, -1]))

new_state_2 = local_search_instance._run_local_search(
method=local_search_instance.best_improvement, initial_state=state, max_iter=1
)

assert np.array_equal(new_state_2, np.array([-1, -1, -1]))


def test_best_improvement(local_search_instance, ising_matrix, qaoa_converter):
ising_ex = qaoa_converter.ising_encode()
ising_matrix.to_ising_matrix(ising_ex)
size = 3
state = np.array([1, 1, -1])

new_state = local_search_instance.best_improvement(ising_matrix, state, size)

assert np.array_equal(new_state, np.array([-1, 1, -1]))
assert ising_ex.calc_energy(new_state) == -8


def test_first_improvement(local_search_instance, ising_matrix, qaoa_converter):
ising_ex = qaoa_converter.ising_encode()
ising_matrix.to_ising_matrix(ising_ex)
size = 3
state = np.array([1, -1, -1])

new_state = local_search_instance.first_improvement(ising_matrix, state, size)

assert np.array_equal(new_state, np.array([-1, 1, -1]))
assert ising_ex.calc_energy(new_state) == -8


def test_decode(local_search_instance):
state = np.array([-1, 1, -1])
result = local_search_instance.decode(state)
state_dict = result.data[0].var_values["x"].values
expected_state_dict = {(2,): 1.0, (0,): 1.0}
obj = result.data[0].eval.objective

assert isinstance(result, jm.experimental.SampleSet)
assert state_dict == expected_state_dict
assert obj == -8
Loading