Source code for autobahn.xbr._schema

###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
import json
import os
import io
import pprint
import hashlib
import textwrap
from pathlib import Path
from pprint import pformat
from typing import Union, Dict, List, Optional, IO, Any, Tuple
from collections.abc import Sequence

# FIXME
# https://github.com/google/yapf#example-as-a-module
from yapf.yapflib.yapf_api import FormatCode

import txaio
from autobahn.wamp.exception import InvalidPayload
from autobahn.util import hlval

from zlmdb.flatbuffers.reflection.Schema import Schema as _Schema
from zlmdb.flatbuffers.reflection.BaseType import BaseType as _BaseType
from zlmdb.flatbuffers.reflection.Field import Field


[docs]class FbsType(object): """ Flatbuffers type. See: https://github.com/google/flatbuffers/blob/11a19887053534c43f73e74786b46a615ecbf28e/reflection/reflection.fbs#L33 """ __slots__ = ('_repository', '_schema', '_basetype', '_element', '_index', '_objtype', '_elementtype') UType = _BaseType.UType # scalar types Bool = _BaseType.Bool Byte = _BaseType.Byte UByte = _BaseType.UByte Short = _BaseType.Short UShort = _BaseType.UShort Int = _BaseType.Int UInt = _BaseType.UInt Long = _BaseType.Long ULong = _BaseType.ULong Float = _BaseType.Float Double = _BaseType.Double String = _BaseType.String SCALAR_TYPES = [_BaseType.Bool, _BaseType.Byte, _BaseType.UByte, _BaseType.Short, _BaseType.UShort, _BaseType.Int, _BaseType.UInt, _BaseType.Long, _BaseType.ULong, _BaseType.Float, _BaseType.Double, _BaseType.String] # structured types Vector = _BaseType.Vector Obj = _BaseType.Obj Union = _BaseType.Union STRUCTURED_TYPES = [_BaseType.Vector, _BaseType.Obj, _BaseType.Union] FBS2PY = { _BaseType.UType: 'int', _BaseType.Bool: 'bool', _BaseType.Byte: 'bytes', _BaseType.UByte: 'int', _BaseType.Short: 'int', _BaseType.UShort: 'int', _BaseType.Int: 'int', _BaseType.UInt: 'int', _BaseType.Long: 'int', _BaseType.ULong: 'int', _BaseType.Float: 'float', _BaseType.Double: 'float', _BaseType.String: 'str', _BaseType.Vector: 'List', _BaseType.Obj: 'object', _BaseType.Union: 'Union', } FBS2PY_TYPE = { _BaseType.UType: int, _BaseType.Bool: bool, _BaseType.Byte: int, _BaseType.UByte: int, _BaseType.Short: int, _BaseType.UShort: int, _BaseType.Int: int, _BaseType.UInt: int, _BaseType.Long: int, _BaseType.ULong: int, _BaseType.Float: float, _BaseType.Double: float, _BaseType.String: str, _BaseType.Vector: list, _BaseType.Obj: dict, # _BaseType.Union: 'Union', } FBS2FLAGS = { _BaseType.Bool: 'BoolFlags', _BaseType.Byte: 'Int8Flags', _BaseType.UByte: 'Uint8Flags', _BaseType.Short: 'Int16Flags', _BaseType.UShort: 'Uint16Flags', _BaseType.Int: 'Int32Flags', _BaseType.UInt: 'Uint32Flags', _BaseType.Long: 'Int64Flags', _BaseType.ULong: 'Uint64Flags', _BaseType.Float: 'Float32Flags', _BaseType.Double: 'Float64Flags', } FBS2PREPEND = { _BaseType.Bool: 'PrependBoolSlot', _BaseType.Byte: 'PrependInt8Slot', _BaseType.UByte: 'PrependUint8Slot', _BaseType.Short: 'PrependInt16Slot', _BaseType.UShort: 'PrependUint16Slot', _BaseType.Int: 'PrependInt32Slot', _BaseType.UInt: 'PrependUint32Slot', _BaseType.Long: 'PrependInt64Slot', _BaseType.ULong: 'PrependUint64Slot', _BaseType.Float: 'PrependFloat32Slot', _BaseType.Double: 'PrependFloat64Slot', } FBS2STR = { _BaseType.UType: 'UType', _BaseType.Bool: 'Bool', _BaseType.Byte: 'Byte', _BaseType.UByte: 'UByte', _BaseType.Short: 'Short', _BaseType.UShort: 'UShort', _BaseType.Int: 'Int', _BaseType.UInt: 'UInt', _BaseType.Long: 'Long', _BaseType.ULong: 'ULong', _BaseType.Float: 'Float', _BaseType.Double: 'Double', _BaseType.String: 'String', _BaseType.Vector: 'Vector', _BaseType.Obj: 'Obj', _BaseType.Union: 'Union', } STR2FBS = { 'UType': _BaseType.UType, 'Bool': _BaseType.Bool, 'Byte': _BaseType.Byte, 'UByte': _BaseType.UByte, 'Short': _BaseType.Short, 'UShort': _BaseType.UShort, 'Int': _BaseType.Int, 'UInt': _BaseType.UInt, 'Long': _BaseType.Long, 'ULong': _BaseType.ULong, 'Float': _BaseType.Float, 'Double': _BaseType.Double, 'String': _BaseType.String, 'Vector': _BaseType.Vector, 'Obj': _BaseType.Obj, 'Union': _BaseType.Union, } def __init__(self, repository: 'FbsRepository', schema: 'FbsSchema', basetype: int, element: int, index: int, objtype: Optional[str] = None, elementtype: Optional[str] = None): self._repository = repository self._schema = schema self._basetype = basetype self._element = element self._elementtype = elementtype self._index = index self._objtype = objtype @property def repository(self) -> 'FbsRepository': return self._repository @property def schema(self) -> 'FbsSchema': return self._schema @property def basetype(self) -> int: """ Flatbuffers base type. :return: """ return self._basetype @property def element(self) -> int: """ Only if basetype == Vector :return: """ return self._element @property def index(self) -> int: """ If basetype == Object, index into "objects". If base_type == Union, UnionType, or integral derived from an enum, index into "enums". If base_type == Vector && element == Union or UnionType. :return: """ return self._index @property def elementtype(self) -> Optional[str]: """ If basetype == Vector, fully qualified element type name. :return: """ # lazy-resolve of element type index to element type name. this is important (!) # to decouple from loading order of type objects if self._basetype == FbsType.Vector and self._elementtype is None: if self._element == FbsType.Obj: self._elementtype = self._schema.objs_by_id[self._index].name # print('filled in missing elementtype "{}" for element type index {} in vector'.format(self._elementtype, self._index)) else: assert False, 'FIXME' return self._elementtype @property def objtype(self) -> Optional[str]: """ If basetype == Object, fully qualified object type name. :return: """ # lazy-resolve of object type index to object type name. this is important (!) # to decouple from loading order of type objects if self._basetype == FbsType.Obj and self._objtype is None: self._objtype = self._schema.objs_by_id[self._index].name # print('filled in missing objtype "{}" for object type index {} in object'.format(self._objtype, self._index)) return self._objtype
[docs] def map(self, language: str, attrs: Optional[Dict] = None, required: Optional[bool] = True, objtype_as_string: bool = False) -> str: """ :param language: :param attrs: :param required: :param objtype_as_string: :return: """ if language == 'python': _mapped_type = None if self.basetype == FbsType.Vector: # vectors of uint8 are mapped to byte strings if self.element == FbsType.UByte: if attrs and 'uuid' in attrs: _mapped_type = 'uuid.UUID' else: _mapped_type = 'bytes' # whereas all other vectors are mapped to list of the same element type else: if self.objtype: # FIXME _mapped_type = 'List[{}]'.format(self.objtype.split('.')[-1]) # _mapped_type = 'List[{}.{}]'.format(self._repository.render_to_basemodule, self.objtype) else: _mapped_type = 'List[{}]'.format(FbsType.FBS2PY[self.element]) elif self.basetype == FbsType.Obj: if self.objtype: # FIXME _mapped_type = self.objtype.split('.')[-1] # _mapped_type = '{}.{}'.format(self._repository.render_to_basemodule, self.objtype) else: _mapped_type = 'List[{}]'.format(FbsType.FBS2PY[self.element]) elif self.basetype in FbsType.SCALAR_TYPES + [FbsType.UType, FbsType.Union]: # FIXME: follow up processing of Unions (UType/Union) if self.basetype == FbsType.ULong and attrs and 'timestamp' in attrs: _mapped_type = 'np.datetime64' else: _mapped_type = FbsType.FBS2PY[self.basetype] else: raise NotImplementedError( 'FIXME: implement mapping of FlatBuffers type "{}" to Python in {}'.format(self.basetype, self.map)) if objtype_as_string and self.basetype == FbsType.Obj: # for object types, use 'TYPE' rather than TYPE so that the type reference # does not depend on type declaration order within a single file # https://peps.python.org/pep-0484/#forward-references if required: return "'{}'".format(_mapped_type) else: return "Optional['{}']".format(_mapped_type) else: if required: return '{}'.format(_mapped_type) else: return 'Optional[{}]'.format(_mapped_type) else: raise RuntimeError('cannot map FlatBuffers type to target language "{}" in {}'.format(language, self.map))
[docs] def __str__(self) -> str: return '\n{}\n'.format(pprint.pformat(self.marshal()))
[docs] def marshal(self) -> Dict[str, Any]: # important: use properties, not private object attribute access (!) obj = { 'basetype': self.FBS2STR.get(self.basetype, None), 'element': self.FBS2STR.get(self.element, None), 'index': self.index, 'objtype': self.objtype, } return obj
[docs]class FbsAttribute(object): def __init__(self): pass
[docs] def __str__(self): return ''.format()
[docs]class FbsField(object): __slots__ = ('_repository', '_schema', '_name', '_type', '_id', '_offset', '_default_int', '_default_real', '_deprecated', '_required', '_attrs', '_docs') def __init__(self, repository: 'FbsRepository', schema: 'FbsSchema', name: str, type: FbsType, id: int, offset: int, default_int: int, default_real: float, deprecated: bool, required: bool, attrs: Dict[str, FbsAttribute], docs: str): self._repository = repository self._schema = schema self._name = name self._type = type self._id = id self._offset = offset self._default_int = default_int self._default_real = default_real self._deprecated = deprecated self._required = required self._attrs = attrs self._docs = docs @property def repository(self) -> 'FbsRepository': return self._repository @property def schema(self) -> 'FbsSchema': return self._schema @property def name(self) -> str: return self._name @property def type(self) -> FbsType: return self._type @property def id(self) -> int: return self._id @property def offset(self) -> int: return self._offset @property def default_int(self) -> int: return self._default_int @property def default_real(self) -> float: return self._default_real @property def deprecated(self) -> bool: return self._deprecated @property def required(self) -> bool: return self._required @property def attrs(self) -> Dict[str, FbsAttribute]: return self._attrs @property def docs(self) -> str: return self._docs
[docs] def __str__(self) -> str: return '\n{}\n'.format(pprint.pformat(self.marshal()))
[docs] def marshal(self) -> Dict[str, Any]: obj = { 'name': self._name, 'type': self._type.marshal() if self._type else None, 'id': self._id, 'offset': self._offset, 'default_int': self._default_int, 'default_real': self._default_real, 'deprecated': self._deprecated, 'required': self._required, 'attrs': {}, 'docs': self._docs, } if self._attrs: for k, v in self._attrs.items(): obj['attrs'][k] = v return obj
[docs]def parse_attr(obj): attrs = {} for j in range(obj.AttributesLength()): fbs_attr = obj.Attributes(j) attr_key = fbs_attr.Key() if attr_key: attr_key = attr_key.decode('utf8') attr_value = fbs_attr.Value() if attr_value: attr_value = attr_value.decode('utf8') assert attr_key not in attrs attrs[attr_key] = attr_value return attrs
[docs]def parse_docs(obj): docs = [] for j in range(obj.DocumentationLength()): doc_line = obj.Documentation(j) if doc_line: doc_line = doc_line.decode('utf8').strip() docs.append(doc_line) # docs = '\n'.join(docs).strip() docs = ' '.join(docs).strip() return docs
[docs]def parse_fields(repository, schema, obj, objs_lst=None): # table Object { // Used for both tables and structs. # ... # fields:[Field] (required); // Sorted. # ... # } # https://github.com/google/flatbuffers/blob/11a19887053534c43f73e74786b46a615ecbf28e/reflection/reflection.fbs#L91 fields_by_name = {} # the type index of a field is stored in ``fbs_field.Id()``, whereas the index of the field # within the list of fields is different (!) because that list is alphabetically sorted (!). # thus, we need to fill this map to recover the type index ordered list of fields field_id_to_name = {} for j in range(obj.FieldsLength()): fbs_field: Field = obj.Fields(j) field_name = fbs_field.Name() if field_name: field_name = field_name.decode('utf8') field_id = int(fbs_field.Id()) # IMPORTANT: this is NOT true, since j is according to sort-by-name # assert field_id == j # instead, maintain this map to recover sort-by-position order later field_id_to_name[field_id] = field_name fbs_field_type = fbs_field.Type() # we use lazy-resolve for this property _objtype = None # # FIXME # _objtype = None # if fbs_field_type.Index() >= 0: # if len(objs_lst) > fbs_field_type.Index(): # _obj = objs_lst[fbs_field_type.Index()] # _objtype = _obj.name field_type = FbsType(repository=repository, schema=schema, basetype=fbs_field_type.BaseType(), element=fbs_field_type.Element(), index=fbs_field_type.Index(), objtype=_objtype) field = FbsField(repository=repository, schema=schema, name=field_name, type=field_type, id=field_id, offset=fbs_field.Offset(), default_int=fbs_field.DefaultInteger(), default_real=fbs_field.DefaultReal(), deprecated=fbs_field.Deprecated(), required=fbs_field.Required(), attrs=parse_attr(fbs_field), docs=parse_docs(fbs_field)) assert field_name not in fields_by_name, 'field "{}" with id "{}" already in fields {}'.format(field_name, field_id, sorted(fields_by_name.keys())) fields_by_name[field_name] = field # recover the type index ordered list of fields fields_by_id = [] for i in range(len(fields_by_name)): fields_by_id.append(fields_by_name[field_id_to_name[i]]) return fields_by_name, fields_by_id
[docs]def parse_calls(repository, schema, svc_obj, objs_lst=None): calls = {} calls_by_id = {} for j in range(svc_obj.CallsLength()): fbs_call = svc_obj.Calls(j) call_name = fbs_call.Name() if call_name: call_name = call_name.decode('utf8') # FIXME: schema reflection.RPCCall lacks "Id" (!) # call_id = int(fbs_call.Id()) call_id = j fbs_call_req = fbs_call.Request() call_req_name = fbs_call_req.Name() if call_req_name: call_req_name = call_req_name.decode('utf8') call_req_declaration_file = fbs_call_req.DeclarationFile() if call_req_declaration_file: call_req_declaration_file = call_req_declaration_file.decode('utf8') call_req_is_struct = fbs_call_req.IsStruct() call_req_min_align = fbs_call_req.Minalign() call_req_bytesize = fbs_call_req.Bytesize() call_req_docs = parse_docs(fbs_call_req) call_req_attrs = parse_attr(fbs_call_req) call_req_fields, call_fields_by_id = parse_fields(repository, schema, fbs_call_req, objs_lst=objs_lst) call_req = FbsObject(repository=repository, schema=schema, declaration_file=call_req_declaration_file, name=call_req_name, fields=call_req_fields, fields_by_id=call_fields_by_id, is_struct=call_req_is_struct, min_align=call_req_min_align, bytesize=call_req_bytesize, attrs=call_req_attrs, docs=call_req_docs) fbs_call_resp = fbs_call.Response() call_resp_name = fbs_call_resp.Name() if call_resp_name: call_resp_name = call_resp_name.decode('utf8') call_resp_declaration_file = fbs_call_resp.DeclarationFile() if call_resp_declaration_file: call_resp_declaration_file = call_resp_declaration_file.decode('utf8') call_resp_is_struct = fbs_call_resp.IsStruct() call_resp_min_align = fbs_call_resp.Minalign() call_resp_bytesize = fbs_call_resp.Bytesize() call_resp_docs = parse_docs(fbs_call_resp) call_resp_attrs = parse_attr(fbs_call_resp) call_resp_fields, call_resp_fields_by_id = parse_fields(repository, schema, fbs_call_resp, objs_lst=objs_lst) call_resp = FbsObject(repository=repository, schema=schema, declaration_file=call_resp_declaration_file, name=call_resp_name, fields=call_resp_fields, fields_by_id=call_resp_fields_by_id, is_struct=call_resp_is_struct, min_align=call_resp_min_align, bytesize=call_resp_bytesize, attrs=call_resp_attrs, docs=call_resp_docs) call_docs = parse_docs(fbs_call) call_attrs = parse_attr(fbs_call) call = FbsRPCCall(repository=repository, schema=schema, name=call_name, id=call_id, request=call_req, response=call_resp, docs=call_docs, attrs=call_attrs) assert call_name not in calls, 'call "{}" with id "{}" already in calls {}'.format(call_name, call_id, sorted(calls.keys())) calls[call_name] = call assert call_id not in calls_by_id, 'call "{}" with id " {}" already in calls {}'.format(call_name, call_id, sorted(calls.keys())) calls_by_id[call_id] = call_name res = [] for _, value in sorted(calls_by_id.items()): res.append(value) calls_by_id = res return calls, calls_by_id
[docs]class FbsObject(object): __slots__ = ('_repository', '_schema', '_declaration_file', '_name', '_fields', '_fields_by_id', '_is_struct', '_min_align', '_bytesize', '_attrs', '_docs', 'modulename', 'classname', 'module_relimport') def __init__(self, repository: 'FbsRepository', schema: 'FbsSchema', declaration_file: str, name: str, fields: Dict[str, FbsField], fields_by_id: List[FbsField], is_struct: bool, min_align: int, bytesize: int, attrs: Dict[str, FbsAttribute], docs: str): self._repository = repository self._schema = schema self._declaration_file = declaration_file self._name = name self._fields = fields self._fields_by_id = fields_by_id self._is_struct = is_struct self._min_align = min_align self._bytesize = bytesize self._attrs = attrs self._docs = docs
[docs] def map(self, language: str, required: Optional[bool] = True, objtype_as_string: bool = False) -> str: if language == 'python': klass = self._name.split('.')[-1] if objtype_as_string: # for object types, use 'TYPE' rather than TYPE so that the type reference # does not depend on type declaration order within a single file # https://peps.python.org/pep-0484/#forward-references if required: return "'{}'".format(klass) else: return "Optional['{}']".format(klass) else: if required: return '{}'.format(klass) else: return 'Optional[{}]'.format(klass) else: raise NotImplementedError()
[docs] def map_import(self, language: str) -> str: if language == 'python': base = self._name.split('.')[-2] klass = self._name.split('.')[-1] return 'from {} import {}'.format(base, klass) else: raise NotImplementedError()
@property def repository(self) -> 'FbsRepository': return self._repository @property def schema(self) -> 'FbsSchema': return self._schema @property def declaration_file(self) -> str: return self._declaration_file @property def name(self) -> str: return self._name @property def fields(self) -> Dict[str, FbsField]: return self._fields @property def fields_by_id(self) -> List[FbsField]: return self._fields_by_id @property def is_struct(self) -> bool: return self._is_struct @property def min_align(self) -> int: return self._min_align @property def bytesize(self) -> int: return self._bytesize @property def attrs(self) -> Dict[str, FbsAttribute]: return self._attrs @property def docs(self) -> str: return self._docs
[docs] def __str__(self) -> str: return '\n{}\n'.format(pprint.pformat(self.marshal()))
[docs] def marshal(self) -> Dict[str, Any]: obj = { 'name': self._name, 'declaration_file': self._declaration_file, 'fields': {}, 'is_struct': self._is_struct, 'min_align': self._min_align, 'bytesize': self._bytesize, 'attrs': {}, 'docs': self._docs, } if self._fields: for k, v in self._fields.items(): obj['fields'][k] = v.marshal() if v else None if self._attrs: for k, v in self._attrs.items(): obj['attrs'][k] = v return obj
[docs] @staticmethod def parse(repository, schema, fbs_obj, objs_lst=None): obj_name = fbs_obj.Name() if obj_name: obj_name = obj_name.decode('utf8') obj_declaration_file = fbs_obj.DeclarationFile() if obj_declaration_file: obj_declaration_file = obj_declaration_file.decode('utf8') obj_docs = parse_docs(fbs_obj) obj_attrs = parse_attr(fbs_obj) fields_by_name, fields_by_id = parse_fields(repository, schema, fbs_obj, objs_lst=objs_lst) # print('ok, parsed fields in object "{}": {}'.format(obj_name, fields_by_name)) obj = FbsObject(repository=repository, schema=schema, declaration_file=obj_declaration_file, name=obj_name, fields=fields_by_name, fields_by_id=fields_by_id, is_struct=fbs_obj.IsStruct(), min_align=fbs_obj.Minalign(), bytesize=fbs_obj.Bytesize(), attrs=obj_attrs, docs=obj_docs) return obj
[docs]class FbsRPCCall(object): def __init__(self, repository: 'FbsRepository', schema: 'FbsSchema', name: str, id: int, request: FbsObject, response: FbsObject, docs: str, attrs: Dict[str, FbsAttribute]): self._repository = repository self._schema = schema self._name = name self._id = id self._request = request self._response = response self._docs = docs self._attrs = attrs @property def repository(self): return self._repository @property def schema(self): return self._schema @property def name(self): return self._name @property def id(self): return self._id @property def request(self): return self._request @property def response(self): return self._response @property def docs(self): return self._docs @property def attrs(self): return self._attrs
[docs] def __str__(self): return '\n{}\n'.format(pprint.pformat(self.marshal()))
[docs] def marshal(self): obj = { 'name': self._name, 'request': self._request.marshal() if self._request else None, 'response': self._response.marshal() if self._response else None, 'attrs': {}, 'docs': self._docs, } if self._attrs: for k, v in self._attrs.items(): obj['attrs'][k] = v return obj
[docs]class FbsService(object): def __init__(self, repository: 'FbsRepository', schema: 'FbsSchema', declaration_file: str, name: str, calls: Dict[str, FbsRPCCall], calls_by_id: List[FbsRPCCall], attrs: Dict[str, FbsAttribute], docs: str): self._repository = repository self._schema = schema self._declaration_file = declaration_file self._name = name self._calls = calls self._calls_by_id = calls_by_id self._attrs = attrs self._docs = docs @property def repository(self): return self._repository @property def schema(self): return self._schema @property def declaration_file(self): return self._declaration_file @property def name(self): return self._name @property def calls(self): return self._calls @property def calls_by_id(self): return self._calls_by_id @property def attrs(self): return self._attrs @property def docs(self): return self._docs
[docs] def __str__(self): return '\n{}\n'.format(pprint.pformat(self.marshal()))
[docs] def marshal(self): obj = { 'name': self._name, 'declaration_file': self._declaration_file, 'calls': {}, 'attrs': {}, 'docs': self._docs, } if self._calls: for k, v in self._calls.items(): obj['calls'][k] = v.marshal() if self._attrs: for k, v in self._attrs.items(): obj['attrs'][k] = v return obj
[docs]class FbsEnumValue(object): def __init__(self, repository: 'FbsRepository', schema: 'FbsSchema', name: str, id: int, value, docs): """ :param repository: :param name: :param value: :param docs: """ self._repository = repository self._schema = schema self._name = name self._id = id self._value = value self._attrs = {} self._docs = docs @property def repository(self): return self._repository @property def schema(self): return self._schema @property def name(self): return self._name @property def id(self): return self._id @property def value(self): return self._value @property def attrs(self): return self._attrs @property def docs(self): return self._docs
[docs] def __str__(self): return '\n{}\n'.format(pprint.pformat(self.marshal()))
[docs] def marshal(self): obj = { 'id': self._id, 'name': self._name, 'attrs': self._attrs, 'docs': self._docs, 'value': self._value, } if self._attrs: for k, v in self._attrs.items(): obj['attrs'][k] = v return obj
[docs]class FbsEnum(object): """ FlatBuffers enum type. See https://github.com/google/flatbuffers/blob/11a19887053534c43f73e74786b46a615ecbf28e/reflection/reflection.fbs#L61 """ def __init__(self, repository: 'FbsRepository', schema: 'FbsSchema', declaration_file: str, name: str, id: int, values: Dict[str, FbsEnumValue], values_by_id: List[FbsEnumValue], is_union: bool, underlying_type: int, attrs: Dict[str, FbsAttribute], docs: str): self._repository = repository self._schema = schema self._declaration_file = declaration_file self._name = name self._id = id self._values = values self._values_by_id = values_by_id self._is_union = is_union # zlmdb.flatbuffers.reflection.Type.Type self._underlying_type = underlying_type self._attrs = attrs self._docs = docs @property def repository(self): return self._repository @property def schema(self): return self._schema @property def declaration_file(self): return self._declaration_file @property def name(self): return self._name @property def id(self): return self._id @property def values(self): return self._values @property def values_by_id(self): return self._values_by_id @property def is_union(self): return self._is_union @property def underlying_type(self): return self._underlying_type @property def attrs(self): return self._attrs @property def docs(self): return self._docs
[docs] def __str__(self): return '\n{}\n'.format(pprint.pformat(self.marshal()))
[docs] def marshal(self): obj = { 'name': self._name, 'id': self._id, 'values': {}, 'is_union': self._is_union, 'underlying_type': FbsType.FBS2STR.get(self._underlying_type, None), 'attrs': {}, 'docs': self._docs, } if self._values: for k, v in self._values.items(): obj['values'][k] = v.marshal() if self._attrs: for k, v in self._attrs.items(): obj['attrs'][k] = v return obj
[docs]class FbsSchema(object): """ """ def __init__(self, repository: 'FbsRepository', file_name: str, file_sha256: str, file_size: int, file_ident: str, file_ext: str, fbs_files: List[Dict[str, str]], root_table: FbsObject, root: _Schema, objs: Optional[Dict[str, FbsObject]] = None, objs_by_id: Optional[List[FbsObject]] = None, enums: Optional[Dict[str, FbsEnum]] = None, enums_by_id: Optional[List[FbsEnum]] = None, services: Optional[Dict[str, FbsService]] = None, services_by_id: Optional[List[FbsService]] = None): """ :param repository: :param file_name: :param file_sha256: :param file_size: :param file_ident: :param file_ext: :param fbs_files: :param root_table: :param root: :param objs: :param objs_by_id: :param enums: :param enums_by_id: :param services: :param services_by_id: """ self._repository = repository self._file_name = file_name self._file_sha256 = file_sha256 self._file_size = file_size self._file_ident = file_ident self._file_ext = file_ext self._fbs_files = fbs_files self._root_table = root_table self._root = root self._objs = objs self._objs_by_id = objs_by_id self._enums = enums self._enums_by_id = enums_by_id self._services = services self._services_by_id = services_by_id @property def repository(self): return self._repository @property def file_name(self): return self._file_name @property def file_sha256(self): return self._file_sha256 @property def file_size(self): return self._file_size @property def file_ident(self): return self._file_ident @property def file_ext(self): return self._file_ext @property def fbs_files(self): return self._fbs_files @property def root_table(self): return self._root_table @property def root(self): return self._root @property def objs(self): return self._objs @property def objs_by_id(self): return self._objs_by_id @property def enums(self): return self._enums @property def enums_by_id(self): return self._enums_by_id @property def services(self): return self._services @property def services_by_id(self): return self._services_by_id
[docs] def __str__(self): return '\n{}\n'.format(pprint.pformat(self.marshal(), width=255))
[docs] def marshal(self) -> Dict[str, object]: """ :return: """ obj = { 'schema': { 'ident': self._file_ident, 'ext': self._file_ext, 'name': os.path.basename(self._file_name) if self._file_name else None, 'files': self._fbs_files, 'sha256': self._file_sha256, 'size': self._file_size, 'objects': len(self._objs), 'enums': len(self._enums), 'services': len(self._services), }, 'root_table': self._root_table.marshal() if self._root_table else None, 'enums': {}, 'objects': {}, 'services': {}, } if self._enums: for k, v in self._enums.items(): obj['enums'][k] = v.marshal() if self._objs: for k, v in self._objs.items(): obj['objects'][k] = v.marshal() if self._services: for k, v in self._services.items(): obj['services'][k] = v.marshal() return obj
[docs] @staticmethod def load(repository: 'FbsRepository', sfile: Union[str, io.RawIOBase, IO[bytes]], filename: Optional[str] = None) -> 'FbsSchema': """ :param repository: :param sfile: :param filename: :return: """ data: bytes if type(sfile) == str and os.path.isfile(sfile): with open(sfile, 'rb') as fd: data = fd.read() else: data = sfile.read() m = hashlib.sha256() m.update(data) # print('loading schema file "{}" ({} bytes, SHA256 0x{})'.format(filename, len(data), m.hexdigest())) # get root object in Flatbuffers reflection schema # see: https://github.com/google/flatbuffers/blob/master/reflection/reflection.fbs root = _Schema.GetRootAsSchema(data, 0) file_ident = root.FileIdent() if file_ident is not None: file_ident = file_ident.decode('utf8') file_ext = root.FileExt() if file_ext is not None: file_ext = file_ext.decode('utf8') fbs_files = [] for i in range(root.FbsFilesLength()): # zlmdb.flatbuffers.reflection.SchemaFile.SchemaFile schema_file = root.FbsFiles(i) schema_file_filename = schema_file.Filename() if schema_file_filename: schema_file_filename = schema_file_filename.decode('utf8') schema_file_included_filenames = [] for j in range(schema_file.IncludedFilenamesLength()): included_filename = schema_file.IncludedFilenames(j) if included_filename: included_filename = included_filename.decode('utf8') schema_file_included_filenames.append(included_filename) fbs_files.append( { 'filename': schema_file_filename, 'included_filenames': schema_file_included_filenames, } ) root_table = root.RootTable() if root_table is not None: root_table = FbsObject.parse(repository, root_table) schema = FbsSchema(repository=repository, file_name=filename, file_size=len(data), file_sha256=m.hexdigest(), file_ident=file_ident, file_ext=file_ext, fbs_files=fbs_files, root_table=root_table, root=root) # enum types from the schema by name and by index enums = {} enums_by_id = [] for i in range(root.EnumsLength()): fbs_enum = root.Enums(i) enum_name = fbs_enum.Name() if enum_name: enum_name = enum_name.decode('utf8') enum_declaration_file = fbs_enum.DeclarationFile() if enum_declaration_file: enum_declaration_file = enum_declaration_file.decode('utf8') enum_underlying_type = fbs_enum.UnderlyingType() enum_values = {} enum_values_by_id = [] for j in range(fbs_enum.ValuesLength()): fbs_enum_value = fbs_enum.Values(j) enum_value_name = fbs_enum_value.Name() if enum_value_name: enum_value_name = enum_value_name.decode('utf8') enum_value_value = fbs_enum_value.Value() enum_value_docs = parse_docs(fbs_enum_value) enum_value = FbsEnumValue(repository=repository, schema=schema, name=enum_value_name, id=j, value=enum_value_value, docs=enum_value_docs) assert enum_value_name not in enum_values enum_values[enum_value_name] = enum_value enum_values_by_id.append(enum_value) enum = FbsEnum(repository=repository, schema=schema, declaration_file=enum_declaration_file, name=enum_name, id=i, values=enum_values, values_by_id=enum_values_by_id, is_union=fbs_enum.IsUnion(), underlying_type=enum_underlying_type, attrs=parse_attr(fbs_enum), docs=parse_docs(fbs_enum)) assert enum_name not in enums enums[enum_name] = enum enums_by_id.append(enum) schema._enums = enums schema._enums_by_id = enums_by_id # type objects (structs and tables) from the schema by name and by index objs = {} objs_by_id = [] for i in range(root.ObjectsLength()): fbs_obj = root.Objects(i) obj = FbsObject.parse(repository, schema, fbs_obj, objs_lst=objs_by_id) assert obj.name not in objs objs[obj.name] = obj objs_by_id.append(obj) # print('ok, processed schema object "{}"'.format(obj.name)) schema._objs = objs schema._objs_by_id = objs_by_id # service type objects (interfaces) from the schema by name and by index services = {} services_by_id = [] for i in range(root.ServicesLength()): svc_obj = root.Services(i) svc_name = svc_obj.Name() if svc_name: svc_name = svc_name.decode('utf8') svc_declaration_file = svc_obj.DeclarationFile() if svc_declaration_file: svc_declaration_file = svc_declaration_file.decode('utf8') docs = parse_docs(svc_obj) attrs = parse_attr(svc_obj) calls, calls_by_id = parse_calls(repository, schema, svc_obj, objs_lst=objs_by_id) service = FbsService(repository=repository, schema=schema, declaration_file=svc_declaration_file, name=svc_name, calls=calls, calls_by_id=calls_by_id, attrs=attrs, docs=docs) assert svc_name not in services services[svc_name] = service services_by_id.append(service) schema._services = services schema._services_by_id = services_by_id return schema
[docs]def validate_scalar(field, value: Optional[Any]): # print('validate scalar "{}" for type {} (attrs={})'.format(field.name, # FbsType.FBS2STR[field.type.basetype], # field.attrs)) if field.type.basetype in FbsType.FBS2PY_TYPE: expected_type = FbsType.FBS2PY_TYPE[field.type.basetype] if type(value) != expected_type: raise InvalidPayload('invalid type {} for value, expected {}'.format(type(value), expected_type)) else: assert False, 'FIXME'
[docs]class FbsRepository(object): """ crossbar.interfaces.IInventory - add: FbsRepository[] - load: FbsSchema[] https://github.com/google/flatbuffers/blob/master/reflection/reflection.fbs """ def __init__(self, basemodule: str): self.log = txaio.make_logger() self._basemodule = basemodule self._schemata: Dict[str, FbsSchema] = {} self._objs: Dict[str, FbsObject] = {} self._enums: Dict[str, FbsEnum] = {} self._services: Dict[str, FbsService] = {}
[docs] @staticmethod def from_archive(filename: str) -> 'FbsRepository': catalog = FbsRepository() return catalog
[docs] @staticmethod def from_address(address: str) -> 'FbsRepository': catalog = FbsRepository() return catalog
@property def basemodule(self) -> str: return self._basemodule @property def schemas(self) -> Dict[str, FbsSchema]: return self._schemata @property def objs(self) -> Dict[str, FbsObject]: return self._objs @property def enums(self) -> Dict[str, FbsEnum]: return self._enums @property def services(self) -> Dict[str, FbsService]: return self._services @property def total_count(self): return len(self._objs) + len(self._enums) + len(self._services)
[docs] def load(self, filename: str) -> Tuple[int, int]: """ Load and add all schemata from Flatbuffers binary schema files (`*.bfbs`) found in the given directory. Alternatively, a path to a single schema file can be provided. :param filename: Filesystem path of a directory or single file from which to load and add Flatbuffers schemata. """ file_dups = 0 load_from_filenames = [] if os.path.isdir(filename): for path in Path(filename).rglob('*.bfbs'): fn = os.path.join(filename, path.name) if fn not in self._schemata: load_from_filenames.append(fn) else: # print('duplicate schema file skipped ("{}" already loaded)'.format(fn)) file_dups += 1 elif os.path.isfile(filename): if filename not in self._schemata: load_from_filenames.append(filename) else: # print('duplicate schema file skipped ("{}" already loaded)'.format(filename)) file_dups += 1 elif ',' in filename: for filename_single in filename.split(','): filename_single = os.path.expanduser(filename_single) # filename_single = os.path.expandvars(filename_single) if os.path.isfile(filename_single): if filename_single not in self._schemata: load_from_filenames.append(filename_single) else: print('duplicate schema file skipped ("{}" already loaded)'.format(filename_single)) else: raise RuntimeError('"{}" in list is not a file'.format(filename_single)) else: raise RuntimeError('cannot open schema file or directory: "{}"'.format(filename)) enum_dups = 0 obj_dups = 0 svc_dups = 0 # iterate over all schema files found for fn in load_from_filenames: # load this schema file schema: FbsSchema = FbsSchema.load(self, fn) # add enum types to repository by name for enum in schema.enums.values(): if enum.name in self._enums: # print('skipping duplicate enum type for name "{}"'.format(enum.name)) enum_dups += 1 else: self._enums[enum.name] = enum # add object types for obj in schema.objs.values(): if obj.name in self._objs: # print('skipping duplicate object (table/struct) type for name "{}"'.format(obj.name)) obj_dups += 1 else: self._objs[obj.name] = obj # add service definitions ("APIs") for svc in schema.services.values(): if svc.name in self._services: # print('skipping duplicate service type for name "{}"'.format(svc.name)) svc_dups += 1 else: self._services[svc.name] = svc self._schemata[fn] = schema type_dups = enum_dups + obj_dups + svc_dups return file_dups, type_dups
[docs] def summary(self, keys=False): if keys: return { 'schemata': sorted(self._schemata.keys()), 'objs': sorted(self._objs.keys()), 'enums': sorted(self._enums.keys()), 'services': sorted(self._services.keys()), } else: return { 'schemata': len(self._schemata), 'objs': len(self._objs), 'enums': len(self._enums), 'services': len(self._services), }
[docs] def print_summary(self): # brown = (160, 110, 50) # brown = (133, 51, 51) brown = (51, 133, 255) # steel_blue = (70, 130, 180) orange = (255, 127, 36) # deep_pink = (255, 20, 147) # light_pink = (255, 102, 204) # pink = (204, 82, 163) pink = (127, 127, 127) for obj_key, obj in self.objs.items(): prefix_uri = obj.attrs.get('wampuri', self._basemodule) obj_name = obj_key.split('.')[-1] obj_color = 'blue' if obj.is_struct else brown obj_label = '{} {}'.format('Struct' if obj.is_struct else 'Table', obj_name) print('{}\n'.format(hlval(' {} {} {}'.format('====', obj_label, '=' * (118 - len(obj_label))), color=obj_color))) # print(' {} {} {}\n'.format(obj_kind, hlval(obj_name, color=obj_color), '=' * (120 - len(obj_name)))) if prefix_uri: print(' Type URI: {}.{}'.format(hlval(prefix_uri), hlval(obj_name))) else: print(' Type URI: {}'.format(hlval(obj_name))) print() print(textwrap.fill(obj.docs, width=100, initial_indent=' ', subsequent_indent=' ', expand_tabs=True, replace_whitespace=True, fix_sentence_endings=False, break_long_words=True, drop_whitespace=True, break_on_hyphens=True, tabsize=4)) print() for field in obj.fields_by_id: docs = textwrap.wrap(field.docs, width=70, initial_indent='', subsequent_indent='', expand_tabs=True, replace_whitespace=True, fix_sentence_endings=False, break_long_words=True, drop_whitespace=True, break_on_hyphens=True, tabsize=4) if field.type.basetype == FbsType.Obj: type_desc_str = field.type.objtype.split('.')[-1] if self.objs[field.type.objtype].is_struct: type_desc = hlval(type_desc_str, color='blue') else: type_desc = hlval(type_desc_str, color=brown) elif field.type.basetype == FbsType.Vector: type_desc_str = 'Vector[{}]'.format(FbsType.FBS2STR[field.type.element]) type_desc = hlval(type_desc_str, color='white') else: type_desc_str = FbsType.FBS2STR[field.type.basetype] type_desc = hlval(type_desc_str, color='white') if field.attrs: attrs_text_str = '(' + ', '.join(field.attrs.keys()) + ')' attrs_text = hlval(attrs_text_str, color=pink) type_text_str = ' '.join([type_desc_str, attrs_text_str]) type_text = ' '.join([type_desc, attrs_text]) else: type_text_str = type_desc_str type_text = type_desc # print('>>', len(type_text_str), len(type_text)) print(' {:<36} {} {}'.format(hlval(field.name), type_text + ' ' * (28 - len(type_text_str)), docs[0] if docs else '')) for line in docs[1:]: print(' ' * 57 + line) print() for svc_key, svc in self.services.items(): prefix_uri = svc.attrs.get('wampuri', self._basemodule) ifx_uuid = svc.attrs.get('uuid', None) ifc_name = svc_key.split('.')[-1] ifc_label = 'Interface {}'.format(ifc_name) print('{}\n'.format(hlval(' {} {} {}'.format('====', ifc_label, '=' * (118 - len(ifc_label))), color='yellow'))) print(' Interface UUID: {}'.format(hlval(ifx_uuid))) print(' Interface URIs: {}.({}|{})'.format(hlval(prefix_uri), hlval('procedure', color=orange), hlval('topic', color='green'))) print() print(textwrap.fill(svc.docs, width=100, initial_indent=' ', subsequent_indent=' ', expand_tabs=True, replace_whitespace=True, fix_sentence_endings=False, break_long_words=True, drop_whitespace=True, break_on_hyphens=True, tabsize=4)) for uri in svc.calls.keys(): print() ep: FbsRPCCall = svc.calls[uri] ep_type = ep.attrs['type'] ep_color = {'topic': 'green', 'procedure': orange}.get(ep_type, 'white') # uri_long = '{}.{}'.format(hlval(prefix_uri, color=(127, 127, 127)), # hlval(ep.attrs.get('wampuri', ep.name), color='white')) uri_short = '{}'.format(hlval(ep.attrs.get('wampuri', ep.name), color=(255, 255, 255))) print(' {} {} ({}) -> {}'.format(hlval(ep_type, color=ep_color), uri_short, hlval(ep.request.name.split('.')[-1], color='blue', bold=False), hlval(ep.response.name.split('.')[-1], color='blue', bold=False))) print() print(textwrap.fill(ep.docs, width=90, initial_indent=' ', subsequent_indent=' ', expand_tabs=True, replace_whitespace=True, fix_sentence_endings=False, break_long_words=True, drop_whitespace=True, break_on_hyphens=True, tabsize=4)) print()
[docs] def render(self, jinja2_env, output_dir, output_lang): """ :param jinja2_env: :param output_dir: :param output_lang: :return: """ # type categories in schemata in the repository # work = { 'obj': self.objs.values(), 'enum': self.enums.values(), 'service': self.services.values(), } # collect code sections by module # code_modules = {} test_code_modules = {} is_first_by_category_modules = {} for category, values in work.items(): # generate and collect code for all FlatBuffers items in the given category # and defined in schemata previously loaded int for item in values: assert isinstance(item, FbsObject) or isinstance(item, FbsEnum) or isinstance(item, FbsService), 'unexpected type {}'.format(type(item)) # metadata = item.marshal() # pprint(item.marshal()) metadata = item # com.example.device.HomeDeviceVendor => com.example.device modulename = '.'.join(metadata.name.split('.')[0:-1]) metadata.modulename = modulename # com.example.device.HomeDeviceVendor => HomeDeviceVendor metadata.classname = metadata.name.split('.')[-1].strip() # com.example.device => device metadata.module_relimport = modulename.split('.')[-1] is_first = modulename not in code_modules is_first_by_category = (modulename, category) not in is_first_by_category_modules if is_first_by_category: is_first_by_category_modules[(modulename, category)] = True # render template into python code section if output_lang == 'python': # render obj|enum|service.py.jinja2 template tmpl = jinja2_env.get_template('py-autobahn/{}.py.jinja2'.format(category)) code = tmpl.render(repo=self, metadata=metadata, FbsType=FbsType, render_imports=is_first, is_first_by_category=is_first_by_category, render_to_basemodule=self.basemodule) # FIXME # code = FormatCode(code)[0] # render test_obj|enum|service.py.jinja2 template test_tmpl = jinja2_env.get_template('py-autobahn/test_{}.py.jinja2'.format(category)) test_code = test_tmpl.render(repo=self, metadata=metadata, FbsType=FbsType, render_imports=is_first, is_first_by_category=is_first_by_category, render_to_basemodule=self.basemodule) elif output_lang == 'eip712': # render obj|enum|service-eip712.sol.jinja2 template tmpl = jinja2_env.get_template('so-eip712/{}-eip712.sol.jinja2'.format(category)) code = tmpl.render(repo=self, metadata=metadata, FbsType=FbsType, render_imports=is_first, is_first_by_category=is_first_by_category, render_to_basemodule=self.basemodule) # FIXME # code = FormatCode(code)[0] test_tmpl = None test_code = None elif output_lang == 'json': code = json.dumps(metadata.marshal(), separators=(', ', ': '), ensure_ascii=False, indent=4, sort_keys=True) test_code = None else: raise RuntimeError('invalid language "{}" for code generation'.format(output_lang)) # collect code sections per-module if modulename not in code_modules: code_modules[modulename] = [] test_code_modules[modulename] = [] code_modules[modulename].append(code) if test_code: test_code_modules[modulename].append(test_code) else: test_code_modules[modulename].append(None) # ['', 'com.example.bla.blub', 'com.example.doo'] namespaces = {} for code_file in code_modules.keys(): name_parts = code_file.split('.') for i in range(len(name_parts)): pn = name_parts[i] ns = '.'.join(name_parts[:i]) if ns not in namespaces: namespaces[ns] = [] if pn and pn not in namespaces[ns]: namespaces[ns].append(pn) print('Namespaces:\n{}\n'.format(pformat(namespaces))) # write out code modules # i = 0 initialized = set() for code_file, code_sections in code_modules.items(): code = '\n\n\n'.join(code_sections) if code_file: code_file_dir = [''] + code_file.split('.')[0:-1] else: code_file_dir = [''] # FIXME: cleanup this mess for i in range(len(code_file_dir)): d = os.path.join(output_dir, *(code_file_dir[:i + 1])) if not os.path.isdir(d): os.mkdir(d) if output_lang == 'python': fn = os.path.join(d, '__init__.py') _modulename = '.'.join(code_file_dir[:i + 1])[1:] _imports = namespaces[_modulename] tmpl = jinja2_env.get_template('py-autobahn/module.py.jinja2') init_code = tmpl.render(repo=self, modulename=_modulename, imports=_imports, render_to_basemodule=self.basemodule) data = init_code.encode('utf8') if not os.path.exists(fn): with open(fn, 'wb') as f: f.write(data) print('Ok, rendered "module.py.jinja2" in {} bytes to "{}"'.format(len(data), fn)) initialized.add(fn) else: with open(fn, 'ab') as f: f.write(data) if output_lang == 'python': if code_file: code_file_name = '{}.py'.format(code_file.split('.')[-1]) test_code_file_name = 'test_{}.py'.format(code_file.split('.')[-1]) else: code_file_name = '__init__.py' test_code_file_name = None elif output_lang == 'json': if code_file: code_file_name = '{}.json'.format(code_file.split('.')[-1]) else: code_file_name = 'init.json' test_code_file_name = None else: code_file_name = None test_code_file_name = None # write out code modules # if code_file_name: try: code = FormatCode(code)[0] except Exception as e: print('error during formatting code: {}'.format(e)) data = code.encode('utf8') fn = os.path.join(*(code_file_dir + [code_file_name])) fn = os.path.join(output_dir, fn) # FIXME # if fn not in initialized and os.path.exists(fn): # os.remove(fn) # with open(fn, 'wb') as fd: # fd.write('# Generated by Autobahn v{}\n'.format(__version__).encode('utf8')) # initialized.add(fn) with open(fn, 'ab') as fd: fd.write(data) print('Ok, written {} bytes to {}'.format(len(data), fn)) # write out unit test code modules # if test_code_file_name: test_code_sections = test_code_modules[code_file] test_code = '\n\n\n'.join(test_code_sections) try: test_code = FormatCode(test_code)[0] except Exception as e: print('error during formatting code: {}'.format(e)) data = test_code.encode('utf8') fn = os.path.join(*(code_file_dir + [test_code_file_name])) fn = os.path.join(output_dir, fn) if fn not in initialized and os.path.exists(fn): os.remove(fn) with open(fn, 'wb') as fd: fd.write('# Copyright (c) ...\n'.encode('utf8')) initialized.add(fn) with open(fn, 'ab') as fd: fd.write(data) print('Ok, written {} bytes to {}'.format(len(data), fn))
[docs] def validate_obj(self, validation_type: Optional[str], value: Optional[Any]): """ Validate value against the validation type given. If the application payload does not validate against the provided type, an :class:`autobahn.wamp.exception.InvalidPayload` is raised. :param validation_type: Flatbuffers type (fully qualified) against to validate application payload. :param value: Value to validate. :return: """ # print('validate_obj', validation_type, type(value)) if validation_type is None: # any value validates against the None validation type return if validation_type not in self.objs: raise RuntimeError('validation type "{}" not found in inventory'.format(self.objs)) # the Flatbuffers table type from the realm's type inventory against which we # will validate the WAMP args/kwargs application payload vt: FbsObject = self.objs[validation_type] if type(value) == dict: vt_kwargs = set(vt.fields.keys()) for k, v in value.items(): if k not in vt.fields: raise InvalidPayload('unexpected argument "{}" in value of validation type "{}"'.format(k, vt.name)) vt_kwargs.discard(k) field = vt.fields[k] # validate object-typed field, eg "uint160_t" if field.type.basetype == FbsType.Obj: self.validate_obj(field.type.objtype, v) elif field.type.basetype == FbsType.Union: pass print('FIXME-003-Union') elif field.type.basetype == FbsType.Vector: if isinstance(v, str) or isinstance(v, bytes): print('FIXME-003-1-Vector') elif isinstance(v, Sequence): for ve in v: self.validate_obj(field.type.elementtype, ve) else: raise InvalidPayload('invalid type {} for value (expected Vector/List/Tuple) ' 'of validation type "{}"'.format(type(v), vt.name)) else: validate_scalar(field, v) if vt.is_struct and vt_kwargs: raise InvalidPayload('missing argument(s) {} in validation type "{}"'.format(list(vt_kwargs), vt.name)) elif type(value) in [tuple, list]: # FIXME: KeyValues if not vt.is_struct: raise InvalidPayload('**: invalid type {} for (non-struct) validation type "{}"'.format(type(value), vt.name)) idx = 0 for field in vt.fields_by_id: # consume the next positional argument from input if idx >= len(value): raise InvalidPayload('missing argument "{}" in type "{}"'.format(field.name, vt.name)) v = value[idx] idx += 1 # validate object-typed field, eg "uint160_t" if field.type.basetype == FbsType.Obj: self.validate_obj(field.type.objtype, v) elif field.type.basetype == FbsType.Union: pass print('FIXME-005-Union') elif field.type.basetype == FbsType.Vector: if isinstance(v, str) or isinstance(v, bytes): print('FIXME-005-1-Vector') elif isinstance(v, Sequence): for ve in v: print(field.type.elementtype, ve) self.validate_obj(field.type.elementtype, ve) else: print('FIXME-005-3-Vector') else: validate_scalar(field, v) if len(value) > idx: raise InvalidPayload('unexpected argument(s) in validation type "{}"'.format(vt.name)) else: raise InvalidPayload('invalid type {} for value of validation type "{}"'.format(type(value), vt.name))
[docs] def validate(self, validation_type: str, args: List[Any], kwargs: Dict[str, Any]) -> Optional[FbsObject]: """ Validate the WAMP application payload provided in positional argument in ``args`` and in keyword-based arguments in ``kwargs`` against the FlatBuffers table type ``validation_type`` from this repository. If the application payload does not validate against the provided type, an :class:`autobahn.wamp.exception.InvalidPayload` is raised. :param validation_type: Flatbuffers type (fully qualified) against to validate application payload. :param args: The application payload WAMP positional arguments. :param kwargs: The application payload WAMP keyword-based arguments. :return: The validation type object from this repository (reference in ``validation_type``) which has been used for validation. """ # any value validates against the None validation type if validation_type is None: return None if validation_type not in self.objs: raise RuntimeError('validation type "{}" not found in inventory (among {} types)'.format(validation_type, len(self.objs))) # the Flatbuffers table type from the realm's type inventory against which we # will validate the WAMP args/kwargs application payload vt: FbsObject = self.objs[validation_type] # we use this to index and consume positional args from the input args_idx = 0 # we use this to track any kwargs not consumed while processing the validation type. # and names left in this set after processing the validation type in full is an error ("unexpected kwargs") kwargs_keys = set(kwargs.keys() if kwargs else []) # iterate over all fields of validation type in field index order (!) for field in vt.fields_by_id: # field is a WAMP positional argument, that is one that needs to map to the next arg from args if field.required or 'arg' in field.attrs or 'kwarg' not in field.attrs: # consume the next positional argument from input if args is None or args_idx >= len(args): raise InvalidPayload('missing positional argument "{}" in type "{}"'.format(field.name, vt.name)) value = args[args_idx] args_idx += 1 # validate object-typed field, eg "uint160_t" if field.type.basetype == FbsType.Obj: self.validate_obj(field.type.objtype, value) elif field.type.basetype == FbsType.Union: pass print('FIXME-003-Union') elif field.type.basetype == FbsType.Vector: if isinstance(value, str) or isinstance(value, bytes): print('FIXME-005-1-Vector') elif isinstance(value, Sequence): for ve in value: print(field.type.elementtype, ve) self.validate_obj(field.type.elementtype, ve) else: print('FIXME-005-3-Vector') else: validate_scalar(field, value) # field is a WAMP keyword argument, that is one that needs to map into kwargs elif 'kwarg' in field.attrs: if field.name in kwargs_keys: value = kwargs[field.name] # FIXME: validate value vs field type print('FIXME-003') kwargs_keys.discard(field.name) else: assert False, 'should not arrive here' if len(args) > args_idx: raise InvalidPayload('{} unexpected positional arguments in type "{}"'.format(len(args) - args_idx, vt.name)) if kwargs_keys: raise InvalidPayload('{} unexpected keyword arguments {} in type "{}"'.format(len(kwargs_keys), list(kwargs_keys), vt.name)) return vt