Skip to content

Commit

Permalink
Merge pull request #67 from KTH/feature/parallel_CROW
Browse files Browse the repository at this point in the history
Feature/parallel crow
  • Loading branch information
Jacarte authored Sep 9, 2020
2 parents 7cdf4c0 + b70084f commit d50e7a1
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 39 deletions.
60 changes: 44 additions & 16 deletions crow/crow/crow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import re
import uuid
from socket_server import listen
import numpy as np


levelPool = None
Expand Down Expand Up @@ -141,38 +142,64 @@ def processBitcode(self, bc, outResult, program_name, redisports, OUT_FOLDER, on
futures = []
variants = []

for subset in getIteratorByName("keysSubset")(merging):
showGenerationProgress = config["DEFAULT"].getboolean("show-generation-progress")

temptativeNumber = np.prod([len(v) + 1 for v in merging.values()])

LOGGER.info(program_name,f"Temptative number of variants {temptativeNumber} (plus original). Expected ratio {len(redisports)} of programs in each iteration.")

if showGenerationProgress:
LOGGER.disable()
printProgressBar(generationcount, temptativeNumber,suffix=f' {generationcount}/{temptativeNumber}')

for subset in getIteratorByName("keysSubset")(merging):

job = generationPool.submit(
self.generateVariant, [subset], program_name, merging, redisports[generationcount
%len(redisports)], bc, OUT_FOLDER, onlybc, meta, outResult)
%len(redisports)], bc, OUT_FOLDER, onlybc, meta, outResult, generationcount, temptativeNumber)
# job.result()

futures.append(job)
generationcount += 1

if generationcount % len(redisports) == 0:
## WAIT for it

generationStartTime = time.time_ns()
LOGGER.info(program_name,f"Executing parallel generation job...")
done, fail = wait(futures, return_when=ALL_COMPLETED)
generationEndTime = time.time_ns() - generationStartTime

futures = []

LOGGER.info(program_name,f"Disposing job...{len(done)} {len(fail)}")

for f in done:
variants += f.result()



if showGenerationProgress:
speed = len(redisports)/generationEndTime
eta = temptativeNumber/speed/1e9

printProgressBar(len(variants), temptativeNumber,suffix=f' {generationcount}/{temptativeNumber} eta:{eta}s')



LOGGER.info(program_name,f"Executing final parallel generation job...")
done, fail = wait(futures, return_when=ALL_COMPLETED)
futures = []
LOGGER.info(program_name,f"Disposing job...{len(done)} {len(fail)}")
generationcount += len(done) + len(fail)

for f in done:
variants += f.result()
# Save metadata


if showGenerationProgress:
printProgressBar(len(variants), temptativeNumber,suffix=f' {generationcount}/{temptativeNumber} ')
LOGGER.enable()

