| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- """T-Gap 3: Concurrency test for POST /slot-assignments upsert+cleanup race."""
- import asyncio
- from unittest.mock import AsyncMock, MagicMock, patch
- import pytest
- from httpx import AsyncClient
- from sqlalchemy import select
- SAMPLE_SPOOL = {
- "id": 10,
- "filament": {
- "id": 1,
- "name": "PLA Basic",
- "material": "PLA",
- "color_hex": "FF0000",
- "weight": 1000,
- "vendor": {"id": 1, "name": "Test Brand"},
- },
- "remaining_weight": 800.0,
- "used_weight": 200.0,
- "location": None,
- "comment": None,
- "first_used": None,
- "last_used": None,
- "registered": "2024-01-01T00:00:00+00:00",
- "archived": False,
- "price": None,
- "extra": {},
- }
- @pytest.fixture
- async def slot_settings(db_session):
- from backend.app.models.settings import Settings
- db_session.add(Settings(key="spoolman_enabled", value="true"))
- db_session.add(Settings(key="spoolman_url", value="http://localhost:7912"))
- await db_session.commit()
- @pytest.fixture
- async def test_printer(db_session):
- from backend.app.models.printer import Printer
- printer = Printer(
- name="Concurrency Test Printer",
- serial_number="CONCTEST001",
- ip_address="192.168.1.99",
- access_code="12345678",
- )
- db_session.add(printer)
- await db_session.commit()
- await db_session.refresh(printer)
- return printer
- @pytest.fixture
- def mock_client():
- client = MagicMock()
- client.base_url = "http://localhost:7912"
- client.health_check = AsyncMock(return_value=True)
- client.get_spool = AsyncMock(return_value=SAMPLE_SPOOL)
- # #1457: assign route enumerates spools to clear stale fallback-tag links.
- client.get_spools = AsyncMock(return_value=[])
- client.merge_spool_extra = AsyncMock(return_value={"id": 0, "extra": {}})
- with patch(
- "backend.app.api.routes.spoolman_inventory._get_client",
- AsyncMock(return_value=client),
- ):
- yield client
- class TestSlotAssignmentConcurrency:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_concurrent_assign_same_slot_idempotent(
- self, async_client: AsyncClient, slot_settings, test_printer, mock_client, db_session
- ):
- """Concurrent POST requests for the same slot must not produce duplicate rows."""
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- payload = {
- "spoolman_spool_id": 10,
- "printer_id": test_printer.id,
- "ams_id": 0,
- "tray_id": 0,
- }
- async def assign():
- return await async_client.post(
- "/api/v1/spoolman/inventory/slot-assignments",
- json=payload,
- )
- responses = await asyncio.gather(assign(), assign(), assign())
- for resp in responses:
- assert resp.status_code == 200
- # Exactly one row for this (printer, ams, tray) combination
- result = await db_session.execute(
- select(SpoolmanSlotAssignment).where(
- SpoolmanSlotAssignment.printer_id == test_printer.id,
- SpoolmanSlotAssignment.ams_id == 0,
- SpoolmanSlotAssignment.tray_id == 0,
- )
- )
- rows = result.scalars().all()
- assert len(rows) == 1
- assert rows[0].spoolman_spool_id == 10
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_reassign_slot_updates_spool_id(
- self, async_client: AsyncClient, slot_settings, test_printer, mock_client, db_session
- ):
- """Re-assigning a slot to a different spool updates the existing row."""
- from backend.app.models.spoolman_slot_assignment import SpoolmanSlotAssignment
- base = {"printer_id": test_printer.id, "ams_id": 1, "tray_id": 2}
- resp1 = await async_client.post(
- "/api/v1/spoolman/inventory/slot-assignments",
- json={**base, "spoolman_spool_id": 10},
- )
- assert resp1.status_code == 200
- # Re-assign same slot to a different spool
- mock_client.get_spool.return_value = {**SAMPLE_SPOOL, "id": 20}
- resp2 = await async_client.post(
- "/api/v1/spoolman/inventory/slot-assignments",
- json={**base, "spoolman_spool_id": 20},
- )
- assert resp2.status_code == 200
- # Only one row; spool_id updated to 20
- result = await db_session.execute(
- select(SpoolmanSlotAssignment).where(
- SpoolmanSlotAssignment.printer_id == test_printer.id,
- SpoolmanSlotAssignment.ams_id == 1,
- SpoolmanSlotAssignment.tray_id == 2,
- )
- )
- rows = result.scalars().all()
- assert len(rows) == 1
- assert rows[0].spoolman_spool_id == 20
|