Source code for autobahn.asyncio.test.test_aio_rawsocket

import pytest
import os

from unittest.mock import Mock, call

from autobahn.asyncio.rawsocket import PrefixProtocol, RawSocketClientProtocol, RawSocketServerProtocol, \
    WampRawSocketClientFactory, WampRawSocketServerFactory
from autobahn.asyncio.util import get_serializers
from autobahn.wamp import message
from autobahn.wamp.types import TransportDetails


[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_sers(event_loop): serializers = get_serializers() assert len(serializers) > 0 m = serializers[0]().serialize(message.Abort('close')) assert m
[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_prefix(event_loop): p = PrefixProtocol() transport = Mock() receiver = Mock() p.stringReceived = receiver p.connection_made(transport) small_msg = b'\x00\x00\x00\x04abcd' p.data_received(small_msg) receiver.assert_called_once_with(b'abcd') assert len(p._buffer) == 0 p.sendString(b'abcd') # print(transport.write.call_args_list) transport.write.assert_has_calls([call(b'\x00\x00\x00\x04'), call(b'abcd')]) transport.reset_mock() receiver.reset_mock() big_msg = b'\x00\x00\x00\x0C' + b'0123456789AB' p.data_received(big_msg[0:2]) assert not receiver.called p.data_received(big_msg[2:6]) assert not receiver.called p.data_received(big_msg[6:11]) assert not receiver.called p.data_received(big_msg[11:16]) receiver.assert_called_once_with(b'0123456789AB') transport.reset_mock() receiver.reset_mock() two_messages = b'\x00\x00\x00\x04' + b'abcd' + b'\x00\x00\x00\x05' + b'12345' + b'\x00' p.data_received(two_messages) receiver.assert_has_calls([call(b'abcd'), call(b'12345')]) assert p._buffer == b'\x00'
[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_is_closed(event_loop): class CP(RawSocketClientProtocol): @property def serializer_id(self): return 1 client = CP() on_hs = Mock() transport = Mock() receiver = Mock() client.stringReceived = receiver client._on_handshake_complete = on_hs assert client.is_closed.done() client.connection_made(transport) assert not client.is_closed.done() client.connection_lost(None) assert client.is_closed.done()
[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_raw_socket_server1(event_loop): server = RawSocketServerProtocol() ser = Mock(return_value=True) on_hs = Mock() transport = Mock() receiver = Mock() server.supports_serializer = ser server.stringReceived = receiver server._on_handshake_complete = on_hs server.stringReceived = receiver server.connection_made(transport) hs = b'\x7F\xF1\x00\x00' + b'\x00\x00\x00\x04abcd' server.data_received(hs) ser.assert_called_once_with(1) on_hs.assert_called_once_with() assert transport.write.called transport.write.assert_called_once_with(b'\x7F\xF1\x00\x00') assert not transport.close.called receiver.assert_called_once_with(b'abcd')
[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_raw_socket_server_errors(event_loop): server = RawSocketServerProtocol() ser = Mock(return_value=True) on_hs = Mock() transport = Mock() receiver = Mock() server.supports_serializer = ser server.stringReceived = receiver server._on_handshake_complete = on_hs server.stringReceived = receiver server.connection_made(transport) server.data_received(b'abcdef') transport.close.assert_called_once_with() server = RawSocketServerProtocol() ser = Mock(return_value=False) on_hs = Mock() transport = Mock(spec_set=('close', 'write', 'get_extra_info')) receiver = Mock() server.supports_serializer = ser server.stringReceived = receiver server._on_handshake_complete = on_hs server.stringReceived = receiver server.connection_made(transport) server.data_received(b'\x7F\xF1\x00\x00') transport.close.assert_called_once_with() transport.write.assert_called_once_with(b'\x7F\x10\x00\x00')
[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_raw_socket_client1(event_loop): class CP(RawSocketClientProtocol): @property def serializer_id(self): return 1 client = CP() on_hs = Mock() transport = Mock() receiver = Mock() client.stringReceived = receiver client._on_handshake_complete = on_hs client.connection_made(transport) client.data_received(b'\x7F\xF1\x00\x00' + b'\x00\x00\x00\x04abcd') on_hs.assert_called_once_with() assert transport.write.called transport.write.called_one_with(b'\x7F\xF1\x00\x00') assert not transport.close.called receiver.assert_called_once_with(b'abcd')
[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_raw_socket_client_error(event_loop): class CP(RawSocketClientProtocol): @property def serializer_id(self): return 1 client = CP() on_hs = Mock() transport = Mock(spec_set=('close', 'write', 'get_extra_info')) receiver = Mock() client.stringReceived = receiver client._on_handshake_complete = on_hs client.connection_made(transport) client.data_received(b'\x7F\xF1\x00\x01') transport.close.assert_called_once_with()
[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_wamp_server(event_loop): transport = Mock(spec_set=('abort', 'close', 'write', 'get_extra_info')) transport.write = Mock(side_effect=lambda m: messages.append(m)) server = Mock(spec=['onOpen', 'onMessage']) def fact_server(): return server messages = [] proto = WampRawSocketServerFactory(fact_server)() proto.connection_made(transport) assert proto.transport_details.is_server is True assert proto.transport_details.channel_framing == TransportDetails.CHANNEL_FRAMING_RAWSOCKET assert proto.factory._serializers s = proto.factory._serializers[1].RAWSOCKET_SERIALIZER_ID proto.data_received(bytes(bytearray([0x7F, 0xF0 | s, 0, 0]))) assert proto._serializer server.onOpen.assert_called_once_with(proto) proto.send(message.Abort('close')) for d in messages[1:]: proto.data_received(d) assert server.onMessage.called assert isinstance(server.onMessage.call_args[0][0], message.Abort)
[docs]@pytest.mark.skipif(not os.environ.get('USE_ASYNCIO', False), reason='test runs on asyncio only') def test_wamp_client(event_loop): transport = Mock(spec_set=('abort', 'close', 'write', 'get_extra_info')) transport.write = Mock(side_effect=lambda m: messages.append(m)) client = Mock(spec=['onOpen', 'onMessage']) def fact_client(): return client messages = [] proto = WampRawSocketClientFactory(fact_client)() proto.connection_made(transport) assert proto.transport_details.is_server is False assert proto.transport_details.channel_framing == TransportDetails.CHANNEL_FRAMING_RAWSOCKET assert proto._serializer s = proto._serializer.RAWSOCKET_SERIALIZER_ID proto.data_received(bytes(bytearray([0x7F, 0xF0 | s, 0, 0]))) client.onOpen.assert_called_once_with(proto) proto.send(message.Abort('close')) for d in messages[1:]: proto.data_received(d) assert client.onMessage.called assert isinstance(client.onMessage.call_args[0][0], message.Abort)