###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) typedef int GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
import copy
import traceback
from autobahn.util import hlval
from autobahn.wamp.exception import ProtocolError, SerializationError, TransportLost
from autobahn.wamp.interfaces import ISession, ITransport
from autobahn.wamp.types import TransportDetails
from autobahn.websocket import protocol
from autobahn.websocket.types import (
ConnectionDeny,
ConnectionRequest,
ConnectionResponse,
)
__all__ = (
"WampWebSocketClientFactory",
"WampWebSocketClientProtocol",
"WampWebSocketServerFactory",
"WampWebSocketServerProtocol",
)
class WampWebSocketProtocol:
"""
Base class for WAMP-over-WebSocket transport mixins.
"""
_session: ISession | None = None # default; self.session is set in onOpen
_transport_details: TransportDetails | None = None
def _bailout(self, code: int, reason: str | None = None):
self.log.debug(
'Failing WAMP-over-WebSocket transport: code={code}, reason="{reason}"',
code=code,
reason=reason,
)
self._fail_connection(code, reason)
def onOpen(self):
"""
Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onOpen`
"""
# WebSocket connection established. Now let the user WAMP session factory
# create a new WAMP session and fire off session open callback.
try:
self._session = self.factory._factory()
self._session._transport = self
self._session.onOpen(self)
except Exception as e:
self.log.critical("{tb}", tb=traceback.format_exc())
reason = f"WAMP Internal Error ({e})"
self._bailout(
protocol.WebSocketProtocol.CLOSE_STATUS_CODE_INTERNAL_ERROR,
reason=reason,
)
def onClose(self, wasClean: bool, code: int, reason: str | None):
"""
Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onClose`
"""
# WAMP session might never have been established in the first place .. guard this!
self._onclose_reason = reason
if self._session is not None:
# WebSocket connection lost - fire off the WAMP
# session close callback
# noinspection PyBroadException
try:
self.log.debug(
'WAMP-over-WebSocket transport lost: wasClean={wasClean}, code={code}, reason="{reason}"',
wasClean=wasClean,
code=code,
reason=reason,
)
self._session.onClose(wasClean)
except Exception:
self.log.critical("{tb}", tb=traceback.format_exc())
self._session = None
def onMessage(self, payload: bytes, isBinary: bool):
"""
Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessage`
"""
try:
for msg in self._serializer.unserialize(payload, isBinary):
self.log.trace(
"\n{action1}{session}, {authid}{action2}\n {message}\n{action3}",
action1=hlval("WAMP-Receive(", color="green", bold=True),
authid=(
hlval(self._session._authid, color="green", bold=False)
if self._session._authid
else "-"
),
session=(
hlval(self._session._session_id, color="green", bold=False)
if self._session._session_id
else "-"
),
action2=hlval(") <<", color="green", bold=True),
action3=hlval("<<", color="green", bold=True),
message=msg,
)
self._session.onMessage(msg)
except ProtocolError as e:
self.log.critical("{tb}", tb=traceback.format_exc())
reason = f"WAMP Protocol Error ({e})"
self._bailout(
protocol.WebSocketProtocol.CLOSE_STATUS_CODE_PROTOCOL_ERROR,
reason=reason,
)
except Exception as e:
self.log.critical("{tb}", tb=traceback.format_exc())
reason = f"WAMP Internal Error ({e})"
self._bailout(
protocol.WebSocketProtocol.CLOSE_STATUS_CODE_INTERNAL_ERROR,
reason=reason,
)
def send(self, msg):
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.send`
"""
if self.isOpen():
try:
self.log.trace(
"\n{action1}{session}, {authid}{action2}\n {message}\n{action3}",
action1=hlval("WAMP-Transmit(", color="red", bold=True),
authid=(
hlval(self._session._authid, color="red", bold=False)
if self._session._authid
else "-"
),
session=(
hlval(self._session._session_id, color="red", bold=False)
if self._session._session_id
else "-"
),
action2=hlval(") >>", color="red", bold=True),
action3=hlval(">>", color="red", bold=True),
message=msg,
)
payload, isBinary = self._serializer.serialize(msg)
except Exception as e:
self.log.error(f"WAMP message serialization error: {e}")
# all exceptions raised from above should be serialization errors ..
raise SerializationError(
f"WAMP message serialization error: {e}"
)
else:
self.sendMessage(payload, isBinary)
else:
raise TransportLost()
def isOpen(self):
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.isOpen`
"""
return self._session is not None
@property
def transport_details(self) -> TransportDetails | None:
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.transport_details`
"""
return self._transport_details
def close(self):
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.close`
"""
if self.isOpen():
self.sendClose(protocol.WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL)
else:
raise TransportLost()
def abort(self):
"""
Implements :func:`autobahn.wamp.interfaces.ITransport.abort`
"""
if self.isOpen():
self._bailout(protocol.WebSocketProtocol.CLOSE_STATUS_CODE_GOING_AWAY)
else:
raise TransportLost()
ITransport.register(WampWebSocketProtocol)
def parseSubprotocolIdentifier(subprotocol: str) -> tuple[int | None, str | None]:
try:
s = subprotocol.split(".")
if s[0] != "wamp":
raise Exception(
f'WAMP WebSocket subprotocol identifier must start with "wamp", not "{s[0]}"'
)
version = int(s[1])
serializer_id = ".".join(s[2:])
return version, serializer_id
except:
return None, None
[docs]
class WampWebSocketServerProtocol(WampWebSocketProtocol):
"""
Mixin for WAMP-over-WebSocket server transports.
"""
[docs]
STRICT_PROTOCOL_NEGOTIATION = True
[docs]
def onConnect(
self, request: ConnectionRequest
) -> tuple[str | None, dict[str, str]]:
"""
Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onConnect`
"""
headers = {}
for subprotocol in request.protocols:
version, serializerId = parseSubprotocolIdentifier(subprotocol)
if version == 2 and serializerId in self.factory._serializers.keys():
# copy over serializer form factory, so that we keep per-session serializer stats
self._serializer = copy.copy(self.factory._serializers[serializerId])
return subprotocol, headers
if self.STRICT_PROTOCOL_NEGOTIATION:
raise ConnectionDeny(
ConnectionDeny.BAD_REQUEST,
"This server only speaks WebSocket subprotocols {}".format(
", ".join(self.factory.protocols)
),
)
else:
# assume wamp.2.json (but do not announce/select it)
self._serializer = copy.copy(self.factory._serializers["json"])
return None, headers
[docs]
class WampWebSocketClientProtocol(WampWebSocketProtocol):
"""
Mixin for WAMP-over-WebSocket client transports.
"""
[docs]
STRICT_PROTOCOL_NEGOTIATION = True
[docs]
def onConnect(self, response: ConnectionResponse):
"""
Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onConnect`
"""
if response.protocol not in self.factory.protocols:
if self.STRICT_PROTOCOL_NEGOTIATION:
raise Exception(
"The server does not speak any of the WebSocket subprotocols {} we requested.".format(
", ".join(self.factory.protocols)
)
)
else:
# assume wamp.2.json
serializer_id = "json"
else:
version, serializer_id = parseSubprotocolIdentifier(response.protocol)
# copy over serializer form factory, so that we keep per-session serializer stats
self._serializer = copy.copy(self.factory._serializers[serializer_id])
class WampWebSocketFactory:
"""
Base class for WAMP-over-WebSocket transport factory mixins.
"""
def __init__(self, factory, serializers=None):
"""
:param factory: A callable that produces instances that implement
:class:`autobahn.wamp.interfaces.ITransportHandler`
:type factory: callable
:param serializers: A list of WAMP serializers to use (or None for default
serializers). Serializers must implement
:class:`autobahn.wamp.interfaces.ISerializer`.
:type serializers: list
"""
if callable(factory):
self._factory = factory
else:
self._factory = lambda: factory
if serializers is None:
serializers = []
# try CBOR WAMP serializer
try:
from autobahn.wamp.serializer import CBORSerializer
serializers.append(CBORSerializer(batched=True))
serializers.append(CBORSerializer())
except ImportError:
pass
# try MsgPack WAMP serializer
try:
from autobahn.wamp.serializer import MsgPackSerializer
serializers.append(MsgPackSerializer(batched=True))
serializers.append(MsgPackSerializer())
except ImportError:
pass
# try UBJSON WAMP serializer
try:
from autobahn.wamp.serializer import UBJSONSerializer
serializers.append(UBJSONSerializer(batched=True))
serializers.append(UBJSONSerializer())
except ImportError:
pass
# try JSON WAMP serializer
try:
from autobahn.wamp.serializer import JsonSerializer
serializers.append(JsonSerializer(batched=True))
serializers.append(JsonSerializer())
except ImportError:
pass
if not serializers:
raise Exception("Could not import any WAMP serializer")
self._serializers = {}
for ser in serializers:
self._serializers[ser.SERIALIZER_ID] = ser
self._protocols = [f"wamp.2.{ser.SERIALIZER_ID}" for ser in serializers]
[docs]
class WampWebSocketServerFactory(WampWebSocketFactory):
"""
Mixin for WAMP-over-WebSocket server transport factories.
"""
[docs]
class WampWebSocketClientFactory(WampWebSocketFactory):
"""
Mixin for WAMP-over-WebSocket client transport factories.
"""