Source code for cryptnoxpro.lib.cryptos.coins.base

# flake8: noqa
# -*- coding: utf-8 -*-
"""
Base classes and shared logic for UTXO-based coin implementations.
"""
from .. import segwit_addr
from ..blocks import mk_merkle_proof
# from ..explorers import blockchain
# from ..electrumx_client.rpc import ElectrumXClient
from ..keystore import *
from ..specials import *
from ..transaction import *
from ..wallet import *


[docs] class BaseCoin(object): """ Base implementation of crypto coin class All child coins must follow same pattern. """ coin_symbol = None display_name = None enabled = True segwit_supported = None magicbyte = None script_magicbyte = None segwit_hrp = None # explorer = blockchain # client = ElectrumXClient client_kwargs = { 'server_file': 'bitcoin.json', 'servers': (), 'host': None, 'port': 50001, 'timeout': 15, 'max_servers': 5, 'loop': None } is_testnet = False address_prefixes = () testnet_overrides = {} hashcode = SIGHASH_ALL secondary_hashcode = None hd_path = 0 wif_prefix = 0x80 wif_script_types = { 'p2pkh': 0, 'p2wpkh': 1, 'p2wpkh-p2sh': 2, 'p2sh': 5, 'p2wsh': 6, 'p2wsh-p2sh': 7 } xprv_headers = { 'p2pkh': 0x0488ade4, 'p2wpkh-p2sh': 0x049d7878, 'p2wsh-p2sh': 0x295b005, 'p2wpkh': 0x4b2430c, 'p2wsh': 0x2aa7a99 } xpub_headers = { 'p2pkh': 0x0488b21e, 'p2wpkh-p2sh': 0x049d7cb2, 'p2wsh-p2sh': 0x295b43f, 'p2wpkh': 0x4b24746, 'p2wsh': 0x2aa7ed3 } electrum_xprv_headers = xprv_headers electrum_xpub_headers = xpub_headers
[docs] def __init__(self, testnet=False, **kwargs): if testnet: self.is_testnet = True for k, v in self.testnet_overrides.items(): setattr(self, k, v) # override default attributes from kwargs for key, value in kwargs.items(): setattr(self, key, value) if not self.enabled: if self.is_testnet: raise NotImplementedError( "Due to explorer limitations, testnet support for this coin has not been implemented yet!") else: raise NotImplementedError("Support for this coin has not been implemented yet!") self.address_prefixes = magicbyte_to_prefix(magicbyte=self.magicbyte) if self.script_magicbyte: self.script_prefixes = magicbyte_to_prefix(magicbyte=self.script_magicbyte) else: self.script_prefixes = () self.secondary_hashcode = self.secondary_hashcode or self.hashcode self._rpc_client = None
@property def rpc_client(self): """ Connect to remove server """ if not self._rpc_client: self._rpc_client = self.client(**self.client_kwargs) return self._rpc_client
[docs] def unspent(self, *addrs): """ Get unspent transactions for addresses """ return self.rpc_client.unspent(*addrs)
[docs] def history(self, *addrs, **kwargs): """ Get transaction history for addresses """ return self.explorer.history(*addrs, coin_symbol=self.coin_symbol)
[docs] def fetchtx(self, tx): """ Fetch a tx from the blockchain """ return self.explorer.fetchtx(tx, coin_symbol=self.coin_symbol)
[docs] def txinputs(self, tx): """ Fetch inputs of a transaction on the blockchain """ return self.explorer.txinputs(tx, coin_symbol=self.coin_symbol)
[docs] def pushtx(self, tx): """ Push/ Broadcast a transaction to the blockchain """ return self.explorer.push_tx(tx, coin_symbol=self.coin_symbol)
[docs] def privtopub(self, privkey): """ Get public key from private key """ return privtopub(privkey)
[docs] def pubtoaddr(self, pubkey): """ Get address from a pubic key """ return pubtoaddr(pubkey, magicbyte=self.magicbyte)
[docs] def privtoaddr(self, privkey): """ Get address from a private key """ return privtoaddr(privkey, magicbyte=self.magicbyte)
[docs] def electrum_address(self, masterkey, n, for_change=0): """ For old electrum seeds """ pubkey = electrum_pubkey(masterkey, n, for_change=for_change) return self.pubtoaddr(pubkey)
[docs] def is_address(self, addr): """ Check if addr is a valid address for this chain """ all_prefixes = ''.join(list(self.address_prefixes) + list(self.script_prefixes)) return any(str(i) == addr[0] for i in all_prefixes)
[docs] def is_p2sh(self, addr): """ Check if addr is a a pay to script address """ return not any(str(i) == addr[0] for i in self.address_prefixes)
[docs] def output_script_to_address(self, script): """ Convert an output script to an address """ return output_script_to_address(script, self.magicbyte)
[docs] def scripttoaddr(self, script): """ Convert an input public key hash to an address """ if re.match('^[0-9a-fA-F]*$', script): script = binascii.unhexlify(script) if script[:3] == b'\x76\xa9\x14' and script[-2:] == b'\x88\xac' and len(script) == 25: return bin_to_b58check(script[3:-2], self.magicbyte) # pubkey hash addresses else: # BIP0016 scripthash addresses return bin_to_b58check(script[2:-1], self.script_magicbyte)
[docs] def p2sh_scriptaddr(self, script): """ Convert an output p2sh script to an address """ if re.match('^[0-9a-fA-F]*$', script): script = binascii.unhexlify(script) return hex_to_b58check(hash160(script), self.script_magicbyte)
[docs] def addrtoscript(self, addr): """ Convert an output address to a script """ if self.segwit_hrp: witver, witprog = segwit_addr.decode(self.segwit_hrp, addr) if witprog is not None: return mk_p2w_scripthash_script(witver, witprog) if self.is_p2sh(addr): return mk_scripthash_script(addr) else: return mk_pubkey_script(addr)
[docs] def pubtop2w(self, pub): """ Convert a public key to a pay to witness public key hash address (P2WPKH, required for segwit) """ if not self.segwit_supported: raise Exception("Segwit not supported for this coin") compressed_pub = compress(pub) return self.scripttoaddr(mk_p2wpkh_script(compressed_pub))
[docs] def privtop2w(self, priv): """ Convert a private key to a pay to witness public key hash address """ return self.pubtop2w(privtopub(priv))
[docs] def hash_to_segwit_addr(self, hash): """ Convert a hash to the new segwit address format outlined in BIP-0173 """ return segwit_addr.encode(self.segwit_hrp, 0, hash)
[docs] def privtosegwit(self, privkey): """ Convert a private key to the new segwit address format outlined in BIP01743 """ return self.pubtosegwit(self.privtopub(privkey))
[docs] def pubtosegwit(self, pubkey): """ Convert a public key to the new segwit address format outlined in BIP01743 """ return self.hash_to_segwit_addr(pubkey_to_hash(pubkey))
[docs] def script_to_p2wsh(self, script): """ Convert a script to the new segwit address format outlined in BIP01743 """ return self.hash_to_segwit_addr(sha256(safe_from_hex(script)))
[docs] def mk_multsig_address(self, *args): """ :param args: List of public keys to used to create multisig and M, the number of signatures required to spend :return: multisig script """ script = mk_multisig_script(*args) address = self.p2sh_scriptaddr(script) return script, address
[docs] def is_segwit(self, priv, addr): """ Check if addr was generated from priv using segwit script """ if not self.segwit_supported: return False if self.segwit_hrp and addr.startswith(self.segwit_hrp): return True segwit_addr = self.privtop2w(priv) return segwit_addr == addr
[docs] def sign(self, txobj, i, priv): """ Sign a transaction input with index using a private key """ i = int(i) if not isinstance(txobj, dict): txobj = deserialize(txobj) if len(priv) <= 33: priv = safe_hexlify(priv) pub = self.privtopub(priv) if txobj['ins'][i].get('segwit', False) or txobj['ins'][i].get('new_segwit', False): if not self.segwit_supported: raise Exception("Segregated witness is not supported for %s" % self.display_name) pub = compress(pub) script = mk_p2wpkh_scriptcode(pub) signing_tx = signature_form(txobj, i, script, self.hashcode) sig = ecdsa_tx_sign(signing_tx, priv, self.secondary_hashcode) if txobj['ins'][i].get('new_segwit', False): txobj["ins"][i]["script"] = '' else: txobj["ins"][i]["script"] = mk_p2wpkh_redeemscript(pub) txobj["witness"].append({"number": 2, "scriptCode": serialize_script([sig, pub])}) else: address = self.pubtoaddr(pub) script = mk_pubkey_script(address) signing_tx = signature_form(txobj, i, script, self.hashcode) sig = ecdsa_tx_sign(signing_tx, priv, self.hashcode) txobj["ins"][i]["script"] = serialize_script([sig, pub]) if "witness" in txobj.keys(): txobj["witness"].append({"number": 0, "scriptCode": ''}) return txobj
[docs] def signall(self, txobj, priv): """ Sign all inputs to a transaction using a private key """ if not isinstance(txobj, dict): txobj = deserialize(txobj) if isinstance(priv, dict): for e, i in enumerate(txobj["ins"]): k = priv["%s:%d" % (i["outpoint"]["hash"], i["outpoint"]["index"])] txobj = self.sign(txobj, e, k) else: for i in range(len(txobj["ins"])): txobj = self.sign(txobj, i, priv) return serialize(txobj)
[docs] def multisign(self, tx, i, script, pk): return multisign(tx, i, script, pk, self.hashcode)
[docs] def mktx(self, *args): """[in0, in1...],[out0, out1...] or in0, in1 ... out0 out1 ... Make an unsigned transaction from inputs and outputs. Change is not automatically included so any difference in value between inputs and outputs will be given as a miner's fee (transactions with too high fees will normally be blocked by the explorers) For Bitcoin Cash and other hard forks using SIGHASH_FORKID, ins must be a list of dicts with each containing the outpoint and value of the input. Inputs originally received with segwit must be a dict in the format: {'outpoint': "txhash:index", value:0, "segwit": True} For other transactions, inputs can be dicts containing only outpoints or strings in the outpoint format. Outpoint format: txhash:index """ ins, outs = [], [] for arg in args: if isinstance(arg, list): for a in arg: (ins if is_inp(a) else outs).append(a) else: (ins if is_inp(arg) else outs).append(arg) txobj = {"locktime": 0, "version": 1, "ins": [], "outs": []} if any(isinstance(i, dict) and (i.get("segwit", False) or i.get("new_segwit", False)) for i in ins): if not self.segwit_supported: raise Exception("Segregated witness is not allowed for %s" % self.display_name) txobj.update({"marker": 0, "flag": 1, "witness": []}) for i in ins: input = {'script': "", "sequence": 4294967295} if isinstance(i, dict) and "output" in i: input["outpoint"] = {"hash": i["output"][:64], "index": int(i["output"][65:])} input['amount'] = i.get("value", 0) if i.get("segwit", False): input["segwit"] = True elif i.get("new_segwit", False): input["new_segwit"] = True else: input["outpoint"] = {"hash": i[:64], "index": int(i[65:])} input['amount'] = 0 txobj["ins"].append(input) for o in outs: if isinstance(o, string_or_bytes_types): addr = o[:o.find(':')] val = int(o[o.find(':') + 1:]) o = {} if re.match('^[0-9a-fA-F]*$', addr): o["script"] = addr else: o["address"] = addr o["value"] = val outobj = {} if "address" in o: outobj["script"] = self.addrtoscript(o["address"]) elif "script" in o: outobj["script"] = o["script"] else: raise Exception("Could not find 'address' or 'script' in output.") outobj["value"] = o["value"] txobj["outs"].append(outobj) return txobj
[docs] def mksend(self, *args, segwit=False): """[in0, in1...],[out0, out1...] or in0, in1 ... out0 out1 ... Make an unsigned transaction from inputs, outputs change address and fee. A change output will be added with change sent to the change address. For Bitcoin Cash and other hard forks using SIGHASH_FORKID and segwit, ins must be a list of dicts with each containing the outpoint and value of the input. For other transactions, inputs can be dicts containing only outpoints or strings in the outpoint format. Outpoint format: txhash:index """ argz, change, fee = args[:-2], args[-2], int(args[-1]) ins, outs = [], [] for arg in argz: if isinstance(arg, list): for a in arg: (ins if is_inp(a) else outs).append(a) else: (ins if is_inp(arg) else outs).append(arg) if segwit: for i in ins: i['segwit'] = True isum = sum([i["value"] for i in ins]) osum, outputs2 = 0, [] for o in outs: if isinstance(o, string_types): o2 = { "address": o[:o.find(':')], "value": int(o[o.find(':') + 1:]) } else: o2 = o outputs2.append(o2) osum += o2["value"] if isum < osum + fee: raise Exception("Not enough money") elif isum > osum + fee + 5430: outputs2 += [{"address": change, "value": isum - osum - fee}] return self.mktx(ins, outputs2)
[docs] def preparesignedtx(self, privkey, to, value, fee=10000, change_addr=None, segwit=False, addr=None): """ Prepare a tx with a specific amount from address belonging to private key to another address, returning change to the from address or change address, if set. Requires private key, target address, value and optionally the change address and fee segwit paramater specifies that the inputs belong to a segwit address addr, if provided, will explicity set the from address, overriding the auto-detection of the address from the private key.It will also be used, along with the privkey, to automatically detect a segwit transaction for coins which support segwit, overriding the segwit kw """ return self.preparesignedmultitx(privkey, to + ":" + str(value), fee, change_addr=change_addr, segwit=segwit, addr=addr)
[docs] def send(self, privkey, to, value, fee=10000, change_addr=None, segwit=False, addr=None): """ Send a specific amount from address belonging to private key to another address, returning change to the from address or change address, if set. Requires private key, target address, value and optionally the change address and fee segwit paramater specifies that the inputs belong to a segwit address addr, if provided, will explicity set the from address, overriding the auto-detection of the address from the private key.It will also be used, along with the privkey, to automatically detect a segwit transaction for coins which support segwit, overriding the segwit kw """ return self.sendmultitx(privkey, to + ":" + str(value), fee, change_addr=change_addr, segwit=segwit, addr=addr)
[docs] def preparesignedmultitx(self, privkey, *args, change_addr=None, segwit=False, addr=None): """ Prepare transaction with multiple outputs, with change sent back to from addrss Requires private key, address:value pairs and optionally the change address and fee segwit paramater specifies that the inputs belong to a segwit address addr, if provided, will explicity set the from address, overriding the auto-detection of the address from the private key.It will also be used, along with the privkey to automatically detect a segwit transaction for coins which support segwit, overriding the segwit kw """ if addr: frm = addr if self.segwit_supported: segwit = self.is_segwit(privkey, addr) elif segwit: frm = self.privtop2w(privkey) else: frm = self.privtoaddr(privkey) tx = self.preparemultitx(frm, *args, change_addr=change_addr, segwit=segwit) tx2 = self.signall(tx, privkey) return tx2
[docs] def sendmultitx(self, privkey, *args, change_addr=None, segwit=False, addr=None): """ Send transaction with multiple outputs, with change sent back to from addrss Requires private key, address:value pairs and optionally the change address and fee segwit paramater specifies that the inputs belong to a segwit address addr, if provided, will explicity set the from address, overriding the auto-detection of the address from the private key.It will also be used, along with the privkey to automatically detect a segwit transaction for coins which support segwit, overriding the segwit kw """ tx = self.preparesignedmultitx(privkey, *args, change_addr=change_addr, segwit=segwit, addr=addr) return self.pushtx(tx)
[docs] def preparetx(self, frm, to, value, fee, change_addr=None, segwit=False): """ Prepare a transaction using from and to addresses, value and a fee, with change sent back to from address """ tovalues = to + ":" + str(value) return self.preparemultitx(frm, tovalues, fee, change_addr=change_addr, segwit=segwit)
[docs] def preparemultitx(self, frm, *args, change_addr=None, segwit=False): """ Prepare transaction with multiple outputs, with change sent to from address Requires from address, to_address:value pairs and fees """ tv, fee = args[:-1], int(args[-1]) outs = [] outvalue = 0 for a in tv: outs.append(a) outvalue += int(a.split(":")[1]) u = self.unspent(frm) u2 = select(u, int(outvalue) + int(fee)) change_addr = change_addr or frm argz = u2 + outs + [change_addr, fee] return self.mksend(*argz, segwit=segwit)
[docs] def block_height(self, txhash): return self.explorer.block_height(txhash, coin_symbol=self.coin_symbol)
[docs] def current_block_height(self): return self.explorer.current_block_height(coin_symbol=self.coin_symbol)
[docs] def block_info(self, height): return self.explorer.block_info(height, coin_symbol=self.coin_symbol)
[docs] def inspect(self, tx): if not isinstance(tx, dict): tx = deserialize(tx) isum = 0 ins = {} for _in in tx['ins']: h = _in['outpoint']['hash'] i = _in['outpoint']['index'] prevout = deserialize(self.fetchtx(h))['outs'][i] isum += prevout['value'] a = self.scripttoaddr(prevout['script']) ins[a] = ins.get(a, 0) + prevout['value'] outs = [] osum = 0 for _out in tx['outs']: outs.append({'address': self.scripttoaddr(_out['script']), 'value': _out['value']}) osum += _out['value'] return { 'fee': isum - osum, 'outs': outs, 'ins': ins }
[docs] def merkle_prove(self, txhash): """ Prove that information an explorer returns about a transaction in the blockchain is valid. Only run on a tx with at least 1 confirmation. """ blocknum = self.block_height(txhash) blockinfo = self.block_info(blocknum) hashes = blockinfo.pop('tx_hashes') try: i = hashes.index(txhash) except ValueError: raise Exception("Merkle proof failed because transaction %s is not part of the main chain" % txhash) return mk_merkle_proof(blockinfo, hashes, i)
[docs] def encode_privkey(self, privkey, formt, script_type="p2pkh"): return encode_privkey(privkey, formt=formt, vbyte=self.wif_prefix + self.wif_script_types[script_type])
[docs] def wallet(self, seed, passphrase=None, **kwargs): if not bip39_is_checksum_valid(seed) == (True, True): raise Exception("BIP39 Checksum failed. This is not a valid BIP39 seed") ks = standard_from_bip39_seed(seed, passphrase, coin=self) return HDWallet(ks, **kwargs)
[docs] def watch_wallet(self, xpub, **kwargs): ks = from_xpub(xpub, self, 'p2pkh') return HDWallet(ks, **kwargs)
[docs] def p2wpkh_p2sh_wallet(self, seed, passphrase=None, **kwargs): if not self.segwit_supported: raise Exception("P2WPKH-P2SH segwit not enabled for this coin") if not bip39_is_checksum_valid(seed) == (True, True): raise Exception("BIP39 Checksum failed. This is not a valid BIP39 seed") ks = p2wpkh_p2sh_from_bip39_seed(seed, passphrase, coin=self) return HDWallet(ks, **kwargs)
[docs] def watch_p2wpkh_p2sh_wallet(self, xpub, **kwargs): ks = from_xpub(xpub, self, 'p2wpkh-p2sh') return HDWallet(ks, **kwargs)
[docs] def p2wpkh_wallet(self, seed, passphrase=None, **kwargs): if not bip39_is_checksum_valid(seed) == (True, True): raise Exception("BIP39 Checksum failed. This is not a valid BIP39 seed") ks = p2wpkh_from_bip39_seed(seed, passphrase, coin=self) return HDWallet(ks, **kwargs)
[docs] def watch_p2wpkh_wallet(self, xpub, **kwargs): ks = from_xpub(xpub, self, 'p2wpkh') return HDWallet(ks, **kwargs)
[docs] def electrum_wallet(self, seed, passphrase=None, **kwargs): ks = from_electrum_seed(seed, passphrase, False, coin=self) return HDWallet(ks, **kwargs)
[docs] def watch_electrum_wallet(self, xpub, **kwargs): ks = from_xpub(xpub, self, 'p2pkh', electrum=True) return HDWallet(ks, **kwargs)
[docs] def watch_electrum_p2wpkh_wallet(self, xpub, **kwargs): ks = from_xpub(xpub, self, 'p2wpkh', electrum=True) return HDWallet(ks, **kwargs)