test_phantom_print_hardening.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. """Tests for phantom print investigation hardening (#374).
  2. Tests the tightened archive matching (no ilike) and the
  3. multiple-printing-items warning logic.
  4. These are pure unit tests that test the changed logic directly,
  5. NOT by calling the full on_print_start/on_print_complete callbacks
  6. (which spawn background tasks and require heavy mocking).
  7. """
  8. import logging
  9. import pytest
  10. from sqlalchemy import or_, select
  11. from sqlalchemy.sql import ClauseElement
  12. from backend.app.models.archive import PrintArchive
  13. class TestArchiveMatchQueryShape:
  14. """Tests that the archive duplicate lookup query uses exact match, not ilike (#374).
  15. The old query used `ilike('%{name}%')` which caused "Clip" to match
  16. "Cable Clip", "Clip Stand", etc. The new query uses exact print_name
  17. match OR exact filename variants (.3mf, .gcode.3mf).
  18. """
  19. def _build_archive_query(self, check_name: str, printer_id: int = 1) -> ClauseElement:
  20. """Build the exact query used in on_print_start for archive dedup."""
  21. return (
  22. select(PrintArchive)
  23. .where(PrintArchive.printer_id == printer_id)
  24. .where(PrintArchive.status == "printing")
  25. .where(
  26. or_(
  27. PrintArchive.print_name == check_name,
  28. PrintArchive.filename.in_(
  29. [
  30. f"{check_name}.3mf",
  31. f"{check_name}.gcode.3mf",
  32. ]
  33. ),
  34. )
  35. )
  36. .order_by(PrintArchive.created_at.desc())
  37. .limit(1)
  38. )
  39. def test_query_does_not_contain_ilike(self):
  40. """Verify the compiled query does NOT use LIKE/ILIKE."""
  41. query = self._build_archive_query("Clip")
  42. query_str = str(query.compile(compile_kwargs={"literal_binds": True}))
  43. assert "LIKE" not in query_str.upper(), f"Query should not use LIKE: {query_str}"
  44. def test_query_uses_exact_equality(self):
  45. """Verify the query uses = for print_name comparison."""
  46. query = self._build_archive_query("Benchy")
  47. query_str = str(query.compile(compile_kwargs={"literal_binds": True}))
  48. assert "print_name = " in query_str or "print_name ='" in query_str or "print_name =" in query_str
  49. def test_query_uses_in_for_filename_variants(self):
  50. """Verify the query uses IN for filename matching with .3mf variants."""
  51. query = self._build_archive_query("MyPrint")
  52. query_str = str(query.compile(compile_kwargs={"literal_binds": True}))
  53. assert "IN" in query_str.upper()
  54. assert "MyPrint.3mf" in query_str
  55. assert "MyPrint.gcode.3mf" in query_str
  56. def test_partial_name_not_in_query(self):
  57. """Verify 'Clip' does not produce a wildcard pattern."""
  58. query = self._build_archive_query("Clip")
  59. query_str = str(query.compile(compile_kwargs={"literal_binds": True}))
  60. # Should NOT contain %Clip% wildcard
  61. assert "%Clip%" not in query_str
  62. def test_check_name_derivation_from_subtask(self):
  63. """Verify check_name is derived correctly from subtask_name."""
  64. # Simulates: check_name = subtask_name or filename.split("/")[-1].replace(...)
  65. subtask_name = "Cable Clip"
  66. filename = "/sdcard/Cable Clip.gcode"
  67. check_name = subtask_name or filename.split("/")[-1].replace(".gcode", "").replace(".3mf", "")
  68. assert check_name == "Cable Clip"
  69. query = self._build_archive_query(check_name)
  70. query_str = str(query.compile(compile_kwargs={"literal_binds": True}))
  71. # Exact match should contain the full name, not a partial
  72. assert "Cable Clip" in query_str
  73. assert "%Cable Clip%" not in query_str
  74. def test_check_name_derivation_from_filename(self):
  75. """Verify check_name strips extensions correctly from filename."""
  76. subtask_name = None
  77. filename = "/sdcard/MyPrint.gcode"
  78. check_name = subtask_name or filename.split("/")[-1].replace(".gcode", "").replace(".3mf", "")
  79. assert check_name == "MyPrint"
  80. class TestMultiplePrintingQueueItemsWarning:
  81. """Tests for the multiple-printing-items warning logic (#374).
  82. The code in on_print_complete now detects when multiple queue items
  83. are in 'printing' status for the same printer, which signals a bug.
  84. """
  85. def test_single_item_returns_item_no_warning(self, caplog):
  86. """Verify single item is returned without warning."""
  87. from unittest.mock import MagicMock
  88. items = [MagicMock(id=1, archive_id=10, library_file_id=None)]
  89. # Simulate the exact code from on_print_complete
  90. with caplog.at_level(logging.WARNING, logger="backend.app.main"):
  91. logger = logging.getLogger("backend.app.main")
  92. printer_id = 1
  93. printing_items = list(items)
  94. if len(printing_items) > 1:
  95. logger.warning(
  96. "BUG: Multiple queue items in 'printing' status for printer %s: %s",
  97. printer_id,
  98. [(i.id, i.archive_id, i.library_file_id) for i in printing_items],
  99. )
  100. queue_item = printing_items[0] if printing_items else None
  101. assert queue_item is not None
  102. assert queue_item.id == 1
  103. bug_warnings = [r for r in caplog.records if "BUG: Multiple queue items" in r.message]
  104. assert len(bug_warnings) == 0
  105. def test_multiple_items_warns_and_returns_first(self, caplog):
  106. """Verify warning is logged and first item is returned when multiple exist."""
  107. from unittest.mock import MagicMock
  108. items = [
  109. MagicMock(id=1, archive_id=10, library_file_id=None),
  110. MagicMock(id=2, archive_id=20, library_file_id=None),
  111. ]
  112. with caplog.at_level(logging.WARNING, logger="backend.app.main"):
  113. logger = logging.getLogger("backend.app.main")
  114. printer_id = 1
  115. printing_items = list(items)
  116. if len(printing_items) > 1:
  117. logger.warning(
  118. "BUG: Multiple queue items in 'printing' status for printer %s: %s",
  119. printer_id,
  120. [(i.id, i.archive_id, i.library_file_id) for i in printing_items],
  121. )
  122. queue_item = printing_items[0] if printing_items else None
  123. assert queue_item is not None
  124. assert queue_item.id == 1 # First item is used
  125. bug_warnings = [r for r in caplog.records if "BUG: Multiple queue items" in r.message]
  126. assert len(bug_warnings) == 1
  127. assert "printer 1" in bug_warnings[0].message
  128. # Warning should include item details
  129. assert "10" in bug_warnings[0].message # archive_id of item 1
  130. assert "20" in bug_warnings[0].message # archive_id of item 2
  131. def test_empty_list_returns_none_no_warning(self, caplog):
  132. """Verify None is returned and no warning when no items exist."""
  133. with caplog.at_level(logging.WARNING, logger="backend.app.main"):
  134. logger = logging.getLogger("backend.app.main")
  135. printer_id = 1
  136. printing_items = []
  137. if len(printing_items) > 1:
  138. logger.warning(
  139. "BUG: Multiple queue items in 'printing' status for printer %s: %s",
  140. printer_id,
  141. [(i.id, i.archive_id, i.library_file_id) for i in printing_items],
  142. )
  143. queue_item = printing_items[0] if printing_items else None
  144. assert queue_item is None
  145. bug_warnings = [r for r in caplog.records if "BUG: Multiple queue items" in r.message]
  146. assert len(bug_warnings) == 0
  147. def test_three_items_warns_with_all_details(self, caplog):
  148. """Verify warning includes all item details when three items found."""
  149. from unittest.mock import MagicMock
  150. items = [
  151. MagicMock(id=1, archive_id=10, library_file_id=None),
  152. MagicMock(id=2, archive_id=None, library_file_id=5),
  153. MagicMock(id=3, archive_id=30, library_file_id=None),
  154. ]
  155. with caplog.at_level(logging.WARNING, logger="backend.app.main"):
  156. logger = logging.getLogger("backend.app.main")
  157. printer_id = 7
  158. printing_items = list(items)
  159. if len(printing_items) > 1:
  160. logger.warning(
  161. "BUG: Multiple queue items in 'printing' status for printer %s: %s",
  162. printer_id,
  163. [(i.id, i.archive_id, i.library_file_id) for i in printing_items],
  164. )
  165. queue_item = printing_items[0] if printing_items else None
  166. assert queue_item.id == 1
  167. bug_warnings = [r for r in caplog.records if "BUG: Multiple queue items" in r.message]
  168. assert len(bug_warnings) == 1
  169. assert "printer 7" in bug_warnings[0].message
  170. class TestBusyPrinterSeedingFromPrintingItems:
  171. """Regression for the duplicate-dispatch bug observed with quantity>1 batches.
  172. The old scheduler seeded ``busy_printers`` with an empty set and relied on
  173. ``_is_printer_idle()`` to gate dispatch. On H2D / P1 series the MQTT state
  174. lags several seconds behind the print command, so the next ``check_queue``
  175. tick saw IDLE and dispatched a second queue item onto the same printer —
  176. both items ended up in 'printing' status. The fix seeds ``busy_printers``
  177. up-front with every printer that already has an item in 'printing' status.
  178. """
  179. @pytest.mark.asyncio
  180. async def test_seed_query_returns_printers_with_printing_items(self):
  181. """The seeding query must return every printer_id that has a 'printing' item."""
  182. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  183. import backend.app.models # noqa: F401
  184. from backend.app.core.database import Base
  185. from backend.app.models.print_queue import PrintQueueItem
  186. engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
  187. async with engine.begin() as conn:
  188. await conn.run_sync(Base.metadata.create_all)
  189. session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
  190. async with session_maker() as db:
  191. db.add_all(
  192. [
  193. PrintQueueItem(printer_id=1, status="printing", position=1, archive_id=10),
  194. PrintQueueItem(printer_id=1, status="pending", position=2, archive_id=10),
  195. PrintQueueItem(printer_id=2, status="printing", position=1, archive_id=11),
  196. PrintQueueItem(printer_id=3, status="pending", position=1, archive_id=12),
  197. PrintQueueItem(printer_id=None, status="pending", position=1, archive_id=13),
  198. ]
  199. )
  200. await db.commit()
  201. result = await db.execute(
  202. select(PrintQueueItem.printer_id)
  203. .where(PrintQueueItem.status == "printing")
  204. .where(PrintQueueItem.printer_id.is_not(None))
  205. )
  206. busy_printers = {pid for (pid,) in result.all() if pid is not None}
  207. assert busy_printers == {1, 2}
  208. await engine.dispose()
  209. @pytest.mark.asyncio
  210. async def test_seed_query_empty_when_no_printing_items(self):
  211. """With only pending items, no printer is considered busy by the query."""
  212. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  213. import backend.app.models # noqa: F401
  214. from backend.app.core.database import Base
  215. from backend.app.models.print_queue import PrintQueueItem
  216. engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
  217. async with engine.begin() as conn:
  218. await conn.run_sync(Base.metadata.create_all)
  219. session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
  220. async with session_maker() as db:
  221. db.add_all(
  222. [
  223. PrintQueueItem(printer_id=1, status="pending", position=1, archive_id=10),
  224. PrintQueueItem(printer_id=2, status="completed", position=1, archive_id=11),
  225. PrintQueueItem(printer_id=3, status="failed", position=1, archive_id=12),
  226. PrintQueueItem(printer_id=4, status="cancelled", position=1, archive_id=13),
  227. ]
  228. )
  229. await db.commit()
  230. result = await db.execute(
  231. select(PrintQueueItem.printer_id)
  232. .where(PrintQueueItem.status == "printing")
  233. .where(PrintQueueItem.printer_id.is_not(None))
  234. )
  235. busy_printers = {pid for (pid,) in result.all() if pid is not None}
  236. assert busy_printers == set()
  237. await engine.dispose()
  238. @pytest.mark.asyncio
  239. async def test_check_queue_skips_printer_with_existing_printing_item(self, caplog):
  240. """Simulate the exact observed bug: a pending item targets a printer that already
  241. has another queue item in 'printing' status. The scheduler must NOT dispatch the
  242. pending item even if the live MQTT state reports IDLE.
  243. """
  244. from unittest.mock import AsyncMock, patch
  245. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  246. import backend.app.models # noqa: F401
  247. from backend.app.core.database import Base
  248. from backend.app.models.print_queue import PrintQueueItem
  249. from backend.app.services.print_scheduler import PrintScheduler
  250. engine = create_async_engine("sqlite+aiosqlite:///:memory:", echo=False)
  251. async with engine.begin() as conn:
  252. await conn.run_sync(Base.metadata.create_all)
  253. session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
  254. async with session_maker() as db:
  255. db.add_all(
  256. [
  257. PrintQueueItem(printer_id=1, status="printing", position=1, archive_id=84),
  258. PrintQueueItem(printer_id=1, status="pending", position=2, archive_id=84),
  259. ]
  260. )
  261. await db.commit()
  262. scheduler = PrintScheduler()
  263. start_print_mock = AsyncMock()
  264. with (
  265. patch("backend.app.services.print_scheduler.async_session", session_maker),
  266. patch.object(scheduler, "_get_bool_setting", AsyncMock(return_value=False)),
  267. patch.object(scheduler, "_is_printer_idle", return_value=True),
  268. patch.object(scheduler, "_check_auto_drying", AsyncMock()),
  269. patch.object(scheduler, "_start_print", start_print_mock),
  270. patch("backend.app.services.print_scheduler.printer_manager") as mock_pm,
  271. ):
  272. mock_pm.is_connected.return_value = True
  273. await scheduler.check_queue()
  274. start_print_mock.assert_not_called()
  275. async with session_maker() as db:
  276. rows = (await db.execute(select(PrintQueueItem).order_by(PrintQueueItem.position))).scalars().all()
  277. statuses = [r.status for r in rows]
  278. assert statuses == ["printing", "pending"]
  279. await engine.dispose()