Browse Source

fix(inventory): serialise spool auto-assign per printer to fix Postgres race

  Bambu MQTT can deliver two ams_data push frames for the same printer
  ~30 ms apart (observed on H2D + dual AMS at K-profile-load / RFID-read
  boundaries). Each frame triggers on_ams_change in main.py, whose
  auto-assign block reads (printer_id, ams_id, tray_id), decides "no
  existing assignment", and INSERTs via auto_assign_spool — and the two
  callbacks raced in their respective sessions, both deciding to insert,
  with the second commit losing on:

      asyncpg.exceptions.UniqueViolationError: duplicate key value
      violates unique constraint
      "spool_assignment_printer_id_ams_id_tray_id_key"
      DETAIL:  Key (printer_id, ams_id, tray_id)=(1, 0, 0) already exists.

  SQLite's WAL serial-write semantics had been silently swallowing the
  race for ~7 weeks since the spool-assignment feature shipped (latent in
  ec82092b "Sync", 2026-02-12). When optional Postgres support landed in
  610431d6 (2026-04-03) and asyncpg started allowing true concurrent
  transactions, it surfaced. Net impact: log noise + one assignment cycle
  skipped, retried on the next on_ams_change.

  Adds a per-printer asyncio.Lock (_ams_assignment_locks keyed by
  printer_id) wrapping the auto-assign critical section. By the time the
  second callback's session runs the SELECT, the first's commit is
  visible and the early-return "existing assignment" branch fires instead
  of a duplicate INSERT.

  The Spoolman sync block further down in on_ams_change intentionally
  stays OUTSIDE the lock — it's network-bound and idempotent, so
  serialising it would block subsequent AMS callbacks for the duration of
  a remote roundtrip. Per-printer scope keeps unrelated printers fully
  parallel. The auto-unlink block above isn't wrapped because its
  DELETE/UPDATE operations don't have the same constraint surface.

  5 new regression tests in test_ams_assignment_lock.py: same-printer-
  same-lock identity, different-printers-different-lock isolation, second
  acquirer waits for first (proves serialisation), different printers run
  truly in parallel under a held lock (proves per-printer scope), and an
  autouse fixture that resets the module-level dict between tests so
  cross-test loop affinity bugs can't surface.
maziggy 1 month ago
parent
commit
352e619ad7
3 changed files with 184 additions and 2 deletions
  1. 0 0
      CHANGELOG.md
  2. 37 2
      backend/app/main.py
  3. 147 0
      backend/tests/unit/test_ams_assignment_lock.py

File diff suppressed because it is too large
+ 0 - 0
CHANGELOG.md


+ 37 - 2
backend/app/main.py

@@ -313,6 +313,35 @@ _user_stopped_printers: set[int] = set()
 # {(printer_id, filename): created_by_id}
 _expected_print_creators: dict[tuple[int, str], int] = {}
 
+# Per-printer lock that serialises the spool-assignment side of on_ams_change
+# (auto-unlink stale + auto-assign new) when MQTT bursts deliver multiple AMS
+# updates for the same printer in quick succession (~30 ms apart, observed in
+# the wild on H2D + dual AMS).
+#
+# Without this serialisation, two concurrent on_ams_change callbacks each read
+# "no assignment for (printer, ams, tray)", each call auto_assign_spool, and
+# the second commit hits
+#   IntegrityError: duplicate key value violates unique constraint
+#                   "spool_assignment_printer_id_ams_id_tray_id_key"
+# SQLite's WAL serial-write semantics had been silently swallowing the race
+# until optional Postgres support landed (asyncpg allows true concurrent
+# transactions and surfaces the constraint violation).
+#
+# Scope is intentionally narrow: only the two DB-mutating blocks (unlink +
+# assign) are inside the lock. The Spoolman sync block further down stays
+# concurrent because it's network-bound and idempotent.
+_ams_assignment_locks: dict[int, asyncio.Lock] = {}
+
+
+def _get_ams_assignment_lock(printer_id: int) -> asyncio.Lock:
+    """Return the per-printer assignment lock, creating it on first use."""
+    lock = _ams_assignment_locks.get(printer_id)
+    if lock is None:
+        lock = asyncio.Lock()
+        _ams_assignment_locks[printer_id] = lock
+    return lock
+
+
 # TTL for expected-print entries: evict registrations older than this to prevent
 # unbounded growth when a print is registered but never starts (e.g. printer
 # disconnect, app restart, print started from the printer panel).
@@ -887,9 +916,15 @@ async def on_ams_change(printer_id: int, ams_data: list):
     except Exception as e:
         logger.warning("Spool assignment cleanup failed: %s", e, exc_info=True)
 
-    # Auto-manage inventory spools from AMS tray data (skip if Spoolman manages AMS)
+    # Auto-manage inventory spools from AMS tray data (skip if Spoolman manages AMS).
+    # Serialised per-printer via _ams_assignment_locks: MQTT bursts can deliver
+    # two AMS pushes ~30 ms apart, and without the lock both callbacks read
+    # "no existing assignment" for the same (printer, ams, tray) and race to
+    # INSERT, hitting the spool_assignment_printer_id_ams_id_tray_id_key
+    # unique constraint on Postgres. SQLite's WAL serialises writes so the
+    # bug stayed latent there. See _ams_assignment_locks comment for details.
     try:
