From 172981e9c620d6e8167dedbca3dbcca59fa815a5 Mon Sep 17 00:00:00 2001 From: Stuart Longland Date: Sat, 10 Jun 2023 12:09:34 +1000 Subject: [PATCH 1/4] broker: Add function for parsing IP address / port --- amqtt/broker.py | 45 +++++++++++++++++++++++++++++++++++++++++++- tests/test_broker.py | 18 ++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index a4f8ef1a..1a1196c5 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -1,7 +1,8 @@ # Copyright (c) 2015 Nicolas JOUANIN # # See the file license.txt for copying permission. -from typing import Optional +from typing import Optional, Tuple +import ipaddress import logging import ssl import websockets @@ -44,6 +45,48 @@ EVENT_BROKER_MESSAGE_RECEIVED = "broker_message_received" +def split_bindaddr_port(port_str: str, default_port: int) -> Tuple[Optional[str], int]: + """ + Split an address:port pair into separate IP address and port, with IPv6 + special-case handling. + """ + # Address can be specified using one of the following methods: + # 1883 - Port number only (listen all interfaces) + # :1883 - Port number only (listen all interfaces) + # 0.0.0.0:1883 - IPv4 address + # [::]:1883 - IPv6 address + # empty string - all interfaces default port + + def _parse_port(port_str: str) -> int: + if port_str.startswith(":"): + port_str = port_str[1:] + + if not port_str: + return default_port + + return int(port_str) + + if port_str.startswith("["): # IPv6 literal + try: + addr_end = port_str.index("]") + except ValueError: + raise ValueError("Expecting '[' to be followed by ']'") + + return (port_str[0 : addr_end + 1], _parse_port(port_str[addr_end + 1 :])) + elif ":" in port_str: + # Address : port + address, port_str = port_str.rsplit(":", 1) + return (address or None, _parse_port(port_str)) + else: + # Address or port + try: + # Port number? + return (None, _parse_port(port_str)) + except ValueError: + # Address, default port + return (port_str, default_port) + + class Action(Enum): subscribe = "subscribe" publish = "publish" diff --git a/tests/test_broker.py b/tests/test_broker.py index f9d06d34..97d26014 100644 --- a/tests/test_broker.py +++ b/tests/test_broker.py @@ -21,6 +21,7 @@ EVENT_BROKER_CLIENT_SUBSCRIBED, EVENT_BROKER_CLIENT_UNSUBSCRIBED, EVENT_BROKER_MESSAGE_RECEIVED, + split_bindaddr_port, ) from amqtt.client import MQTTClient, ConnectException from amqtt.mqtt import ( @@ -53,6 +54,23 @@ async def async_magic(): MagicMock.__await__ = lambda x: async_magic().__await__() +@pytest.mark.parametrize( + "input_str, output_addr, output_port", + [ + ("1234", None, 1234), + (":1234", None, 1234), + ("0.0.0.0:1234", "0.0.0.0", 1234), + ("[::]:1234", "[::]", 1234), + ("0.0.0.0", "0.0.0.0", 5678), + ("[::]", "[::]", 5678), + ("localhost", "localhost", 5678), + ("localhost:1234", "localhost", 1234), + ], +) +def test_split_bindaddr_port(input_str, output_addr, output_port): + assert split_bindaddr_port(input_str, 5678) == (output_addr, output_port) + + @pytest.mark.asyncio async def test_start_stop(broker, mock_plugin_manager): mock_plugin_manager.assert_has_calls( From 22132bc01a1462c5a8159a47712a7887d2989907 Mon Sep 17 00:00:00 2001 From: Stuart Longland Date: Sat, 10 Jun 2023 12:16:02 +1000 Subject: [PATCH 2/4] broker: Use earlier defined method for splitting IP/port --- amqtt/broker.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index 1a1196c5..d2e8f270 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -34,6 +34,9 @@ "auth": {"allow-anonymous": True, "password-file": None}, } +# Default port numbers +DEFAULT_PORTS = {"tcp": 1883, "ws": 8883} + EVENT_BROKER_PRE_START = "broker_pre_start" EVENT_BROKER_POST_START = "broker_post_start" EVENT_BROKER_PRE_SHUTDOWN = "broker_pre_shutdown" @@ -97,7 +100,6 @@ class BrokerException(Exception): class RetainedApplicationMessage: - __slots__ = ("source_session", "topic", "data", "qos") def __init__(self, source_session, topic, data, qos=None): @@ -337,10 +339,10 @@ async def start(self) -> None: % (listener["certfile"], listener["keyfile"], fnfe) ) - address, s_port = listener["bind"].split(":") - port = 0 try: - port = int(s_port) + address, port = split_bindaddr_port( + listener["bind"], DEFAULT_PORTS[listener["type"]] + ) except ValueError: raise BrokerException( "Invalid port value in bind value: %s" % listener["bind"] @@ -979,7 +981,7 @@ async def _run_broadcast(self, running_tasks: deque): continue subscriptions = self._subscriptions[k_filter] - for (target_session, qos) in subscriptions: + for target_session, qos in subscriptions: qos = broadcast.get("qos", qos) # Retain all messages which cannot be broadcasted From 65cb7cfde651ceeb2607027d6168b8693209f55c Mon Sep 17 00:00:00 2001 From: Stuart Longland Date: Sat, 10 Jun 2023 11:03:42 +1000 Subject: [PATCH 3/4] mqtt.protocol.handler: Use `exc_info` to emit exception not `%s` The former emits the stack trace, which greatly helps in identifying _WHERE_ an exception is raised, not just WHAT the exception is. --- amqtt/mqtt/protocol/handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/amqtt/mqtt/protocol/handler.py b/amqtt/mqtt/protocol/handler.py index bb6d4133..eb9875c1 100644 --- a/amqtt/mqtt/protocol/handler.py +++ b/amqtt/mqtt/protocol/handler.py @@ -530,8 +530,8 @@ async def _send_packet(self, packet): await self.handle_connection_closed() except asyncio.CancelledError: raise - except Exception as e: - self.logger.warning("Unhandled exception: %s" % e) + except: + self.logger.warning("Unhandled exception", exc_info=True) raise async def mqtt_deliver_next_message(self): From 830250350b9cb6be19cc7cf515b6f08776ad46f8 Mon Sep 17 00:00:00 2001 From: Stuart Longland Date: Sun, 5 Nov 2023 09:36:37 +1000 Subject: [PATCH 4/4] amqtt.broker: Remove unused `ipaddress` module --- amqtt/broker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/amqtt/broker.py b/amqtt/broker.py index d2e8f270..94d54720 100644 --- a/amqtt/broker.py +++ b/amqtt/broker.py @@ -2,7 +2,6 @@ # # See the file license.txt for copying permission. from typing import Optional, Tuple -import ipaddress import logging import ssl import websockets