-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpyspark_add_noise.py
executable file
·118 lines (86 loc) · 2.8 KB
/
pyspark_add_noise.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env pyspark
# Add_Noise Operator - Implemented in PySpark
# Guy Rapaport and Yasmin Bokobza ({guyrap,yasminbo}@post.bgu.ac.il)
# Massive Data Mining Course, Autumn 2015
# Deptartment of Information Systems Engineering
# Ben-Gurion University of the Negev
# To toy with this implementaiton, use ipython with Spark:
# $ export PYSPARK_DRIVER_PYTHON=ipython
#################################
# Spark Submit boilerplate code #
#################################
import pyspark
from pyspark import SparkContext, SparkConf
appName = "test"
master = "local"
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
######################
# Add_Noise Operator #
######################
from random import random
def add_noise(self, p, m):
"""Map every element in the rdd to element+m with probability p
or leave it unchanged with probability (1-p)."""
def noise(x):
if random() < p:
sign = (1 if random() < 0.5 else -1)
noise = random() * m * sign
x += noise
return x
return self.map(noise)
# Dynamically add new operator to RDD class
setattr(pyspark.rdd.RDD, "add_noise", add_noise)
############
# Examples #
############
def examples():
# Data preparation
empty_lst = []
odd_lst = [1,2,3,4,5]
even_lst = [1,2,3,4,5,6]
from random import randint
MIN = 1
MAX = 100
N = 20
big_lst = [randint(MIN, MAX) for _ in xrange(N)]
# Testing: expected vs. actual
lsts = {"empty list": empty_lst,
"odd list": odd_lst,
"even list": even_lst,
"big list": big_lst}
# Print out test results
order = ["empty list", "odd list", "even list", "big list"]
actual = {label: sc.parallelize(lsts[label]).add_noise(0.3, 5).collect() for label in order}
from itertools import izip
for label in order:
original = lsts[label]
noisey = actual[label]
print label
print original
print noisey
print
################################################
# Try reading input file according to CLI args #
################################################
import sys
import os
from itertools import izip
if __name__ == "__main__":
if len(sys.argv) == 1:
examples()
else:
try:
filename = sys.argv[1]
p = float(sys.argv[2])
m = float(sys.argv[3])
rdd = sc.textFile(filename, use_unicode=True)
rdd = rdd.map(float)
noisey_rdd = rdd.add_noise(p, m)
print "Added noise:"
for a, b in izip(rdd.collect(), noisey_rdd.collect()):
tag = "=" if a == b else "!"
print tag, a, "==>", b
except:
print "Error reading inout file! Exiting..."
exit(1)