-        async with async_session() as db:
+        async with _get_ams_assignment_lock(printer_id), async_session() as db:
             from backend.app.api.routes.settings import get_setting
             from backend.app.models.spool_assignment import SpoolAssignment as SA
             from backend.app.services.spool_tag_matcher import (

+ 147 - 0
backend/tests/unit/test_ams_assignment_lock.py

@@ -0,0 +1,147 @@
+"""Regression tests for ``_ams_assignment_locks`` (per-printer serialisation
+of ``on_ams_change``'s spool-assignment block).
+
+Background
+==========
+
+MQTT bursts can deliver two ``ams_data`` push frames for the same printer ~30
+ms apart (observed in the wild: H2D + dual AMS at K-profile load + RFID-read
+boundaries). Without serialisation, both ``on_ams_change`` callbacks read
+"no assignment for ``(printer, ams, tray)``" in their respective sessions,
+both call ``auto_assign_spool``, both ``INSERT``, and the second commit
+violates ``spool_assignment_printer_id_ams_id_tray_id_key``:
+
+    asyncpg.exceptions.UniqueViolationError: duplicate key value violates
+    unique constraint "spool_assignment_printer_id_ams_id_tray_id_key"
+    DETAIL:  Key (printer_id, ams_id, tray_id)=(1, 0, 0) already exists.
+
+SQLite's WAL serialises writes so the bug stayed latent there for ~7 weeks.
+It surfaced when optional Postgres support landed and asyncpg started
+allowing true concurrent transactions.
+
+These tests assert the lock primitive's properties, not the full
+``on_ams_change`` flow — wiring the whole callback through a real DB at unit
+scope would dwarf the size of the fix and add no signal beyond what the
+existing integration suite already covers.
+"""
+
+from __future__ import annotations
+
+import asyncio
+
+import pytest
+
+from backend.app.main import _ams_assignment_locks, _get_ams_assignment_lock
+
+
+@pytest.fixture(autouse=True)
+def _isolate_locks_dict():
+    """Each test gets a fresh module-level locks dict — otherwise prior
+    tests' lazy-created locks leak across runs and a stale ``Lock`` object
+    bound to an already-closed event loop trips uvloop's "Future attached to
+    a different loop" assertion."""
+    saved = dict(_ams_assignment_locks)
+    _ams_assignment_locks.clear()
+    try:
+        yield
+    finally:
+        _ams_assignment_locks.clear()
+        _ams_assignment_locks.update(saved)
+
+
+class TestLockKeySeparation:
+    def test_same_printer_returns_same_lock(self):
+        """Two callbacks for the same printer must contend on the same lock —
+        otherwise serialisation buys us nothing."""
+        a = _get_ams_assignment_lock(7)
+        b = _get_ams_assignment_lock(7)
+        assert a is b
+
+    def test_different_printers_get_different_locks(self):
+        """Per-printer scope: one printer's slow assignment must not block
+        unrelated printers from processing their own AMS pushes."""
+        a = _get_ams_assignment_lock(7)
+        b = _get_ams_assignment_lock(8)
+        assert a is not b
+
+
+class TestLockSerialisesConcurrentCallbacks:
+    @pytest.mark.asyncio
+    async def test_second_acquirer_waits_for_first(self):
+        """The exact race the bug fix targets: two coroutines for the same
+        printer must serialise inside the lock, so the second only enters
+        the critical section after the first has committed."""
+        printer_id = 42
+        order: list[str] = []
+        first_inside = asyncio.Event()
+        first_release = asyncio.Event()
+
+        async def first():
+            async with _get_ams_assignment_lock(printer_id):
+                order.append("first-enter")
+                first_inside.set()
+                # Hold the lock until the test allows release; this is what
+                # gives the second coroutine a chance to queue up if the
+                # primitive is doing its job.
+                await first_release.wait()
+                order.append("first-exit")
+
+        async def second():
+            await first_inside.wait()  # ensure first holds the lock
+            async with _get_ams_assignment_lock(printer_id):
+                order.append("second-enter")
+
+        task_a = asyncio.create_task(first())
+        task_b = asyncio.create_task(second())
+
+        await first_inside.wait()
+        # Yield the loop a few times so `second()` has every opportunity to
+        # mistakenly enter early; without the lock, "second-enter" would land
+        # before "first-exit".
+        for _ in range(5):
+            await asyncio.sleep(0)
+
+        assert order == ["first-enter"]
+
+        first_release.set()
+        await asyncio.gather(task_a, task_b)
+
+        assert order == ["first-enter", "first-exit", "second-enter"]
+
+    @pytest.mark.asyncio
+    async def test_different_printers_run_in_parallel(self):
+        """Cross-printer independence: two callbacks for distinct printers
+        must NOT block each other, otherwise a single slow printer would
+        stall every other printer's AMS handling."""
+        order: list[str] = []
+        printer_a_inside = asyncio.Event()
+        printer_a_release = asyncio.Event()
+
+        async def printer_a():
+            async with _get_ams_assignment_lock(1):
+                order.append("a-enter")
+                printer_a_inside.set()
+                await printer_a_release.wait()
+                order.append("a-exit")
+
+        async def printer_b():
+            await printer_a_inside.wait()
+            async with _get_ams_assignment_lock(2):
+                order.append("b-enter-and-exit")
+
+        task_a = asyncio.create_task(printer_a())
+        task_b = asyncio.create_task(printer_b())
+
+        # Wait for printer_a to be holding the lock, then yield for printer_b.
+        await printer_a_inside.wait()
+        for _ in range(5):
+            await asyncio.sleep(0)
+
+        # printer_b must have entered AND exited its own lock while
+        # printer_a is still holding lock A. If the locks were a single
+        # global mutex, "b-enter-and-exit" would not yet appear.
+        assert "b-enter-and-exit" in order
+        assert "a-exit" not in order
+
+        printer_a_release.set()
+        await asyncio.gather(task_a, task_b)

Some files were not shown because too many files changed in this diff