| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- """In-memory background dispatcher for slice jobs.
- Mirrors the shape of `background_dispatch.py` (the print-upload dispatcher)
- but tailored for slicing: jobs are independent (no printer-busy gating),
- short-lived (typically 5-60s), and the result is a `LibraryFile` or
- `PrintArchive` row rather than a printer-side dispatch.
- The frontend kicks off a slice via `POST /library/files/{id}/slice` or
- `POST /archives/{id}/slice`, gets back `{job_id, status_url}`, then polls
- `GET /slice-jobs/{id}` until status is `completed` or `failed`.
- """
- from __future__ import annotations
- import asyncio
- import logging
- from collections.abc import Awaitable, Callable
- from dataclasses import dataclass, field
- from datetime import datetime, timezone
- from typing import Any, Literal
- logger = logging.getLogger(__name__)
- SliceJobStatus = Literal["pending", "running", "completed", "failed"]
- @dataclass(slots=True)
- class SliceJob:
- id: int
- kind: Literal["library_file", "archive"]
- source_id: int
- source_name: str
- status: SliceJobStatus = "pending"
- created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
- started_at: datetime | None = None
- completed_at: datetime | None = None
- # On success: the body returned to the caller — usually a SliceResponse
- # or SliceArchiveResponse dict.
- result: dict[str, Any] | None = None
- # On failure: HTTP status + error message.
- error_status: int | None = None
- error_detail: str | None = None
- # Retention: keep finished jobs around for 30 minutes so the polling client
- # always sees a terminal state on its next tick. After that, the next access
- # sweep prunes them.
- _RETENTION_SECONDS = 30 * 60
- class SliceDispatchService:
- def __init__(self) -> None:
- self._jobs: dict[int, SliceJob] = {}
- self._next_id: int = 1
- self._lock = asyncio.Lock()
- self._tasks: dict[int, asyncio.Task] = {}
- async def enqueue(
- self,
- *,
- kind: Literal["library_file", "archive"],
- source_id: int,
- source_name: str,
- run: Callable[[], Awaitable[dict[str, Any]]],
- ) -> SliceJob:
- """Register a new slice job and start it on the event loop.
- ``run`` is an async callable that performs the actual slice + save
- and returns the response body the caller will receive once status
- flips to ``completed``.
- """
- async with self._lock:
- job = SliceJob(
- id=self._next_id,
- kind=kind,
- source_id=source_id,
- source_name=source_name,
- )
- self._next_id += 1
- self._jobs[job.id] = job
- self._sweep_locked()
- task = asyncio.create_task(self._run_job(job, run), name=f"slice-job-{job.id}")
- self._tasks[job.id] = task
- return job
- async def _run_job(
- self,
- job: SliceJob,
- run: Callable[[], Awaitable[dict[str, Any]]],
- ) -> None:
- job.started_at = datetime.now(timezone.utc)
- job.status = "running"
- try:
- result = await run()
- job.result = result
- job.status = "completed"
- except _SliceJobError as exc:
- # Caller-controlled HTTP error — propagate status + detail.
- job.status = "failed"
- job.error_status = exc.status_code
- job.error_detail = exc.detail
- except Exception as exc:
- logger.exception("Slice job %s failed unexpectedly", job.id)
- job.status = "failed"
- job.error_status = 500
- job.error_detail = f"Unexpected error: {exc}"
- finally:
- job.completed_at = datetime.now(timezone.utc)
- self._tasks.pop(job.id, None)
- def get(self, job_id: int) -> SliceJob | None:
- return self._jobs.get(job_id)
- def _sweep_locked(self) -> None:
- """Drop finished jobs older than the retention window. Caller holds
- the lock."""
- now = datetime.now(timezone.utc)
- stale_ids = [
- jid
- for jid, job in self._jobs.items()
- if job.status in ("completed", "failed")
- and job.completed_at is not None
- and (now - job.completed_at).total_seconds() > _RETENTION_SECONDS
- ]
- for jid in stale_ids:
- self._jobs.pop(jid, None)
- class _SliceJobError(Exception):
- """Raised inside a slice job's `run` callable to surface a specific
- HTTP status + detail. The dispatcher catches these and stores them on
- the job. Callers convert ``HTTPException`` to this on the boundary.
- """
- def __init__(self, status_code: int, detail: str) -> None:
- super().__init__(detail)
- self.status_code = status_code
- self.detail = detail
- def http_exception_to_job_error(exc) -> _SliceJobError:
- """Convert a starlette ``HTTPException`` into the dispatcher's error
- type. Handles the common case where slice helpers raise FastAPI's
- ``HTTPException`` for validation / sidecar failures.
- """
- return _SliceJobError(exc.status_code, str(exc.detail))
- # Module-level singleton, started/stopped by main.py's lifespan.
- slice_dispatch = SliceDispatchService()
|