-
Notifications
You must be signed in to change notification settings - Fork 11.5k
/
Copy pathmrSVM.py
95 lines (82 loc) · 3.53 KB
/
mrSVM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#!/usr/bin/python
# coding:utf8
'''
Created on 2017-04-07
Update on 2017-06-20
MapReduce version of Pegasos SVM
Using mrjob to automate job flow
Author: Peter/ApacheCN-xy/片刻
GitHub: https://github.com/apachecn/AiLearning
'''
from mrjob.job import MRJob
import pickle
from numpy import *
class MRsvm(MRJob):
DEFAULT_INPUT_PROTOCOL = 'json_value'
def __init__(self, *args, **kwargs):
super(MRsvm, self).__init__(*args, **kwargs)
self.data = pickle.load(open('/opt/git/MachineLearnidata/15.BigData_MapReduce/svmDat27'))
self.w = 0
self.eta = 0.69
self.dataList = []
self.k = self.options.batchsize
self.numMappers = 1
self.t = 1 # iteration number
def configure_options(self):
super(MRsvm, self).configure_options()
self.add_passthrough_option(
'--iterations', dest='iterations', default=2, type='int',
help='T: number of iterations to run')
self.add_passthrough_option(
'--batchsize', dest='batchsize', default=100, type='int',
help='k: number of data points in a batch')
def map(self, mapperId, inVals): # 需要 2 个参数
# input: nodeId, ('w', w-vector) OR nodeId, ('x', int)
if False:
yield
if inVals[0] == 'w': # 积累 w向量
self.w = inVals[1]
elif inVals[0] == 'x':
self.dataList.append(inVals[1]) # 累积数据点计算
elif inVals[0] == 't': # 迭代次数
self.t = inVals[1]
else:
self.eta = inVals # 这用于 debug, eta未在map中使用
def map_fin(self):
labels = self.data[:, -1]
X = self.data[:, :-1] # 将数据重新形成 X 和 Y
if self.w == 0:
self.w = [0.001] * shape(X)[1] # 在第一次迭代时,初始化 w
for index in self.dataList:
p = mat(self.w)*X[index, :].T # calc p=w*dataSet[key].T
if labels[index]*p < 1.0:
yield (1, ['u', index]) # 确保一切数据包含相同的key
yield (1, ['w', self.w]) # 它们将在同一个 reducer
yield (1, ['t', self.t])
def reduce(self, _, packedVals):
for valArr in packedVals: # 从流输入获取值
if valArr[0] == 'u':
self.dataList.append(valArr[1])
elif valArr[0] == 'w':
self.w = valArr[1]
elif valArr[0] == 't':
self.t = valArr[1]
labels = self.data[:, -1]
X = self.data[:, 0:-1]
wMat = mat(self.w)
wDelta = mat(zeros(len(self.w)))
for index in self.dataList:
wDelta += float(labels[index]) * X[index, :] # wDelta += label*dataSet
eta = 1.0/(2.0*self.t) # calc new: eta
# calc new: w = (1.0 - 1/t)*w + (eta/k)*wDelta
wMat = (1.0 - 1.0/self.t)*wMat + (eta/self.k)*wDelta
for mapperNum in range(1, self.numMappers+1):
yield (mapperNum, ['w', wMat.tolist()[0]]) # 发出 w
if self.t < self.options.iterations:
yield (mapperNum, ['t', self.t+1]) # 增量 T
for j in range(self.k/self.numMappers): # emit random ints for mappers iid
yield (mapperNum, ['x', random.randint(shape(self.data)[0])])
def steps(self):
return ([self.mr(mapper=self.map, reducer=self.reduce, mapper_final=self.map_fin)] * self.options.iterations)
if __name__ == '__main__':
MRsvm.run()