test_ams_assignment_lock.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. """Regression tests for ``_ams_assignment_locks`` (per-printer serialisation
  2. of ``on_ams_change``'s spool-assignment block).
  3. Background
  4. ==========
  5. MQTT bursts can deliver two ``ams_data`` push frames for the same printer ~30
  6. ms apart (observed in the wild: H2D + dual AMS at K-profile load + RFID-read
  7. boundaries). Without serialisation, both ``on_ams_change`` callbacks read
  8. "no assignment for ``(printer, ams, tray)``" in their respective sessions,
  9. both call ``auto_assign_spool``, both ``INSERT``, and the second commit
  10. violates ``spool_assignment_printer_id_ams_id_tray_id_key``:
  11. asyncpg.exceptions.UniqueViolationError: duplicate key value violates
  12. unique constraint "spool_assignment_printer_id_ams_id_tray_id_key"
  13. DETAIL: Key (printer_id, ams_id, tray_id)=(1, 0, 0) already exists.
  14. SQLite's WAL serialises writes so the bug stayed latent there for ~7 weeks.
  15. It surfaced when optional Postgres support landed and asyncpg started
  16. allowing true concurrent transactions.
  17. These tests assert the lock primitive's properties, not the full
  18. ``on_ams_change`` flow — wiring the whole callback through a real DB at unit
  19. scope would dwarf the size of the fix and add no signal beyond what the
  20. existing integration suite already covers.
  21. """
  22. from __future__ import annotations
  23. import asyncio
  24. import pytest
  25. from backend.app.main import _ams_assignment_locks, _get_ams_assignment_lock
  26. @pytest.fixture(autouse=True)
  27. def _isolate_locks_dict():
  28. """Each test gets a fresh module-level locks dict — otherwise prior
  29. tests' lazy-created locks leak across runs and a stale ``Lock`` object
  30. bound to an already-closed event loop trips uvloop's "Future attached to
  31. a different loop" assertion."""
  32. saved = dict(_ams_assignment_locks)
  33. _ams_assignment_locks.clear()
  34. try:
  35. yield
  36. finally:
  37. _ams_assignment_locks.clear()
  38. _ams_assignment_locks.update(saved)
  39. class TestLockKeySeparation:
  40. def test_same_printer_returns_same_lock(self):
  41. """Two callbacks for the same printer must contend on the same lock —
  42. otherwise serialisation buys us nothing."""
  43. a = _get_ams_assignment_lock(7)
  44. b = _get_ams_assignment_lock(7)
  45. assert a is b
  46. def test_different_printers_get_different_locks(self):
  47. """Per-printer scope: one printer's slow assignment must not block
  48. unrelated printers from processing their own AMS pushes."""
  49. a = _get_ams_assignment_lock(7)
  50. b = _get_ams_assignment_lock(8)
  51. assert a is not b
  52. class TestLockSerialisesConcurrentCallbacks:
  53. @pytest.mark.asyncio
  54. async def test_second_acquirer_waits_for_first(self):
  55. """The exact race the bug fix targets: two coroutines for the same
  56. printer must serialise inside the lock, so the second only enters
  57. the critical section after the first has committed."""
  58. printer_id = 42
  59. order: list[str] = []
  60. first_inside = asyncio.Event()
  61. first_release = asyncio.Event()
  62. async def first():
  63. async with _get_ams_assignment_lock(printer_id):
  64. order.append("first-enter")
  65. first_inside.set()
  66. # Hold the lock until the test allows release; this is what
  67. # gives the second coroutine a chance to queue up if the
  68. # primitive is doing its job.
  69. await first_release.wait()
  70. order.append("first-exit")
  71. async def second():
  72. await first_inside.wait() # ensure first holds the lock
  73. async with _get_ams_assignment_lock(printer_id):
  74. order.append("second-enter")
  75. task_a = asyncio.create_task(first())
  76. task_b = asyncio.create_task(second())
  77. await first_inside.wait()
  78. # Yield the loop a few times so `second()` has every opportunity to
  79. # mistakenly enter early; without the lock, "second-enter" would land
  80. # before "first-exit".
  81. for _ in range(5):
  82. await asyncio.sleep(0)
  83. assert order == ["first-enter"]
  84. first_release.set()
  85. await asyncio.gather(task_a, task_b)
  86. assert order == ["first-enter", "first-exit", "second-enter"]
  87. @pytest.mark.asyncio
  88. async def test_different_printers_run_in_parallel(self):
  89. """Cross-printer independence: two callbacks for distinct printers
  90. must NOT block each other, otherwise a single slow printer would
  91. stall every other printer's AMS handling."""
  92. order: list[str] = []
  93. printer_a_inside = asyncio.Event()
  94. printer_a_release = asyncio.Event()
  95. async def printer_a():
  96. async with _get_ams_assignment_lock(1):
  97. order.append("a-enter")
  98. printer_a_inside.set()
  99. await printer_a_release.wait()
  100. order.append("a-exit")
  101. async def printer_b():
  102. await printer_a_inside.wait()
  103. async with _get_ams_assignment_lock(2):
  104. order.append("b-enter-and-exit")
  105. task_a = asyncio.create_task(printer_a())
  106. task_b = asyncio.create_task(printer_b())
  107. # Wait for printer_a to be holding the lock, then yield for printer_b.
  108. await printer_a_inside.wait()
  109. for _ in range(5):
  110. await asyncio.sleep(0)
  111. # printer_b must have entered AND exited its own lock while
  112. # printer_a is still holding lock A. If the locks were a single
  113. # global mutex, "b-enter-and-exit" would not yet appear.
  114. assert "b-enter-and-exit" in order
  115. assert "a-exit" not in order
  116. printer_a_release.set()
  117. await asyncio.gather(task_a, task_b)