Source code for autobahn.wamp.cryptosign

###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) typedef int GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################

import binascii
import os
import struct
from binascii import a2b_hex, b2a_hex
from typing import Any, Callable, Dict, Optional, Union

import txaio

from autobahn import util
from autobahn.util import parse_keyfile
from autobahn.wamp.interfaces import ICryptosignKey, ISecurityModule
from autobahn.wamp.message import _URI_PAT_REALM_NAME_ETH
from autobahn.wamp.mnemonic import mnemonic_to_private_key
from autobahn.wamp.types import Challenge

__all__ = [
    "HAS_CRYPTOSIGN",
]

try:
    # try to import everything we need for WAMP-cryptosign
    from nacl import bindings, encoding, signing
    from nacl.signing import SignedMessage
except ImportError:
[docs] HAS_CRYPTOSIGN = False
else: HAS_CRYPTOSIGN = True __all__.append("CryptosignKey") def _unpack(keydata): """ Unpack a SSH agent key blob into parts. See: http://blog.oddbit.com/2011/05/08/converting-openssh-public-keys/ """ parts = [] while keydata: # read the length of the data dlen = struct.unpack(">I", keydata[:4])[0] # read in <length> bytes data, keydata = keydata[4 : dlen + 4], keydata[4 + dlen :] parts.append(data) return parts def _pack(keyparts): """ Pack parts into a SSH key blob. """ parts = [] for part in keyparts: parts.append(struct.pack(">I", len(part))) parts.append(part) return b"".join(parts) def _read_ssh_ed25519_pubkey(keydata): """ Parse an OpenSSH Ed25519 public key from a string into a raw public key. Example input: ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIJukDU5fqXv/yVhSirsDWsUFyOodZyCSLxyitPPzWJW9 oberstet@office-corei7 :param keydata: The OpenSSH Ed25519 public key data to parse. :type keydata: str :returns: pair of raw public key (32 bytes) and comment :rtype: tuple """ if type(keydata) != str: raise Exception("invalid type {} for keydata".format(type(keydata))) parts = keydata.strip().split() if len(parts) != 3: raise Exception("invalid SSH Ed25519 public key") algo, keydata, comment = parts if algo != "ssh-ed25519": raise Exception("not a Ed25519 SSH public key (but {})".format(algo)) blob = binascii.a2b_base64(keydata) try: key = _unpack(blob)[1] except Exception as e: raise Exception("could not parse key ({})".format(e)) if len(key) != 32: raise Exception( "invalid length {} for embedded raw key (must be 32 bytes)".format(len(key)) ) return key, comment class _SSHPacketReader: """ Read OpenSSH packet format which is used for key material. """ def __init__(self, packet): self._packet = packet self._idx = 0 self._len = len(packet) def get_remaining_payload(self): return self._packet[self._idx :] def get_bytes(self, size): if self._idx + size > self._len: raise Exception("incomplete packet") value = self._packet[self._idx : self._idx + size] self._idx += size return value def get_uint32(self): return struct.unpack(">I", self.get_bytes(4))[0] def get_string(self): return self.get_bytes(self.get_uint32()) def _makepad(size: int) -> bytes: assert 0 <= size < 255 return b"".join(x.to_bytes(1, byteorder="big") for x in range(1, size + 1)) def _read_ssh_ed25519_privkey(keydata): """ Parse an OpenSSH Ed25519 private key from a string into a raw private key. Example input: -----BEGIN OPENSSH PRIVATE KEY----- b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW QyNTUxOQAAACCbpA1OX6l7/8lYUoq7A1rFBcjqHWcgki8corTz81iVvQAAAKDWjZ0Y1o2d GAAAAAtzc2gtZWQyNTUxOQAAACCbpA1OX6l7/8lYUoq7A1rFBcjqHWcgki8corTz81iVvQ AAAEArodzIMjH9MOBz0X+HDvL06rEJOMYFhzGQ5zXPM7b7fZukDU5fqXv/yVhSirsDWsUF yOodZyCSLxyitPPzWJW9AAAAFm9iZXJzdGV0QG9mZmljZS1jb3JlaTcBAgMEBQYH -----END OPENSSH PRIVATE KEY----- :param keydata: The OpenSSH Ed25519 private key data to parse. :type keydata: str :returns: pair of raw private key (32 bytes) and comment :rtype: tuple """ # Some pointers: # https://github.com/ronf/asyncssh/blob/master/asyncssh/public_key.py # https://github.com/ronf/asyncssh/blob/master/asyncssh/ed25519.py # crypto_sign_ed25519_sk_to_seed # https://github.com/jedisct1/libsodium/blob/master/src/libsodium/crypto_sign/ed25519/sign_ed25519_api.c#L27 # https://tools.ietf.org/html/draft-bjh21-ssh-ed25519-02 # http://blog.oddbit.com/2011/05/08/converting-openssh-public-keys/ SSH_BEGIN = "-----BEGIN OPENSSH PRIVATE KEY-----" SSH_END = "-----END OPENSSH PRIVATE KEY-----" OPENSSH_KEY_V1 = b"openssh-key-v1\0" if not (keydata.startswith(SSH_BEGIN) and keydata.endswith(SSH_END)): raise Exception( "invalid OpenSSH private key (does not start/end with OPENSSH preamble)" ) ssh_end = keydata.find(SSH_END) keydata = keydata[len(SSH_BEGIN) : ssh_end] keydata = "".join(x.strip() for x in keydata.split()) blob = binascii.a2b_base64(keydata) blob = blob[len(OPENSSH_KEY_V1) :] packet = _SSHPacketReader(blob) cipher_name = packet.get_string() kdf = packet.get_string() packet.get_string() # kdf_data nkeys = packet.get_uint32() packet.get_string() # public_key key_data = packet.get_string() mac = packet.get_remaining_payload() block_size = 8 if cipher_name != b"none": raise Exception( "encrypted private keys not supported (please remove the passphrase from your private key or use SSH agent)" ) if kdf != b"none": raise Exception("passphrase encrypted private keys not supported") if nkeys != 1: raise Exception( "multiple private keys in a key file not supported (found {} keys)".format( nkeys ) ) if mac: raise Exception("invalid OpenSSH private key (found remaining payload for mac)") packet = _SSHPacketReader(key_data) packet.get_uint32() # check1 packet.get_uint32() # check2 alg = packet.get_string() if alg != b"ssh-ed25519": raise Exception( 'invalid key type: we only support Ed25519 (found "{}")'.format( alg.decode("ascii") ) ) vk = packet.get_string() sk = packet.get_string() if len(vk) != bindings.crypto_sign_PUBLICKEYBYTES: raise Exception("invalid public key length") if len(sk) != bindings.crypto_sign_SECRETKEYBYTES: raise Exception("invalid public key length") comment = packet.get_string() # comment pad = packet.get_remaining_payload() if len(pad) and (len(pad) >= block_size or pad != _makepad(len(pad))): raise Exception( "invalid OpenSSH private key (padlen={}, actual_pad={}, expected_pad={})".format( len(pad), pad, _makepad(len(pad)) ) ) # secret key (64 octets) = 32 octets seed || 32 octets secret key derived of seed seed = sk[: bindings.crypto_sign_SEEDBYTES] comment = comment.decode("ascii") return seed, comment def _read_signify_ed25519_signature(signature_file): """ Read a Ed25519 signature file created with OpenBSD signify. http://man.openbsd.org/OpenBSD-current/man1/signify.1 """ with open(signature_file) as f: # signature file format: 2nd line is base64 of 'Ed' || 8 random octets || 64 octets Ed25519 signature sig = binascii.a2b_base64(f.read().splitlines()[1])[10:] if len(sig) != 64: raise Exception( "bogus Ed25519 signature: raw signature length was {}, but expected 64".format( len(sig) ) ) return sig def _read_signify_ed25519_pubkey(pubkey_file): """ Read a public key from a Ed25519 key pair created with OpenBSD signify. http://man.openbsd.org/OpenBSD-current/man1/signify.1 """ with open(pubkey_file) as f: # signature file format: 2nd line is base64 of 'Ed' || 8 random octets || 32 octets Ed25519 public key pubkey = binascii.a2b_base64(f.read().splitlines()[1])[10:] if len(pubkey) != 32: raise Exception( "bogus Ed25519 public key: raw key length was {}, but expected 32".format( len(pubkey) ) ) return pubkey def _qrcode_from_signify_ed25519_pubkey(pubkey_file, mode="text"): """ Usage: 1. Get the OpenBSD 5.7 release public key from here http://cvsweb.openbsd.org/cgi-bin/cvsweb/src/etc/signify/Attic/openbsd-57-base.pub?rev=1.1 2. Generate QR Code and print to terminal print(cryptosign._qrcode_from_signify_ed25519_pubkey('openbsd-57-base.pub')) 3. Compare to (scroll down) QR code here https://www.openbsd.org/papers/bsdcan-signify.html """ assert mode in ["text", "svg"] import qrcode with open(pubkey_file) as f: pubkey = f.read().splitlines()[1] qr = qrcode.QRCode(box_size=3, error_correction=qrcode.ERROR_CORRECT_L) qr.add_data(pubkey) if mode == "text": import io with io.StringIO() as data_buffer: qr.print_ascii(out=data_buffer, invert=True) return data_buffer.getvalue() elif mode == "svg": import qrcode.image.svg image = qr.make_image(image_factory=qrcode.image.svg.SvgImage) return image.to_string() else: raise Exception("logic error") def _verify_signify_ed25519_signature(pubkey_file, signature_file, message): """ Verify a Ed25519 signature created with OpenBSD signify. This will raise a `nacl.exceptions.BadSignatureError` if the signature is bad and return silently when the signature is good. Usage: 1. Create a signature: signify-openbsd -S -s ~/.signify/crossbario-trustroot.sec -m .profile 2. Verify the signature from autobahn.wamp import cryptosign with open('.profile', 'rb') as f: message = f.read() cryptosign._verify_signify_ed25519_signature('.signify/crossbario-trustroot.pub', '.profile.sig', message) http://man.openbsd.org/OpenBSD-current/man1/signify.1 """ pubkey = _read_signify_ed25519_pubkey(pubkey_file) verify_key = signing.VerifyKey(pubkey) sig = _read_signify_ed25519_signature(signature_file) verify_key.verify(message, sig) # CryptosignKey from # - raw byte string or file with raw bytes # - SSH private key string or key file # - SSH agent proxy # # VerifyKey from # - raw byte string or file with raw bytes # - SSH public key string or key file if HAS_CRYPTOSIGN: def _format_challenge( challenge: Challenge, channel_id_raw: Optional[bytes], channel_id_type: Optional[str], ) -> bytes: """ Format the challenge based on provided parameters :param challenge: The WAMP-cryptosign challenge object for which a signature should be computed. :param channel_id_raw: The channel ID when channel_id_type is 'tls-unique'. :param channel_id_type: The type of the channel id, currently handles 'tls-unique' and ignores otherwise. """ if not isinstance(challenge, Challenge): raise Exception( "challenge must be instance of autobahn.wamp.types.Challenge, not {}".format( type(challenge) ) ) if "challenge" not in challenge.extra: raise Exception("missing challenge value in challenge.extra") # the challenge sent by the router (a 32 bytes random value) challenge_hex = challenge.extra["challenge"] if type(challenge_hex) != str: raise Exception( "invalid type {} for challenge (expected a hex string)".format( type(challenge_hex) ) ) if len(challenge_hex) != 64: raise Exception( "unexpected challenge (hex) length: was {}, but expected 64".format( len(challenge_hex) ) ) # the challenge for WAMP-cryptosign is a 32 bytes random value in Hex encoding (that is, a unicode string) challenge_raw = binascii.a2b_hex(challenge_hex) if channel_id_type == "tls-unique": assert len(channel_id_raw) == 32, ( "unexpected TLS transport channel ID length (was {}, but expected 32)".format( len(channel_id_raw) ) ) # with TLS channel binding of type "tls-unique", the message to be signed by the client actually # is the XOR of the challenge and the TLS channel ID data = util.xor(challenge_raw, channel_id_raw) elif channel_id_type is None: # when no channel binding was requested, the message to be signed by the client is the challenge only data = challenge_raw else: assert False, 'invalid channel_id_type "{}"'.format(channel_id_type) return data def _sign_challenge(data: bytes, signer_func: Callable) -> bytes: """ Sign the provided data using the provided signer. :param data: challenge to sign :param signer_func: The callable function to use for signing :returns: A Deferred/Future that resolves to the computed signature. :rtype: str """ # a raw byte string is signed, and the signature is also a raw byte string d1 = signer_func(data) # asyncio lacks callback chaining (and we cannot use co-routines, since we want # to support older Pythons), hence we need d2 d2 = txaio.create_future() def process(signature_raw): # convert the raw signature into a hex encode value (unicode string) signature_hex = binascii.b2a_hex(signature_raw).decode("ascii") # we return the concatenation of the signature and the message signed (96 bytes) data_hex = binascii.b2a_hex(data).decode("ascii") sig = signature_hex + data_hex txaio.resolve(d2, sig) txaio.add_callbacks(d1, process, None) return d2 class CryptosignKey(object): """ A cryptosign private key for signing, and hence usable for authentication or a public key usable for verification (but can't be used for signing). """ def __init__( self, key, can_sign: bool, security_module: Optional[ISecurityModule] = None, key_no: Optional[int] = None, comment: Optional[str] = None, ) -> None: if not ( isinstance(key, signing.VerifyKey) or isinstance(key, signing.SigningKey) ): raise Exception("invalid type {} for key".format(type(key))) assert (can_sign and isinstance(key, signing.SigningKey)) or ( not can_sign and isinstance(key, signing.VerifyKey) ) self._key = key self._can_sign = can_sign self._security_module = security_module self._key_no = key_no self._comment = comment @property def security_module(self) -> Optional["ISecurityModule"]: """ Implements :meth:`autobahn.wamp.interfaces.IKey.security_module`. """ return self._security_module @property def key_no(self) -> Optional[int]: """ Implements :meth:`autobahn.wamp.interfaces.IKey.key_no`. """ return self._key_no @property def comment(self) -> Optional[str]: """ Implements :meth:`autobahn.wamp.interfaces.IKey.comment`. """ return self._comment @property def key_type(self) -> str: """ Implements :meth:`autobahn.wamp.interfaces.IKey.key_type`. """ return "cryptosign" @property def can_sign(self) -> bool: """ Implements :meth:`autobahn.wamp.interfaces.IKey.can_sign`. """ return self._can_sign def sign(self, data: bytes) -> bytes: """ Implements :meth:`autobahn.wamp.interfaces.IKey.sign`. """ if not self._can_sign: raise Exception("a signing key required to sign") if type(data) != bytes: raise Exception("data to be signed must be binary") sig: SignedMessage = self._key.sign(data) # we only return the actual signature! if we return "sig", # it gets coerced into the concatenation of message + signature # not sure which order, but we don't want that. we only want # the signature return txaio.create_future_success(sig.signature) def sign_challenge( self, challenge: Challenge, channel_id: Optional[bytes] = None, channel_id_type: Optional[str] = None, ) -> bytes: """ Implements :meth:`autobahn.wamp.interfaces.ICryptosignKey.sign_challenge`. """ assert challenge.method in [ "cryptosign", "cryptosign-proxy", ], 'unexpected cryptosign challenge with method "{}"'.format( challenge.method ) data = _format_challenge(challenge, channel_id, channel_id_type) return _sign_challenge(data, self.sign) def public_key(self, binary: bool = False) -> Union[str, bytes]: """ Returns the public key part of a signing key or the (public) verification key. :returns: The public key in Hex encoding. :rtype: str or None """ if isinstance(self._key, signing.SigningKey): key = self._key.verify_key else: key = self._key if binary: return key.encode() else: return key.encode(encoder=encoding.HexEncoder).decode("ascii") @classmethod def from_pubkey( cls, pubkey: bytes, comment: Optional[str] = None ) -> "CryptosignKey": if not (comment is None or type(comment) == str): raise ValueError("invalid type {} for comment".format(type(comment))) if type(pubkey) != bytes: raise ValueError( "invalid key type {} (expected binary)".format(type(pubkey)) ) if len(pubkey) != 32: raise ValueError( "invalid key length {} (expected 32)".format(len(pubkey)) ) return cls(key=signing.VerifyKey(pubkey), can_sign=False, comment=comment) @classmethod def from_bytes( cls, key: bytes, comment: Optional[str] = None ) -> "CryptosignKey": if not (comment is None or type(comment) == str): raise ValueError("invalid type {} for comment".format(type(comment))) if type(key) != bytes: raise ValueError( "invalid key type {} (expected binary)".format(type(key)) ) if len(key) != 32: raise ValueError("invalid key length {} (expected 32)".format(len(key))) return cls(key=signing.SigningKey(key), can_sign=True, comment=comment) @classmethod def from_file( cls, filename: str, comment: Optional[str] = None ) -> "CryptosignKey": """ Load an Ed25519 (private) signing key (actually, the seed for the key) from a raw file of 32 bytes length. This can be any random byte sequence, such as generated from Python code like os.urandom(32) or from the shell dd if=/dev/urandom of=client02.key bs=1 count=32 :param filename: Filename of the key. :param comment: Comment for key (optional). """ if not (comment is None or type(comment) == str): raise Exception("invalid type {} for comment".format(type(comment))) if type(filename) != str: raise Exception("invalid type {} for filename".format(filename)) with open(filename, "rb") as f: key_data = f.read() return cls.from_bytes(key_data, comment=comment) @classmethod def from_ssh_file(cls, filename: str) -> "CryptosignKey": """ Load an Ed25519 key from a SSH key file. The key file can be a (private) signing key (from a SSH private key file) or a (public) verification key (from a SSH public key file). A private key file must be passphrase-less. """ with open(filename, "rb") as f: key_data = f.read().decode("utf-8").strip() return cls.from_ssh_bytes(key_data) @classmethod def from_ssh_bytes(cls, key_data: str) -> "CryptosignKey": """ Load an Ed25519 key from SSH key file. The key file can be a (private) signing key (from a SSH private key file) or a (public) verification key (from a SSH public key file). A private key file must be passphrase-less. """ SSH_BEGIN = "-----BEGIN OPENSSH PRIVATE KEY-----" if key_data.startswith(SSH_BEGIN): # OpenSSH private key key_data, comment = _read_ssh_ed25519_privkey(key_data) key = signing.SigningKey(key_data, encoder=encoding.RawEncoder) can_sign = True else: # OpenSSH public key key_data, comment = _read_ssh_ed25519_pubkey(key_data) key = signing.VerifyKey(key_data) can_sign = False return cls(key=key, can_sign=can_sign, comment=comment) @classmethod def from_seedphrase(cls, seedphrase: str, index: int = 0) -> "CryptosignKey": """ Create a private key from the given BIP-39 mnemonic seed phrase and index, which can be used to sign and create signatures. :param seedphrase: The BIP-39 seedphrase ("Mnemonic") from which to derive the account. :param index: The account index in account hierarchy defined by the seedphrase. :return: New instance of :class:`EthereumKey` """ # BIP44 path for WAMP # https://github.com/wamp-proto/wamp-proto/issues/401 # https://github.com/satoshilabs/slips/pull/1322 derivation_path = "m/44'/655'/0'/0/{}".format(index) key_raw = mnemonic_to_private_key(seedphrase, derivation_path) assert type(key_raw) == bytes assert len(key_raw) == 32 # create WAMP-Cryptosign key object from raw bytes key = cls.from_bytes(key_raw) return key @classmethod def from_keyfile(cls, keyfile: str) -> "CryptosignKey": """ Create a public or private key from reading the given public or private key file. Here is an example key file that includes an CryptosignKey private key ``private-key-ed25519``, which is loaded in this function, and other fields, which are ignored by this function: .. code-block:: This is a comment (all lines until the first empty line are comments indeed). creator: oberstet@intel-nuci7 created-at: 2022-07-05T12:29:48.832Z user-id: oberstet@intel-nuci7 public-key-ed25519: 7326d9dc0307681cc6940fde0e60eb31a6e4d642a81e55c434462ce31f95deed public-adr-eth: 0x10848feBdf7f200Ba989CDf7E3eEB3EC03ae7768 private-key-ed25519: f750f42b0430e28a2e272c3cedcae4dcc4a1cf33bc345c35099d3322626ab666 private-key-eth: 4d787714dcb0ae52e1c5d2144648c255d660b9a55eac9deeb80d9f506f501025 :param keyfile: Path (relative or absolute) to a public or private keys file. :return: New instance of :class:`CryptosignKey` """ if not os.path.exists(keyfile) or not os.path.isfile(keyfile): raise RuntimeError('keyfile "{}" is not a file'.format(keyfile)) # now load the private or public key file - this returns a dict which should # include (for a private key): # # private-key-ed25519: 20e8c05d0ede9506462bb049c4843032b18e8e75b314583d0c8d8a4942f9be40 # # or (for a public key only): # # public-key-ed25519: 7326d9dc0307681cc6940fde0e60eb31a6e4d642a81e55c434462ce31f95deed # data = parse_keyfile(keyfile) privkey_ed25519_hex = data.get("private-key-ed25519", None) if privkey_ed25519_hex is None: pubkey_ed25519_hex = data.get("public-key-ed25519", None) if pubkey_ed25519_hex is None: raise RuntimeError( 'neither "private-key-ed25519" nor "public-key-ed25519" found in keyfile {}'.format( keyfile ) ) else: return CryptosignKey.from_pubkey( binascii.a2b_hex(pubkey_ed25519_hex) ) else: return CryptosignKey.from_bytes(binascii.a2b_hex(privkey_ed25519_hex)) ICryptosignKey.register(CryptosignKey) class CryptosignAuthextra(object): """ WAMP-Cryptosign authextra object. """ __slots__ = [ "_pubkey", "_trustroot", "_challenge", "_channel_binding", "_channel_id", "_realm", "_chain_id", "_block_no", "_delegate", "_seeder", "_bandwidth", "_signature", ] def __init__( self, pubkey: Optional[bytes] = None, challenge: Optional[bytes] = None, channel_binding: Optional[str] = None, channel_id: Optional[bytes] = None, # domain address, certificates are verified against owner of the domain trustroot: Optional[bytes] = None, # FIXME: add delegate address # FIXME: add certificates # FIXME: remove reservation realm: Optional[bytes] = None, chain_id: Optional[int] = None, block_no: Optional[int] = None, delegate: Optional[bytes] = None, seeder: Optional[bytes] = None, bandwidth: Optional[int] = None, signature: Optional[bytes] = None, ): if pubkey: assert len(pubkey) == 32 if trustroot: assert len(trustroot) == 20 if challenge: assert len(challenge) == 32 if channel_binding: assert channel_binding in ["tls-unique"] if channel_id: assert len(channel_id) == 32 if realm: assert len(realm) == 20 if delegate: assert len(delegate) == 20 if seeder: assert len(seeder) == 20 if signature: assert len(signature) == 65 self._pubkey = pubkey self._trustroot = trustroot self._challenge = challenge self._channel_binding = channel_binding self._channel_id = channel_id self._realm = realm self._chain_id = chain_id self._block_no = block_no self._delegate = delegate self._seeder = seeder self._bandwidth = bandwidth self._signature = signature @property def pubkey(self) -> Optional[bytes]: return self._pubkey @pubkey.setter def pubkey(self, value: Optional[bytes]): assert value is None or len(value) == 20 self._pubkey = value @property def trustroot(self) -> Optional[bytes]: return self._trustroot @trustroot.setter def trustroot(self, value: Optional[bytes]): assert value is None or len(value) == 20 self._trustroot = value @property def challenge(self) -> Optional[bytes]: return self._challenge @challenge.setter def challenge(self, value: Optional[bytes]): assert value is None or len(value) == 32 self._challenge = value @property def channel_binding(self) -> Optional[str]: return self._channel_binding @channel_binding.setter def channel_binding(self, value: Optional[str]): assert value is None or value in ["tls-unique"] self._channel_binding = value @property def channel_id(self) -> Optional[bytes]: return self._channel_id @channel_id.setter def channel_id(self, value: Optional[bytes]): assert value is None or len(value) == 32 self._channel_id = value @property def realm(self) -> Optional[bytes]: return self._realm @realm.setter def realm(self, value: Optional[bytes]): assert value is None or len(value) == 20 self._realm = value @property def chain_id(self) -> Optional[int]: return self._chain_id @chain_id.setter def chain_id(self, value: Optional[int]): assert value is None or value > 0 self._chain_id = value @property def block_no(self) -> Optional[int]: return self._block_no @block_no.setter def block_no(self, value: Optional[int]): assert value is None or value > 0 self._block_no = value @property def delegate(self) -> Optional[bytes]: return self._delegate @delegate.setter def delegate(self, value: Optional[bytes]): assert value is None or len(value) == 20 self._delegate = value @property def seeder(self) -> Optional[bytes]: return self._seeder @seeder.setter def seeder(self, value: Optional[bytes]): assert value is None or len(value) == 20 self._seeder = value @property def bandwidth(self) -> Optional[int]: return self._bandwidth @bandwidth.setter def bandwidth(self, value: Optional[int]): assert value is None or value > 0 self._bandwidth = value @property def signature(self) -> Optional[bytes]: return self._signature @signature.setter def signature(self, value: Optional[bytes]): assert value is None or len(value) == 65 self._signature = value @staticmethod def parse(data: Dict[str, Any]) -> "CryptosignAuthextra": obj = CryptosignAuthextra() pubkey = data.get("pubkey", None) if pubkey is not None: if type(pubkey) != str: raise ValueError("invalid type {} for pubkey".format(type(pubkey))) if len(pubkey) != 32 * 2: raise ValueError("invalid length {} of pubkey".format(len(pubkey))) obj._pubkey = a2b_hex(pubkey) challenge = data.get("challenge", None) if challenge is not None: if type(challenge) != str: raise ValueError( "invalid type {} for challenge".format(type(challenge)) ) if len(challenge) != 32 * 2: raise ValueError( "invalid length {} of challenge".format(len(challenge)) ) obj._challenge = a2b_hex(challenge) channel_binding = data.get("channel_binding", None) if channel_binding is not None: if type(channel_binding) != str: raise ValueError( "invalid type {} for channel_binding".format( type(channel_binding) ) ) if channel_binding not in ["tls-unique"]: raise ValueError( 'invalid value "{}" for channel_binding'.format(channel_binding) ) obj._channel_binding = channel_binding channel_id = data.get("channel_id", None) if channel_id is not None: if type(channel_id) != str: raise ValueError( "invalid type {} for channel_id".format(type(channel_id)) ) if len(channel_id) != 32 * 2: raise ValueError( "invalid length {} of channel_id".format(len(channel_id)) ) obj._channel_id = a2b_hex(channel_id) trustroot = data.get("trustroot", None) if trustroot is not None: if type(trustroot) != str: raise ValueError( "invalid type {} for trustroot - expected a string".format( type(trustroot) ) ) if not _URI_PAT_REALM_NAME_ETH.match(trustroot): raise ValueError( 'invalid value "{}" for trustroot - expected an Ethereum address'.format( type(trustroot) ) ) obj._trustroot = a2b_hex(trustroot[2:]) reservation = data.get("reservation", None) if reservation is not None: if type(reservation) != dict: raise ValueError( "invalid type {} for reservation".format(type(reservation)) ) chain_id = reservation.get("chain_id", None) if chain_id is not None: if type(chain_id) != int: raise ValueError( "invalid type {} for reservation.chain_id - expected an integer".format( type(chain_id) ) ) obj._chain_id = chain_id block_no = reservation.get("block_no", None) if block_no is not None: if type(block_no) != int: raise ValueError( "invalid type {} for reservation.block_no - expected an integer".format( type(block_no) ) ) obj._block_no = block_no realm = reservation.get("realm", None) if realm is not None: if type(realm) != str: raise ValueError( "invalid type {} for reservation.realm - expected a string".format( type(realm) ) ) if not _URI_PAT_REALM_NAME_ETH.match(realm): raise ValueError( 'invalid value "{}" for reservation.realm - expected an Ethereum address'.format( type(realm) ) ) obj._realm = a2b_hex(realm[2:]) delegate = reservation.get("delegate", None) if delegate is not None: if type(delegate) != str: raise ValueError( "invalid type {} for reservation.delegate - expected a string".format( type(delegate) ) ) if not _URI_PAT_REALM_NAME_ETH.match(delegate): raise ValueError( 'invalid value "{}" for reservation.delegate - expected an Ethereum address'.format( type(delegate) ) ) obj._delegate = a2b_hex(delegate[2:]) seeder = reservation.get("seeder", None) if seeder is not None: if type(seeder) != str: raise ValueError( "invalid type {} for reservation.seeder - expected a string".format( type(seeder) ) ) if not _URI_PAT_REALM_NAME_ETH.match(seeder): raise ValueError( 'invalid value "{}" for reservation.seeder - expected an Ethereum address'.format( type(seeder) ) ) obj._seeder = a2b_hex(seeder[2:]) bandwidth = reservation.get("bandwidth", None) if bandwidth is not None: if type(bandwidth) != int: raise ValueError( "invalid type {} for reservation.bandwidth - expected an integer".format( type(bandwidth) ) ) obj._bandwidth = bandwidth signature = data.get("signature", None) if signature is not None: if type(signature) != str: raise ValueError( "invalid type {} for signature".format(type(signature)) ) if len(signature) != 65 * 2: raise ValueError( "invalid length {} of signature".format(len(signature)) ) obj._signature = a2b_hex(signature) return obj def marshal(self) -> Dict[str, Any]: res = {} # FIXME: marshal check-summed eth addresses if self._pubkey is not None: res["pubkey"] = b2a_hex(self._pubkey).decode() if self._challenge is not None: res["challenge"] = b2a_hex(self._challenge).decode() if self._channel_binding is not None: res["channel_binding"] = self._channel_binding if self._channel_id is not None: res["channel_id"] = b2a_hex(self._channel_id).decode() if self._trustroot is not None: res["trustroot"] = "0x" + b2a_hex(self._trustroot).decode() reservation = {} if self._chain_id is not None: reservation["chain_id"] = self._chain_id if self._block_no is not None: reservation["block_no"] = self._block_no if self._realm is not None: reservation["realm"] = "0x" + b2a_hex(self._realm).decode() if self._delegate is not None: reservation["delegate"] = "0x" + b2a_hex(self._delegate).decode() if self._seeder is not None: reservation["seeder"] = "0x" + b2a_hex(self._seeder).decode() if self._bandwidth is not None: reservation["bandwidth"] = self._bandwidth if reservation: res["reservation"] = reservation if self._signature is not None: res["signature"] = b2a_hex(self._signature).decode() return res __all__.extend( ["CryptosignAuthextra", "CryptosignKey", "format_challenge", "sign_challenge"] )