-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathproducer.py
176 lines (161 loc) · 6.26 KB
/
producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from typing import List
import logging
import boto3
import json
import argparse
import time
from os import environ, chdir
from .cmd import cmd
from .runner import get_functions
def send_message(args: dict) -> str:
msg = json.dumps(args.send_message)
logging.info(msg)
if args.local:
with open('queue.json', 'a+') as f:
f.seek(0)
f.close()
with open('queue.json', 'r+') as f:
content = f.read()
if content != '':
queue = json.loads(content)
else:
queue = []
queue.append(msg)
# print(queue)
f.seek(0)
f.write(json.dumps(queue))
f.close()
message_id = str(len(queue))
else:
sqs = boto3.client('sqs', region_name='us-east-1')
response = sqs.get_queue_url(QueueName=environ['SQS_QUEUE_NAME'])
queue_url = response['QueueUrl']
response = sqs.send_message(
QueueUrl=queue_url,
DelaySeconds=10,
MessageBody=msg
)
message_id = response['MessageId']
logging.info(message_id)
return message_id
def full_benchmark_abdk_math_64x64(args: dict) -> List[str]:
version = time.strftime("%s")
# tools = ['halmos', 'foundry', 'echidna', 'medusa']
tools = ['halmos', 'foundry']
projects = ['abdk-math-64x64']
ans = []
for project in projects:
chdir('projects/{}'.format(project))
functions = get_functions()
filter_functions = ['test_abs_multiplicativeness', 'test_exp_inverse', 'test_exp_negative_exponent', 'test_exp_2_inverse', 'test_exp_2_negative_exponent', 'test_inv_double_inverse', 'test_inv_division_noncommutativity',
'test_inv_multiplication', 'test_inv_identity', 'test_mul_associative', 'test_mul_distributive', 'test_mul_values', 'test_sqrt_inverse_mul', 'test_sqrt_inverse_pow', 'test_sqrt_distributive']
functions = [f for f in functions if f not in filter_functions]
all_mutants = get_all_mutants()
for tool in tools:
for test in functions:
mutants = get_mutants_by_test(all_mutants, test)
for mutant in mutants:
preprocess = 'forge clean && git apply {}.patch --allow-empty && cd lib/abdk-libraries-solidity && git apply ../../mutants/{}.patch && git diff && cd ../../'.format(
tool, mutant)
postprocess = 'git checkout . && cd lib/abdk-libraries-solidity && git checkout . && cd ../../'
msg = {
"tool": tool,
"project": project,
'preprocess': preprocess,
'postprocess': postprocess,
'extra_args': '',
'contract': '',
"test": test,
"timeout": 3600,
"mutant": mutant,
"prefix": "{}-".format(version)
}
print(args)
ns = argparse.Namespace()
ns.send_message = msg
ns.local = args.local
message_id = send_message(ns)
ans.append(message_id)
chdir('../..')
print('{} benchmark jobs created'.format(len(ans)))
return ans
def full_benchmark_dai_certora(args: dict) -> List[str]:
version = time.strftime("%s")
ans = []
tools = ['halmos', 'foundry', 'echidna', 'medusa']
loops = ['2', '3', '4']
for tool in tools:
i = 0
for loop in loops:
if tool != 'halmos' and i > 0:
continue
i += 1
extra_args = tool_cmd(tool, loop)
msg = {
"tool": tool,
"project": 'dai-certora',
'preprocess': '',
'postprocess': '',
'contract': '',
'extra_args': extra_args,
"test": 'check_minivat_n_full_symbolic',
"timeout": 3600,
"mutant": '',
"prefix": "{}-".format(version)
}
print(args)
ns = argparse.Namespace()
ns.send_message = msg
ns.local = args.local
message_id = send_message(ns)
ans.append(message_id)
print('{} benchmark jobs created'.format(len(ans)))
return ans
def full_benchmark(args: dict) -> List[str]:
version = time.strftime("%s")
ans = []
tool = 'foundry'
extra_args = "forge test --match-test test_minivat_counterexample --match-contract MiniVatTest --fuzz-runs 1000000000"
msg = {
"tool": tool,
"project": 'dai-certora',
'preprocess': '',
'postprocess': '',
'contract': 'MiniVatTest',
'extra_args': extra_args,
"test": 'test_minivat_counterexample',
"timeout": 3600,
"mutant": '',
"prefix": "{}-".format(version)
}
print(args)
ns = argparse.Namespace()
ns.send_message = msg
ns.local = args.local
message_id = send_message(ns)
ans.append(message_id)
print('{} benchmark jobs created'.format(len(ans)))
return ans
def get_all_mutants() -> List[str]:
status, stdout, stderr = cmd(
"find mutants | sed 's/mutants\/\(.*\)\-.*/\\1/'")
return stdout.split('\n')
def tool_cmd(tool: str, loop: str) -> str:
if tool == 'halmos':
return 'halmos --function check_minivat_n_full_symbolic --contract HalmosTester -vvv --solver-parallel --solver-timeout-assertion 0 --loop {}'.format(loop)
elif tool == 'foundry':
return 'forge test --match-contract FoundryTester'
elif tool == 'medusa':
return 'medusa fuzz --no-color'
elif tool == 'echidna':
return 'echidna . --contract CryticTester --config config.yaml'
else:
raise Exception("Unknown tool {}".format(tool))
def get_mutants_by_test(all_mutants: List[str], test: str) -> List[str]:
mutants_by_test = []
for mutant in all_mutants:
if mutant in test:
status, stdout, stderr = cmd(
"find mutants | grep '\\b{}\\b' | sed 's/mutants\/\(.*\)\.patch/\\1/'".format(mutant))
mutants_by_test += stdout.split('\n')
return list(set(mutants_by_test))