slice_dispatch.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. """In-memory background dispatcher for slice jobs.
  2. Mirrors the shape of `background_dispatch.py` (the print-upload dispatcher)
  3. but tailored for slicing: jobs are independent (no printer-busy gating),
  4. short-lived (typically 5-60s), and the result is a `LibraryFile` or
  5. `PrintArchive` row rather than a printer-side dispatch.
  6. The frontend kicks off a slice via `POST /library/files/{id}/slice` or
  7. `POST /archives/{id}/slice`, gets back `{job_id, status_url}`, then polls
  8. `GET /slice-jobs/{id}` until status is `completed` or `failed`.
  9. """
  10. from __future__ import annotations
  11. import asyncio
  12. import logging
  13. from collections.abc import Awaitable, Callable
  14. from dataclasses import dataclass, field
  15. from datetime import datetime, timezone
  16. from typing import Any, Literal
  17. logger = logging.getLogger(__name__)
  18. SliceJobStatus = Literal["pending", "running", "completed", "failed"]
  19. @dataclass(slots=True)
  20. class SliceJob:
  21. id: int
  22. kind: Literal["library_file", "archive"]
  23. source_id: int
  24. source_name: str
  25. status: SliceJobStatus = "pending"
  26. created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
  27. started_at: datetime | None = None
  28. completed_at: datetime | None = None
  29. # On success: the body returned to the caller — usually a SliceResponse
  30. # or SliceArchiveResponse dict.
  31. result: dict[str, Any] | None = None
  32. # On failure: HTTP status + error message.
  33. error_status: int | None = None
  34. error_detail: str | None = None
  35. # Retention: keep finished jobs around for 30 minutes so the polling client
  36. # always sees a terminal state on its next tick. After that, the next access
  37. # sweep prunes them.
  38. _RETENTION_SECONDS = 30 * 60
  39. class SliceDispatchService:
  40. def __init__(self) -> None:
  41. self._jobs: dict[int, SliceJob] = {}
  42. self._next_id: int = 1
  43. self._lock = asyncio.Lock()
  44. self._tasks: dict[int, asyncio.Task] = {}
  45. async def enqueue(
  46. self,
  47. *,
  48. kind: Literal["library_file", "archive"],
  49. source_id: int,
  50. source_name: str,
  51. run: Callable[[], Awaitable[dict[str, Any]]],
  52. ) -> SliceJob:
  53. """Register a new slice job and start it on the event loop.
  54. ``run`` is an async callable that performs the actual slice + save
  55. and returns the response body the caller will receive once status
  56. flips to ``completed``.
  57. """
  58. async with self._lock:
  59. job = SliceJob(
  60. id=self._next_id,
  61. kind=kind,
  62. source_id=source_id,
  63. source_name=source_name,
  64. )
  65. self._next_id += 1
  66. self._jobs[job.id] = job
  67. self._sweep_locked()
  68. task = asyncio.create_task(self._run_job(job, run), name=f"slice-job-{job.id}")
  69. self._tasks[job.id] = task
  70. return job
  71. async def _run_job(
  72. self,
  73. job: SliceJob,
  74. run: Callable[[], Awaitable[dict[str, Any]]],
  75. ) -> None:
  76. job.started_at = datetime.now(timezone.utc)
  77. job.status = "running"
  78. try:
  79. result = await run()
  80. job.result = result
  81. job.status = "completed"
  82. except _SliceJobError as exc:
  83. # Caller-controlled HTTP error — propagate status + detail.
  84. job.status = "failed"
  85. job.error_status = exc.status_code
  86. job.error_detail = exc.detail
  87. except Exception as exc:
  88. logger.exception("Slice job %s failed unexpectedly", job.id)
  89. job.status = "failed"
  90. job.error_status = 500
  91. job.error_detail = f"Unexpected error: {exc}"
  92. finally:
  93. job.completed_at = datetime.now(timezone.utc)
  94. self._tasks.pop(job.id, None)
  95. def get(self, job_id: int) -> SliceJob | None:
  96. return self._jobs.get(job_id)
  97. def _sweep_locked(self) -> None:
  98. """Drop finished jobs older than the retention window. Caller holds
  99. the lock."""
  100. now = datetime.now(timezone.utc)
  101. stale_ids = [
  102. jid
  103. for jid, job in self._jobs.items()
  104. if job.status in ("completed", "failed")
  105. and job.completed_at is not None
  106. and (now - job.completed_at).total_seconds() > _RETENTION_SECONDS
  107. ]
  108. for jid in stale_ids:
  109. self._jobs.pop(jid, None)
  110. class _SliceJobError(Exception):
  111. """Raised inside a slice job's `run` callable to surface a specific
  112. HTTP status + detail. The dispatcher catches these and stores them on
  113. the job. Callers convert ``HTTPException`` to this on the boundary.
  114. """
  115. def __init__(self, status_code: int, detail: str) -> None:
  116. super().__init__(detail)
  117. self.status_code = status_code
  118. self.detail = detail
  119. def http_exception_to_job_error(exc) -> _SliceJobError:
  120. """Convert a starlette ``HTTPException`` into the dispatcher's error
  121. type. Handles the common case where slice helpers raise FastAPI's
  122. ``HTTPException`` for validation / sidecar failures.
  123. """
  124. return _SliceJobError(exc.status_code, str(exc.detail))
  125. # Module-level singleton, started/stopped by main.py's lifespan.
  126. slice_dispatch = SliceDispatchService()