###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
import os.path
import pprint
from binascii import a2b_hex
from typing import Dict, Any, Optional, List
import web3
import cbor2
from py_eth_sig_utils.eip712 import encode_typed_data
from autobahn.wamp.message import _URI_PAT_REALM_NAME_ETH
from autobahn.xbr._secmod import EthereumKey
from ._eip712_base import sign, recover, is_chain_id, is_address, is_block_number, is_signature, is_eth_privkey
from ._eip712_certificate import EIP712Certificate
[docs]def create_eip712_authority_certificate(chainId: int,
verifyingContract: bytes,
validFrom: int,
issuer: bytes,
subject: bytes,
realm: bytes,
capabilities: int,
meta: str) -> dict:
"""
Authority certificate: long-lived, on-chain L2.
:param chainId:
:param verifyingContract:
:param validFrom:
:param issuer:
:param subject:
:param realm:
:param capabilities:
:param meta:
:return:
"""
assert is_chain_id(chainId)
assert is_address(verifyingContract)
assert is_block_number(validFrom)
assert is_address(issuer)
assert is_address(subject)
assert is_address(realm)
assert type(capabilities) == int and 0 <= capabilities <= 2 ** 53
assert meta is None or type(meta) == str
data = {
'types': {
'EIP712Domain': [
{
'name': 'name',
'type': 'string'
},
{
'name': 'version',
'type': 'string'
},
],
'EIP712AuthorityCertificate': [
{
'name': 'chainId',
'type': 'uint256'
},
{
'name': 'verifyingContract',
'type': 'address'
},
{
'name': 'validFrom',
'type': 'uint256'
},
{
'name': 'issuer',
'type': 'address'
},
{
'name': 'subject',
'type': 'address'
},
{
'name': 'realm',
'type': 'address'
},
{
'name': 'capabilities',
'type': 'uint64'
},
{
'name': 'meta',
'type': 'string'
}
]
},
'primaryType': 'EIP712AuthorityCertificate',
'domain': {
'name': 'WMP',
'version': '1',
},
'message': {
'chainId': chainId,
'verifyingContract': verifyingContract,
'validFrom': validFrom,
'issuer': issuer,
'subject': subject,
'realm': realm,
'capabilities': capabilities,
'meta': meta or '',
}
}
return data
[docs]def sign_eip712_authority_certificate(eth_privkey: bytes,
chainId: int,
verifyingContract: bytes,
validFrom: int,
issuer: bytes,
subject: bytes,
realm: bytes,
capabilities: int,
meta: str) -> bytes:
"""
Sign the given data using a EIP712 based signature with the provided private key.
:param eth_privkey:
:param chainId:
:param verifyingContract:
:param validFrom:
:param issuer:
:param subject:
:param realm:
:param capabilities:
:param meta:
:return:
"""
assert is_eth_privkey(eth_privkey)
data = create_eip712_authority_certificate(chainId, verifyingContract, validFrom, issuer,
subject, realm, capabilities, meta)
return sign(eth_privkey, data)
[docs]def recover_eip712_authority_certificate(chainId: int,
verifyingContract: bytes,
validFrom: int,
issuer: bytes,
subject: bytes,
realm: bytes,
capabilities: int,
meta: str,
signature: bytes) -> bytes:
"""
Recover the signer address the given EIP712 signature was signed with.
:param chainId:
:param verifyingContract:
:param validFrom:
:param issuer:
:param subject:
:param realm:
:param capabilities:
:param meta:
:param signature:
:return: The (computed) signer address the signature was signed with.
"""
assert is_signature(signature)
data = create_eip712_authority_certificate(chainId, verifyingContract, validFrom, issuer,
subject, realm, capabilities, meta)
return recover(data, signature)
[docs]class EIP712AuthorityCertificate(EIP712Certificate):
CAPABILITY_ROOT_CA = 1
CAPABILITY_INTERMEDIATE_CA = 2
CAPABILITY_PUBLIC_RELAY = 4
CAPABILITY_PRIVATE_RELAY = 8
CAPABILITY_PROVIDER = 16
CAPABILITY_CONSUMER = 32
__slots__ = (
# EIP712 attributes
'chainId',
'verifyingContract',
'validFrom',
'issuer',
'subject',
'realm',
'capabilities',
'meta',
# additional attributes
'signatures',
'hash',
)
def __init__(self, chainId: int, verifyingContract: bytes, validFrom: int, issuer: bytes, subject: bytes,
realm: bytes, capabilities: int, meta: str,
signatures: Optional[List[bytes]] = None):
super().__init__(chainId, verifyingContract, validFrom)
self.issuer = issuer
self.subject = subject
self.realm = realm
self.capabilities = capabilities
self.meta = meta
self.signatures = signatures
eip712 = create_eip712_authority_certificate(chainId,
verifyingContract,
validFrom,
issuer,
subject,
realm,
capabilities,
meta)
self.hash = encode_typed_data(eip712)
[docs] def __eq__(self, other: Any) -> bool:
if not isinstance(other, self.__class__):
return False
if not EIP712AuthorityCertificate.__eq__(self, other):
return False
if other.chainId != self.chainId:
return False
if other.verifyingContract != self.verifyingContract:
return False
if other.validFrom != self.validFrom:
return False
if other.issuer != self.issuer:
return False
if other.subject != self.subject:
return False
if other.realm != self.realm:
return False
if other.capabilities != self.capabilities:
return False
if other.meta != self.meta:
return False
if other.signatures != self.signatures:
return False
if other.hash != self.hash:
return False
return True
[docs] def __ne__(self, other: Any) -> bool:
return not self.__eq__(other)
[docs] def __str__(self) -> str:
return pprint.pformat(self.marshal())
[docs] def sign(self, key: EthereumKey, binary: bool = False) -> bytes:
eip712 = create_eip712_authority_certificate(self.chainId,
self.verifyingContract,
self.validFrom,
self.issuer,
self.subject,
self.realm,
self.capabilities,
self.meta)
return key.sign_typed_data(eip712, binary=binary)
[docs] def recover(self, signature: bytes) -> bytes:
return recover_eip712_authority_certificate(self.chainId,
self.verifyingContract,
self.validFrom,
self.issuer,
self.subject,
self.realm,
self.capabilities,
self.meta,
signature)
[docs] def marshal(self, binary: bool = False) -> Dict[str, Any]:
obj = create_eip712_authority_certificate(chainId=self.chainId,
verifyingContract=self.verifyingContract,
validFrom=self.validFrom,
issuer=self.issuer,
subject=self.subject,
realm=self.realm,
capabilities=self.capabilities,
meta=self.meta)
if not binary:
obj['message']['verifyingContract'] = web3.Web3.toChecksumAddress(obj['message']['verifyingContract']) if obj['message']['verifyingContract'] else None
obj['message']['issuer'] = web3.Web3.toChecksumAddress(obj['message']['issuer']) if obj['message']['issuer'] else None
obj['message']['subject'] = web3.Web3.toChecksumAddress(obj['message']['subject']) if obj['message']['subject'] else None
obj['message']['realm'] = web3.Web3.toChecksumAddress(obj['message']['realm']) if obj['message']['realm'] else None
return obj
[docs] @staticmethod
def parse(obj, binary: bool = False) -> 'EIP712AuthorityCertificate':
if type(obj) != dict:
raise ValueError('invalid type {} for object in EIP712AuthorityCertificate.parse'.format(type(obj)))
primaryType = obj.get('primaryType', None)
if primaryType != 'EIP712AuthorityCertificate':
raise ValueError('invalid primaryType "{}" - expected "EIP712AuthorityCertificate"'.format(primaryType))
# FIXME: check EIP712 types, domain
data = obj.get('message', None)
if type(data) != dict:
raise ValueError('invalid type {} for EIP712AuthorityCertificate'.format(type(data)))
for k in data:
if k not in ['type', 'chainId', 'verifyingContract', 'validFrom', 'issuer', 'subject',
'realm', 'capabilities', 'meta']:
raise ValueError('invalid attribute "{}" in EIP712AuthorityCertificate'.format(k))
_type = data.get('type', None)
if _type and _type != 'EIP712AuthorityCertificate':
raise ValueError('unexpected type "{}" in EIP712AuthorityCertificate'.format(_type))
chainId = data.get('chainId', None)
if chainId is None:
raise ValueError('missing chainId in EIP712AuthorityCertificate')
if type(chainId) != int:
raise ValueError('invalid type {} for chainId in EIP712AuthorityCertificate'.format(type(chainId)))
verifyingContract = data.get('verifyingContract', None)
if verifyingContract is None:
raise ValueError('missing verifyingContract in EIP712AuthorityCertificate')
if binary:
if type(verifyingContract) != bytes:
raise ValueError(
'invalid type {} for verifyingContract in EIP712AuthorityCertificate'.format(type(verifyingContract)))
if len(verifyingContract) != 20:
raise ValueError('invalid value length {} of verifyingContract'.format(len(verifyingContract)))
else:
if type(verifyingContract) != str:
raise ValueError(
'invalid type {} for verifyingContract in EIP712AuthorityCertificate'.format(type(verifyingContract)))
if not _URI_PAT_REALM_NAME_ETH.match(verifyingContract):
raise ValueError(
'invalid value "{}" for verifyingContract in EIP712AuthorityCertificate'.format(verifyingContract))
verifyingContract = a2b_hex(verifyingContract[2:])
validFrom = data.get('validFrom', None)
if validFrom is None:
raise ValueError('missing validFrom in EIP712AuthorityCertificate')
if type(validFrom) != int:
raise ValueError('invalid type {} for validFrom in EIP712AuthorityCertificate'.format(type(validFrom)))
issuer = data.get('issuer', None)
if issuer is None:
raise ValueError('missing issuer in EIP712AuthorityCertificate')
if binary:
if type(issuer) != bytes:
raise ValueError(
'invalid type {} for issuer in EIP712AuthorityCertificate'.format(type(issuer)))
if len(issuer) != 20:
raise ValueError('invalid value length {} of issuer'.format(len(issuer)))
else:
if type(issuer) != str:
raise ValueError('invalid type {} for issuer in EIP712AuthorityCertificate'.format(type(issuer)))
if not _URI_PAT_REALM_NAME_ETH.match(issuer):
raise ValueError('invalid value "{}" for issuer in EIP712AuthorityCertificate'.format(issuer))
issuer = a2b_hex(issuer[2:])
subject = data.get('subject', None)
if subject is None:
raise ValueError('missing subject in EIP712AuthorityCertificate')
if binary:
if type(subject) != bytes:
raise ValueError(
'invalid type {} for subject in EIP712AuthorityCertificate'.format(type(subject)))
if len(subject) != 20:
raise ValueError('invalid value length {} of verifyingContract'.format(len(subject)))
else:
if type(subject) != str:
raise ValueError('invalid type {} for subject in EIP712AuthorityCertificate'.format(type(subject)))
if not _URI_PAT_REALM_NAME_ETH.match(subject):
raise ValueError('invalid value "{}" for subject in EIP712AuthorityCertificate'.format(subject))
subject = a2b_hex(subject[2:])
realm = data.get('realm', None)
if realm is None:
raise ValueError('missing realm in EIP712AuthorityCertificate')
if binary:
if type(realm) != bytes:
raise ValueError(
'invalid type {} for realm in EIP712AuthorityCertificate'.format(type(realm)))
if len(realm) != 20:
raise ValueError('invalid value length {} of realm'.format(len(realm)))
else:
if type(realm) != str:
raise ValueError('invalid type {} for realm in EIP712AuthorityCertificate'.format(type(realm)))
if not _URI_PAT_REALM_NAME_ETH.match(realm):
raise ValueError('invalid value "{}" for realm in EIP712AuthorityCertificate'.format(realm))
realm = a2b_hex(realm[2:])
capabilities = data.get('capabilities', None)
if capabilities is None:
raise ValueError('missing capabilities in EIP712AuthorityCertificate')
if type(capabilities) != int:
raise ValueError('invalid type {} for capabilities in EIP712AuthorityCertificate'.format(type(capabilities)))
meta = data.get('meta', None)
if meta is None:
raise ValueError('missing meta in EIP712AuthorityCertificate')
if type(meta) != str:
raise ValueError('invalid type {} for meta in EIP712AuthorityCertificate'.format(type(meta)))
obj = EIP712AuthorityCertificate(chainId=chainId, verifyingContract=verifyingContract, validFrom=validFrom,
issuer=issuer, subject=subject, realm=realm, capabilities=capabilities, meta=meta)
return obj
[docs] def save(self, filename: str) -> int:
"""
Save certificate to file. File format (serialized as CBOR):
[cert_hash: bytes, cert_eip712: Dict[str, Any], cert_signatures: List[bytes]]
:param filename:
:return:
"""
cert_obj = [self.hash, self.marshal(binary=True), self.signatures or []]
with open(filename, 'wb') as f:
data = cbor2.dumps(cert_obj)
f.write(data)
return len(data)
[docs] @staticmethod
def load(filename) -> 'EIP712AuthorityCertificate':
"""
Load certificate from file.
:param filename:
:return:
"""
if not os.path.isfile(filename):
raise RuntimeError('cannot create EIP712AuthorityCertificate from filename "{}": not a file'.format(filename))
with open(filename, 'rb') as f:
cert_hash, cert_eip712, cert_signatures = cbor2.loads(f.read())
cert = EIP712AuthorityCertificate.parse(cert_eip712, binary=True)
assert cert_hash == cert.hash
cert.signatures = cert_signatures
return cert