LOGGER.info(program_name, f"Saving metadata...")
variantsFile = open(f"{OUT_FOLDER}/{program_name}.variants.json", 'w')
Expand All @@ -189,7 +216,7 @@ def processBitcode(self, bc, outResult, program_name, redisports, OUT_FOLDER, on

return dict(programs=meta, count=len(meta.keys()))

def generateVariant(self,job,program_name, merging, port,bc, OUT_FOLDER, onlybc, meta, outResult):
def generateVariant(self,job,program_name, merging, port,bc, OUT_FOLDER, onlybc, meta, outResult, current, total):

if len(job) == 0 :
LOGGER.info(program_name,f"Empty job...")
Expand All @@ -198,7 +225,7 @@ def generateVariant(self,job,program_name, merging, port,bc, OUT_FOLDER, onlybc,
variants = []
LOGGER.info(program_name,f"Generating {len(job)} variants...")
r = redis.Redis(host="localhost", port=port)
for j in job:
for jindex,j in enumerate(job):
LOGGER.info(program_name, f"Cleaning previous cache for variant generation...{port}")
try:
LOGGER.success(program_name, f"Flushing redis DB...")
Expand Down Expand Up @@ -335,7 +362,7 @@ def processLevel(self, levels, program_name, port, bc, OUT_FOLDER, onlybc, meta,
raise e

waitFor = 5
LOGGER.warning(program_name, f"Sleeping for {waitFor} seconds waiting for freeing ports...")
LOGGER.warning(program_name, f"Sleeping for {waitFor} seconds waiting for free ports...")
time.sleep(waitFor)
while not q.empty():
i = q.get()
Expand Down Expand Up @@ -418,21 +445,26 @@ def removeDuplicate(program_name, folder, filt="*.wasm", remove=False):

def process(f, OUT_FOLDER, onlybc, program_name, redisports, isBc=False):

LEVELS = 20 # upper bound level count
LEVELS = len(config["DEFAULT"]["order"].split(","))
exploration_workers = config["DEFAULT"].getint("workers")
exploration_timeout = config["DEFAULT"].getint("exploration-timeout")
total_timeout = config["DEFAULT"].getint("timeout")

print(LEVELS, exploration_workers, exploration_timeout, total_timeout)
if total_timeout > 0 : # There is a timeout
# Validate the composition of times

if exploration_workers > LEVELS:
LOGGER.warning(program_name, f"The number of generation workers is the maximum number of levels {LEVELS}. You set the maximum to {exploration_workers}, skipped.")
exploration_workers = LEVELS
if 1.0*exploration_timeout * LEVELS / exploration_workers >= total_timeout:
if 1.0*exploration_timeout * LEVELS / exploration_workers >= total_timeout or exploration_timeout <= -1:
if total_timeout > 0:
LOGGER.error(program_name, f"The total timeout set {total_timeout}s must be larger than the whole (even parallel) generation stage, which is {1.0*exploration_timeout * LEVELS / exploration_workers}s, according to the number of threads and exploration timeout ")
exit(1)
else:
if exploration_timeout > -1:
LOGGER.info(program_name, f"The total exploration stage would take {1.0*exploration_timeout * LEVELS / exploration_workers}s, according to the number of threads and exploration timeout ")
else:
LOGGER.warning(program_name, f"Sit and wait, there is no timeout")



global launch

Expand Down Expand Up @@ -470,8 +502,6 @@ def launch(file, result):

timeout = config["DEFAULT"].getint("timeout")

print("Timeout ... %s s" % timeout)

if total_timeout > 0:
th.join(timeout=timeout)
else:
Expand Down Expand Up @@ -517,7 +547,6 @@ def main(f, redisports):

result = dict(namespace=program_name, programs=[])
attach = []
print(os.listdir(f))
for final in ["%s/%s" % (f, i) for i in os.listdir(f)]:

program_name, OUT_FOLDER, onlybc = getFileMeta(final)
Expand All @@ -539,7 +568,6 @@ def main(f, redisports):
if not os.path.exists(OUT_FOLDER):
os.mkdir(OUT_FOLDER)

print(json.dumps(result, indent=4))


if __name__ == "__main__":
Expand Down
10 changes: 5 additions & 5 deletions crow/crow/iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ def keysSubset(S):
if len(S) == 0:
return {}

for k in S.keys():
for v in S[k]:
d = dict(zip(S.keys(), [None]*len(S.keys())))
d[k] = v
yield d
#for k in S.keys():
# for v in S[k]:
# d = dict(zip(S.keys(), [None]*len(S.keys())))
# d[k] = v
# yield d

#print("Combinations")

Expand Down
24 changes: 19 additions & 5 deletions crow/crow/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ def __init__(self, debugToFile=False):
self.debugToFile = debugToFile
self.indent = 0

self.disabled = False

def disable(self):
self.disabled = True

def enable(self):
self.disabled = False

def enter(self):
self.indent += 1

Expand Down Expand Up @@ -58,31 +66,37 @@ def debug(self, file, message, std=None):
if std:
f.write(std.__str__())
else:
print("%s%s%s%s" % (self.getIndent(), bcolors.UNDERLINE, message, bcolors.ENDC))
if not self.disabled:
print("%s%s%s%s" % (self.getIndent(), bcolors.UNDERLINE, message, bcolors.ENDC))
sys.stdout.flush()

def error(self,file, message):
f = open(getlogfilename(file), 'a+')
f.write(message.__str__() + "\n")
print("%s%s%s%s" % (self.getIndent(), bcolors.FAIL, message, bcolors.ENDC))
if not self.disabled:
print("%s%s%s%s" % (self.getIndent(), bcolors.FAIL, message, bcolors.ENDC))
sys.stdout.flush()

def warning(self,file, message):
f = open(getlogfilename(file), 'a+')
f.write(message.__str__() + "\n")
print("%s%s%s%s" % (self.getIndent(), bcolors.WARNING, message, bcolors.ENDC))

if not self.disabled:
print("%s%s%s%s" % (self.getIndent(), bcolors.WARNING, message, bcolors.ENDC))
sys.stdout.flush()

def info(self,file, message):
f = open(getlogfilename(file), 'a+')
f.write(message.__str__() + "\n")
print("%s%s%s%s" % (self.getIndent(), bcolors.OKBLUE, message, bcolors.ENDC))
if not self.disabled:
print("%s%s%s%s" % (self.getIndent(), bcolors.OKBLUE, message, bcolors.ENDC))
sys.stdout.flush()

def success(self,file, message):
f = open(getlogfilename(file), 'a+')
f.write(message.__str__() + "\n")
print("%s%s%s%s" % (self.getIndent(), bcolors.OKGREEN, message, bcolors.ENDC))
if not self.disabled:
print("%s%s%s%s" % (self.getIndent(), bcolors.OKGREEN, message, bcolors.ENDC))
sys.stdout.flush()


Expand Down
19 changes: 10 additions & 9 deletions crow/crow/settings/config.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[DEFAULT]
slumpspath = /slumps
slumpspath = /Users/javierca/Documents/Develop/slumps
debugfile = ${slumpspath}/crow/slumps.debug.txt
outfolder = /slumps/crow/out
outfolder = /Users/javierca/Documents/Develop/slumps/crow_out
print-sha = True
prune-equal = False
exit-on-find = False
Expand All @@ -11,12 +11,13 @@ candidates-threshold = 1
thread-pool-size = 1
fail-silently = True
upper-bound = 8000
timeout = 3600
exploration-timeout = 2000
timeout = 0
exploration-timeout = 10
generate-bc-only = False
workers = 8
workers = 4
include = -I/inputs
order = 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19
order = 14,15
show-generation-progress = True

[clang]
path = ${DEFAULT:slumpspath}/souper/third_party/llvm-Release-install/bin/clang
Expand Down Expand Up @@ -49,9 +50,9 @@ path = ${DEFAULT:slumpspath}/souper/build
souper = ${path}/souper
check = ${path}/souper-check
solver = -z3-path=${slumpspath}/souper/third_party/z3/build/z3
passname = libsouperPass.so
passname = libsouperPass.dylib
souper-debug-level = 1
workers = 30
workers = 4
socket_port = 100
souper-common = --solver-timeout=0
souper-level-1 = --souper-infer-inst
Expand All @@ -75,5 +76,5 @@ souper-level-18 = --souper-synthesis-const-with-cegis --souper-synthesis-ignore-
souper-level-19 = --souper-infer-inst=false --souper-max-lhs-cands=50 --souper-enumerative-synthesis-ignore-cost --souper-enumerative-synthesis-max-instructions=50
load-opt = -load ${DEFAULT:slumpspath}/souper/build/${passName} -souper
list-candidates = ${load-opt} ${souper-common} --souper-valid-count --souper-debug-level=${souper-debug-level} %s %s
super-opt-pass = ${load-opt} --solver-timeout=1 %s --souper-debug-level=${souper-debug-level} %s
super-opt-pass = ${load-opt} --souper-no-infer %s --souper-debug-level=${souper-debug-level} %s

7 changes: 4 additions & 3 deletions crow/crow/socket_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ def listen(port, q, program):

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
print(f"Listening...{port}")

LOGGER.info(program,f"Listening...{port}")

s.listen()
conn, addr = s.accept()
with conn:
print('Connected by', addr)
LOGGER.success(program,f'Connected by {addr}')
while True:
data = conn.recv(1024<<6)
data = data.replace('\\'.encode(), '\\\\'.encode()).replace('\n'.encode(), '\\n'.encode())
Expand All @@ -36,7 +37,7 @@ def listen(port, q, program):
q.put([k, v])
LOGGER.success(program,f"Populating results...{len(result.keys())} keys")
s = np.prod([len(t) for t in result.values()])
LOGGER.success(program,f"Populating results...{s} tentative replacements")
LOGGER.success(program,f"Populating results...{s} temptative replacements")
except Exception as e:
print(st)
if not data:
Expand Down
2 changes: 1 addition & 1 deletion crow/crow/stages.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def __call__(self, args=[], std=None): # f -> inputs

def processInner(self, std, err):
# return the std output optimized LLVM IR
print(std, err)
#print(std, err)
return std


Expand Down

0 comments on commit d50e7a1

Please sign in to comment.