-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel4.py
68 lines (58 loc) · 1.95 KB
/
parallel4.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# modified version of parallel.py to avoid reallocation of lists
import datetime, multiprocessing, os, sys
def scan_in_csv(r):
try:
return [line.split(',') for line in r], None
except Exception as e:
return None, e
def validate_rows(allrows, col_size, start, end):
print('id of forked rows', id(allrows))
for row_id in range(start, end):
row = allrows[row_id]
if len(row) != col_size:
msg = "Row {} has {} cells, but expected {}\n"
print(msg.format(row_id, len(row), col_size))
continue
for col_id, cell in enumerate(row):
try:
int(cell)
except ValueError as e:
print("Err at ({}, {}): {}".format(col_id, row_id, e))
def validate_parallel(rows, col_size, core_count):
# Partition the input based on number of cores without reallocating memory
# for new lists.
chunks = [] # pairs of (start, end)
chunk_size = int(len(rows)/core_count)
end = len(rows)
while end > 0:
start = max(end - chunk_size, 0)
chunks.append((start, end))
end = start
procs = []
for start, end in chunks:
# fork() happens here, is `rows` copied?
p = multiprocessing.Process(target=validate_rows, args=(rows, col_size, start, end))
p.start()
procs.append(p)
for p in procs:
p.join()
def timeit(f):
start = datetime.datetime.now()
f()
return datetime.datetime.now() - start
if __name__ == '__main__':
_rows, err = scan_in_csv(sys.stdin)
if err is not None:
print(err)
sys.exit(-1)
if len(_rows) < 1:
print('No rows in file')
sys.exit(-1)
core_count = multiprocessing.cpu_count()
print("Beginning validation...")
print('id of original rows', id(_rows))
print("Validated {} rows of {} cells in {}".format(
len(_rows),
len(_rows[0]),
timeit(lambda: validate_parallel(_rows, len(_rows[0]), core_count)),
))