Source code for autobahn.xbr._secmod

###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
import binascii
import os
import configparser
from collections.abc import MutableMapping
from typing import Optional, Union, Dict, Any, List, Iterator
from threading import Lock

import txaio
import nacl

from eth_account.account import Account
from eth_account.signers.local import LocalAccount

from py_eth_sig_utils.eip712 import encode_typed_data
from py_eth_sig_utils.utils import ecsign, ecrecover_to_pub, checksum_encode, sha3
from py_eth_sig_utils.signing import v_r_s_to_signature, signature_to_v_r_s

from autobahn.wamp.interfaces import ISecurityModule, IEthereumKey
from autobahn.xbr._mnemonic import mnemonic_to_private_key
from autobahn.util import parse_keyfile
from autobahn.wamp.cryptosign import CryptosignKey

__all__ = ('EthereumKey', 'SecurityModuleMemory', )


[docs]class EthereumKey(object): """ Base class to implement :class:`autobahn.wamp.interfaces.IEthereumKey`. """ def __init__(self, key_or_address: Union[LocalAccount, str, bytes], can_sign: bool, security_module: Optional[ISecurityModule] = None, key_no: Optional[int] = None) -> None: if can_sign: # https://eth-account.readthedocs.io/en/latest/eth_account.html#eth_account.account.Account assert type(key_or_address) == LocalAccount self._key = key_or_address self._address = key_or_address.address else: assert type(key_or_address) in (str, bytes) self._key = None self._address = key_or_address self._can_sign = can_sign self._security_module = security_module self._key_no = key_no @property def security_module(self) -> Optional['ISecurityModule']: """ Implements :meth:`autobahn.wamp.interfaces.IKey.security_module`. """ return self._security_module @property def key_no(self) -> Optional[int]: """ Implements :meth:`autobahn.wamp.interfaces.IKey.key_no`. """ return self._key_no @property def key_type(self) -> str: """ Implements :meth:`autobahn.wamp.interfaces.IKey.key_type`. """ return 'ethereum'
[docs] def public_key(self, binary: bool = False) -> Union[str, bytes]: """ Implements :meth:`autobahn.wamp.interfaces.IKey.public_key`. """ raise NotImplementedError()
@property def can_sign(self) -> bool: """ Implements :meth:`autobahn.wamp.interfaces.IKey.can_sign`. """ return self._can_sign
[docs] def address(self, binary: bool = False) -> Union[str, bytes]: """ Implements :meth:`autobahn.wamp.interfaces.IEthereumKey.address`. """ if binary: return binascii.a2b_hex(self._address[2:]) else: return self._address
[docs] def sign(self, data: bytes) -> bytes: """ Implements :meth:`autobahn.wamp.interfaces.IKey.sign`. """ # FIXME: implement signing of raw data raise NotImplementedError()
[docs] def recover(self, data: bytes, signature: bytes) -> bytes: """ Implements :meth:`autobahn.wamp.interfaces.IKey.recover`. """ # FIXME: implement signing address recovery from signature of raw data raise NotImplementedError()
[docs] def sign_typed_data(self, data: Dict[str, Any], binary=True) -> bytes: """ Implements :meth:`autobahn.wamp.interfaces.IEthereumKey.sign_typed_data`. """ if self._security_module: assert self._security_module.is_open and not self._security_module.is_locked, 'security module must be open and unlocked' try: # encode typed data dict and return message hash msg_hash = encode_typed_data(data) # ECDSA signatures in Ethereum consist of three parameters: v, r and s. # The signature is always 65-bytes in length. # r = first 32 bytes of signature # s = second 32 bytes of signature # v = final 1 byte of signature signature_vrs = ecsign(msg_hash, self._key.key) # concatenate signature components into byte string signature = v_r_s_to_signature(*signature_vrs) except Exception as e: return txaio.create_future_error(e) else: if binary: return txaio.create_future_success(signature) else: return txaio.create_future_success(binascii.b2a_hex(signature).decode())
[docs] def verify_typed_data(self, data: Dict[str, Any], signature: bytes) -> bool: """ Implements :meth:`autobahn.wamp.interfaces.IEthereumKey.verify_typed_data`. """ if self._security_module: assert self._security_module.is_open and not self._security_module.is_locked, 'security module must be open and unlocked' try: msg_hash = encode_typed_data(data) signature_vrs = signature_to_v_r_s(signature) public_key = ecrecover_to_pub(msg_hash, *signature_vrs) address_bytes = sha3(public_key)[-20:] address = checksum_encode(address_bytes) except Exception as e: return txaio.create_future_error(e) else: return txaio.create_future_success(address == self._address)
[docs] @classmethod def from_address(cls, address: Union[str, bytes]) -> 'EthereumKey': """ Create a public key from an address, which can be used to verify signatures. :param address: The Ethereum address (20 octets). :return: New instance of :class:`EthereumKey` """ return EthereumKey(key_or_address=address, can_sign=False)
[docs] @classmethod def from_bytes(cls, key: bytes) -> 'EthereumKey': """ Create a private key from seed bytes, which can be used to sign and create signatures. :param key: The Ethereum private key seed (32 octets). :return: New instance of :class:`EthereumKey` """ if type(key) != bytes: raise ValueError("invalid seed type {} (expected binary)".format(type(key))) if len(key) != 32: raise ValueError("invalid seed length {} (expected 32)".format(len(key))) account: LocalAccount = Account.from_key(key) return EthereumKey(key_or_address=account, can_sign=True)
[docs] @classmethod def from_seedphrase(cls, seedphrase: str, index: int = 0) -> 'EthereumKey': """ Create a private key from the given BIP-39 mnemonic seed phrase and index, which can be used to sign and create signatures. :param seedphrase: The BIP-39 seedphrase ("Mnemonic") from which to derive the account. :param index: The account index in account hierarchy defined by the seedphrase. :return: New instance of :class:`EthereumKey` """ # Base HD Path: m/44'/60'/0'/0/{account_index} derivation_path = "m/44'/60'/0'/0/{}".format(index) key = mnemonic_to_private_key(seedphrase, str_derivation_path=derivation_path) assert type(key) == bytes assert len(key) == 32 account: LocalAccount = Account.from_key(key) return EthereumKey(key_or_address=account, can_sign=True)
[docs] @classmethod def from_keyfile(cls, keyfile: str) -> 'EthereumKey': """ Create a public or private key from reading the given public or private key file. Here is an example key file that includes an Ethereum private key ``private-key-eth``, which is loaded in this function, and other fields, which are ignored by this function: .. code-block:: This is a comment (all lines until the first empty line are comments indeed). creator: oberstet@intel-nuci7 created-at: 2022-07-05T12:29:48.832Z user-id: oberstet@intel-nuci7 public-key-ed25519: 7326d9dc0307681cc6940fde0e60eb31a6e4d642a81e55c434462ce31f95deed public-adr-eth: 0x10848feBdf7f200Ba989CDf7E3eEB3EC03ae7768 private-key-ed25519: f750f42b0430e28a2e272c3cedcae4dcc4a1cf33bc345c35099d3322626ab666 private-key-eth: 4d787714dcb0ae52e1c5d2144648c255d660b9a55eac9deeb80d9f506f501025 :param keyfile: Path (relative or absolute) to a public or private keys file. :return: New instance of :class:`EthereumKey` """ if not os.path.exists(keyfile) or not os.path.isfile(keyfile): raise RuntimeError('keyfile "{}" is not a file'.format(keyfile)) # now load the private or public key file - this returns a dict which should # include (for a private key): # # private-key-eth: 6b08b6e186bd2a3b9b2f36e6ece3f8031fe788ab3dc4a1cfd3a489ea387c496b # # or (for a public key only): # # public-adr-eth: 0x10848feBdf7f200Ba989CDf7E3eEB3EC03ae7768 # data = parse_keyfile(keyfile) privkey_eth_hex = data.get('private-key-eth', None) if privkey_eth_hex is None: pub_adr_eth = data.get('public-adr-eth', None) if pub_adr_eth is None: raise RuntimeError('neither "private-key-eth" nor "public-adr-eth" found in keyfile {}'.format(keyfile)) else: return EthereumKey.from_address(pub_adr_eth) else: return EthereumKey.from_bytes(binascii.a2b_hex(privkey_eth_hex))
IEthereumKey.register(EthereumKey)
[docs]class SecurityModuleMemory(MutableMapping): """ A transient, memory-based implementation of :class:`ISecurityModule`. """ def __init__(self, keys: Optional[List[Union[CryptosignKey, EthereumKey]]] = None): self._mutex = Lock() self._is_open = False self._is_locked = True self._keys: Dict[int, Union[CryptosignKey, EthereumKey]] = {} self._counters: Dict[int, int] = {} if keys: for i, key in enumerate(keys): self._keys[i] = key
[docs] def __len__(self) -> int: """ Implements :meth:`ISecurityModule.__len__` """ assert self._is_open, 'security module not open' return len(self._keys)
[docs] def __contains__(self, key_no: int) -> bool: assert self._is_open, 'security module not open' return key_no in self._keys
[docs] def __iter__(self) -> Iterator[int]: """ Implements :meth:`ISecurityModule.__iter__` """ assert self._is_open, 'security module not open' yield from self._keys
[docs] def __getitem__(self, key_no: int) -> Union[CryptosignKey, EthereumKey]: """ Implements :meth:`ISecurityModule.__getitem__` """ assert self._is_open, 'security module not open' if key_no in self._keys: return self._keys[key_no] else: raise IndexError('key_no {} not found'.format(key_no))
[docs] def __setitem__(self, key_no: int, key: Union[CryptosignKey, EthereumKey]) -> None: assert self._is_open, 'security module not open' assert key_no >= 0 if key_no in self._keys: # FIXME pass self._keys[key_no] = key
[docs] def __delitem__(self, key_no: int) -> None: assert self._is_open, 'security module not open' if key_no in self._keys: del self._keys[key_no] else: raise IndexError()
[docs] def open(self): """ Implements :meth:`ISecurityModule.open` """ assert not self._is_open, 'security module already open' self._is_open = True return txaio.create_future_success(None)
[docs] def close(self): """ Implements :meth:`ISecurityModule.close` """ assert self._is_open, 'security module not open' self._is_open = False self._is_locked = True return txaio.create_future_success(None)
@property def is_open(self) -> bool: """ Implements :meth:`ISecurityModule.is_open` """ return self._is_open @property def can_lock(self) -> bool: """ Implements :meth:`ISecurityModule.can_lock` """ return True @property def is_locked(self) -> bool: """ Implements :meth:`ISecurityModule.is_locked` """ return self._is_locked
[docs] def lock(self): """ Implements :meth:`ISecurityModule.lock` """ assert self._is_open, 'security module not open' assert not self._is_locked self._is_locked = True return txaio.create_future_success(None)
[docs] def unlock(self): """ Implements :meth:`ISecurityModule.unlock` """ assert self._is_open, 'security module not open' assert self._is_locked self._is_locked = False return txaio.create_future_success(None)
[docs] def create_key(self, key_type: str) -> int: assert self._is_open, 'security module not open' key_no = len(self._keys) if key_type == 'cryptosign': key = CryptosignKey(key=nacl.signing.SigningKey(os.urandom(32)), can_sign=True, security_module=self, key_no=key_no) elif key_type == 'ethereum': key = EthereumKey(key_or_address=Account.from_key(os.urandom(32)), can_sign=True, security_module=self, key_no=key_no) else: raise ValueError('invalid key_type "{}"'.format(key_type)) self._keys[key_no] = key return txaio.create_future_success(key_no)
[docs] def delete_key(self, key_no: int): assert self._is_open, 'security module not open' if key_no in self._keys: del self._keys[key_no] return txaio.create_future_success(key_no) else: return txaio.create_future_success(None)
[docs] def get_random(self, octets: int) -> bytes: """ Implements :meth:`ISecurityModule.get_random` """ assert self._is_open, 'security module not open' data = os.urandom(octets) return txaio.create_future_success(data)
[docs] def get_counter(self, counter_no: int) -> int: """ Implements :meth:`ISecurityModule.get_counter` """ assert self._is_open, 'security module not open' self._mutex.acquire() res = self._counters.get(counter_no, 0) self._mutex.release() return txaio.create_future_success(res)
[docs] def increment_counter(self, counter_no: int) -> int: """ Implements :meth:`ISecurityModule.increment_counter` """ assert self._is_open, 'security module not open' self._mutex.acquire() if counter_no not in self._counters: self._counters[counter_no] = 0 self._counters[counter_no] += 1 res = self._counters[counter_no] self._mutex.release() return txaio.create_future_success(res)
[docs] @classmethod def from_seedphrase(cls, seedphrase: str, num_eth_keys: int = 1, num_cs_keys: int = 1) -> 'SecurityModuleMemory': """ Create a new memory-backed security module with 1. ``num_eth_keys`` keys of type :class:`EthereumKey`, followed by 2. ``num_cs_keys`` keys of type :class:`CryptosignKey` computed from a (common) BIP44 seedphrase. :param seedphrase: BIP44 seedphrase to use. :param num_eth_keys: Number of Ethereum keys to derive. :param num_cs_keys: Number of Cryptosign keys to derive. :return: New memory-backed security module instance. """ keys: List[Union[EthereumKey, CryptosignKey]] = [] # first, add num_eth_keys EthereumKey(s), numbering starting at 0 for i in range(num_eth_keys): key = EthereumKey.from_seedphrase(seedphrase, i) keys.append(key) # second, add num_cs_keys CryptosignKey(s), numbering starting at num_eth_keys (!) for i in range(num_cs_keys): key = CryptosignKey.from_seedphrase(seedphrase, i + num_eth_keys) keys.append(key) # initialize security module from collected keys sm = SecurityModuleMemory(keys=keys) return sm
[docs] @classmethod def from_config(cls, config: str, profile: str = 'default') -> 'SecurityModuleMemory': """ Create a new memory-backed security module with keys referred from a profile in the given configuration file. :param config: Path (relative or absolute) to an INI configuration file. :param profile: Name of the profile within the given INI configuration file. :return: New memory-backed security module instance. """ keys: List[Union[EthereumKey, CryptosignKey]] = [] cfg = configparser.ConfigParser() cfg.read(config) if not cfg.has_section(profile): raise RuntimeError('profile "{}" not found in configuration file "{}"'.format(profile, config)) if not cfg.has_option(profile, 'privkey'): raise RuntimeError('missing option "privkey" in profile "{}" of configuration file "{}"'.format(profile, config)) privkey = os.path.join(os.path.dirname(config), cfg.get(profile, 'privkey')) if not os.path.exists(privkey) or not os.path.isfile(privkey): raise RuntimeError('privkey "{}" is not a file in profile "{}" of configuration file "{}"'.format(privkey, profile, config)) # now load the private key file - this returns a dict which should include: # private-key-eth: 6b08b6e186bd2a3b9b2f36e6ece3f8031fe788ab3dc4a1cfd3a489ea387c496b # private-key-ed25519: 20e8c05d0ede9506462bb049c4843032b18e8e75b314583d0c8d8a4942f9be40 data = parse_keyfile(privkey) # first, add Ethereum key privkey_eth_hex = data.get('private-key-eth', None) keys.append(EthereumKey.from_bytes(binascii.a2b_hex(privkey_eth_hex))) # second, add Cryptosign key privkey_ed25519_hex = data.get('private-key-ed25519', None) keys.append(CryptosignKey.from_bytes(binascii.a2b_hex(privkey_ed25519_hex))) # initialize security module from collected keys sm = SecurityModuleMemory(keys=keys) return sm
[docs] @classmethod def from_keyfile(cls, keyfile: str) -> 'SecurityModuleMemory': """ Create a new memory-backed security module with keys referred from a profile in the given configuration file. :param keyfile: Path (relative or absolute) to a private keys file. :return: New memory-backed security module instance. """ keys: List[Union[EthereumKey, CryptosignKey]] = [] if not os.path.exists(keyfile) or not os.path.isfile(keyfile): raise RuntimeError('keyfile "{}" is not a file'.format(keyfile)) # now load the private key file - this returns a dict which should include: # private-key-eth: 6b08b6e186bd2a3b9b2f36e6ece3f8031fe788ab3dc4a1cfd3a489ea387c496b # private-key-ed25519: 20e8c05d0ede9506462bb049c4843032b18e8e75b314583d0c8d8a4942f9be40 data = parse_keyfile(keyfile) # first, add Ethereum key privkey_eth_hex = data.get('private-key-eth', None) if privkey_eth_hex is None: raise RuntimeError('"private-key-eth" not found in keyfile {}'.format(keyfile)) keys.append(EthereumKey.from_bytes(binascii.a2b_hex(privkey_eth_hex))) # second, add Cryptosign key privkey_ed25519_hex = data.get('private-key-ed25519', None) if privkey_ed25519_hex is None: raise RuntimeError('"private-key-ed25519" not found in keyfile {}'.format(keyfile)) keys.append(CryptosignKey.from_bytes(binascii.a2b_hex(privkey_ed25519_hex))) # initialize security module from collected keys sm = SecurityModuleMemory(keys=keys) return sm
ISecurityModule.register(SecurityModuleMemory)