"""Tests for Virtual Printer MQTT server.""" import ast import inspect from backend.app.services.virtual_printer.mqtt_server import SimpleMQTTServer class TestMQTTServerNoGlobalState: """Ensure MQTT server doesn't set global asyncio state.""" def test_no_global_exception_handler(self): """MQTT server must not call set_exception_handler(). set_exception_handler() is global to the event loop. When multiple VP instances run, each would overwrite the previous handler, causing lost error context and spurious 'Unhandled exception in client_connected_cb' messages. """ source = inspect.getsource(SimpleMQTTServer) tree = ast.parse(source) for node in ast.walk(tree): if isinstance(node, ast.Attribute) and node.attr == "set_exception_handler": raise AssertionError( "SimpleMQTTServer must not call set_exception_handler(). " "It overwrites the global asyncio exception handler, " "breaking multi-VP setups." )