"""Tests for the PrintRun-based stats aggregation (#1378). Statistics and per-archive aggregates now come from PrintLogEntry rows rather than PrintArchive's runtime fields, so a reprint contributes new totals instead of overwriting the source archive's first-run data. """ from datetime import datetime, timezone import pytest from httpx import AsyncClient from backend.app.models.print_log import PrintLogEntry @pytest.mark.asyncio @pytest.mark.integration async def test_stats_count_reprints_independently( async_client: AsyncClient, archive_factory, printer_factory, db_session ): """A reprint adds to stats instead of overwriting the source archive.""" printer = await printer_factory() archive = await archive_factory( printer.id, status="completed", filament_used_grams=100.0, cost=2.5, print_time_seconds=3600, with_run=False, ) # First run — completed, 100g. db_session.add( PrintLogEntry( archive_id=archive.id, printer_id=archive.printer_id, status="completed", started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 11, 0, tzinfo=timezone.utc), duration_seconds=3600, filament_used_grams=100.0, cost=2.5, created_at=datetime(2026, 5, 1, 11, 0, tzinfo=timezone.utc), ) ) # Reprint — failed at 10g. db_session.add( PrintLogEntry( archive_id=archive.id, printer_id=archive.printer_id, status="failed", started_at=datetime(2026, 5, 5, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 5, 10, 5, tzinfo=timezone.utc), duration_seconds=300, filament_used_grams=10.0, cost=0.25, failure_reason="Cancelled by user", created_at=datetime(2026, 5, 5, 10, 5, tzinfo=timezone.utc), ) ) await db_session.commit() response = await async_client.get("/api/v1/archives/stats") assert response.status_code == 200 body = response.json() # Both runs counted, not the single archive row. assert body["total_prints"] == 2 assert body["successful_prints"] == 1 assert body["failed_prints"] == 1 # 100g + 10g — NOT 10g (which is what archives.filament_used_grams alone # would give if the archive's runtime fields were the source of truth). assert body["total_filament_grams"] == pytest.approx(110.0) assert body["total_cost"] == pytest.approx(2.75) @pytest.mark.asyncio @pytest.mark.integration async def test_archive_list_includes_run_aggregates( async_client: AsyncClient, archive_factory, printer_factory, db_session ): """List response carries run_count, last_run_at, total_filament_actual_grams.""" printer = await printer_factory() archive = await archive_factory( printer.id, status="completed", filament_used_grams=100.0, with_run=False, ) db_session.add_all( [ PrintLogEntry( archive_id=archive.id, printer_id=archive.printer_id, status="completed", started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 11, 0, tzinfo=timezone.utc), filament_used_grams=100.0, created_at=datetime(2026, 5, 1, 11, 0, tzinfo=timezone.utc), ), PrintLogEntry( archive_id=archive.id, printer_id=archive.printer_id, status="failed", started_at=datetime(2026, 5, 10, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 10, 10, 5, tzinfo=timezone.utc), filament_used_grams=10.0, created_at=datetime(2026, 5, 10, 10, 5, tzinfo=timezone.utc), ), ] ) await db_session.commit() response = await async_client.get("/api/v1/archives/") assert response.status_code == 200 rows = response.json() row = next(r for r in rows if r["id"] == archive.id) assert row["run_count"] == 2 assert row["successful_run_count"] == 1 assert row["failed_run_count"] == 1 assert row["total_filament_actual_grams"] == pytest.approx(110.0) assert row["last_run_at"] is not None # max(started_at) populated @pytest.mark.asyncio @pytest.mark.integration async def test_runs_endpoint_returns_runs_newest_first( async_client: AsyncClient, archive_factory, printer_factory, db_session ): """GET /archives/{id}/runs returns each PrintLogEntry for the archive.""" printer = await printer_factory() archive = await archive_factory( printer.id, status="completed", with_run=False, ) db_session.add_all( [ PrintLogEntry( archive_id=archive.id, printer_id=archive.printer_id, status="completed", started_at=datetime(2026, 4, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 4, 1, 11, 0, tzinfo=timezone.utc), filament_used_grams=50.0, ), PrintLogEntry( archive_id=archive.id, printer_id=archive.printer_id, status="failed", started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 10, 5, tzinfo=timezone.utc), filament_used_grams=5.0, failure_reason="Cancelled by user", ), ] ) await db_session.commit() response = await async_client.get(f"/api/v1/archives/{archive.id}/runs") assert response.status_code == 200 body = response.json() assert body["total"] == 2 # Newest first assert body["items"][0]["status"] == "failed" assert body["items"][0]["failure_reason"] == "Cancelled by user" assert body["items"][1]["status"] == "completed" assert body["items"][1]["filament_used_grams"] == pytest.approx(50.0) @pytest.mark.asyncio @pytest.mark.integration async def test_purge_stats_also_deletes_linked_runs( async_client: AsyncClient, archive_factory, printer_factory, db_session ): """``DELETE /archives/{id}?purge_stats=true`` hard-deletes linked PrintLogEntry rows so their filament / cost / count contributions truly leave Quick Stats. Without this, ON DELETE SET NULL on the FK would orphan the runs and they'd keep showing up in the new aggregate-from-PrintLogEntry totals (#1378).""" from sqlalchemy import func, select printer = await printer_factory() keep = await archive_factory(printer.id, status="completed", filament_used_grams=50.0) purge = await archive_factory(printer.id, status="completed", filament_used_grams=100.0) # Extra runs on the archive about to be purged, to prove they all go. db_session.add_all( [ PrintLogEntry( archive_id=purge.id, printer_id=purge.printer_id, status="failed", filament_used_grams=10.0, ), PrintLogEntry( archive_id=purge.id, printer_id=purge.printer_id, status="completed", filament_used_grams=100.0, ), ] ) await db_session.commit() resp = await async_client.delete(f"/api/v1/archives/{purge.id}?purge_stats=true") assert resp.status_code == 200 assert resp.json()["purged_from_stats"] is True remaining = await db_session.execute( select(func.count(PrintLogEntry.id)).where(PrintLogEntry.archive_id == purge.id) ) assert remaining.scalar() == 0 # The OTHER archive's auto-synthesized run is still there. keep_remaining = await db_session.execute( select(func.count(PrintLogEntry.id)).where(PrintLogEntry.archive_id == keep.id) ) assert keep_remaining.scalar() == 1 @pytest.mark.asyncio @pytest.mark.integration async def test_soft_delete_keeps_runs_for_stats( async_client: AsyncClient, archive_factory, printer_factory, db_session ): """Default soft-delete (without ``purge_stats=true``) keeps the archive's PrintLogEntry rows so the #1343 stats-preservation contract still holds — the archive disappears from listings, but its filament / time / cost stay in Quick Stats.""" from sqlalchemy import func, select printer = await printer_factory() archive = await archive_factory(printer.id, status="completed", filament_used_grams=75.0) resp = await async_client.delete(f"/api/v1/archives/{archive.id}") assert resp.status_code == 200 assert resp.json()["purged_from_stats"] is False # The run row is still there for stats aggregation. runs = await db_session.execute(select(func.count(PrintLogEntry.id)).where(PrintLogEntry.archive_id == archive.id)) assert runs.scalar() == 1 stats = (await async_client.get("/api/v1/archives/stats")).json() assert stats["total_prints"] >= 1 assert stats["total_filament_grams"] >= 75.0 @pytest.mark.asyncio @pytest.mark.integration async def test_time_accuracy_excludes_multi_plate_plate_by_plate_outliers( async_client: AsyncClient, archive_factory, printer_factory, db_session ): """Per-run accuracy clamps to a plausible 50%-200% band so multi-plate archives printed plate-by-plate don't poison the printer-level average. Pre-#1593 the parser stored plate-1-only time in ``PrintArchive.print_time_seconds``, so a plate-by-plate run produced a near-100% ratio by accident. Post-#1593 the field is the sum across plates, so each plate-by-plate run produces estimate/actual = N×100% for an N-plate file. Without the band filter a single 3-plate file printed plate-by-plate would drag the printer's accuracy reading to ~300%, which is pure noise. The metric is designed for the single-plate-file case and should reflect real slicer drift there. """ printer = await printer_factory() # Archive 1: single-plate file. Estimate 3600s, actual 3700s # → ratio 97.3% (well within band). single = await archive_factory( printer.id, print_time_seconds=3600, with_run=False, ) db_session.add( PrintLogEntry( archive_id=single.id, printer_id=printer.id, status="completed", duration_seconds=3700, ) ) # Archive 2: multi-plate file (3 plates totaling 18000s). Two runs # printed plate-by-plate at ~6000s each — ratio 18000/6000 = 300%. # Both must be filtered out so the printer average stays at the # single-plate file's 97.3% reading. multi = await archive_factory( printer.id, print_time_seconds=18000, with_run=False, ) db_session.add( PrintLogEntry( archive_id=multi.id, printer_id=printer.id, status="completed", duration_seconds=6000, ) ) db_session.add( PrintLogEntry( archive_id=multi.id, printer_id=printer.id, status="completed", duration_seconds=6100, ) ) await db_session.commit() body = (await async_client.get("/api/v1/archives/stats")).json() assert body["average_time_accuracy"] == pytest.approx(97.3, abs=0.1) assert body["time_accuracy_by_printer"][str(printer.id)] == pytest.approx(97.3, abs=0.1) # --------------------------------------------------------------------------- # #1608: compute_time_accuracy suppresses the per-card badge for multi-run # archives where the whole-file estimate is incommensurable with the # latest-run actual. # --------------------------------------------------------------------------- class TestComputeTimeAccuracyMultiRun: """The card-level ``compute_time_accuracy`` runs against the archive's own ``started_at`` / ``completed_at`` (latest run only) and ``print_time_seconds`` (post-#1593 sum across plates). For multi-run archives those describe different scopes — a 3-plate file printed plate-by-plate over 3 runs produces estimate/actual = 300% → +200% badge, which is pure noise. The reporter (#1608, archive #65) verified the bug surfaces at +188% for a 3-plate file with 9 runs. The fix: when the archive has more than one logged run, suppress BOTH fields. The frontend then falls through to ``print_time_seconds`` for the time display (so the user sees the slicer's whole-file estimate instead of one run's wall-clock) and hides the badge. """ def _make_archive(self, *, print_time_seconds, started_at, completed_at, status="completed"): from types import SimpleNamespace return SimpleNamespace( print_time_seconds=print_time_seconds, started_at=started_at, completed_at=completed_at, status=status, ) def test_single_run_keeps_original_behaviour(self): """``run_count == 1`` is the case the badge was designed for — compute and return both actual + accuracy as before.""" from backend.app.api.routes.archives import compute_time_accuracy archive = self._make_archive( print_time_seconds=3600, started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 11, 0, tzinfo=timezone.utc), ) result = compute_time_accuracy(archive, run_aggregate={"run_count": 1}) assert result["actual_time_seconds"] == 3600 assert result["time_accuracy"] == 100.0 def test_no_run_aggregate_keeps_original_behaviour(self): """Endpoints that don't yet load run_aggregates (legacy callers, or contexts where the data isn't relevant) must keep the pre-fix per-archive computation — never silently drop the badge.""" from backend.app.api.routes.archives import compute_time_accuracy archive = self._make_archive( print_time_seconds=3600, started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 10, 50, tzinfo=timezone.utc), ) result = compute_time_accuracy(archive) # no run_aggregate assert result["actual_time_seconds"] == 3000 assert result["time_accuracy"] == 120.0 # 3600/3000 def test_multi_run_archive_suppresses_both_fields(self): """Reporter's case (archive #65, 3 plates, 9 logged runs): one-run actual (6364s) vs whole-file estimate (18354s) → +188% badge that means nothing. Multi-run must clear both fields so the card falls through to the estimate display with no badge.""" from backend.app.api.routes.archives import compute_time_accuracy archive = self._make_archive( print_time_seconds=18354, started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 11, 46, 4, tzinfo=timezone.utc), # ~6364s ) result = compute_time_accuracy(archive, run_aggregate={"run_count": 9}) assert result["actual_time_seconds"] is None assert result["time_accuracy"] is None def test_run_count_zero_keeps_original_behaviour(self): """A run_aggregate that exists but reports zero runs (edge case from the LEFT JOIN-style helper) must not trigger suppression — that's not the multi-run shape, it's the no-runs shape, and the per-archive timestamps are still meaningful (the archive was marked completed without a PrintLogEntry trail, e.g. legacy imports).""" from backend.app.api.routes.archives import compute_time_accuracy archive = self._make_archive( print_time_seconds=3600, started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 11, 0, tzinfo=timezone.utc), ) result = compute_time_accuracy(archive, run_aggregate={"run_count": 0}) assert result["actual_time_seconds"] == 3600 assert result["time_accuracy"] == 100.0 @pytest.mark.asyncio @pytest.mark.integration async def test_archive_list_suppresses_time_accuracy_for_multi_run_archives( async_client: AsyncClient, archive_factory, printer_factory, db_session ): """#1608 integration: the card response from the main list endpoint must report ``actual_time_seconds = null`` and ``time_accuracy = null`` for an archive with multiple logged runs, so the frontend renders the slicer estimate without the misleading +N% badge.""" printer = await printer_factory() # 3-plate file: estimate is the whole-file sum, latest run is one plate. archive = await archive_factory( printer.id, status="completed", print_time_seconds=18354, # all-plates estimate (post-#1593 parser) started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 11, 46, 4, tzinfo=timezone.utc), # ~6364s = one plate with_run=False, ) # Three runs each ~6364s — plate-by-plate. for day in (1, 2, 3): db_session.add( PrintLogEntry( archive_id=archive.id, printer_id=archive.printer_id, status="completed", started_at=datetime(2026, 5, day, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, day, 11, 46, 4, tzinfo=timezone.utc), duration_seconds=6364, ) ) await db_session.commit() response = await async_client.get("/api/v1/archives/") assert response.status_code == 200 row = next(r for r in response.json() if r["id"] == archive.id) # Frontend renders archive.actual_time_seconds || archive.print_time_seconds — # with actual cleared, it falls through to the estimate; with accuracy # cleared, no badge renders. assert row["actual_time_seconds"] is None, "multi-run actual is incommensurable with the estimate — must be null" assert row["time_accuracy"] is None, "no badge for multi-run archives — the scopes don't match" # The estimate itself is preserved so the card has something to display. assert row["print_time_seconds"] == 18354 assert row["run_count"] == 3 @pytest.mark.asyncio @pytest.mark.integration async def test_archive_list_keeps_time_accuracy_for_single_run_archives( async_client: AsyncClient, archive_factory, printer_factory, db_session ): """Sanity check for the #1608 fix: single-run archives (the case the badge was designed for) keep their original badge behaviour.""" printer = await printer_factory() archive = await archive_factory( printer.id, status="completed", print_time_seconds=3600, started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 11, 0, tzinfo=timezone.utc), with_run=False, ) db_session.add( PrintLogEntry( archive_id=archive.id, printer_id=archive.printer_id, status="completed", started_at=datetime(2026, 5, 1, 10, 0, tzinfo=timezone.utc), completed_at=datetime(2026, 5, 1, 11, 0, tzinfo=timezone.utc), duration_seconds=3600, ) ) await db_session.commit() row = next(r for r in (await async_client.get("/api/v1/archives/")).json() if r["id"] == archive.id) assert row["actual_time_seconds"] == 3600 assert row["time_accuracy"] == 100.0 assert row["run_count"] == 1