Source code for autobahn.wamp.websocket

###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) typedef int GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################

import copy
import traceback

from autobahn.util import hlval
from autobahn.wamp.exception import ProtocolError, SerializationError, TransportLost
from autobahn.wamp.interfaces import ISession, ITransport
from autobahn.wamp.types import TransportDetails
from autobahn.websocket import protocol
from autobahn.websocket.types import (
    ConnectionDeny,
    ConnectionRequest,
    ConnectionResponse,
)

__all__ = (
    "WampWebSocketClientFactory",
    "WampWebSocketClientProtocol",
    "WampWebSocketServerFactory",
    "WampWebSocketServerProtocol",
)


class WampWebSocketProtocol:
    """
    Base class for WAMP-over-WebSocket transport mixins.
    """

    _session: ISession | None = None  # default; self.session is set in onOpen
    _transport_details: TransportDetails | None = None

    def _bailout(self, code: int, reason: str | None = None):
        self.log.debug(
            'Failing WAMP-over-WebSocket transport: code={code}, reason="{reason}"',
            code=code,
            reason=reason,
        )
        self._fail_connection(code, reason)

    def onOpen(self):
        """
        Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onOpen`
        """
        # WebSocket connection established. Now let the user WAMP session factory
        # create a new WAMP session and fire off session open callback.
        try:
            self._session = self.factory._factory()
            self._session._transport = self

            self._session.onOpen(self)
        except Exception as e:
            self.log.critical("{tb}", tb=traceback.format_exc())
            reason = f"WAMP Internal Error ({e})"
            self._bailout(
                protocol.WebSocketProtocol.CLOSE_STATUS_CODE_INTERNAL_ERROR,
                reason=reason,
            )

    def onClose(self, wasClean: bool, code: int, reason: str | None):
        """
        Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onClose`
        """
        # WAMP session might never have been established in the first place .. guard this!
        self._onclose_reason = reason
        if self._session is not None:
            # WebSocket connection lost - fire off the WAMP
            # session close callback
            # noinspection PyBroadException
            try:
                self.log.debug(
                    'WAMP-over-WebSocket transport lost: wasClean={wasClean}, code={code}, reason="{reason}"',
                    wasClean=wasClean,
                    code=code,
                    reason=reason,
                )
                self._session.onClose(wasClean)
            except Exception:
                self.log.critical("{tb}", tb=traceback.format_exc())
            self._session = None

    def onMessage(self, payload: bytes, isBinary: bool):
        """
        Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessage`
        """
        try:
            for msg in self._serializer.unserialize(payload, isBinary):
                self.log.trace(
                    "\n{action1}{session}, {authid}{action2}\n  {message}\n{action3}",
                    action1=hlval("WAMP-Receive(", color="green", bold=True),
                    authid=(
                        hlval(self._session._authid, color="green", bold=False)
                        if self._session._authid
                        else "-"
                    ),
                    session=(
                        hlval(self._session._session_id, color="green", bold=False)
                        if self._session._session_id
                        else "-"
                    ),
                    action2=hlval(") <<", color="green", bold=True),
                    action3=hlval("<<", color="green", bold=True),
                    message=msg,
                )
                self._session.onMessage(msg)

        except ProtocolError as e:
            self.log.critical("{tb}", tb=traceback.format_exc())
            reason = f"WAMP Protocol Error ({e})"
            self._bailout(
                protocol.WebSocketProtocol.CLOSE_STATUS_CODE_PROTOCOL_ERROR,
                reason=reason,
            )

        except Exception as e:
            self.log.critical("{tb}", tb=traceback.format_exc())
            reason = f"WAMP Internal Error ({e})"
            self._bailout(
                protocol.WebSocketProtocol.CLOSE_STATUS_CODE_INTERNAL_ERROR,
                reason=reason,
            )

    def send(self, msg):
        """
        Implements :func:`autobahn.wamp.interfaces.ITransport.send`
        """
        if self.isOpen():
            try:
                self.log.trace(
                    "\n{action1}{session}, {authid}{action2}\n  {message}\n{action3}",
                    action1=hlval("WAMP-Transmit(", color="red", bold=True),
                    authid=(
                        hlval(self._session._authid, color="red", bold=False)
                        if self._session._authid
                        else "-"
                    ),
                    session=(
                        hlval(self._session._session_id, color="red", bold=False)
                        if self._session._session_id
                        else "-"
                    ),
                    action2=hlval(") >>", color="red", bold=True),
                    action3=hlval(">>", color="red", bold=True),
                    message=msg,
                )
                payload, isBinary = self._serializer.serialize(msg)
            except Exception as e:
                self.log.error(f"WAMP message serialization error: {e}")
                # all exceptions raised from above should be serialization errors ..
                raise SerializationError(
                    f"WAMP message serialization error: {e}"
                )
            else:
                self.sendMessage(payload, isBinary)
        else:
            raise TransportLost()

    def isOpen(self):
        """
        Implements :func:`autobahn.wamp.interfaces.ITransport.isOpen`
        """
        return self._session is not None

    @property
    def transport_details(self) -> TransportDetails | None:
        """
        Implements :func:`autobahn.wamp.interfaces.ITransport.transport_details`
        """
        return self._transport_details

    def close(self):
        """
        Implements :func:`autobahn.wamp.interfaces.ITransport.close`
        """
        if self.isOpen():
            self.sendClose(protocol.WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL)
        else:
            raise TransportLost()

    def abort(self):
        """
        Implements :func:`autobahn.wamp.interfaces.ITransport.abort`
        """
        if self.isOpen():
            self._bailout(protocol.WebSocketProtocol.CLOSE_STATUS_CODE_GOING_AWAY)
        else:
            raise TransportLost()


