-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel.py
57 lines (49 loc) · 1.66 KB
/
parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# This is my attempt at parallel Python implementation using shared memory and
# threads.
import multiprocessing
import multiprocessing.dummy
import datetime
import sys
def scan_in_csv(r):
try:
return [line.split(',') for line in r], None
except Exception as e:
return None, e
def validate_rows(rows, col_size):
print('id of forked rows', id(rows)) # see if there was a copy of `rows` passed in
for row_id, row in enumerate(rows):
if len(row) != col_size:
msg = "Row {} has {} cells, but expected {}\n"
print(msg.format(row_id, len(row), col_size))
continue
for col_id, cell in enumerate(row):
try:
int(cell)
except ValueError as e:
print("Err at ({}, {}): {}".format(col_id, row_id, e))
def validate_parallel(rows, col_size, core_count):
pool = multiprocessing.dummy.Pool(core_count)
chunks = [rows[i::core_count] for i in range(core_count)]
pool.map(lambda r: validate_rows(r, col_size), chunks)
pool.close()
pool.join()
def timeit(f):
start = datetime.datetime.now()
f()
return datetime.datetime.now() - start
if __name__ == '__main__':
rows, err = scan_in_csv(sys.stdin)
if err is not None:
print(err)
sys.exit(-1)
if len(rows) < 1:
print('No rows in file')
sys.exit(-1)
core_count = multiprocessing.cpu_count()
print("Beginning validation...")
print('id of original rows', id(rows))
print("Validated {} rows of {} cells in {}".format(
len(rows),
len(rows[0]),
timeit(lambda: validate_parallel(rows, len(rows[0]), core_count)),
))