"""Unit tests for Virtual Printer services. Tests the virtual printer manager, FTP server, and SSDP server components. """ import asyncio import json import zipfile from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest def _write_3mf_with_filaments(file_path: Path, filaments: list[dict], plate_index: int = 1) -> None: """Build a minimal 3MF zip with `Metadata/slice_info.config` carrying the given per-slot filament entries. Each `filaments` dict needs `id`, `type`, `color`, `used_g`. Used by the #1188 VP queue-mode tests below.""" filament_xml = "".join( f'' for f in filaments ) config = ( '' "" f'' f"{filament_xml}" "" "" ) with zipfile.ZipFile(file_path, "w") as zf: zf.writestr("Metadata/slice_info.config", config) # Plate gcode is referenced for plate-id detection in the VP path — # presence is enough; contents don't matter. zf.writestr(f"Metadata/plate_{plate_index}.gcode", "; gcode\n") class TestVirtualPrinterInstance: """Tests for VirtualPrinterInstance class.""" @pytest.fixture def instance(self, tmp_path): """Create a VirtualPrinterInstance with test defaults.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance return VirtualPrinterInstance( vp_id=1, name="TestPrinter", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", base_dir=tmp_path, ) # ======================================================================== # Tests for instance properties # ======================================================================== def test_instance_stores_parameters(self, instance): """Verify constructor stores parameters correctly.""" assert instance.id == 1 assert instance.name == "TestPrinter" assert instance.mode == "immediate" assert instance.model == "C11" assert instance.access_code == "12345678" assert instance.serial_suffix == "391800001" def test_instance_serial_property(self, instance): """Verify serial is generated from model prefix + suffix.""" # C11 = P1P, prefix = 01S00A assert instance.serial == "01S00A391800001" def test_instance_serial_x1c(self, tmp_path): """Verify X1C serial uses correct prefix.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=2, name="X1C", mode="immediate", model="BL-P001", access_code="12345678", serial_suffix="391800002", base_dir=tmp_path, ) assert inst.serial == "00M00A391800002" def test_instance_is_proxy_false(self, instance): """Verify is_proxy is False for non-proxy mode.""" assert instance.is_proxy is False def test_instance_is_proxy_true(self, tmp_path): """Verify is_proxy is True for proxy mode.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=3, name="Proxy", mode="proxy", model="C11", access_code="", serial_suffix="391800003", target_printer_ip="192.168.1.100", base_dir=tmp_path, ) assert inst.is_proxy is True def test_instance_is_running_with_active_tasks(self, instance): """Verify is_running is True when tasks are active.""" mock_task = MagicMock() mock_task.done.return_value = False instance._tasks = [mock_task] assert instance.is_running is True def test_instance_is_running_with_no_tasks(self, instance): """Verify is_running is False when no tasks.""" assert instance.is_running is False def test_instance_creates_directories(self, instance, tmp_path): """Verify instance creates upload and cert directories.""" assert (tmp_path / "uploads" / "1").exists() assert (tmp_path / "uploads" / "1" / "cache").exists() assert (tmp_path / "certs" / "1").exists() # ======================================================================== # Tests for status # ======================================================================== def test_get_status_returns_correct_format(self, instance): """Verify get_status returns expected fields.""" instance._pending_files = {"file1.3mf": Path("/tmp/file1.3mf")} # nosec B108 mock_task = MagicMock(done=MagicMock(return_value=False)) instance._tasks = [mock_task] status = instance.get_status() assert status["running"] is True assert status["pending_files"] == 1 def test_get_status_not_running(self, instance): """Verify get_status when no tasks.""" status = instance.get_status() assert status["running"] is False assert status["pending_files"] == 0 # ======================================================================== # Tests for file handling # ======================================================================== @pytest.mark.asyncio async def test_on_file_received_adds_to_pending(self, instance): """Verify received file is added to pending list in review mode.""" instance.mode = "review" file_path = Path("/tmp/test.3mf") # nosec B108 with patch.object(instance, "_queue_file", new_callable=AsyncMock) as mock_queue: await instance.on_file_received(file_path, "192.168.1.100") assert "test.3mf" in instance._pending_files mock_queue.assert_called_once() @pytest.mark.asyncio async def test_on_file_received_archives_immediately(self, instance): """Verify file is archived in immediate mode.""" file_path = Path("/tmp/test.3mf") # nosec B108 with patch.object(instance, "_archive_file", new_callable=AsyncMock) as mock_archive: await instance.on_file_received(file_path, "192.168.1.100") mock_archive.assert_called_once_with(file_path, "192.168.1.100") @pytest.mark.asyncio async def test_archive_file_skips_non_3mf(self, instance): """Verify non-3MF files are skipped and cleaned up.""" instance._session_factory = MagicMock() instance._pending_files["verify_job"] = Path("/tmp/verify_job") # nosec B108 with patch("pathlib.Path.unlink"): await instance._archive_file(Path("/tmp/verify_job"), "192.168.1.100") # nosec B108 assert "verify_job" not in instance._pending_files # ======================================================================== # Tests for auto_dispatch # ======================================================================== def test_auto_dispatch_defaults_to_true(self, tmp_path): """Verify auto_dispatch defaults to True when not specified.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=10, name="DefaultDispatch", mode="print_queue", model="C11", access_code="12345678", serial_suffix="391800010", base_dir=tmp_path, ) assert inst.auto_dispatch is True @pytest.mark.asyncio async def test_add_to_print_queue_with_auto_dispatch_on(self, tmp_path): """Verify queue items have manual_start=False when auto_dispatch=True.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance mock_db = AsyncMock() added_items = [] def capture_add(item): added_items.append(item) mock_db.add = MagicMock(side_effect=capture_add) mock_db.commit = AsyncMock() mock_session_factory = MagicMock() mock_session_ctx = AsyncMock() mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.__aexit__ = AsyncMock(return_value=False) mock_session_factory.return_value = mock_session_ctx inst = VirtualPrinterInstance( vp_id=11, name="AutoDispatchOn", mode="print_queue", model="C11", access_code="12345678", serial_suffix="391800011", auto_dispatch=True, base_dir=tmp_path, session_factory=mock_session_factory, ) # Create a temp 3mf file file_path = tmp_path / "test.3mf" file_path.write_bytes(b"fake3mf") mock_archive = MagicMock() mock_archive.id = 1 mock_archive.print_name = "test" with ( patch( "backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None, ), patch( "backend.app.services.archive.ArchiveService.archive_print", new_callable=AsyncMock, return_value=mock_archive, ), ): await inst._add_to_print_queue(file_path, "192.168.1.100") assert len(added_items) == 1 queue_item = added_items[0] assert queue_item.manual_start is False @pytest.mark.asyncio async def test_add_to_print_queue_with_auto_dispatch_off(self, tmp_path): """Verify queue items have manual_start=True when auto_dispatch=False.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance mock_db = AsyncMock() added_items = [] def capture_add(item): added_items.append(item) mock_db.add = MagicMock(side_effect=capture_add) mock_db.commit = AsyncMock() mock_session_factory = MagicMock() mock_session_ctx = AsyncMock() mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.__aexit__ = AsyncMock(return_value=False) mock_session_factory.return_value = mock_session_ctx inst = VirtualPrinterInstance( vp_id=12, name="AutoDispatchOff", mode="print_queue", model="C11", access_code="12345678", serial_suffix="391800012", auto_dispatch=False, base_dir=tmp_path, session_factory=mock_session_factory, ) # Create a temp 3mf file file_path = tmp_path / "test.3mf" file_path.write_bytes(b"fake3mf") mock_archive = MagicMock() mock_archive.id = 1 mock_archive.print_name = "test" with ( patch( "backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None, ), patch( "backend.app.services.archive.ArchiveService.archive_print", new_callable=AsyncMock, return_value=mock_archive, ), ): await inst._add_to_print_queue(file_path, "192.168.1.100") assert len(added_items) == 1 queue_item = added_items[0] assert queue_item.manual_start is True @pytest.mark.asyncio async def test_add_to_print_queue_populates_required_filament_types(self, tmp_path): """#1188: VP queue-mode used to create PrintQueueItems with no filament fields, so the scheduler fell through to model-only matching and dispatched onto whatever printer was free regardless of loaded colour. ``required_filament_types`` is populated unconditionally (cheap, helps the scheduler validate type even without ``force_color_match``) — pin that contract here.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance added_items = [] mock_db = AsyncMock() mock_db.add = MagicMock(side_effect=added_items.append) mock_db.commit = AsyncMock() mock_session_factory = MagicMock() mock_session_ctx = AsyncMock() mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.__aexit__ = AsyncMock(return_value=False) mock_session_factory.return_value = mock_session_ctx inst = VirtualPrinterInstance( vp_id=21, name="Reqs", mode="print_queue", model="C12", access_code="12345678", serial_suffix="391800021", auto_dispatch=True, queue_force_color_match=False, # off → only required_filament_types base_dir=tmp_path, session_factory=mock_session_factory, ) file_path = tmp_path / "multi.3mf" _write_3mf_with_filaments( file_path, [ {"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "12.3"}, {"id": "2", "type": "PETG", "color": "#000000", "used_g": "4.5"}, # used_g=0 → not actually consumed by this plate, must be ignored {"id": "3", "type": "ABS", "color": "#FF0000", "used_g": "0"}, ], plate_index=1, ) mock_archive = MagicMock() mock_archive.id = 1 mock_archive.print_name = "multi" with ( patch( "backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None, ), patch( "backend.app.services.archive.ArchiveService.archive_print", new_callable=AsyncMock, return_value=mock_archive, ), ): await inst._add_to_print_queue(file_path, "192.168.1.100") assert len(added_items) == 1 queue_item = added_items[0] # Type-only fallback always populated. Sorted, deduped, no zero-use ABS. assert queue_item.required_filament_types is not None assert json.loads(queue_item.required_filament_types) == ["PETG", "PLA"] # Setting off → no force_color_match overrides leaked. assert queue_item.filament_overrides is None @pytest.mark.asyncio async def test_add_to_print_queue_force_color_match_writes_overrides(self, tmp_path): """#1188 core fix: when the per-VP ``queue_force_color_match`` toggle is on, every consumed slot lands as a ``filament_overrides`` entry with ``force_color_match: true``. This is the field the scheduler keys on (``print_scheduler.py:512``) — without it, slot-by-slot type+color matching never runs.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance added_items = [] mock_db = AsyncMock() mock_db.add = MagicMock(side_effect=added_items.append) mock_db.commit = AsyncMock() mock_session_factory = MagicMock() mock_session_ctx = AsyncMock() mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.__aexit__ = AsyncMock(return_value=False) mock_session_factory.return_value = mock_session_ctx inst = VirtualPrinterInstance( vp_id=22, name="ForceColor", mode="print_queue", model="C12", access_code="12345678", serial_suffix="391800022", auto_dispatch=True, queue_force_color_match=True, # on base_dir=tmp_path, session_factory=mock_session_factory, ) file_path = tmp_path / "forced.3mf" _write_3mf_with_filaments( file_path, [ {"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "10.0"}, {"id": "2", "type": "PLA", "color": "#FF00FF", "used_g": "5.0"}, ], plate_index=1, ) mock_archive = MagicMock() mock_archive.id = 1 mock_archive.print_name = "forced" with ( patch( "backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None, ), patch( "backend.app.services.archive.ArchiveService.archive_print", new_callable=AsyncMock, return_value=mock_archive, ), ): await inst._add_to_print_queue(file_path, "192.168.1.100") assert len(added_items) == 1 queue_item = added_items[0] assert queue_item.filament_overrides is not None overrides = json.loads(queue_item.filament_overrides) assert overrides == [ {"slot_id": 1, "type": "PLA", "color": "#FFFFFF", "force_color_match": True}, {"slot_id": 2, "type": "PLA", "color": "#FF00FF", "force_color_match": True}, ] # required_filament_types still populated alongside overrides. assert json.loads(queue_item.required_filament_types) == ["PLA"] @pytest.mark.asyncio async def test_add_to_print_queue_force_color_match_skips_when_3mf_unparseable(self, tmp_path): """A malformed or fake-bytes 3MF must not crash the upload path — we just write the queue item with no filament fields and let the scheduler fall back to model-only matching (the pre-#1188 default). Regression guard for the existing fake-bytes happy-path tests.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance added_items = [] mock_db = AsyncMock() mock_db.add = MagicMock(side_effect=added_items.append) mock_db.commit = AsyncMock() mock_session_factory = MagicMock() mock_session_ctx = AsyncMock() mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.__aexit__ = AsyncMock(return_value=False) mock_session_factory.return_value = mock_session_ctx inst = VirtualPrinterInstance( vp_id=23, name="Unparseable", mode="print_queue", model="C12", access_code="12345678", serial_suffix="391800023", auto_dispatch=True, queue_force_color_match=True, base_dir=tmp_path, session_factory=mock_session_factory, ) file_path = tmp_path / "bad.3mf" file_path.write_bytes(b"not a real 3mf zip") mock_archive = MagicMock() mock_archive.id = 1 mock_archive.print_name = "bad" with ( patch( "backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=None, ), patch( "backend.app.services.archive.ArchiveService.archive_print", new_callable=AsyncMock, return_value=mock_archive, ), ): await inst._add_to_print_queue(file_path, "192.168.1.100") assert len(added_items) == 1 queue_item = added_items[0] # No filament data extractable → both fields stay None (graceful # fallback to model-only scheduling). assert queue_item.required_filament_types is None assert queue_item.filament_overrides is None # ======================================================================== # Tests for archive_name_source setting (#1152) # ======================================================================== @pytest.mark.asyncio @pytest.mark.parametrize( ("setting_value", "expected_prefer_filename"), [ ("filename", True), ("metadata", False), (None, False), # Default when setting unset ("", False), # Defensive: empty string is not "filename" ], ) async def test_archive_file_passes_prefer_filename_per_setting( self, tmp_path, setting_value, expected_prefer_filename ): """_archive_file reads `virtual_printer_archive_name_source` and forwards prefer_filename_for_name=True only when it equals 'filename' (#1152).""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance mock_db = AsyncMock() mock_session_factory = MagicMock() mock_session_ctx = AsyncMock() mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db) mock_session_ctx.__aexit__ = AsyncMock(return_value=False) mock_session_factory.return_value = mock_session_ctx inst = VirtualPrinterInstance( vp_id=20, name="NameSource", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800020", base_dir=tmp_path, session_factory=mock_session_factory, ) file_path = tmp_path / "user-renamed-job.3mf" file_path.write_bytes(b"fake3mf") mock_archive = MagicMock() mock_archive.id = 1 mock_archive.print_name = "user-renamed-job" archive_print_mock = AsyncMock(return_value=mock_archive) with ( patch( "backend.app.api.routes.settings.get_setting", new_callable=AsyncMock, return_value=setting_value, ), patch( "backend.app.services.archive.ArchiveService.archive_print", archive_print_mock, ), ): await inst._archive_file(file_path, "192.168.1.100") assert archive_print_mock.await_count == 1 kwargs = archive_print_mock.await_args.kwargs assert kwargs.get("prefer_filename_for_name") is expected_prefer_filename class TestVirtualPrinterManager: """Tests for VirtualPrinterManager orchestrator.""" @pytest.fixture def manager(self): """Create a VirtualPrinterManager instance.""" from backend.app.services.virtual_printer.manager import VirtualPrinterManager return VirtualPrinterManager() def test_manager_starts_empty(self, manager): """Verify manager starts with no instances.""" assert len(manager._instances) == 0 assert manager.is_enabled is False def test_manager_get_status_empty(self, manager): """Verify get_status returns disabled state when no instances.""" status = manager.get_status() assert status["enabled"] is False assert status["running"] is False assert status["mode"] == "immediate" def test_manager_is_enabled_with_instance(self, manager, tmp_path): """Verify is_enabled is True when instances exist.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="Test", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", base_dir=tmp_path, ) manager._instances[1] = inst assert manager.is_enabled is True @pytest.mark.asyncio async def test_manager_remove_instance_server(self, manager, tmp_path): """Verify remove_instance stops and removes a server-mode instance.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="Test", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", base_dir=tmp_path, ) inst.stop_server = AsyncMock() manager._instances[1] = inst await manager.remove_instance(1) assert 1 not in manager._instances inst.stop_server.assert_called_once() @pytest.mark.asyncio async def test_manager_remove_instance_proxy(self, manager, tmp_path): """Verify remove_instance stops proxy-mode instance.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=2, name="Proxy", mode="proxy", model="C11", access_code="", serial_suffix="391800002", target_printer_ip="192.168.1.100", base_dir=tmp_path, ) inst.stop_proxy = AsyncMock() manager._instances[2] = inst await manager.remove_instance(2) assert 2 not in manager._instances inst.stop_proxy.assert_called_once() def test_manager_get_status_with_instance(self, manager, tmp_path): """Verify legacy get_status returns first instance data.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="Bambuddy", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", base_dir=tmp_path, ) mock_task = MagicMock(done=MagicMock(return_value=False)) inst._tasks = [mock_task] inst._pending_files = {"file1.3mf": Path("/tmp/file1.3mf")} # nosec B108 manager._instances[1] = inst status = manager.get_status() assert status["enabled"] is True assert status["running"] is True assert status["mode"] == "immediate" assert status["name"] == "Bambuddy" assert status["serial"] == "01S00A391800001" assert status["model"] == "C11" assert status["model_name"] == "P1P" assert status["pending_files"] == 1 def test_manager_get_all_status(self, manager, tmp_path): """Verify get_all_status returns status for all instances.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance for i in range(1, 3): inst = VirtualPrinterInstance( vp_id=i, name=f"VP{i}", mode="immediate", model="C11", access_code="12345678", serial_suffix=f"39180000{i}", base_dir=tmp_path, ) manager._instances[i] = inst statuses = manager.get_all_status() assert len(statuses) == 2 assert statuses[0]["name"] == "VP1" assert statuses[1]["name"] == "VP2" @pytest.mark.asyncio async def test_manager_stop_all(self, manager, tmp_path): """Verify stop_all removes all instances.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance for i in range(1, 3): inst = VirtualPrinterInstance( vp_id=i, name=f"VP{i}", mode="immediate", model="C11", access_code="12345678", serial_suffix=f"39180000{i}", base_dir=tmp_path, ) inst.stop_server = AsyncMock() manager._instances[i] = inst await manager.stop_all() assert len(manager._instances) == 0 # ======================================================================== # Tests for sync_from_db config change detection # ======================================================================== def _make_db_vp(self, **overrides): """Create a mock VirtualPrinter DB object.""" defaults = { "id": 1, "name": "TestVP", "enabled": True, "mode": "immediate", "model": "C11", "access_code": "12345678", "serial_suffix": "391800001", "bind_ip": "", "remote_interface_ip": "", "target_printer_id": None, "auto_dispatch": True, "tailscale_disabled": True, # Opt-in default (#1070 UX fix) "position": 0, } defaults.update(overrides) vp = MagicMock() for k, v in defaults.items(): setattr(vp, k, v) return vp def _setup_sync_mocks(self, manager, enabled_vps, tmp_path): """Wire up session_factory mock for sync_from_db.""" mock_result = MagicMock() mock_result.scalars.return_value.all.return_value = enabled_vps mock_db = AsyncMock() mock_db.execute = AsyncMock(return_value=mock_result) mock_db.__aenter__ = AsyncMock(return_value=mock_db) mock_db.__aexit__ = AsyncMock(return_value=False) manager._session_factory = MagicMock(return_value=mock_db) manager._base_dir = tmp_path @pytest.mark.asyncio async def test_sync_from_db_restarts_on_mode_change(self, manager, tmp_path): """Verify sync_from_db restarts VP when mode changes.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="TestVP", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", base_dir=tmp_path, ) inst.stop_server = AsyncMock() manager._instances[1] = inst # DB says mode changed to "archive" db_vp = self._make_db_vp(mode="archive") self._setup_sync_mocks(manager, [db_vp], tmp_path) with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove: # Patch VirtualPrinterInstance to prevent actual start with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst: mock_new = MagicMock() mock_new.start_server = AsyncMock() MockInst.return_value = mock_new await manager.sync_from_db() mock_remove.assert_called_once_with(1) @pytest.mark.asyncio async def test_sync_from_db_restarts_on_access_code_change(self, manager, tmp_path): """Verify sync_from_db restarts VP when access_code changes.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="TestVP", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", base_dir=tmp_path, ) inst.stop_server = AsyncMock() manager._instances[1] = inst db_vp = self._make_db_vp(access_code="newcode99") self._setup_sync_mocks(manager, [db_vp], tmp_path) with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove: with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst: mock_new = MagicMock() mock_new.start_server = AsyncMock() MockInst.return_value = mock_new await manager.sync_from_db() mock_remove.assert_called_once_with(1) @pytest.mark.asyncio async def test_sync_from_db_skips_unchanged_instance(self, manager, tmp_path): """Verify sync_from_db does NOT restart when config is identical.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="TestVP", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", base_dir=tmp_path, ) manager._instances[1] = inst # DB matches running config exactly db_vp = self._make_db_vp() self._setup_sync_mocks(manager, [db_vp], tmp_path) with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove: await manager.sync_from_db() mock_remove.assert_not_called() @pytest.mark.asyncio async def test_sync_from_db_restarts_on_bind_ip_change(self, manager, tmp_path): """Verify sync_from_db restarts VP when bind_ip changes.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="TestVP", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", bind_ip="192.168.1.10", base_dir=tmp_path, ) inst.stop_server = AsyncMock() manager._instances[1] = inst db_vp = self._make_db_vp(bind_ip="192.168.1.20") self._setup_sync_mocks(manager, [db_vp], tmp_path) with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove: with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst: mock_new = MagicMock() mock_new.start_server = AsyncMock() MockInst.return_value = mock_new await manager.sync_from_db() mock_remove.assert_called_once_with(1) @pytest.mark.asyncio async def test_sync_from_db_restarts_on_model_change(self, manager, tmp_path): """Verify sync_from_db restarts VP when model changes.""" from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="TestVP", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", base_dir=tmp_path, ) inst.stop_server = AsyncMock() manager._instances[1] = inst db_vp = self._make_db_vp(model="C12") self._setup_sync_mocks(manager, [db_vp], tmp_path) with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove: with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst: mock_new = MagicMock() mock_new.start_server = AsyncMock() MockInst.return_value = mock_new await manager.sync_from_db() mock_remove.assert_called_once_with(1) @pytest.mark.asyncio async def test_sync_from_db_does_not_restart_on_tailscale_toggle(self, manager, tmp_path): """Flipping tailscale_disabled is purely informational — must NOT trigger a restart. Cert provisioning was removed; the toggle only governs whether the VP card surfaces the host's Tailscale IP/FQDN to the user. No service needs to reload, so changing it through sync_from_db should leave any running instance untouched. """ from backend.app.services.virtual_printer.manager import VirtualPrinterInstance inst = VirtualPrinterInstance( vp_id=1, name="TestVP", mode="immediate", model="C11", access_code="12345678", serial_suffix="391800001", tailscale_disabled=False, base_dir=tmp_path, ) inst.stop_server = AsyncMock() manager._instances[1] = inst db_vp = self._make_db_vp(tailscale_disabled=True) self._setup_sync_mocks(manager, [db_vp], tmp_path) with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove: await manager.sync_from_db() mock_remove.assert_not_called() class TestFTPSession: """Tests for FTP session handling.""" @pytest.fixture def mock_reader(self): """Create a mock StreamReader.""" reader = AsyncMock() return reader @pytest.fixture def mock_writer(self): """Create a mock StreamWriter.""" writer = MagicMock() writer.get_extra_info = MagicMock(return_value=("192.168.1.100", 12345)) writer.write = MagicMock() writer.drain = AsyncMock() writer.close = MagicMock() writer.wait_closed = AsyncMock() writer.is_closing = MagicMock(return_value=False) return writer @pytest.fixture def ssl_context(self): """Create a mock SSL context.""" return MagicMock() @pytest.fixture def session(self, mock_reader, mock_writer, ssl_context, tmp_path): """Create an FTPSession instance.""" from backend.app.services.virtual_printer.ftp_server import FTPSession return FTPSession( reader=mock_reader, writer=mock_writer, upload_dir=tmp_path, access_code="12345678", ssl_context=ssl_context, on_file_received=None, ) # ======================================================================== # Tests for authentication # ======================================================================== @pytest.mark.asyncio async def test_user_command_accepts_bblp(self, session): """Verify USER command accepts bblp user.""" await session.cmd_USER("bblp") assert session.username == "bblp" @pytest.mark.asyncio async def test_pass_command_authenticates(self, session): """Verify PASS command authenticates with correct code.""" session.username = "bblp" await session.cmd_PASS("12345678") assert session.authenticated is True @pytest.mark.asyncio async def test_pass_command_rejects_wrong_code(self, session): """Verify PASS command rejects wrong access code.""" session.username = "bblp" await session.cmd_PASS("wrongcode") assert session.authenticated is False # ======================================================================== # Tests for FTP commands # ======================================================================== @pytest.mark.asyncio async def test_syst_command(self, session): """Verify SYST returns UNIX type.""" await session.cmd_SYST("") session.writer.write.assert_called() call_args = session.writer.write.call_args[0][0].decode() assert "215" in call_args assert "UNIX" in call_args @pytest.mark.asyncio async def test_pwd_command_requires_auth(self, session): """Verify PWD requires authentication.""" session.authenticated = False await session.cmd_PWD("") call_args = session.writer.write.call_args[0][0].decode() assert "530" in call_args @pytest.mark.asyncio async def test_pwd_command_when_authenticated(self, session): """Verify PWD returns root directory when authenticated.""" session.authenticated = True await session.cmd_PWD("") call_args = session.writer.write.call_args[0][0].decode() assert "257" in call_args @pytest.mark.asyncio async def test_type_command_sets_binary(self, session): """Verify TYPE I sets binary mode.""" session.authenticated = True await session.cmd_TYPE("I") assert session.transfer_type == "I" @pytest.mark.asyncio async def test_pbsz_command(self, session): """Verify PBSZ returns success.""" await session.cmd_PBSZ("0") call_args = session.writer.write.call_args[0][0].decode() assert "200" in call_args @pytest.mark.asyncio async def test_prot_command_accepts_p(self, session): """Verify PROT P is accepted.""" await session.cmd_PROT("P") call_args = session.writer.write.call_args[0][0].decode() assert "200" in call_args @pytest.mark.asyncio async def test_quit_command(self, session): """Verify QUIT sends goodbye and raises CancelledError.""" with pytest.raises(asyncio.CancelledError): await session.cmd_QUIT("") class TestSSDPServer: """Tests for Virtual Printer SSDP server.""" @pytest.fixture def ssdp_server(self): """Create a VirtualPrinterSSDPServer instance.""" from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer return VirtualPrinterSSDPServer( serial="TEST123", name="TestPrinter", model="BL-P001", ) # ======================================================================== # Tests for SSDP response # ======================================================================== def test_build_notify_message(self, ssdp_server): """Verify NOTIFY packet contains required headers.""" # Set a known IP for testing ssdp_server._local_ip = "192.168.1.100" message = ssdp_server._build_notify_message() assert b"NOTIFY" in message assert b"DevName.bambu.com: TestPrinter" in message assert b"USN: TEST123" in message def test_build_response_message(self, ssdp_server): """Verify response packet contains required headers.""" # Set a known IP for testing ssdp_server._local_ip = "192.168.1.100" message = ssdp_server._build_response_message() assert b"HTTP/1.1 200 OK" in message assert b"DevName.bambu.com: TestPrinter" in message assert b"USN: TEST123" in message def test_ssdp_server_uses_correct_model(self, ssdp_server): """Verify SSDP server uses the provided model.""" ssdp_server._local_ip = "192.168.1.100" message = ssdp_server._build_notify_message() assert b"DevModel.bambu.com: BL-P001" in message # ======================================================================== # Tests for advertise_ip parameter # ======================================================================== def test_advertise_ip_sets_local_ip(self): """Verify advertise_ip overrides auto-detection.""" from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer server = VirtualPrinterSSDPServer( serial="TEST123", name="TestPrinter", model="BL-P001", advertise_ip="10.0.0.50", ) assert server._local_ip == "10.0.0.50" def test_advertise_ip_empty_string_uses_auto_detect(self): """Verify empty advertise_ip falls back to auto-detection.""" from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer server = VirtualPrinterSSDPServer( serial="TEST123", name="TestPrinter", model="BL-P001", advertise_ip="", ) assert server._local_ip is None def test_advertise_ip_in_notify_message(self): """Verify NOTIFY message uses the advertise_ip.""" from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer server = VirtualPrinterSSDPServer( serial="TEST123", name="TestPrinter", model="BL-P001", advertise_ip="10.0.0.50", ) message = server._build_notify_message() assert b"Location: 10.0.0.50" in message def test_advertise_ip_in_response_message(self): """Verify M-SEARCH response uses the advertise_ip.""" from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer server = VirtualPrinterSSDPServer( serial="TEST123", name="TestPrinter", model="BL-P001", advertise_ip="10.0.0.50", ) message = server._build_response_message() assert b"Location: 10.0.0.50" in message def test_default_no_advertise_ip(self): """Verify default constructor has None local_ip (auto-detect).""" from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer server = VirtualPrinterSSDPServer() assert server._local_ip is None class TestCertificateService: """Tests for TLS certificate generation.""" @pytest.fixture def cert_service(self, tmp_path): """Create a CertificateService instance.""" from backend.app.services.virtual_printer.certificate import CertificateService return CertificateService(cert_dir=tmp_path, serial="TEST123") def test_generate_certificates(self, cert_service, tmp_path): """Verify certificates are generated correctly.""" cert_path, key_path = cert_service.generate_certificates() assert cert_path.exists() assert key_path.exists() # Verify certificate content cert_content = cert_path.read_text() assert "BEGIN CERTIFICATE" in cert_content key_content = key_path.read_text() assert "BEGIN" in key_content and "KEY" in key_content def test_certificates_reused_if_exist(self, cert_service): """Verify existing certificates are reused.""" # First generation cert_path1, key_path1 = cert_service.generate_certificates() mtime1 = cert_path1.stat().st_mtime # Second call should reuse (via ensure_certificates) cert_path2, key_path2 = cert_service.ensure_certificates() mtime2 = cert_path2.stat().st_mtime assert mtime1 == mtime2 # File wasn't regenerated def test_delete_certificates(self, cert_service): """Verify certificates can be deleted.""" cert_service.generate_certificates() assert cert_service.cert_path.exists() assert cert_service.key_path.exists() cert_service.delete_certificates() assert not cert_service.cert_path.exists() assert not cert_service.key_path.exists() def test_ensure_creates_if_not_exist(self, cert_service): """Verify ensure_certificates generates if not existing.""" assert not cert_service.cert_path.exists() cert_path, key_path = cert_service.ensure_certificates() assert cert_path.exists() assert key_path.exists() class TestBindServer: """Tests for BindServer (port 3002 bind/detect protocol).""" @pytest.fixture def bind_server(self): """Create a BindServer instance.""" from backend.app.services.virtual_printer.bind_server import BindServer return BindServer( serial="09400A391800001", model="O1D", name="Bambuddy", ) def test_build_frame(self, bind_server): """Verify frame building produces correct format.""" payload = {"login": {"command": "detect"}} frame = bind_server._build_frame(payload) # Header: 0xA5A5 assert frame[:2] == b"\xa5\xa5" # Trailer: 0xA7A7 assert frame[-2:] == b"\xa7\xa7" # Length field is total message size (LE uint16) import struct total_len = struct.unpack_from(" bytes: """Build a minimal MQTT PUBLISH packet.""" # PUBLISH fixed header: type 3, no flags topic_bytes = topic.encode("utf-8") # Variable header: topic length (2 bytes) + topic var_header = len(topic_bytes).to_bytes(2, "big") + topic_bytes body = var_header + payload # Encode remaining length remaining = len(body) header = bytearray([0x30]) # PUBLISH, QoS 0 while True: encoded_byte = remaining % 128 remaining //= 128 if remaining > 0: encoded_byte |= 0x80 header.append(encoded_byte) if remaining == 0: break return bytes(header) + body @staticmethod def _build_mqtt_pingreq() -> bytes: """Build an MQTT PINGREQ packet (2 bytes, no payload).""" return b"\xc0\x00" def test_rewrite_ip_in_publish(self): """IP string in PUBLISH payload is rewritten.""" from backend.app.services.virtual_printer.tcp_proxy import TLSProxy payload = b'{"rtsp_url":"rtsps://192.168.1.100:322/live"}' packet = self._build_mqtt_publish("device/status", payload) result, buf = TLSProxy._rewrite_mqtt_ip(packet, b"192.168.1.100", b"10.0.0.1", bytearray()) assert b"10.0.0.1" in result assert b"192.168.1.100" not in result def test_no_rewrite_when_ip_absent(self): """Packets without the target IP are passed through unchanged.""" from backend.app.services.virtual_printer.tcp_proxy import TLSProxy payload = b'{"status":"idle"}' packet = self._build_mqtt_publish("device/status", payload) result, buf = TLSProxy._rewrite_mqtt_ip(packet, b"192.168.1.100", b"10.0.0.1", bytearray()) assert result == packet def test_non_publish_packets_unchanged(self): """Non-PUBLISH packets (e.g. PINGREQ) are never rewritten.""" from backend.app.services.virtual_printer.tcp_proxy import TLSProxy pingreq = self._build_mqtt_pingreq() result, buf = TLSProxy._rewrite_mqtt_ip(pingreq, b"192.168.1.100", b"10.0.0.1", bytearray()) assert result == pingreq def test_rewrite_preserves_packet_framing(self): """Rewritten packet has valid MQTT remaining length.""" from backend.app.services.virtual_printer.tcp_proxy import TLSProxy # Use IPs of different lengths to test length re-encoding old_ip = b"192.168.255.133" # 15 bytes new_ip = b"10.0.0.1" # 8 bytes payload = b'{"ip":"192.168.255.133"}' packet = self._build_mqtt_publish("device/status", payload) result, buf = TLSProxy._rewrite_mqtt_ip(packet, old_ip, new_ip, bytearray()) # Parse the result to verify framing assert result[0] == 0x30 # PUBLISH header byte # Decode remaining length pos = 1 remaining = 0 multiplier = 1 while True: b = result[pos] pos += 1 remaining += (b & 0x7F) * multiplier multiplier *= 128 if (b & 0x80) == 0: break # Remaining length should match actual data assert pos + remaining == len(result) assert new_ip in result def test_incomplete_packet_buffered(self): """Incomplete packet at end of chunk is buffered for next call.""" from backend.app.services.virtual_printer.tcp_proxy import TLSProxy payload = b'{"ip":"192.168.1.100"}' packet = self._build_mqtt_publish("device/status", payload) # Split packet in the middle half = len(packet) // 2 chunk1 = packet[:half] chunk2 = packet[half:] result1, buf = TLSProxy._rewrite_mqtt_ip(chunk1, b"192.168.1.100", b"10.0.0.1", bytearray()) # First chunk should be buffered (incomplete packet) assert len(buf) > 0 result2, buf = TLSProxy._rewrite_mqtt_ip(chunk2, b"192.168.1.100", b"10.0.0.1", buf) # Second chunk completes the packet, IP should be rewritten combined = result1 + result2 assert b"10.0.0.1" in combined assert b"192.168.1.100" not in combined def test_multiple_packets_in_one_chunk(self): """Multiple MQTT packets in a single chunk are all processed.""" from backend.app.services.virtual_printer.tcp_proxy import TLSProxy payload1 = b'{"ip":"192.168.1.100"}' payload2 = b'{"other":"data"}' packet1 = self._build_mqtt_publish("topic1", payload1) packet2 = self._build_mqtt_publish("topic2", payload2) combined = packet1 + packet2 result, buf = TLSProxy._rewrite_mqtt_ip(combined, b"192.168.1.100", b"10.0.0.1", bytearray()) assert b"10.0.0.1" in result assert b"192.168.1.100" not in result # Second packet should still be present assert b"other" in result def test_extra_replacements(self): """Extra replacement pairs (e.g. integer IP) are also applied.""" from backend.app.services.virtual_printer.tcp_proxy import TLSProxy payload = b'{"net":{"info":[{"ip":2248124608}]}}' packet = self._build_mqtt_publish("device/status", payload) result, buf = TLSProxy._rewrite_mqtt_ip( packet, b"NOMATCH", b"NOREPLACE", bytearray(), extra_replacements=[(b"2248124608", b"285190336")], ) assert b"285190336" in result assert b"2248124608" not in result class TestIpToLeIntBytes: """Tests for TLSProxy._ip_to_le_int_bytes() integer IP conversion.""" def test_converts_ip_to_le_int(self): from backend.app.services.virtual_printer.tcp_proxy import TLSProxy assert TLSProxy._ip_to_le_int_bytes("192.168.255.133") == b"2248124608" assert TLSProxy._ip_to_le_int_bytes("192.168.255.16") == b"285190336" assert TLSProxy._ip_to_le_int_bytes("10.0.0.1") == b"16777226" def test_roundtrip(self): """Verify the integer converts back to the correct IP.""" import struct from backend.app.services.virtual_printer.tcp_proxy import TLSProxy for ip in ["192.168.1.1", "10.0.0.1", "172.16.0.100", "192.168.255.133"]: le_int = int(TLSProxy._ip_to_le_int_bytes(ip)) parts = ip.split(".") expected = struct.unpack("