| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- """Reset-usage endpoint regressions (#1390 follow-up).
- The per-spool and bulk reset endpoints zero `weight_used` without touching
- `weight_locked`. They exist because PATCH /spools/{id} auto-locks the spool
- when weight_used is set explicitly, and that's wrong for the "clean-slate
- my Total Consumed stat" workflow — the user wants the spool to keep
- receiving AMS auto-sync updates from the next print onward.
- """
- import pytest
- from httpx import AsyncClient
- from sqlalchemy import select
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.spool import Spool
- @pytest.fixture
- async def spool_factory(db_session: AsyncSession):
- """Create a Spool with sensible defaults."""
- async def _create(**kwargs):
- defaults = {
- "material": "PLA",
- "subtype": "Basic",
- "brand": "Bambu",
- "color_name": "Red",
- "rgba": "FF0000FF",
- "label_weight": 1000,
- "weight_used": 0,
- "weight_locked": False,
- }
- defaults.update(kwargs)
- spool = Spool(**defaults)
- db_session.add(spool)
- await db_session.commit()
- await db_session.refresh(spool)
- return spool
- return _create
- class TestResetSpoolUsage:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_reset_zeroes_weight_used(self, async_client: AsyncClient, spool_factory, db_session):
- """Endpoint sets weight_used to 0."""
- spool = await spool_factory(weight_used=234.5)
- response = await async_client.post(f"/api/v1/inventory/spools/{spool.id}/reset-usage")
- assert response.status_code == 200
- assert response.json()["weight_used"] == 0
- await db_session.refresh(spool)
- assert spool.weight_used == 0
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_reset_does_not_lock_spool(self, async_client: AsyncClient, spool_factory, db_session):
- """Reset must leave weight_locked alone.
- PATCH /spools/{id} auto-locks when weight_used is set explicitly;
- the dedicated reset endpoint must NOT, because the user's intent
- is "track fresh from zero", not "freeze at zero forever".
- """
- spool = await spool_factory(weight_used=100.0, weight_locked=False)
- response = await async_client.post(f"/api/v1/inventory/spools/{spool.id}/reset-usage")
- assert response.status_code == 200
- await db_session.refresh(spool)
- assert spool.weight_used == 0
- assert spool.weight_locked is False, "Reset must not auto-lock the spool"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_reset_preserves_existing_lock(self, async_client: AsyncClient, spool_factory, db_session):
- """If the user previously locked the spool, the lock is preserved."""
- spool = await spool_factory(weight_used=500.0, weight_locked=True)
- response = await async_client.post(f"/api/v1/inventory/spools/{spool.id}/reset-usage")
- assert response.status_code == 200
- await db_session.refresh(spool)
- assert spool.weight_used == 0
- assert spool.weight_locked is True, "Pre-existing lock must be preserved"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_reset_404_for_missing_spool(self, async_client: AsyncClient):
- response = await async_client.post("/api/v1/inventory/spools/99999/reset-usage")
- assert response.status_code == 404
- class TestBulkResetSpoolUsage:
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_bulk_reset_zeroes_only_listed_spools(self, async_client: AsyncClient, spool_factory, db_session):
- """Only spools in the request are reset; others are untouched."""
- target1 = await spool_factory(weight_used=100.0)
- target2 = await spool_factory(weight_used=200.0)
- untouched = await spool_factory(weight_used=300.0)
- response = await async_client.post(
- "/api/v1/inventory/spools/reset-usage-bulk",
- json={"spool_ids": [target1.id, target2.id]},
- )
- assert response.status_code == 200
- assert response.json() == {"reset": 2}
- # The endpoint commits via its own session — expire our session so the
- # next read pulls fresh values rather than the cached pre-reset state.
- db_session.expire_all()
- spools = (await db_session.execute(select(Spool))).scalars().all()
- by_id = {s.id: s for s in spools}
- assert by_id[target1.id].weight_used == 0
- assert by_id[target2.id].weight_used == 0
- assert by_id[untouched.id].weight_used == 300.0, "Spool not in request must keep its usage"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_bulk_reset_rejects_empty_list(self, async_client: AsyncClient):
- """Empty list must be rejected — guards against accidental wildcard wipes."""
- response = await async_client.post(
- "/api/v1/inventory/spools/reset-usage-bulk",
- json={"spool_ids": []},
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_bulk_reset_rejects_missing_field(self, async_client: AsyncClient):
- """Missing spool_ids field must be rejected."""
- response = await async_client.post(
- "/api/v1/inventory/spools/reset-usage-bulk",
- json={},
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_bulk_reset_does_not_lock_spools(self, async_client: AsyncClient, spool_factory, db_session):
- """Bulk reset preserves weight_locked across all targets."""
- unlocked = await spool_factory(weight_used=100.0, weight_locked=False)
- locked = await spool_factory(weight_used=200.0, weight_locked=True)
- response = await async_client.post(
- "/api/v1/inventory/spools/reset-usage-bulk",
- json={"spool_ids": [unlocked.id, locked.id]},
- )
- assert response.status_code == 200
- await db_session.refresh(unlocked)
- await db_session.refresh(locked)
- assert unlocked.weight_used == 0 and unlocked.weight_locked is False
- assert locked.weight_used == 0 and locked.weight_locked is True
|