"""Unit tests for Virtual Printer services.
Tests the virtual printer manager, FTP server, and SSDP server components.
"""
import asyncio
import json
import zipfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
def _write_3mf_with_filaments(file_path: Path, filaments: list[dict], plate_index: int = 1) -> None:
"""Build a minimal 3MF zip with `Metadata/slice_info.config` carrying the
given per-slot filament entries. Each `filaments` dict needs `id`, `type`,
`color`, `used_g`. Used by the #1188 VP queue-mode tests below."""
filament_xml = "".join(
f''
for f in filaments
)
config = (
''
""
f''
f"{filament_xml}"
""
""
)
with zipfile.ZipFile(file_path, "w") as zf:
zf.writestr("Metadata/slice_info.config", config)
# Plate gcode is referenced for plate-id detection in the VP path —
# presence is enough; contents don't matter.
zf.writestr(f"Metadata/plate_{plate_index}.gcode", "; gcode\n")
class TestVirtualPrinterInstance:
"""Tests for VirtualPrinterInstance class."""
@pytest.fixture
def instance(self, tmp_path):
"""Create a VirtualPrinterInstance with test defaults."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
return VirtualPrinterInstance(
vp_id=1,
name="TestPrinter",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
base_dir=tmp_path,
)
# ========================================================================
# Tests for instance properties
# ========================================================================
def test_instance_stores_parameters(self, instance):
"""Verify constructor stores parameters correctly."""
assert instance.id == 1
assert instance.name == "TestPrinter"
assert instance.mode == "immediate"
assert instance.model == "C11"
assert instance.access_code == "12345678"
assert instance.serial_suffix == "391800001"
def test_instance_serial_property(self, instance):
"""Verify serial is generated from model prefix + suffix."""
# C11 = P1P, prefix = 01S00A
assert instance.serial == "01S00A391800001"
def test_instance_serial_x1c(self, tmp_path):
"""Verify X1C serial uses correct prefix."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=2,
name="X1C",
mode="immediate",
model="BL-P001",
access_code="12345678",
serial_suffix="391800002",
base_dir=tmp_path,
)
assert inst.serial == "00M00A391800002"
def test_instance_is_proxy_false(self, instance):
"""Verify is_proxy is False for non-proxy mode."""
assert instance.is_proxy is False
def test_instance_is_proxy_true(self, tmp_path):
"""Verify is_proxy is True for proxy mode."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=3,
name="Proxy",
mode="proxy",
model="C11",
access_code="",
serial_suffix="391800003",
target_printer_ip="192.168.1.100",
base_dir=tmp_path,
)
assert inst.is_proxy is True
def test_instance_is_running_with_active_tasks(self, instance):
"""Verify is_running is True when tasks are active."""
mock_task = MagicMock()
mock_task.done.return_value = False
instance._tasks = [mock_task]
assert instance.is_running is True
def test_instance_is_running_with_no_tasks(self, instance):
"""Verify is_running is False when no tasks."""
assert instance.is_running is False
def test_instance_creates_directories(self, instance, tmp_path):
"""Verify instance creates upload and cert directories."""
assert (tmp_path / "uploads" / "1").exists()
assert (tmp_path / "uploads" / "1" / "cache").exists()
assert (tmp_path / "certs" / "1").exists()
# ========================================================================
# Tests for status
# ========================================================================
def test_get_status_returns_correct_format(self, instance):
"""Verify get_status returns expected fields."""
instance._pending_files = {"file1.3mf": Path("/tmp/file1.3mf")} # nosec B108
mock_task = MagicMock(done=MagicMock(return_value=False))
instance._tasks = [mock_task]
status = instance.get_status()
assert status["running"] is True
assert status["pending_files"] == 1
def test_get_status_not_running(self, instance):
"""Verify get_status when no tasks."""
status = instance.get_status()
assert status["running"] is False
assert status["pending_files"] == 0
# ========================================================================
# Tests for file handling
# ========================================================================
@pytest.mark.asyncio
async def test_on_file_received_adds_to_pending(self, instance):
"""Verify received file is added to pending list in review mode."""
instance.mode = "review"
file_path = Path("/tmp/test.3mf") # nosec B108
with patch.object(instance, "_queue_file", new_callable=AsyncMock) as mock_queue:
await instance.on_file_received(file_path, "192.168.1.100")
assert "test.3mf" in instance._pending_files
mock_queue.assert_called_once()
@pytest.mark.asyncio
async def test_on_file_received_archives_immediately(self, instance):
"""Verify file is archived in immediate mode."""
file_path = Path("/tmp/test.3mf") # nosec B108
with patch.object(instance, "_archive_file", new_callable=AsyncMock) as mock_archive:
await instance.on_file_received(file_path, "192.168.1.100")
mock_archive.assert_called_once_with(file_path, "192.168.1.100")
@pytest.mark.asyncio
async def test_archive_file_skips_non_3mf(self, instance):
"""Verify non-3MF files are skipped and cleaned up."""
instance._session_factory = MagicMock()
instance._pending_files["verify_job"] = Path("/tmp/verify_job") # nosec B108
with patch("pathlib.Path.unlink"):
await instance._archive_file(Path("/tmp/verify_job"), "192.168.1.100") # nosec B108
assert "verify_job" not in instance._pending_files
# ========================================================================
# Tests for auto_dispatch
# ========================================================================
def test_auto_dispatch_defaults_to_true(self, tmp_path):
"""Verify auto_dispatch defaults to True when not specified."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=10,
name="DefaultDispatch",
mode="print_queue",
model="C11",
access_code="12345678",
serial_suffix="391800010",
base_dir=tmp_path,
)
assert inst.auto_dispatch is True
@pytest.mark.asyncio
async def test_add_to_print_queue_with_auto_dispatch_on(self, tmp_path):
"""Verify queue items have manual_start=False when auto_dispatch=True."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
mock_db = AsyncMock()
added_items = []
def capture_add(item):
added_items.append(item)
mock_db.add = MagicMock(side_effect=capture_add)
mock_db.commit = AsyncMock()
mock_session_factory = MagicMock()
mock_session_ctx = AsyncMock()
mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
mock_session_factory.return_value = mock_session_ctx
inst = VirtualPrinterInstance(
vp_id=11,
name="AutoDispatchOn",
mode="print_queue",
model="C11",
access_code="12345678",
serial_suffix="391800011",
auto_dispatch=True,
base_dir=tmp_path,
session_factory=mock_session_factory,
)
# Create a temp 3mf file
file_path = tmp_path / "test.3mf"
file_path.write_bytes(b"fake3mf")
mock_archive = MagicMock()
mock_archive.id = 1
mock_archive.print_name = "test"
with (
patch(
"backend.app.api.routes.settings.get_setting",
new_callable=AsyncMock,
return_value=None,
),
patch(
"backend.app.services.archive.ArchiveService.archive_print",
new_callable=AsyncMock,
return_value=mock_archive,
),
):
await inst._add_to_print_queue(file_path, "192.168.1.100")
assert len(added_items) == 1
queue_item = added_items[0]
assert queue_item.manual_start is False
@pytest.mark.asyncio
async def test_add_to_print_queue_with_auto_dispatch_off(self, tmp_path):
"""Verify queue items have manual_start=True when auto_dispatch=False."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
mock_db = AsyncMock()
added_items = []
def capture_add(item):
added_items.append(item)
mock_db.add = MagicMock(side_effect=capture_add)
mock_db.commit = AsyncMock()
mock_session_factory = MagicMock()
mock_session_ctx = AsyncMock()
mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
mock_session_factory.return_value = mock_session_ctx
inst = VirtualPrinterInstance(
vp_id=12,
name="AutoDispatchOff",
mode="print_queue",
model="C11",
access_code="12345678",
serial_suffix="391800012",
auto_dispatch=False,
base_dir=tmp_path,
session_factory=mock_session_factory,
)
# Create a temp 3mf file
file_path = tmp_path / "test.3mf"
file_path.write_bytes(b"fake3mf")
mock_archive = MagicMock()
mock_archive.id = 1
mock_archive.print_name = "test"
with (
patch(
"backend.app.api.routes.settings.get_setting",
new_callable=AsyncMock,
return_value=None,
),
patch(
"backend.app.services.archive.ArchiveService.archive_print",
new_callable=AsyncMock,
return_value=mock_archive,
),
):
await inst._add_to_print_queue(file_path, "192.168.1.100")
assert len(added_items) == 1
queue_item = added_items[0]
assert queue_item.manual_start is True
@pytest.mark.asyncio
async def test_add_to_print_queue_populates_required_filament_types(self, tmp_path):
"""#1188: VP queue-mode used to create PrintQueueItems with no
filament fields, so the scheduler fell through to model-only matching
and dispatched onto whatever printer was free regardless of loaded
colour. ``required_filament_types`` is populated unconditionally
(cheap, helps the scheduler validate type even without
``force_color_match``) — pin that contract here."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
added_items = []
mock_db = AsyncMock()
mock_db.add = MagicMock(side_effect=added_items.append)
mock_db.commit = AsyncMock()
mock_session_factory = MagicMock()
mock_session_ctx = AsyncMock()
mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
mock_session_factory.return_value = mock_session_ctx
inst = VirtualPrinterInstance(
vp_id=21,
name="Reqs",
mode="print_queue",
model="C12",
access_code="12345678",
serial_suffix="391800021",
auto_dispatch=True,
queue_force_color_match=False, # off → only required_filament_types
base_dir=tmp_path,
session_factory=mock_session_factory,
)
file_path = tmp_path / "multi.3mf"
_write_3mf_with_filaments(
file_path,
[
{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "12.3"},
{"id": "2", "type": "PETG", "color": "#000000", "used_g": "4.5"},
# used_g=0 → not actually consumed by this plate, must be ignored
{"id": "3", "type": "ABS", "color": "#FF0000", "used_g": "0"},
],
plate_index=1,
)
mock_archive = MagicMock()
mock_archive.id = 1
mock_archive.print_name = "multi"
with (
patch(
"backend.app.api.routes.settings.get_setting",
new_callable=AsyncMock,
return_value=None,
),
patch(
"backend.app.services.archive.ArchiveService.archive_print",
new_callable=AsyncMock,
return_value=mock_archive,
),
):
await inst._add_to_print_queue(file_path, "192.168.1.100")
assert len(added_items) == 1
queue_item = added_items[0]
# Type-only fallback always populated. Sorted, deduped, no zero-use ABS.
assert queue_item.required_filament_types is not None
assert json.loads(queue_item.required_filament_types) == ["PETG", "PLA"]
# Setting off → no force_color_match overrides leaked.
assert queue_item.filament_overrides is None
@pytest.mark.asyncio
async def test_add_to_print_queue_force_color_match_writes_overrides(self, tmp_path):
"""#1188 core fix: when the per-VP ``queue_force_color_match`` toggle
is on, every consumed slot lands as a ``filament_overrides`` entry
with ``force_color_match: true``. This is the field the scheduler
keys on (``print_scheduler.py:512``) — without it, slot-by-slot
type+color matching never runs."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
added_items = []
mock_db = AsyncMock()
mock_db.add = MagicMock(side_effect=added_items.append)
mock_db.commit = AsyncMock()
mock_session_factory = MagicMock()
mock_session_ctx = AsyncMock()
mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
mock_session_factory.return_value = mock_session_ctx
inst = VirtualPrinterInstance(
vp_id=22,
name="ForceColor",
mode="print_queue",
model="C12",
access_code="12345678",
serial_suffix="391800022",
auto_dispatch=True,
queue_force_color_match=True, # on
base_dir=tmp_path,
session_factory=mock_session_factory,
)
file_path = tmp_path / "forced.3mf"
_write_3mf_with_filaments(
file_path,
[
{"id": "1", "type": "PLA", "color": "#FFFFFF", "used_g": "10.0"},
{"id": "2", "type": "PLA", "color": "#FF00FF", "used_g": "5.0"},
],
plate_index=1,
)
mock_archive = MagicMock()
mock_archive.id = 1
mock_archive.print_name = "forced"
with (
patch(
"backend.app.api.routes.settings.get_setting",
new_callable=AsyncMock,
return_value=None,
),
patch(
"backend.app.services.archive.ArchiveService.archive_print",
new_callable=AsyncMock,
return_value=mock_archive,
),
):
await inst._add_to_print_queue(file_path, "192.168.1.100")
assert len(added_items) == 1
queue_item = added_items[0]
assert queue_item.filament_overrides is not None
overrides = json.loads(queue_item.filament_overrides)
assert overrides == [
{"slot_id": 1, "type": "PLA", "color": "#FFFFFF", "force_color_match": True},
{"slot_id": 2, "type": "PLA", "color": "#FF00FF", "force_color_match": True},
]
# required_filament_types still populated alongside overrides.
assert json.loads(queue_item.required_filament_types) == ["PLA"]
@pytest.mark.asyncio
async def test_add_to_print_queue_force_color_match_skips_when_3mf_unparseable(self, tmp_path):
"""A malformed or fake-bytes 3MF must not crash the upload path —
we just write the queue item with no filament fields and let the
scheduler fall back to model-only matching (the pre-#1188 default).
Regression guard for the existing fake-bytes happy-path tests."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
added_items = []
mock_db = AsyncMock()
mock_db.add = MagicMock(side_effect=added_items.append)
mock_db.commit = AsyncMock()
mock_session_factory = MagicMock()
mock_session_ctx = AsyncMock()
mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
mock_session_factory.return_value = mock_session_ctx
inst = VirtualPrinterInstance(
vp_id=23,
name="Unparseable",
mode="print_queue",
model="C12",
access_code="12345678",
serial_suffix="391800023",
auto_dispatch=True,
queue_force_color_match=True,
base_dir=tmp_path,
session_factory=mock_session_factory,
)
file_path = tmp_path / "bad.3mf"
file_path.write_bytes(b"not a real 3mf zip")
mock_archive = MagicMock()
mock_archive.id = 1
mock_archive.print_name = "bad"
with (
patch(
"backend.app.api.routes.settings.get_setting",
new_callable=AsyncMock,
return_value=None,
),
patch(
"backend.app.services.archive.ArchiveService.archive_print",
new_callable=AsyncMock,
return_value=mock_archive,
),
):
await inst._add_to_print_queue(file_path, "192.168.1.100")
assert len(added_items) == 1
queue_item = added_items[0]
# No filament data extractable → both fields stay None (graceful
# fallback to model-only scheduling).
assert queue_item.required_filament_types is None
assert queue_item.filament_overrides is None
# ========================================================================
# Tests for archive_name_source setting (#1152)
# ========================================================================
@pytest.mark.asyncio
@pytest.mark.parametrize(
("setting_value", "expected_prefer_filename"),
[
("filename", True),
("metadata", False),
(None, False), # Default when setting unset
("", False), # Defensive: empty string is not "filename"
],
)
async def test_archive_file_passes_prefer_filename_per_setting(
self, tmp_path, setting_value, expected_prefer_filename
):
"""_archive_file reads `virtual_printer_archive_name_source` and forwards
prefer_filename_for_name=True only when it equals 'filename' (#1152)."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
mock_db = AsyncMock()
mock_session_factory = MagicMock()
mock_session_ctx = AsyncMock()
mock_session_ctx.__aenter__ = AsyncMock(return_value=mock_db)
mock_session_ctx.__aexit__ = AsyncMock(return_value=False)
mock_session_factory.return_value = mock_session_ctx
inst = VirtualPrinterInstance(
vp_id=20,
name="NameSource",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800020",
base_dir=tmp_path,
session_factory=mock_session_factory,
)
file_path = tmp_path / "user-renamed-job.3mf"
file_path.write_bytes(b"fake3mf")
mock_archive = MagicMock()
mock_archive.id = 1
mock_archive.print_name = "user-renamed-job"
archive_print_mock = AsyncMock(return_value=mock_archive)
with (
patch(
"backend.app.api.routes.settings.get_setting",
new_callable=AsyncMock,
return_value=setting_value,
),
patch(
"backend.app.services.archive.ArchiveService.archive_print",
archive_print_mock,
),
):
await inst._archive_file(file_path, "192.168.1.100")
assert archive_print_mock.await_count == 1
kwargs = archive_print_mock.await_args.kwargs
assert kwargs.get("prefer_filename_for_name") is expected_prefer_filename
class TestVirtualPrinterManager:
"""Tests for VirtualPrinterManager orchestrator."""
@pytest.fixture
def manager(self):
"""Create a VirtualPrinterManager instance."""
from backend.app.services.virtual_printer.manager import VirtualPrinterManager
return VirtualPrinterManager()
def test_manager_starts_empty(self, manager):
"""Verify manager starts with no instances."""
assert len(manager._instances) == 0
assert manager.is_enabled is False
def test_manager_get_status_empty(self, manager):
"""Verify get_status returns disabled state when no instances."""
status = manager.get_status()
assert status["enabled"] is False
assert status["running"] is False
assert status["mode"] == "immediate"
def test_manager_is_enabled_with_instance(self, manager, tmp_path):
"""Verify is_enabled is True when instances exist."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="Test",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
base_dir=tmp_path,
)
manager._instances[1] = inst
assert manager.is_enabled is True
@pytest.mark.asyncio
async def test_manager_remove_instance_server(self, manager, tmp_path):
"""Verify remove_instance stops and removes a server-mode instance."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="Test",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
base_dir=tmp_path,
)
inst.stop_server = AsyncMock()
manager._instances[1] = inst
await manager.remove_instance(1)
assert 1 not in manager._instances
inst.stop_server.assert_called_once()
@pytest.mark.asyncio
async def test_manager_remove_instance_proxy(self, manager, tmp_path):
"""Verify remove_instance stops proxy-mode instance."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=2,
name="Proxy",
mode="proxy",
model="C11",
access_code="",
serial_suffix="391800002",
target_printer_ip="192.168.1.100",
base_dir=tmp_path,
)
inst.stop_proxy = AsyncMock()
manager._instances[2] = inst
await manager.remove_instance(2)
assert 2 not in manager._instances
inst.stop_proxy.assert_called_once()
def test_manager_get_status_with_instance(self, manager, tmp_path):
"""Verify legacy get_status returns first instance data."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="Bambuddy",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
base_dir=tmp_path,
)
mock_task = MagicMock(done=MagicMock(return_value=False))
inst._tasks = [mock_task]
inst._pending_files = {"file1.3mf": Path("/tmp/file1.3mf")} # nosec B108
manager._instances[1] = inst
status = manager.get_status()
assert status["enabled"] is True
assert status["running"] is True
assert status["mode"] == "immediate"
assert status["name"] == "Bambuddy"
assert status["serial"] == "01S00A391800001"
assert status["model"] == "C11"
assert status["model_name"] == "P1P"
assert status["pending_files"] == 1
def test_manager_get_all_status(self, manager, tmp_path):
"""Verify get_all_status returns status for all instances."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
for i in range(1, 3):
inst = VirtualPrinterInstance(
vp_id=i,
name=f"VP{i}",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix=f"39180000{i}",
base_dir=tmp_path,
)
manager._instances[i] = inst
statuses = manager.get_all_status()
assert len(statuses) == 2
assert statuses[0]["name"] == "VP1"
assert statuses[1]["name"] == "VP2"
@pytest.mark.asyncio
async def test_manager_stop_all(self, manager, tmp_path):
"""Verify stop_all removes all instances."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
for i in range(1, 3):
inst = VirtualPrinterInstance(
vp_id=i,
name=f"VP{i}",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix=f"39180000{i}",
base_dir=tmp_path,
)
inst.stop_server = AsyncMock()
manager._instances[i] = inst
await manager.stop_all()
assert len(manager._instances) == 0
# ========================================================================
# Tests for sync_from_db config change detection
# ========================================================================
def _make_db_vp(self, **overrides):
"""Create a mock VirtualPrinter DB object."""
defaults = {
"id": 1,
"name": "TestVP",
"enabled": True,
"mode": "immediate",
"model": "C11",
"access_code": "12345678",
"serial_suffix": "391800001",
"bind_ip": "",
"remote_interface_ip": "",
"target_printer_id": None,
"auto_dispatch": True,
"tailscale_disabled": True, # Opt-in default (#1070 UX fix)
"position": 0,
}
defaults.update(overrides)
vp = MagicMock()
for k, v in defaults.items():
setattr(vp, k, v)
return vp
def _setup_sync_mocks(self, manager, enabled_vps, tmp_path):
"""Wire up session_factory mock for sync_from_db."""
mock_result = MagicMock()
mock_result.scalars.return_value.all.return_value = enabled_vps
mock_db = AsyncMock()
mock_db.execute = AsyncMock(return_value=mock_result)
mock_db.__aenter__ = AsyncMock(return_value=mock_db)
mock_db.__aexit__ = AsyncMock(return_value=False)
manager._session_factory = MagicMock(return_value=mock_db)
manager._base_dir = tmp_path
@pytest.mark.asyncio
async def test_sync_from_db_restarts_on_mode_change(self, manager, tmp_path):
"""Verify sync_from_db restarts VP when mode changes."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="TestVP",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
base_dir=tmp_path,
)
inst.stop_server = AsyncMock()
manager._instances[1] = inst
# DB says mode changed to "archive"
db_vp = self._make_db_vp(mode="archive")
self._setup_sync_mocks(manager, [db_vp], tmp_path)
with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove:
# Patch VirtualPrinterInstance to prevent actual start
with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst:
mock_new = MagicMock()
mock_new.start_server = AsyncMock()
MockInst.return_value = mock_new
await manager.sync_from_db()
mock_remove.assert_called_once_with(1)
@pytest.mark.asyncio
async def test_sync_from_db_restarts_on_access_code_change(self, manager, tmp_path):
"""Verify sync_from_db restarts VP when access_code changes."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="TestVP",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
base_dir=tmp_path,
)
inst.stop_server = AsyncMock()
manager._instances[1] = inst
db_vp = self._make_db_vp(access_code="newcode99")
self._setup_sync_mocks(manager, [db_vp], tmp_path)
with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove:
with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst:
mock_new = MagicMock()
mock_new.start_server = AsyncMock()
MockInst.return_value = mock_new
await manager.sync_from_db()
mock_remove.assert_called_once_with(1)
@pytest.mark.asyncio
async def test_sync_from_db_skips_unchanged_instance(self, manager, tmp_path):
"""Verify sync_from_db does NOT restart when config is identical."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="TestVP",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
base_dir=tmp_path,
)
manager._instances[1] = inst
# DB matches running config exactly
db_vp = self._make_db_vp()
self._setup_sync_mocks(manager, [db_vp], tmp_path)
with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove:
await manager.sync_from_db()
mock_remove.assert_not_called()
@pytest.mark.asyncio
async def test_sync_from_db_restarts_on_bind_ip_change(self, manager, tmp_path):
"""Verify sync_from_db restarts VP when bind_ip changes."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="TestVP",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
bind_ip="192.168.1.10",
base_dir=tmp_path,
)
inst.stop_server = AsyncMock()
manager._instances[1] = inst
db_vp = self._make_db_vp(bind_ip="192.168.1.20")
self._setup_sync_mocks(manager, [db_vp], tmp_path)
with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove:
with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst:
mock_new = MagicMock()
mock_new.start_server = AsyncMock()
MockInst.return_value = mock_new
await manager.sync_from_db()
mock_remove.assert_called_once_with(1)
@pytest.mark.asyncio
async def test_sync_from_db_restarts_on_model_change(self, manager, tmp_path):
"""Verify sync_from_db restarts VP when model changes."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="TestVP",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
base_dir=tmp_path,
)
inst.stop_server = AsyncMock()
manager._instances[1] = inst
db_vp = self._make_db_vp(model="C12")
self._setup_sync_mocks(manager, [db_vp], tmp_path)
with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove:
with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst:
mock_new = MagicMock()
mock_new.start_server = AsyncMock()
MockInst.return_value = mock_new
await manager.sync_from_db()
mock_remove.assert_called_once_with(1)
@pytest.mark.asyncio
async def test_sync_from_db_restarts_on_tailscale_disabled_change(self, manager, tmp_path):
"""VP restarts when tailscale_disabled flips from False to True."""
from backend.app.services.virtual_printer.manager import VirtualPrinterInstance
inst = VirtualPrinterInstance(
vp_id=1,
name="TestVP",
mode="immediate",
model="C11",
access_code="12345678",
serial_suffix="391800001",
tailscale_disabled=False,
base_dir=tmp_path,
)
inst.stop_server = AsyncMock()
manager._instances[1] = inst
db_vp = self._make_db_vp(tailscale_disabled=True)
self._setup_sync_mocks(manager, [db_vp], tmp_path)
with patch.object(manager, "remove_instance", new_callable=AsyncMock) as mock_remove:
with patch("backend.app.services.virtual_printer.manager.VirtualPrinterInstance") as MockInst:
mock_new = MagicMock()
mock_new.start_server = AsyncMock()
MockInst.return_value = mock_new
await manager.sync_from_db()
mock_remove.assert_called_once_with(1)
class TestFTPSession:
"""Tests for FTP session handling."""
@pytest.fixture
def mock_reader(self):
"""Create a mock StreamReader."""
reader = AsyncMock()
return reader
@pytest.fixture
def mock_writer(self):
"""Create a mock StreamWriter."""
writer = MagicMock()
writer.get_extra_info = MagicMock(return_value=("192.168.1.100", 12345))
writer.write = MagicMock()
writer.drain = AsyncMock()
writer.close = MagicMock()
writer.wait_closed = AsyncMock()
writer.is_closing = MagicMock(return_value=False)
return writer
@pytest.fixture
def ssl_context(self):
"""Create a mock SSL context."""
return MagicMock()
@pytest.fixture
def session(self, mock_reader, mock_writer, ssl_context, tmp_path):
"""Create an FTPSession instance."""
from backend.app.services.virtual_printer.ftp_server import FTPSession
return FTPSession(
reader=mock_reader,
writer=mock_writer,
upload_dir=tmp_path,
access_code="12345678",
ssl_context=ssl_context,
on_file_received=None,
)
# ========================================================================
# Tests for authentication
# ========================================================================
@pytest.mark.asyncio
async def test_user_command_accepts_bblp(self, session):
"""Verify USER command accepts bblp user."""
await session.cmd_USER("bblp")
assert session.username == "bblp"
@pytest.mark.asyncio
async def test_pass_command_authenticates(self, session):
"""Verify PASS command authenticates with correct code."""
session.username = "bblp"
await session.cmd_PASS("12345678")
assert session.authenticated is True
@pytest.mark.asyncio
async def test_pass_command_rejects_wrong_code(self, session):
"""Verify PASS command rejects wrong access code."""
session.username = "bblp"
await session.cmd_PASS("wrongcode")
assert session.authenticated is False
# ========================================================================
# Tests for FTP commands
# ========================================================================
@pytest.mark.asyncio
async def test_syst_command(self, session):
"""Verify SYST returns UNIX type."""
await session.cmd_SYST("")
session.writer.write.assert_called()
call_args = session.writer.write.call_args[0][0].decode()
assert "215" in call_args
assert "UNIX" in call_args
@pytest.mark.asyncio
async def test_pwd_command_requires_auth(self, session):
"""Verify PWD requires authentication."""
session.authenticated = False
await session.cmd_PWD("")
call_args = session.writer.write.call_args[0][0].decode()
assert "530" in call_args
@pytest.mark.asyncio
async def test_pwd_command_when_authenticated(self, session):
"""Verify PWD returns root directory when authenticated."""
session.authenticated = True
await session.cmd_PWD("")
call_args = session.writer.write.call_args[0][0].decode()
assert "257" in call_args
@pytest.mark.asyncio
async def test_type_command_sets_binary(self, session):
"""Verify TYPE I sets binary mode."""
session.authenticated = True
await session.cmd_TYPE("I")
assert session.transfer_type == "I"
@pytest.mark.asyncio
async def test_pbsz_command(self, session):
"""Verify PBSZ returns success."""
await session.cmd_PBSZ("0")
call_args = session.writer.write.call_args[0][0].decode()
assert "200" in call_args
@pytest.mark.asyncio
async def test_prot_command_accepts_p(self, session):
"""Verify PROT P is accepted."""
await session.cmd_PROT("P")
call_args = session.writer.write.call_args[0][0].decode()
assert "200" in call_args
@pytest.mark.asyncio
async def test_quit_command(self, session):
"""Verify QUIT sends goodbye and raises CancelledError."""
with pytest.raises(asyncio.CancelledError):
await session.cmd_QUIT("")
class TestSSDPServer:
"""Tests for Virtual Printer SSDP server."""
@pytest.fixture
def ssdp_server(self):
"""Create a VirtualPrinterSSDPServer instance."""
from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer
return VirtualPrinterSSDPServer(
serial="TEST123",
name="TestPrinter",
model="BL-P001",
)
# ========================================================================
# Tests for SSDP response
# ========================================================================
def test_build_notify_message(self, ssdp_server):
"""Verify NOTIFY packet contains required headers."""
# Set a known IP for testing
ssdp_server._local_ip = "192.168.1.100"
message = ssdp_server._build_notify_message()
assert b"NOTIFY" in message
assert b"DevName.bambu.com: TestPrinter" in message
assert b"USN: TEST123" in message
def test_build_response_message(self, ssdp_server):
"""Verify response packet contains required headers."""
# Set a known IP for testing
ssdp_server._local_ip = "192.168.1.100"
message = ssdp_server._build_response_message()
assert b"HTTP/1.1 200 OK" in message
assert b"DevName.bambu.com: TestPrinter" in message
assert b"USN: TEST123" in message
def test_ssdp_server_uses_correct_model(self, ssdp_server):
"""Verify SSDP server uses the provided model."""
ssdp_server._local_ip = "192.168.1.100"
message = ssdp_server._build_notify_message()
assert b"DevModel.bambu.com: BL-P001" in message
# ========================================================================
# Tests for advertise_ip parameter
# ========================================================================
def test_advertise_ip_sets_local_ip(self):
"""Verify advertise_ip overrides auto-detection."""
from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer
server = VirtualPrinterSSDPServer(
serial="TEST123",
name="TestPrinter",
model="BL-P001",
advertise_ip="10.0.0.50",
)
assert server._local_ip == "10.0.0.50"
def test_advertise_ip_empty_string_uses_auto_detect(self):
"""Verify empty advertise_ip falls back to auto-detection."""
from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer
server = VirtualPrinterSSDPServer(
serial="TEST123",
name="TestPrinter",
model="BL-P001",
advertise_ip="",
)
assert server._local_ip is None
def test_advertise_ip_in_notify_message(self):
"""Verify NOTIFY message uses the advertise_ip."""
from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer
server = VirtualPrinterSSDPServer(
serial="TEST123",
name="TestPrinter",
model="BL-P001",
advertise_ip="10.0.0.50",
)
message = server._build_notify_message()
assert b"Location: 10.0.0.50" in message
def test_advertise_ip_in_response_message(self):
"""Verify M-SEARCH response uses the advertise_ip."""
from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer
server = VirtualPrinterSSDPServer(
serial="TEST123",
name="TestPrinter",
model="BL-P001",
advertise_ip="10.0.0.50",
)
message = server._build_response_message()
assert b"Location: 10.0.0.50" in message
def test_default_no_advertise_ip(self):
"""Verify default constructor has None local_ip (auto-detect)."""
from backend.app.services.virtual_printer.ssdp_server import VirtualPrinterSSDPServer
server = VirtualPrinterSSDPServer()
assert server._local_ip is None
class TestCertificateService:
"""Tests for TLS certificate generation."""
@pytest.fixture
def cert_service(self, tmp_path):
"""Create a CertificateService instance."""
from backend.app.services.virtual_printer.certificate import CertificateService
return CertificateService(cert_dir=tmp_path, serial="TEST123")
def test_generate_certificates(self, cert_service, tmp_path):
"""Verify certificates are generated correctly."""
cert_path, key_path = cert_service.generate_certificates()
assert cert_path.exists()
assert key_path.exists()
# Verify certificate content
cert_content = cert_path.read_text()
assert "BEGIN CERTIFICATE" in cert_content
key_content = key_path.read_text()
assert "BEGIN" in key_content and "KEY" in key_content
def test_certificates_reused_if_exist(self, cert_service):
"""Verify existing certificates are reused."""
# First generation
cert_path1, key_path1 = cert_service.generate_certificates()
mtime1 = cert_path1.stat().st_mtime
# Second call should reuse (via ensure_certificates)
cert_path2, key_path2 = cert_service.ensure_certificates()
mtime2 = cert_path2.stat().st_mtime
assert mtime1 == mtime2 # File wasn't regenerated
def test_delete_certificates(self, cert_service):
"""Verify certificates can be deleted."""
cert_service.generate_certificates()
assert cert_service.cert_path.exists()
assert cert_service.key_path.exists()
cert_service.delete_certificates()
assert not cert_service.cert_path.exists()
assert not cert_service.key_path.exists()
def test_ensure_creates_if_not_exist(self, cert_service):
"""Verify ensure_certificates generates if not existing."""
assert not cert_service.cert_path.exists()
cert_path, key_path = cert_service.ensure_certificates()
assert cert_path.exists()
assert key_path.exists()
class TestBindServer:
"""Tests for BindServer (port 3002 bind/detect protocol)."""
@pytest.fixture
def bind_server(self):
"""Create a BindServer instance."""
from backend.app.services.virtual_printer.bind_server import BindServer
return BindServer(
serial="09400A391800001",
model="O1D",
name="Bambuddy",
)
def test_build_frame(self, bind_server):
"""Verify frame building produces correct format."""
payload = {"login": {"command": "detect"}}
frame = bind_server._build_frame(payload)
# Header: 0xA5A5
assert frame[:2] == b"\xa5\xa5"
# Trailer: 0xA7A7
assert frame[-2:] == b"\xa7\xa7"
# Length field is total message size (LE uint16)
import struct
total_len = struct.unpack_from(" bytes:
"""Build a minimal MQTT PUBLISH packet."""
# PUBLISH fixed header: type 3, no flags
topic_bytes = topic.encode("utf-8")
# Variable header: topic length (2 bytes) + topic
var_header = len(topic_bytes).to_bytes(2, "big") + topic_bytes
body = var_header + payload
# Encode remaining length
remaining = len(body)
header = bytearray([0x30]) # PUBLISH, QoS 0
while True:
encoded_byte = remaining % 128
remaining //= 128
if remaining > 0:
encoded_byte |= 0x80
header.append(encoded_byte)
if remaining == 0:
break
return bytes(header) + body
@staticmethod
def _build_mqtt_pingreq() -> bytes:
"""Build an MQTT PINGREQ packet (2 bytes, no payload)."""
return b"\xc0\x00"
def test_rewrite_ip_in_publish(self):
"""IP string in PUBLISH payload is rewritten."""
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
payload = b'{"rtsp_url":"rtsps://192.168.1.100:322/live"}'
packet = self._build_mqtt_publish("device/status", payload)
result, buf = TLSProxy._rewrite_mqtt_ip(packet, b"192.168.1.100", b"10.0.0.1", bytearray())
assert b"10.0.0.1" in result
assert b"192.168.1.100" not in result
def test_no_rewrite_when_ip_absent(self):
"""Packets without the target IP are passed through unchanged."""
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
payload = b'{"status":"idle"}'
packet = self._build_mqtt_publish("device/status", payload)
result, buf = TLSProxy._rewrite_mqtt_ip(packet, b"192.168.1.100", b"10.0.0.1", bytearray())
assert result == packet
def test_non_publish_packets_unchanged(self):
"""Non-PUBLISH packets (e.g. PINGREQ) are never rewritten."""
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
pingreq = self._build_mqtt_pingreq()
result, buf = TLSProxy._rewrite_mqtt_ip(pingreq, b"192.168.1.100", b"10.0.0.1", bytearray())
assert result == pingreq
def test_rewrite_preserves_packet_framing(self):
"""Rewritten packet has valid MQTT remaining length."""
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
# Use IPs of different lengths to test length re-encoding
old_ip = b"192.168.255.133" # 15 bytes
new_ip = b"10.0.0.1" # 8 bytes
payload = b'{"ip":"192.168.255.133"}'
packet = self._build_mqtt_publish("device/status", payload)
result, buf = TLSProxy._rewrite_mqtt_ip(packet, old_ip, new_ip, bytearray())
# Parse the result to verify framing
assert result[0] == 0x30 # PUBLISH header byte
# Decode remaining length
pos = 1
remaining = 0
multiplier = 1
while True:
b = result[pos]
pos += 1
remaining += (b & 0x7F) * multiplier
multiplier *= 128
if (b & 0x80) == 0:
break
# Remaining length should match actual data
assert pos + remaining == len(result)
assert new_ip in result
def test_incomplete_packet_buffered(self):
"""Incomplete packet at end of chunk is buffered for next call."""
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
payload = b'{"ip":"192.168.1.100"}'
packet = self._build_mqtt_publish("device/status", payload)
# Split packet in the middle
half = len(packet) // 2
chunk1 = packet[:half]
chunk2 = packet[half:]
result1, buf = TLSProxy._rewrite_mqtt_ip(chunk1, b"192.168.1.100", b"10.0.0.1", bytearray())
# First chunk should be buffered (incomplete packet)
assert len(buf) > 0
result2, buf = TLSProxy._rewrite_mqtt_ip(chunk2, b"192.168.1.100", b"10.0.0.1", buf)
# Second chunk completes the packet, IP should be rewritten
combined = result1 + result2
assert b"10.0.0.1" in combined
assert b"192.168.1.100" not in combined
def test_multiple_packets_in_one_chunk(self):
"""Multiple MQTT packets in a single chunk are all processed."""
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
payload1 = b'{"ip":"192.168.1.100"}'
payload2 = b'{"other":"data"}'
packet1 = self._build_mqtt_publish("topic1", payload1)
packet2 = self._build_mqtt_publish("topic2", payload2)
combined = packet1 + packet2
result, buf = TLSProxy._rewrite_mqtt_ip(combined, b"192.168.1.100", b"10.0.0.1", bytearray())
assert b"10.0.0.1" in result
assert b"192.168.1.100" not in result
# Second packet should still be present
assert b"other" in result
def test_extra_replacements(self):
"""Extra replacement pairs (e.g. integer IP) are also applied."""
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
payload = b'{"net":{"info":[{"ip":2248124608}]}}'
packet = self._build_mqtt_publish("device/status", payload)
result, buf = TLSProxy._rewrite_mqtt_ip(
packet,
b"NOMATCH",
b"NOREPLACE",
bytearray(),
extra_replacements=[(b"2248124608", b"285190336")],
)
assert b"285190336" in result
assert b"2248124608" not in result
class TestIpToLeIntBytes:
"""Tests for TLSProxy._ip_to_le_int_bytes() integer IP conversion."""
def test_converts_ip_to_le_int(self):
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
assert TLSProxy._ip_to_le_int_bytes("192.168.255.133") == b"2248124608"
assert TLSProxy._ip_to_le_int_bytes("192.168.255.16") == b"285190336"
assert TLSProxy._ip_to_le_int_bytes("10.0.0.1") == b"16777226"
def test_roundtrip(self):
"""Verify the integer converts back to the correct IP."""
import struct
from backend.app.services.virtual_printer.tcp_proxy import TLSProxy
for ip in ["192.168.1.1", "10.0.0.1", "172.16.0.100", "192.168.255.133"]:
le_int = int(TLSProxy._ip_to_le_int_bytes(ip))
parts = ip.split(".")
expected = struct.unpack("