diff --git a/graphenebase/aes.py b/graphenebase/aes.py index cf02f7c3..7a7357e4 100644 --- a/graphenebase/aes.py +++ b/graphenebase/aes.py @@ -1,4 +1,4 @@ -try: +try: # pragma: no cover from Cryptodome.Cipher import AES from Cryptodome import Random except ImportError: # pragma: no cover diff --git a/graphenebase/base58.py b/graphenebase/base58.py index efd3137d..2681deaa 100644 --- a/graphenebase/base58.py +++ b/graphenebase/base58.py @@ -1,8 +1,11 @@ -from binascii import hexlify, unhexlify import hashlib -import sys +# import sys import string import logging + +from binascii import hexlify, unhexlify +from .utils import _bytes + log = logging.getLogger(__name__) """ Default Prefix """ @@ -23,12 +26,13 @@ class Base58(object): """Base58 base class This class serves as an abstraction layer to deal with base58 encoded - strings and their corresponding hex and binary representation throughout the - library. + strings and their corresponding hex and binary representation throughout + the library. :param data: Data to initialize object, e.g. pubkey data, address data, ... :type data: hex, wif, bip38 encrypted wif, base58 string - :param str prefix: Prefix to use for Address/PubKey strings (defaults to ``GPH``) + :param str prefix: Prefix to use for Address/PubKey strings (defaults to + ``GPH``) :return: Base58 object initialized with ``data`` :rtype: Base58 :raises ValueError: if data cannot be decoded @@ -36,7 +40,8 @@ class Base58(object): * ``bytes(Base58)``: Returns the raw data * ``str(Base58)``: Returns the readable ``Base58CheckEncoded`` data. * ``repr(Base58)``: Gives the hex representation of the data. - * ``format(Base58,_format)`` Formats the instance according to ``_format``: + * ``format(Base58,_format)`` Formats the instance according to + ``_format``: * ``"wif"``: prefixed with ``0x00``. Yields a valid wif key * ``"bts"``: prefixed with ``BTS`` * etc. @@ -50,7 +55,10 @@ def __init__(self, data, prefix=PREFIX): self._hex = data elif data[0] == "5" or data[0] == "6": self._hex = base58CheckDecode(data) - elif data[0] == "K" or data[0] == "L": + elif data[0] == "K" or data[0] == "L": # pragma: no cover + raise NotImplementedError( + "Private Keys starting with L or K are not supported!" + ) self._hex = base58CheckDecode(data)[:-2] elif data[:len(self._prefix)] == self._prefix: self._hex = gphBase58CheckDecode(data[len(self._prefix):]) @@ -74,7 +82,10 @@ def __format__(self, _format): elif _format.upper() in known_prefixes: return _format.upper() + str(self) else: - log.warn("Format %s unkown. You've been warned!\n" % _format) + log.warning( + "Format {} unkown. You've been warned!\n".format( + _format + )) return _format.upper() + str(self) def __repr__(self): @@ -108,10 +119,13 @@ def __bytes__(self): def base58decode(base58_str): + """ if sys.version > '3': base58_text = bytes(base58_str, "ascii") - else: + else: # pragma: no cover base58_text = base58_str.encode("ascii") + """ + base58_text = _bytes(base58_str) n = 0 leading_zeroes_count = 0 for b in base58_text: @@ -129,10 +143,13 @@ def base58decode(base58_str): def base58encode(hexstring): + """ if sys.version > '3': byteseq = bytes(unhexlify(bytes(hexstring, 'ascii'))) else: byteseq = bytearray(unhexlify(hexstring.decode("ascii"))) + """ + byteseq = unhexlify(_bytes(hexstring)) n = 0 leading_zeroes_count = 0 for c in byteseq: diff --git a/graphenebase/bip38.py b/graphenebase/bip38.py index 07ff31da..5453277a 100644 --- a/graphenebase/bip38.py +++ b/graphenebase/bip38.py @@ -1,9 +1,11 @@ -import sys +# import sys import logging import hashlib from binascii import hexlify, unhexlify from .account import PrivateKey from .base58 import Base58, base58decode +from .utils import _bytes + log = logging.getLogger(__name__) try: @@ -15,11 +17,11 @@ raise ImportError("Missing dependency: pyCryptodome") SCRYPT_MODULE = None -if not SCRYPT_MODULE: +if not SCRYPT_MODULE: # pragma: no branch try: import scrypt SCRYPT_MODULE = "scrypt" - except ImportError: # pragma: no cover + except ImportError: try: import pylibscrypt as scrypt SCRYPT_MODULE = "pylibscrypt" @@ -58,14 +60,17 @@ def encrypt(privkey, passphrase): privkeyhex = repr(privkey) # hex addr = format(privkey.bitcoin.address, "BTC") + """ if sys.version > '3': a = bytes(addr, 'ascii') else: a = bytes(addr).encode('ascii') + """ + a = _bytes(addr) salt = hashlib.sha256(hashlib.sha256(a).digest()).digest()[0:4] - if SCRYPT_MODULE == "scrypt": + if SCRYPT_MODULE == "scrypt": # pragma: no branch key = scrypt.hash(passphrase, salt, 16384, 8, 8) - elif SCRYPT_MODULE == "pylibscrypt": + elif SCRYPT_MODULE == "pylibscrypt": # pragma: no branch key = scrypt.scrypt(bytes(passphrase, "utf-8"), salt, 16384, 8, 8) else: # pragma: no cover raise ValueError("No scrypt module loaded") @@ -101,9 +106,9 @@ def decrypt(encrypted_privkey, passphrase): assert flagbyte == b'\xc0', "Flagbyte has to be 0xc0" salt = d[0:4] d = d[4:-4] - if SCRYPT_MODULE == "scrypt": + if SCRYPT_MODULE == "scrypt": # pragma: no branch key = scrypt.hash(passphrase, salt, 16384, 8, 8) - elif SCRYPT_MODULE == "pylibscrypt": + elif SCRYPT_MODULE == "pylibscrypt": # pragma: no branch key = scrypt.scrypt(bytes(passphrase, "utf-8"), salt, 16384, 8, 8) else: raise ValueError("No scrypt module loaded") @@ -121,10 +126,13 @@ def decrypt(encrypted_privkey, passphrase): """ Verify Salt """ privkey = PrivateKey(format(wif, "wif")) addr = format(privkey.bitcoin.address, "BTC") + """ if sys.version > '3': a = bytes(addr, 'ascii') else: a = bytes(addr).encode('ascii') + """ + a = _bytes(addr) saltverify = hashlib.sha256(hashlib.sha256(a).digest()).digest()[0:4] if saltverify != salt: # pragma: no cover raise SaltException( diff --git a/graphenebase/ecdsa.py b/graphenebase/ecdsa.py index e281054a..dc8ebc25 100644 --- a/graphenebase/ecdsa.py +++ b/graphenebase/ecdsa.py @@ -1,20 +1,23 @@ from __future__ import absolute_import -import sys +# import sys import time import ecdsa import hashlib -from binascii import hexlify import struct import logging + +from binascii import hexlify + from .account import PrivateKey, PublicKey +from .utils import _bytes log = logging.getLogger(__name__) SECP256K1_MODULE = None SECP256K1_AVAILABLE = False CRYPTOGRAPHY_AVAILABLE = False GMPY2_MODULE = False -if not SECP256K1_MODULE: +if not SECP256K1_MODULE: # pragma: no branch try: import secp256k1 SECP256K1_MODULE = "secp256k1" @@ -27,7 +30,7 @@ except ImportError: SECP256K1_MODULE = "ecdsa" - try: + try: # pragma: no branch from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec @@ -61,10 +64,13 @@ def compressedPubkey(pk): x = p.x() y = p.y() x_str = ecdsa.util.number_to_string(x, order) + """ if sys.version > '3': return bytes(chr(2 + (y & 1)), 'ascii') + x_str else: return bytes(chr(2 + (y & 1))).encode("ascii") + x_str + """ + return _bytes(chr(2 + (y & 1))) + x_str def recover_public_key(digest, signature, i, message=None): @@ -100,9 +106,9 @@ def recover_public_key(digest, signature, i, message=None): return public_key else: # Not strictly necessary, but let's verify the message for paranoia's sake. - if not ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1).verify_digest(signature, digest, sigdecode=ecdsa.util.sigdecode_string): + if not ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1).verify_digest(signature, digest, sigdecode=ecdsa.util.sigdecode_string): # pragma: no cover return None # pragma: no cover - return ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1) + return ecdsa.VerifyingKey.from_public_point(Q, curve=ecdsa.SECP256k1) # pragma: no cover def recoverPubkeyParameter(message, digest, signature, pubkey): @@ -127,14 +133,13 @@ def recoverPubkeyParameter(message, digest, signature, pubkey): p = recover_public_key(digest, signature, i) p_comp = hexlify(compressedPubkey(p)) p_string = hexlify(p.to_string()) - if isinstance(pubkey, PublicKey): - pubkey_string = bytes(repr(pubkey), 'latin') - else: + if isinstance(pubkey, PublicKey): # pragma: no cover + pubkey_string = bytes(repr(pubkey), 'ascii') + else: # pragma: no cover pubkey_string = hexlify(pubkey.to_string()) if (p_string == pubkey_string or - p_comp == pubkey_string): + p_comp == pubkey_string): # pragma: no cover return i - return None def sign_message(message, wif, hashfn=hashlib.sha256): @@ -148,10 +153,13 @@ def sign_message(message, wif, hashfn=hashlib.sha256): digest = hashfn(message).digest() priv_key = PrivateKey(wif) + """ if sys.version > '3': p = bytes(priv_key) else: p = bytes(priv_key.__bytes__()) + """ + p = bytes(priv_key) if SECP256K1_MODULE == "secp256k1": ndata = secp256k1.ffi.new("const int *ndata") @@ -206,12 +214,12 @@ def sign_message(message, wif, hashfn=hashlib.sha256): i += 4 # compressed i += 27 # compact break - else: + else: # pragma: no branch cnt = 0 sk = ecdsa.SigningKey.from_string(p, curve=ecdsa.SECP256k1) while 1: cnt += 1 - if not cnt % 20: + if not cnt % 20: # pragma: no branch log.info("Still searching for a canonical signature. Tried %d times already!" % cnt) # Deterministic k @@ -262,7 +270,7 @@ def sign_message(message, wif, hashfn=hashlib.sha256): def verify_message(message, signature, hashfn=hashlib.sha256): if not isinstance(message, bytes): message = bytes(message, "utf-8") - if not isinstance(signature, bytes): + if not isinstance(signature, bytes): # pragma: no cover signature = bytes(signature, "utf-8") if not isinstance(message, bytes): raise AssertionError() @@ -292,7 +300,7 @@ def verify_message(message, signature, hashfn=hashlib.sha256): sigder = encode_dss_signature(r, s) p.verify(sigder, message, ec.ECDSA(hashes.SHA256())) phex = compressedPubkey(p) - else: + else: # pragma: no branch p = recover_public_key(digest, sig, recoverParameter) # Will throw an exception of not valid p.verify_digest( @@ -308,10 +316,13 @@ def verify_message(message, signature, hashfn=hashlib.sha256): def pointToPubkey(x, y, order=None): order = order or ecdsa.SECP256k1.order x_str = ecdsa.util.number_to_string(x, order) + """ if sys.version > '3': return bytes(chr(2 + (y & 1)), 'ascii') + x_str else: return bytes(chr(2 + (y & 1))).encode("ascii") + x_str + """ + return _bytes(chr(2 + (y & 1))) + x_str # pragma: no cover def tweakaddPubkey(pk, digest256, SECP256K1_MODULE=SECP256K1_MODULE): diff --git a/setup.cfg b/setup.cfg index b78d7ed4..f1315bf5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,7 @@ exclude_lines = if 0: if __name__ == .__main__.: if sys.version > '3': + SECP256K1_MODULE [flake8] ignore = E501,F401 diff --git a/tests/test_account.py b/tests/test_account.py index 6d95c4f5..dfaedc3d 100644 --- a/tests/test_account.py +++ b/tests/test_account.py @@ -1,4 +1,6 @@ +import ecdsa import unittest +import hashlib from graphenebase.base58 import Base58 from graphenebase.account import BrainKey, Address, PublicKey, PrivateKey, PasswordKey, GrapheneAddress, BitcoinAddress @@ -242,14 +244,15 @@ def test_btcprivkeystr(self): ]) def test_gphprivkeystr(self): - self.assertEqual([str(Address.from_pubkey(PrivateKey("5HvVz6XMx84aC5KaaBbwYrRLvWE46cH6zVnv4827SBPLorg76oq").pubkey)), - str(Address.from_pubkey(PrivateKey("5Jete5oFNjjk3aUMkKuxgAXsp7ZyhgJbYNiNjHLvq5xzXkiqw7R").pubkey)), - str(Address.from_pubkey(PrivateKey("5KDT58ksNsVKjYShG4Ls5ZtredybSxzmKec8juj7CojZj6LPRF7").pubkey, compressed=False, prefix="BTS")), - ], [ - 'GPHBXqRucGm7nRkk6jm7BNspTJTWRtNcx7k5', - 'GPH5tTDDR6M3mkcyVv16edsw8dGUyNQZrvKU', - 'BTS4XPkBqYw882fH5aR5S8mMKXCaZ1yVA76f' - ]) + self.assertEqual([ + str(Address.from_pubkey(PrivateKey("5HvVz6XMx84aC5KaaBbwYrRLvWE46cH6zVnv4827SBPLorg76oq").pubkey)), + str(Address.from_pubkey(PrivateKey("5Jete5oFNjjk3aUMkKuxgAXsp7ZyhgJbYNiNjHLvq5xzXkiqw7R").pubkey)), + str(Address.from_pubkey(PrivateKey("5KDT58ksNsVKjYShG4Ls5ZtredybSxzmKec8juj7CojZj6LPRF7").pubkey, compressed=False, prefix="BTS")), + ], [ + 'GPHBXqRucGm7nRkk6jm7BNspTJTWRtNcx7k5', + 'GPH5tTDDR6M3mkcyVv16edsw8dGUyNQZrvKU', + 'BTS4XPkBqYw882fH5aR5S8mMKXCaZ1yVA76f' + ]) def test_password_suggest(self): self.assertEqual( @@ -262,6 +265,26 @@ def test_child(self): self.assertIsInstance(p2, PrivateKey) self.assertEqual(str(p2), "5JQ6AQmjpbEZjJBLnoa3BaWa9y3LDTUBeSDwEGQD2UjYkb1gY2x") + def test_child_pub(self): + p = PrivateKey("5JWcdkhL3w4RkVPcZMdJsjos22yB5cSkPExerktvKnRNZR5gx1S") + pub = p.pubkey + point = pub.point() + self.assertEqual([ + ecdsa.util.number_to_string(point.x(), ecdsa.SECP256k1.order), + ecdsa.util.number_to_string(point.y(), ecdsa.SECP256k1.order) + ], [ + b"\x90d5\xf6\xf9\xceo=NL\xf8\xd3\xd0\xdd\xce \x9a\x83'w8\xff\xdc~\xaec\x08\xf4\xed)c\xdf", + b'\r\xa8tl\xf1:a\x89\xa2\x81\x96\\X\x0fBA]\x86\xe9l#*\x89%\xea\x152T\xbb\x87\x9f`' + ]) + self.assertEqual( + repr(pub.child(b"foobar")), + "022a42ae1e9af8f84544c9a1970308c31f864dfb4f5998c45ef76e11d307cc77d5" + ) + self.assertEqual( + repr(pub.add(hashlib.sha256(b"Foobar").digest())), + "0354a2c06c398990c52933df0c93b9904a4b23b0e2d524a3b0075c72adaa4e459e" + ) + def test_BrainKey_sequence(self): b = BrainKey("COLORER BICORN KASBEKE FAERIE LOCHIA GOMUTI SOVKHOZ Y GERMAL AUNTIE PERFUMY TIME FEATURE GANGAN CELEMIN MATZO") self.assertEqual(str(next(b).get_private_key()), "5Hsbn6kXio4bb7eW5bX7kTp2sdkmbzP8kGWoau46Cf7en7T1RRE") @@ -275,3 +298,17 @@ def test_new_BrainKey(self): w = BrainKey().get_private_key() self.assertIsInstance(w, PrivateKey) self.assertEqual(str(w)[0], "5") # is a wif key that starts with 5 + + def test_derive_private_key(self): + p = PrivateKey("5JWcdkhL3w4RkVPcZMdJsjos22yB5cSkPExerktvKnRNZR5gx1S") + p2 = p.derive_private_key(10) + self.assertEqual( + repr(p2), + "2dc7cb99933132e25b37710f9ea806228b04a583da11a137ef97fd42c0007390" + ) + + def test_init_wrong_format(self): + with self.assertRaises(NotImplementedError): + PrivateKey("KJWcdkhL3w4RkVPcZMdJsjos22yB5cSkPExerktvKnRNZR5gx1S") + with self.assertRaises(NotImplementedError): + PrivateKey("LJWcdkhL3w4RkVPcZMdJsjos22yB5cSkPExerktvKnRNZR5gx1S") diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 00000000..1294b1e5 --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,72 @@ +import unittest +from grapheneapi.api import Api, Websocket, Http +from grapheneapi import exceptions + +urls = [ + "wss://node.bitshares.eu", + "https://eu.nodes.bitshares.ws" +] + + +class Testcases(unittest.TestCase): + + def test_apiInit(self): + api = Api(urls) + self.assertEqual(len(api._url_counter), 2) + api_single = Api(urls[0]) + self.assertEqual(len(api_single._url_counter), 1) + + def test_chain_params(self): + api = Api(urls) + p = api.chain_params + self.assertEqual(p, api.get_network()) + self.assertEqual(p.get("chain_id"), "4018d7844c78f6a6c41c6a552b898022310fc5dec06da467ee7905a8dad512c8") + + def test_websocket_connection(self): + api = Api(urls[0]) + self.assertIsInstance( + api.connection, + Websocket + ) + api = Api(urls[1]) + self.assertIsInstance( + api.connection, + Http + ) + with self.assertRaises(ValueError): + Api("hssp://error.example.com", num_retries=1) + + def test_call(self): + api = Api(urls[0]) + api.get_objects(["2.8.0"], api="database") + + def test_cconnection_exceptions(self): + with self.assertRaises(exceptions.NumRetriesReached): + api = Api("http://error.example.com", num_retries=0) + api.get_objects(["2.8.0"]) + with self.assertRaises(exceptions.NumRetriesReached): + api = Api("http://error.example.com", num_retries=1) + api.get_objects(["2.8.0"]) + + def test_raise_rpc_error_wss(self): + api = Api(urls[0], num_retries=-1) + with self.assertRaises(exceptions.RPCError): + api.get_SOMETHING(["2.8.0"]) + api.connection.disconnect() + api.connection.disconnect() + + def test_raise_rpc_error_https(self): + api = Api(urls[1], num_retries=-1) + with self.assertRaises(exceptions.RPCError): + api.get_SOMETHING(["2.8.0"]) + + def test_login(self): + Api(urls[0], "User", "password", num_retries=-1) + Api(urls[0], None, None, num_retries=-1) + + def test_rollover(self): + api = Api([ + "http://error.example.com", + "wss://node.bitshares.eu" + ]) + api.get_objects(["2.8.0"]) diff --git a/tests/test_base58.py b/tests/test_base58.py index c9a9666d..185b2991 100644 --- a/tests/test_base58.py +++ b/tests/test_base58.py @@ -119,6 +119,27 @@ def test_Base58(self): "5Jete5oFNjjk3aUMkKuxgAXsp7ZyhgJbYNiNjHLvq5xzXkiqw7R", "5KDT58ksNsVKjYShG4Ls5ZtredybSxzmKec8juj7CojZj6LPRF7"]) + def test_leading_zeros(self): + # Leading zeros become ones + for i in range(1, 10): + self.assertEqual( + base58encode('00' * i), + '1' * (i + 1) + ) + + # Leading zeros become ones + self.assertEqual([ + base58decode(base58encode('00')), + base58decode(base58encode('0000')), + base58decode(base58encode('000000')), + base58decode(base58encode('00000000')), + ], [ + '000000', + '00000000', + '0000000000', + '000000000000' + ]) + if __name__ == '__main__': unittest.main()