-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathqaoa_solution2.py
275 lines (225 loc) · 8.96 KB
/
qaoa_solution2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from itertools import combinations
from qiskit import Aer
from qiskit.algorithms import QAOA
from qiskit.algorithms.optimizers import COBYLA
from qiskit.utils import QuantumInstance
from qiskit.opflow import PauliOp
from qiskit.quantum_info import Pauli
from qiskit_ibm_runtime import QiskitRuntimeService
def decode_solution(result, n_cities):
"""
Decode the QAOA result into a valid TSP path.
Now properly handles dictionary output from QAOA.
"""
try:
# Get the state with highest probability from the counts dictionary
if hasattr(result, 'eigenstate') and isinstance(result.eigenstate, dict):
# Handle dictionary of counts
counts = result.eigenstate
max_bitstring = max(counts.items(), key=lambda x: x[1])[0]
binary = format(int(max_bitstring, 2), f'0{n_cities * n_cities}b')
else:
# Fallback: create a simple sequential path
print("Warning: Could not decode quantum state, using fallback path")
path = list(range(n_cities)) + [0]
return path
# Convert to state matrix
state_matrix = np.array(list(map(int, binary))).reshape(n_cities, n_cities)
# Build valid path
path = []
used_cities = set()
for t in range(n_cities):
probs = state_matrix[:, t]
available = [i for i in range(n_cities) if i not in used_cities]
if not available:
remaining = list(set(range(n_cities)) - set(path))
city = remaining[0] if remaining else path[0]
else:
city = max(available, key=lambda x: probs[x])
path.append(city)
used_cities.add(city)
path.append(path[0]) # Complete the cycle
return path
except Exception as e:
print(f"Warning: Error in solution decoding: {e}")
print("Using fallback path")
# Return a simple sequential path as fallback
return list(range(n_cities)) + [0]
def create_cost_hamiltonian(distances):
"""
Create the cost Hamiltonian for the QAOA.
"""
n_cities = len(distances)
n_qubits = n_cities * n_cities
cost_ops = []
# Distance terms
for i in range(n_cities):
for j in range(n_cities):
if i != j:
for t in range(n_cities):
t_next = (t + 1) % n_cities
qubit1 = i * n_cities + t
qubit2 = j * n_cities + t_next
pauli_str = ['I'] * n_qubits
pauli_str[qubit1] = 'Z'
pauli_str[qubit2] = 'Z'
pauli = Pauli(''.join(pauli_str))
cost_ops.append(PauliOp(pauli, distances[i][j] / 4))
# Add strong penalty terms
penalty = 20.0
# One city per time step
for t in range(n_cities):
for i, j in combinations(range(n_cities), 2):
qubit1 = i * n_cities + t
qubit2 = j * n_cities + t
pauli_str = ['I'] * n_qubits
pauli_str[qubit1] = 'Z'
pauli_str[qubit2] = 'Z'
pauli = Pauli(''.join(pauli_str))
cost_ops.append(PauliOp(pauli, penalty))
# Each city visited once
for i in range(n_cities):
for t1, t2 in combinations(range(n_cities), 2):
qubit1 = i * n_cities + t1
qubit2 = i * n_cities + t2
pauli_str = ['I'] * n_qubits
pauli_str[qubit1] = 'Z'
pauli_str[qubit2] = 'Z'
pauli = Pauli(''.join(pauli_str))
cost_ops.append(PauliOp(pauli, penalty))
return sum(cost_ops)
def solve_tsp_with_qaoa(distances, cities, useSimulator=True, visualize=False):
"""
Solve TSP using QAOA on IBM Quantum hardware.
"""
# Create problem instance
n_cities = len(distances)
print(f"Solving TSP for {n_cities} cities:\n", cities, "\nDistance Matrix:\n", distances)
if visualize:
# Show initial graph
print("Visualizing TSP Graph:")
visualize_graph(distances, cities)
# Create Hamiltonian
cost_hamiltonian = create_cost_hamiltonian(distances)
# Set up QAOA
p = 3 # Number of QAOA layers (keep small for hardware constraints)
optimizer = COBYLA(maxiter=500)
if useSimulator:
backend = Aer.get_backend('qasm_simulator')
else:
# Save the IBM Quantum Experience Credentials only for first run and DO NOT COMMIT the Token in GIT repo!
# QiskitRuntimeService.save_account(channel="ibm_quantum", token="<YOUR-TOKEN>", overwrite=True, set_as_default=True)
# Load IBMQ account and select backend
service = QiskitRuntimeService(channel="ibm_quantum")
# Get the least busy backend with enough qubits
backend = service.least_busy(operational=True, simulator=False, min_num_qubits=16)
if not backend:
raise Exception("No available backends with enough qubits.")
print("Running on current least busy backend:", backend)
quantum_instance = QuantumInstance(
backend,
shots=4096,
seed_simulator=123,
seed_transpiler=123
)
qaoa = QAOA(
optimizer=optimizer,
quantum_instance=quantum_instance,
reps=p,
initial_point=[np.pi/3] * (2*p)
)
# Run QAOA
try:
result = qaoa.compute_minimum_eigenvalue(cost_hamiltonian)
# Debug information
print("\nQAOA Result Details:")
print(f"Result type: {type(result)}")
print(f"Available attributes: {dir(result)}")
if hasattr(result, 'eigenstate'):
print(f"Eigenstate type: {type(result.eigenstate)}")
# Decode solution
path_indices = decode_solution(result, n_cities)
path = [cities[i] for i in path_indices]
# Calculate total distance
total_distance = sum(distances[path_indices[i]][path_indices[i+1]]
for i in range(len(path_indices)-1))
# Print results
print(f"\nQAOA Results:")
print(f"Optimal path: {' -> '.join(path)}")
print(f"Total distance: {total_distance}")
if visualize:
# Show solution graph
print("\nVisualizing Optimal Path:")
visualize_graph(distances, cities, path)
return path, total_distance
except Exception as e:
print(f"Error in QAOA execution: {e}")
# Return a simple sequential path as fallback
path = list(range(n_cities)) + [0]
path = [cities[i] for i in path]
return path, float('inf')
def visualize_graph(distances, cities, path=None):
"""
Visualize TSP graph with path highlighting.
"""
plt.figure(figsize=(10, 8))
G = nx.Graph()
for i in range(len(cities)):
for j in range(i + 1, len(cities)):
G.add_edge(cities[i], cities[j], weight=distances[i][j])
pos = nx.circular_layout(G)
# Draw all edges in light gray
nx.draw_networkx_edges(G, pos,
edge_color='lightgray',
width=1,
style='dashed')
# Draw edge labels
edge_labels = nx.get_edge_attributes(G, 'weight')
nx.draw_networkx_edge_labels(G, pos,
edge_labels=edge_labels,
font_size=10)
if path is not None:
# Draw solution path with arrows
path_edges = list(zip(path[:-1], path))
for start, end in path_edges:
start_pos = pos[start]
end_pos = pos[end]
plt.arrow(start_pos[0], start_pos[1],
end_pos[0] - start_pos[0],
end_pos[1] - start_pos[1],
head_width=0.03,
head_length=0.05,
fc='red',
ec='red',
length_includes_head=True,
width=0.002)
# Draw nodes
nx.draw_networkx_nodes(G, pos,
node_color='lightblue',
node_size=1000,
edgecolors='black',
linewidths=2)
nx.draw_networkx_labels(G, pos,
font_size=14,
font_weight='bold')
if path is None:
plt.title("Initial TSP Graph")
else:
total_distance = sum(distances[cities.index(path[i])][cities.index(path[i+1])]
for i in range(len(path)-1))
plt.title(f"TSP Solution Path\nPath: {' → '.join(path)}\nTotal Distance: {total_distance}")
plt.axis('off')
# Add legend
if path is not None:
legend_elements = [
plt.Line2D([0], [0], color='lightgray', linestyle='--',
label='Available Paths'),
plt.Line2D([0], [0], color='red', label='Solution Path')
]
plt.legend(handles=legend_elements, loc='lower center',
bbox_to_anchor=(0.5, -0.15))
plt.tight_layout()
plt.show()