forked from TimeEscaper/distributed_eigenspaces
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdistributed.py
executable file
·185 lines (150 loc) · 6.41 KB
/
distributed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python
import pika
import json
import argparse
import time
import numpy as np
from load_data import load_CIFAR_10_data
from scipy.linalg import eigh as largest_eigh
class Node:
def __init__(self, broker_host):
# TODO: Implement proper credentials management
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=broker_host,
credentials=pika.PlainCredentials("distrib", "test")))
self.channel = self.connection.channel()
self.channel.queue_declare(queue='master')
self.channel.queue_declare(queue='slaves')
def top_k_eigenvectors(self, matrix, k):
"""
params: matrix: matrix
k: number of top vector to extract
return: top k eignvectors based on eignvalues
"""
N = matrix.shape[0]
return largest_eigh(matrix, eigvals=(N - k, N - 1))[1]
class SlaveNode(Node):
def __init__(self, broker_host, data):
super().__init__(broker_host)
print("Slave Start listening")
self.data = data
self.channel.basic_consume(queue='slaves', on_message_callback=self.callback_)
def start(self):
self.channel.start_consuming()
def callback_(self, channel, method, properties, body):
request = json.loads(body)
print("Slave: Received, batchid: " + str(request["batch"]))
batch = self.data[request["batch"][0]:request["batch"][1]]
eigenspace = self.compute_sigma_hat_(batch)
eigenspace = self.top_k_eigenvectors(eigenspace, request["rank"])
response = dict()
response["batch"] = request["batch"]
response["eigenspace"] = eigenspace.tolist()
self.send_to_master_(str(json.dumps(response)))
channel.basic_ack(delivery_tag=method.delivery_tag)
def send_to_master_(self, message):
print("Sending to Master")
self.channel.basic_publish(exchange='', routing_key='master', body=message)
def compute_sigma_hat_(self, x):
"""
Compute the segma hat that can run nodes(slaves)
params: x: batch of the data
return: segma hat
"""
# the K leading eigenvectors of simga = 1/n * sum{X @ X.T} over n
n, d = x.shape
sigma_hat = np.zeros((d, d))
sigma_hat += np.dot(x.T, x)
sigma_hat /= n
return sigma_hat
def compute_eigenspace_(self, batch, rank):
sigma = np.zeros((len(batch[0]), len(batch[0])))
for x in batch:
vec = np.array(x)
sigma += vec @ vec.T
sigma = sigma / len(batch)
eigenvalues, eigenvectors = np.linalg.eig(sigma)
return eigenvectors[:, 0:rank]
class MasterNode(Node):
def __init__(self, broker_host, rank, batches_number, data):
super().__init__(broker_host)
self.rank = rank
self.batches_number = batches_number
self.data = data
self.batches_in_process = set()
self.batches = list()
self.computed_eigens = list()
self.current_batch = 0
print("Master Start listening")
self.start_time = time.time()
self.channel.basic_consume(queue='master', on_message_callback=self.callback_)
def start(self):
# Split the dataset
print("Splitting dataset...")
step = self.data.shape[0] // self.batches_number
for i in range(self.batches_number):
batch = (i * step, (i + 1) * step)
self.batches.append(batch)
#self.batches.append(self.data[i*step:(i+1)*step])
self.batches_in_process.add(batch)
# Send batches to queue
print("Sending to slaves...")
for i in range(0, 5):
request = dict()
request["rank"] = self.rank
request["batch"] = self.batches.pop()
self.send_to_slaves_(str(json.dumps(request)))
print("Start waiting for messages")
self.channel.start_consuming()
def callback_(self, channel, method, properties, body):
request = json.loads(body)
batch = (request["batch"][0], request["batch"][1])
print("Master: Received, batch: " + str(batch))
eigenspace = np.array(request["eigenspace"])
self.computed_eigens.append(eigenspace)
self.batches_in_process.remove(batch)
if len(self.batches_in_process) == 0:
sigma_tilde = np.zeros((self.computed_eigens[0].shape[0], self.computed_eigens[0].shape[0]))
for eigen in self.computed_eigens:
sigma_tilde += eigen @ eigen.T
sigma_tilde /= self.batches_number
print("Computed! Time (seconds): " + str(time.time() - self.start_time))
else:
request = dict()
request["batch"] = self.batches.pop()
request["rank"] = self.rank
self.send_to_slaves_(str(json.dumps(request)))
print("Sended batch: " + str(request["batch"]))
channel.basic_ack(delivery_tag=method.delivery_tag)
def send_to_slaves_(self, message):
print("Sending to slaves: ")
self.channel.basic_publish(exchange='', routing_key='slaves', body=message)
def run_master(broker, rank, batches_number, data):
master = MasterNode(broker, rank, batches_number, data)
master.start()
def run_slave(broker, data):
slave = SlaveNode(broker, data)
slave.start()
def main():
parser = argparse.ArgumentParser(description="Multinode PCA")
parser.add_argument("--mode", help="Mode to run script - slave or master")
parser.add_argument("--broker", help="Message broker IP address")
parser.add_argument("--rank", help="Approximation rank (only for master node")
parser.add_argument("--batches", help="Total batches number")
parser.add_argument("--data", default="cifar-10-batches-py", help="Path to dataset")
args = parser.parse_args()
if args.broker is None:
raise RuntimeError("Broker not specified")
data, filenames, labels = load_CIFAR_10_data(args.data)
# Remove RGB Channales and make dataset single scale
data = data.mean(axis=3)
# Reshape images to be in R(1024) instead of R(32x32)
data = data.reshape(data.shape[:-2] + (-1,))
if args.mode == "slave":
run_slave(args.broker, data)
elif args.mode == "master":
run_master(args.broker, int(args.rank), int(args.batches), data)
else:
raise RuntimeError("Mode not specified or specified wrong")
if __name__ == "__main__":
main()