-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathrun_ppo_user.py
109 lines (83 loc) · 2.8 KB
/
run_ppo_user.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import torch
from montecarlo.node import Node
from montecarlo.montecarlo import MonteCarlo
from lang import score_func, can_be_solution
from prompts import prompt, expansion_count, min_lines, check_func
from common import limit_depth, max_completion_depth
import ppo
from cmdline import args
n_iter = args.n_iter
# n_iter = 3
FAIL = object()
calls_to_generate = 0
class GenNode:
def __init__(self, text, gens):
self.text = text
self.gens = gens
def reinforce(gens, reward):
rewards = [torch.tensor(reward)]
for query_tensors, response_tensors in gens:
ppo.trainer_step(query_tensors, response_tensors, rewards)
def generate_complete(text, montecarlo, gens, current_completion_depth=1):
if current_completion_depth >= max_completion_depth:
return None
global calls_to_generate
calls_to_generate += 1
if calls_to_generate > 100:
montecarlo.solution = FAIL
prev = text
(text, gen) = ppo.generate(text)
gens.append(gen)
score = score_func(text)
if score is not None:
if score < 0:
return None
else:
node = Node(GenNode(text, gens))
if can_be_solution(text, min_lines, check_func):
montecarlo.solution = node
inp = input("Reinforce it? [y/n/f/...]")
inp = inp.lower()
if inp.startswith("n"):
reinforce(gens, -1.0)
return generate_complete(prev, montecarlo, [])
elif inp.startswith("y"):
reinforce(gens, +1.0)
elif inp.startswith("f"):
montecarlo.solution = FAIL
return node
else:
return generate_complete(text, montecarlo, gens, current_completion_depth + 1)
def child_finder(node, montecarlo):
if limit_depth(node, lambda state: state.text):
return
child = generate_complete(node.state.text, montecarlo, [])
if child is None:
node.update_win_value(-1)
else:
node.add_child(child)
child.update_win_value(1)
child.update_policy_value(1)
retry_child = Node(GenNode(node.state.text, []))
node.add_child(retry_child)
retry_child.update_policy_value(0.2)
def main_iter():
montecarlo = MonteCarlo(Node(GenNode(prompt, [])))
montecarlo.child_finder = child_finder
montecarlo.simulate(expansion_count)
if montecarlo.solution and montecarlo.solution != FAIL:
print("CHOSEN SOLUTION")
print(montecarlo.solution.state.text)
node = montecarlo.solution
while node:
reinforce(node.state.gens, 10.0)
node = node.parent
else:
print("FAILED RUN")
ppo.save()
def main():
for i in range(0, n_iter):
print("ITERATION", i)
main_iter()
if __name__ == "__main__":
main()