Skip to content

Commit

Permalink
Coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
xeroc committed Aug 20, 2018
1 parent 95627b1 commit 246339c
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 40 deletions.
2 changes: 1 addition & 1 deletion graphenebase/aes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
try:
try: # pragma: no cover
from Cryptodome.Cipher import AES
from Cryptodome import Random
except ImportError: # pragma: no cover
Expand Down
35 changes: 26 additions & 9 deletions graphenebase/base58.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from binascii import hexlify, unhexlify
import hashlib
import sys
# import sys
import string
import logging

from binascii import hexlify, unhexlify
from .utils import _bytes

log = logging.getLogger(__name__)

""" Default Prefix """
Expand All @@ -23,20 +26,22 @@ class Base58(object):
"""Base58 base class
This class serves as an abstraction layer to deal with base58 encoded
strings and their corresponding hex and binary representation throughout the
library.
strings and their corresponding hex and binary representation throughout
the library.
:param data: Data to initialize object, e.g. pubkey data, address data, ...
:type data: hex, wif, bip38 encrypted wif, base58 string
:param str prefix: Prefix to use for Address/PubKey strings (defaults to ``GPH``)
:param str prefix: Prefix to use for Address/PubKey strings (defaults to
``GPH``)
:return: Base58 object initialized with ``data``
:rtype: Base58
:raises ValueError: if data cannot be decoded
* ``bytes(Base58)``: Returns the raw data
* ``str(Base58)``: Returns the readable ``Base58CheckEncoded`` data.
* ``repr(Base58)``: Gives the hex representation of the data.
* ``format(Base58,_format)`` Formats the instance according to ``_format``:
* ``format(Base58,_format)`` Formats the instance according to
``_format``:
* ``"wif"``: prefixed with ``0x00``. Yields a valid wif key
* ``"bts"``: prefixed with ``BTS``
* etc.
Expand All @@ -50,7 +55,10 @@ def __init__(self, data, prefix=PREFIX):
self._hex = data
elif data[0] == "5" or data[0] == "6":
self._hex = base58CheckDecode(data)
elif data[0] == "K" or data[0] == "L":
elif data[0] == "K" or data[0] == "L": # pragma: no cover
raise NotImplementedError(
"Private Keys starting with L or K are not supported!"
)
self._hex = base58CheckDecode(data)[:-2]
elif data[:len(self._prefix)] == self._prefix:
self._hex = gphBase58CheckDecode(data[len(self._prefix):])
Expand All @@ -74,7 +82,10 @@ def __format__(self, _format):
elif _format.upper() in known_prefixes:
return _format.upper() + str(self)
else:
log.warn("Format %s unkown. You've been warned!\n" % _format)
log.warning(
"Format {} unkown. You've been warned!\n".format(
_format
))
return _format.upper() + str(self)

def __repr__(self):
Expand Down Expand Up @@ -108,10 +119,13 @@ def __bytes__(self):


def base58decode(base58_str):
"""
if sys.version > '3':
base58_text = bytes(base58_str, "ascii")
else:
else: # pragma: no cover
base58_text = base58_str.encode("ascii")
"""
base58_text = _bytes(base58_str)
n = 0
leading_zeroes_count = 0
for b in base58_text:
Expand All @@ -129,10 +143,13 @@ def base58decode(base58_str):


def base58encode(hexstring):
"""
if sys.version > '3':
byteseq = bytes(unhexlify(bytes(hexstring, 'ascii')))
else:
byteseq = bytearray(unhexlify(hexstring.decode("ascii")))
"""
byteseq = unhexlify(_bytes(hexstring))
n = 0
leading_zeroes_count = 0
for c in byteseq:
Expand Down
22 changes: 15 additions & 7 deletions graphenebase/bip38.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import sys
# import sys
import logging
import hashlib
from binascii import hexlify, unhexlify
from .account import PrivateKey
from .base58 import Base58, base58decode
from .utils import _bytes

log = logging.getLogger(__name__)

try:
Expand All @@ -15,11 +17,11 @@
raise ImportError("Missing dependency: pyCryptodome")

SCRYPT_MODULE = None
if not SCRYPT_MODULE:
if not SCRYPT_MODULE: # pragma: no branch
try:
import scrypt
SCRYPT_MODULE = "scrypt"
except ImportError: # pragma: no cover
except ImportError:
try:
import pylibscrypt as scrypt
SCRYPT_MODULE = "pylibscrypt"
Expand Down Expand Up @@ -58,14 +60,17 @@ def encrypt(privkey, passphrase):

privkeyhex = repr(privkey) # hex
addr = format(privkey.bitcoin.address, "BTC")
"""
if sys.version > '3':
a = bytes(addr, 'ascii')
else:
a = bytes(addr).encode('ascii')
"""
a = _bytes(addr)
salt = hashlib.sha256(hashlib.sha256(a).digest()).digest()[0:4]
if SCRYPT_MODULE == "scrypt":
if SCRYPT_MODULE == "scrypt": # pragma: no branch
key = scrypt.hash(passphrase, salt, 16384, 8, 8)
elif SCRYPT_MODULE == "pylibscrypt":
elif SCRYPT_MODULE == "pylibscrypt": # pragma: no branch
key = scrypt.scrypt(bytes(passphrase, "utf-8"), salt, 16384, 8, 8)
else: # pragma: no cover
raise ValueError("No scrypt module loaded")
Expand Down Expand Up @@ -101,9 +106,9 @@ def decrypt(encrypted_privkey, passphrase):
assert flagbyte == b'\xc0', "Flagbyte has to be 0xc0"
salt = d[0:4]
d = d[4:-4]
if SCRYPT_MODULE == "scrypt":
if SCRYPT_MODULE == "scrypt": # pragma: no branch
key = scrypt.hash(passphrase, salt, 16384, 8, 8)
elif SCRYPT_MODULE == "pylibscrypt":
elif SCRYPT_MODULE == "pylibscrypt": # pragma: no branch
key = scrypt.scrypt(bytes(passphrase, "utf-8"), salt, 16384, 8, 8)
else:
raise ValueError("No scrypt module loaded")
Expand All @@ -121,10 +126,13 @@ def decrypt(encrypted_privkey, passphrase):
""" Verify Salt """
privkey = PrivateKey(format(wif, "wif"))
addr = format(privkey.bitcoin.address, "BTC")
"""
if sys.version > '3':
a = bytes(addr, 'ascii')
else:
a = bytes(addr).encode('ascii')
"""
a = _bytes(addr)
saltverify = hashlib.sha256(hashlib.sha256(a).digest()).digest()[0:4]
if saltverify != salt: # pragma: no cover
raise SaltException(
Expand Down
41 changes: 26 additions & 15 deletions graphenebase/ecdsa.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
from __future__ import absolute_import

import sys
# import sys
import time
import ecdsa
import hashlib
from binascii import hexlify
import struct
import logging

from binascii import hexlify

from .account import PrivateKey, PublicKey
from .utils import _bytes
log = logging.getLogger(__name__)

SECP256K1_MODULE = None
SECP256K1_AVAILABLE = False
CRYPTOGRAPHY_AVAILABLE = False
GMPY2_MODULE = False
if not SECP256K1_MODULE:
if not SECP256K1_MODULE: # pragma: no branch
try:
import secp256k1
SECP256K1_MODULE = "secp256k1"
Expand All @@ -27,7 +30,7 @@
except ImportError:
SECP256K1_MODULE = "ecdsa"

try:
try: # pragma: no branch
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
Expand Down Expand Up @@ -61,10 +64,13 @@ def compressedPubkey(pk):
x = p.x()
y = p.y()
x_str = ecdsa.util.number_to_string(x, order)
"""
if sys.version > '3':
return bytes(chr(2 + (y & 1)), 'ascii') + x_str
else:
return bytes(chr(2 + (y & 1))).encode("ascii") + x_str
"""
return _bytes(chr(2 + (y & 1))) + x_str


def recover_public_key(digest, signature, i, message=None):
Expand Down Expand Up @@ -100,9 +106,9 @@ def recover_public_key(digest, signature, i, message=None):
return public_key
else:
# Not strictly necessary, but let's verify the message for paranoia's sake.
if not ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1).verify_digest(signature, digest, sigdecode=ecdsa.util.sigdecode_string):
if not ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1).verify_digest(signature, digest, sigdecode=ecdsa.util.sigdecode_string): # pragma: no cover
return None # pragma: no cover
return ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1)
return ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1) # pragma: no cover


def recoverPubkeyParameter(message, digest, signature, pubkey):
Expand All @@ -127,14 +133,13 @@ def recoverPubkeyParameter(message, digest, signature, pubkey):
p = recover_public_key(digest, signature, i)
p_comp = hexlify(compressedPubkey(p))
p_string = hexlify(p.to_string())
if isinstance(pubkey, PublicKey):
pubkey_string = bytes(repr(pubkey), 'latin')
else:
if isinstance(pubkey, PublicKey): # pragma: no cover
pubkey_string = bytes(repr(pubkey), 'ascii')
else: # pragma: no cover
pubkey_string = hexlify(pubkey.to_string())
if (p_string == pubkey_string or
p_comp == pubkey_string):
p_comp == pubkey_string): # pragma: no cover
return i
return None


def sign_message(message, wif, hashfn=hashlib.sha256):
Expand All @@ -148,10 +153,13 @@ def sign_message(message, wif, hashfn=hashlib.sha256):

digest = hashfn(message).digest()
priv_key = PrivateKey(wif)
"""
if sys.version > '3':
p = bytes(priv_key)
else:
p = bytes(priv_key.__bytes__())
"""
p = bytes(priv_key)

if SECP256K1_MODULE == "secp256k1":
ndata = secp256k1.ffi.new("const int *ndata")
Expand Down Expand Up @@ -206,12 +214,12 @@ def sign_message(message, wif, hashfn=hashlib.sha256):
i += 4 # compressed
i += 27 # compact
break
else:
else: # pragma: no branch
cnt = 0
sk = ecdsa.SigningKey.from_string(p, curve=ecdsa.SECP256k1)
while 1:
cnt += 1
if not cnt % 20:
if not cnt % 20: # pragma: no branch
log.info("Still searching for a canonical signature. Tried %d times already!" % cnt)

# Deterministic k
Expand Down Expand Up @@ -262,7 +270,7 @@ def sign_message(message, wif, hashfn=hashlib.sha256):
def verify_message(message, signature, hashfn=hashlib.sha256):
if not isinstance(message, bytes):
message = bytes(message, "utf-8")
if not isinstance(signature, bytes):
if not isinstance(signature, bytes): # pragma: no cover
signature = bytes(signature, "utf-8")
if not isinstance(message, bytes):
raise AssertionError()
Expand Down Expand Up @@ -292,7 +300,7 @@ def verify_message(message, signature, hashfn=hashlib.sha256):
sigder = encode_dss_signature(r, s)
p.verify(sigder, message, ec.ECDSA(hashes.SHA256()))
phex = compressedPubkey(p)
else:
else: # pragma: no branch
p = recover_public_key(digest, sig, recoverParameter)
# Will throw an exception of not valid
p.verify_digest(
Expand All @@ -308,10 +316,13 @@ def verify_message(message, signature, hashfn=hashlib.sha256):
def pointToPubkey(x, y, order=None):
order = order or ecdsa.SECP256k1.order
x_str = ecdsa.util.number_to_string(x, order)
"""
if sys.version > '3':
return bytes(chr(2 + (y & 1)), 'ascii') + x_str
else:
return bytes(chr(2 + (y & 1))).encode("ascii") + x_str
"""
return _bytes(chr(2 + (y & 1))) + x_str # pragma: no cover


def tweakaddPubkey(pk, digest256, SECP256K1_MODULE=SECP256K1_MODULE):
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ exclude_lines =
if 0:
if __name__ == .__main__.:
if sys.version > '3':
SECP256K1_MODULE

[flake8]
ignore = E501,F401
Expand Down
53 changes: 45 additions & 8 deletions tests/test_account.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import ecdsa
import unittest
import hashlib
from graphenebase.base58 import Base58
from graphenebase.account import BrainKey, Address, PublicKey, PrivateKey, PasswordKey, GrapheneAddress, BitcoinAddress

Expand Down Expand Up @@ -242,14 +244,15 @@ def test_btcprivkeystr(self):
])

def test_gphprivkeystr(self):
self.assertEqual([str(Address.from_pubkey(PrivateKey("5HvVz6XMx84aC5KaaBbwYrRLvWE46cH6zVnv4827SBPLorg76oq").pubkey)),
str(Address.from_pubkey(PrivateKey("5Jete5oFNjjk3aUMkKuxgAXsp7ZyhgJbYNiNjHLvq5xzXkiqw7R").pubkey)),
str(Address.from_pubkey(PrivateKey("5KDT58ksNsVKjYShG4Ls5ZtredybSxzmKec8juj7CojZj6LPRF7").pubkey, compressed=False, prefix="BTS")),
], [
'GPHBXqRucGm7nRkk6jm7BNspTJTWRtNcx7k5',
'GPH5tTDDR6M3mkcyVv16edsw8dGUyNQZrvKU',
'BTS4XPkBqYw882fH5aR5S8mMKXCaZ1yVA76f'
])
self.assertEqual([
str(Address.from_pubkey(PrivateKey("5HvVz6XMx84aC5KaaBbwYrRLvWE46cH6zVnv4827SBPLorg76oq").pubkey)),
str(Address.from_pubkey(PrivateKey("5Jete5oFNjjk3aUMkKuxgAXsp7ZyhgJbYNiNjHLvq5xzXkiqw7R").pubkey)),
str(Address.from_pubkey(PrivateKey("5KDT58ksNsVKjYShG4Ls5ZtredybSxzmKec8juj7CojZj6LPRF7").pubkey, compressed=False, prefix="BTS")),
], [
'GPHBXqRucGm7nRkk6jm7BNspTJTWRtNcx7k5',
'GPH5tTDDR6M3mkcyVv16edsw8dGUyNQZrvKU',
'BTS4XPkBqYw882fH5aR5S8mMKXCaZ1yVA76f'
])

def test_password_suggest(self):
self.assertEqual(
Expand All @@ -262,6 +265,26 @@ def test_child(self):
self.assertIsInstance(p2, PrivateKey)
self.assertEqual(str(p2), "5JQ6AQmjpbEZjJBLnoa3BaWa9y3LDTUBeSDwEGQD2UjYkb1gY2x")

def test_child_pub(self):
p = PrivateKey("5JWcdkhL3w4RkVPcZMdJsjos22yB5cSkPExerktvKnRNZR5gx1S")
pub = p.pubkey
point = pub.point()
self.assertEqual([
ecdsa.util.number_to_string(point.x(), ecdsa.SECP256k1.order),
ecdsa.util.number_to_string(point.y(), ecdsa.SECP256k1.order)
], [
b"\x90d5\xf6\xf9\xceo=NL\xf8\xd3\xd0\xdd\xce \x9a\x83'w8\xff\xdc~\xaec\x08\xf4\xed)c\xdf",
b'\r\xa8tl\xf1:a\x89\xa2\x81\x96\\X\x0fBA]\x86\xe9l#*\x89%\xea\x152T\xbb\x87\x9f`'
])
self.assertEqual(
repr(pub.child(b"foobar")),
"022a42ae1e9af8f84544c9a1970308c31f864dfb4f5998c45ef76e11d307cc77d5"
)
self.assertEqual(
repr(pub.add(hashlib.sha256(b"Foobar").digest())),
"0354a2c06c398990c52933df0c93b9904a4b23b0e2d524a3b0075c72adaa4e459e"
)

def test_BrainKey_sequence(self):
b = BrainKey("COLORER BICORN KASBEKE FAERIE LOCHIA GOMUTI SOVKHOZ Y GERMAL AUNTIE PERFUMY TIME FEATURE GANGAN CELEMIN MATZO")
self.assertEqual(str(next(b).get_private_key()), "5Hsbn6kXio4bb7eW5bX7kTp2sdkmbzP8kGWoau46Cf7en7T1RRE")
Expand All @@ -275,3 +298,17 @@ def test_new_BrainKey(self):
w = BrainKey().get_private_key()
self.assertIsInstance(w, PrivateKey)
self.assertEqual(str(w)[0], "5") # is a wif key that starts with 5

def test_derive_private_key(self):
p = PrivateKey("5JWcdkhL3w4RkVPcZMdJsjos22yB5cSkPExerktvKnRNZR5gx1S")
p2 = p.derive_private_key(10)
self.assertEqual(
repr(p2),
"2dc7cb99933132e25b37710f9ea806228b04a583da11a137ef97fd42c0007390"
)

def test_init_wrong_format(self):
with self.assertRaises(NotImplementedError):
PrivateKey("KJWcdkhL3w4RkVPcZMdJsjos22yB5cSkPExerktvKnRNZR5gx1S")
with self.assertRaises(NotImplementedError):
PrivateKey("LJWcdkhL3w4RkVPcZMdJsjos22yB5cSkPExerktvKnRNZR5gx1S")
Loading

0 comments on commit 246339c

Please sign in to comment.