-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathledger.py
139 lines (117 loc) · 5.65 KB
/
ledger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# Don't trust me with cryptography.
"""
Implementation of https://gist.github.com/phyro/935badc682057f418842c72961cf096c
"""
import hashlib
from secp import PrivateKey, PublicKey
import b_dhke
class Ledger:
def __init__(self, secret_key):
self.master_key = secret_key
self.used_proofs = set() # no promise proofs have been used
self.keys = self._derive_keys(self.master_key)
@staticmethod
def _derive_keys(master_key):
"""Deterministic derivation of keys for 2^n values."""
return {
2**i: PrivateKey(hashlib.sha256((str(master_key) + str(i)).encode("utf-8")).hexdigest().encode("utf-8")[:32], raw=True)
for i in range(20)
}
def _generate_promises(self, amounts, B_s):
"""Generates promises that sum to the given amount."""
return [
self._generate_promise(amount, PublicKey(bytes.fromhex(B_), raw=True))
for (amount, B_) in zip(amounts, B_s)
]
def _generate_promise(self, amount, B_):
"""Generates a promise for given amount and returns a pair (amount, C')."""
secret_key = self.keys[amount] # Get the correct key
C_ = b_dhke.step2_alice(B_, secret_key)
return {"amount": amount, "C'": C_.serialize().hex()}
def _verify_proof(self, proof):
"""Verifies that the proof of promise was issued by this ledger."""
if proof["secret_msg"] in self.used_proofs:
raise Exception("Already spent. Secret msg:{}".format(proof["secret_msg"]))
secret_key = self.keys[proof["amount"]] # Get the correct key to check against
C = PublicKey(bytes.fromhex(proof["C"]), raw=True)
return b_dhke.verify(secret_key, C, proof["secret_msg"])
def _verify_outputs(self, total, amount, output_data):
"""Verifies the expected split was correctly computed"""
fst_amt, snd_amt = total-amount, amount # we have two amounts to split to
fst_outputs = self._get_output_split(fst_amt)
snd_outputs = self._get_output_split(snd_amt)
expected = fst_outputs + snd_outputs
given = [o["amount"] for o in output_data]
return given == expected
def _verify_no_duplicates(self, proofs, output_data):
secret_msgs = [p["secret_msg"] for p in proofs]
if len(secret_msgs) != len(list(set(secret_msgs))):
return False
# TODO: think whether this can be an inflation vector somehow
B_xs = [od["B'"] for od in output_data]
if len(B_xs) != len(list(set(B_xs))):
return False
return True
def _verify_split_amount(self, amount):
"""Split amount like output amount can't be negative or too big."""
try:
self._verify_amount(amount)
except:
# For better error message
raise Exception("Invalid split amount: " + str(amount))
@staticmethod
def _verify_amount(amount):
"""Any amount used should be a positive integer not larger than 2^32."""
valid = isinstance(amount, int) and amount > 0 and amount < 2**32
if not valid:
raise Exception("Invalid amount: " + str(amount))
return amount
def _verify_equation_balanced(self, proofs, outs):
"""Verify that Σoutputs - Σinputs = 0."""
sum_inputs = sum(self._verify_amount(p["amount"]) for p in proofs)
sum_outputs = sum(self._verify_amount(p["amount"]) for p in outs)
assert sum_outputs - sum_inputs == 0
def _get_output_split(self, amount):
"""Given an amount returns a list of amounts returned e.g. 13 is [1, 4, 8]."""
self._verify_amount(amount)
bits_amt = bin(amount)[::-1][:-2]
rv = []
for (pos, bit) in enumerate(bits_amt):
if bit == "1":
rv.append(2**pos)
return rv
# Public methods
def get_pubkeys(self):
"""Returns public keys for possible amounts."""
return {
amt: self.keys[amt].pubkey.serialize().hex()
for amt in [2**i for i in range(20)]
}
def mint(self, B_):
"""Mints a promise for 64 coins for B_."""
# NOTE: This could be implemented that a mint requires a rare pow
return self._generate_promise(64, B_)
def split(self, proofs, amount, output_data):
"""Consumes proofs and prepares new promises based on the amount split."""
self._verify_split_amount(amount)
# Verify proofs are valid
if not all([self._verify_proof(p) for p in proofs]):
return False
total = sum([p["amount"] for p in proofs])
if not self._verify_no_duplicates(proofs, output_data):
raise Exception("Duplicate proofs or promises.")
if amount > total:
raise Exception("Split amount is higher than the total sum")
if not self._verify_outputs(total, amount, output_data):
raise Exception("Split of promises is not as expected.")
# Perform split
proof_msgs = set([p["secret_msg"] for p in proofs])
# Mark proofs as used and prepare new promises
self.used_proofs |= proof_msgs
outs_fst = self._get_output_split(total-amount)
outs_snd = self._get_output_split(amount)
B_fst = [od["B'"] for od in output_data[:len(outs_fst)]]
B_snd = [od["B'"] for od in output_data[len(outs_fst):]]
prom_fst, prom_snd = self._generate_promises(outs_fst, B_fst), self._generate_promises(outs_snd, B_snd)
self._verify_equation_balanced(proofs, prom_fst + prom_snd)
return prom_fst, prom_snd