ITransport.register(WampWebSocketProtocol)


def parseSubprotocolIdentifier(subprotocol: str) -> tuple[int | None, str | None]:
    try:
        s = subprotocol.split(".")
        if s[0] != "wamp":
            raise Exception(
                f'WAMP WebSocket subprotocol identifier must start with "wamp", not "{s[0]}"'
            )
        version = int(s[1])
        serializer_id = ".".join(s[2:])
        return version, serializer_id
    except:
        return None, None


[docs] class WampWebSocketServerProtocol(WampWebSocketProtocol): """ Mixin for WAMP-over-WebSocket server transports. """
[docs] STRICT_PROTOCOL_NEGOTIATION = True
[docs] def onConnect( self, request: ConnectionRequest ) -> tuple[str | None, dict[str, str]]: """ Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onConnect` """ headers = {} for subprotocol in request.protocols: version, serializerId = parseSubprotocolIdentifier(subprotocol) if version == 2 and serializerId in self.factory._serializers.keys(): # copy over serializer form factory, so that we keep per-session serializer stats self._serializer = copy.copy(self.factory._serializers[serializerId]) return subprotocol, headers if self.STRICT_PROTOCOL_NEGOTIATION: raise ConnectionDeny( ConnectionDeny.BAD_REQUEST, "This server only speaks WebSocket subprotocols {}".format( ", ".join(self.factory.protocols) ), ) else: # assume wamp.2.json (but do not announce/select it) self._serializer = copy.copy(self.factory._serializers["json"]) return None, headers
[docs] class WampWebSocketClientProtocol(WampWebSocketProtocol): """ Mixin for WAMP-over-WebSocket client transports. """
[docs] STRICT_PROTOCOL_NEGOTIATION = True
[docs] def onConnect(self, response: ConnectionResponse): """ Callback from :func:`autobahn.websocket.interfaces.IWebSocketChannel.onConnect` """ if response.protocol not in self.factory.protocols: if self.STRICT_PROTOCOL_NEGOTIATION: raise Exception( "The server does not speak any of the WebSocket subprotocols {} we requested.".format( ", ".join(self.factory.protocols) ) ) else: # assume wamp.2.json serializer_id = "json" else: version, serializer_id = parseSubprotocolIdentifier(response.protocol) # copy over serializer form factory, so that we keep per-session serializer stats self._serializer = copy.copy(self.factory._serializers[serializer_id])
class WampWebSocketFactory: """ Base class for WAMP-over-WebSocket transport factory mixins. """ def __init__(self, factory, serializers=None): """ :param factory: A callable that produces instances that implement :class:`autobahn.wamp.interfaces.ITransportHandler` :type factory: callable :param serializers: A list of WAMP serializers to use (or None for default serializers). Serializers must implement :class:`autobahn.wamp.interfaces.ISerializer`. :type serializers: list """ if callable(factory): self._factory = factory else: self._factory = lambda: factory if serializers is None: serializers = [] # try CBOR WAMP serializer try: from autobahn.wamp.serializer import CBORSerializer serializers.append(CBORSerializer(batched=True)) serializers.append(CBORSerializer()) except ImportError: pass # try MsgPack WAMP serializer try: from autobahn.wamp.serializer import MsgPackSerializer serializers.append(MsgPackSerializer(batched=True)) serializers.append(MsgPackSerializer()) except ImportError: pass # try UBJSON WAMP serializer try: from autobahn.wamp.serializer import UBJSONSerializer serializers.append(UBJSONSerializer(batched=True)) serializers.append(UBJSONSerializer()) except ImportError: pass # try JSON WAMP serializer try: from autobahn.wamp.serializer import JsonSerializer serializers.append(JsonSerializer(batched=True)) serializers.append(JsonSerializer()) except ImportError: pass if not serializers: raise Exception("Could not import any WAMP serializer") self._serializers = {} for ser in serializers: self._serializers[ser.SERIALIZER_ID] = ser self._protocols = [f"wamp.2.{ser.SERIALIZER_ID}" for ser in serializers]
[docs] class WampWebSocketServerFactory(WampWebSocketFactory): """ Mixin for WAMP-over-WebSocket server transport factories. """
[docs] class WampWebSocketClientFactory(WampWebSocketFactory): """ Mixin for WAMP-over-WebSocket client transport factories. """