slice_dispatch.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. """In-memory background dispatcher for slice jobs.
  2. Mirrors the shape of `background_dispatch.py` (the print-upload dispatcher)
  3. but tailored for slicing: jobs are independent (no printer-busy gating),
  4. short-lived (typically 5-60s), and the result is a `LibraryFile` or
  5. `PrintArchive` row rather than a printer-side dispatch.
  6. The frontend kicks off a slice via `POST /library/files/{id}/slice` or
  7. `POST /archives/{id}/slice`, gets back `{job_id, status_url}`, then polls
  8. `GET /slice-jobs/{id}` until status is `completed` or `failed`.
  9. """
  10. from __future__ import annotations
  11. import asyncio
  12. import logging
  13. from collections.abc import Awaitable, Callable
  14. from dataclasses import dataclass, field
  15. from datetime import datetime, timezone
  16. from typing import Any, Literal
  17. logger = logging.getLogger(__name__)
  18. SliceJobStatus = Literal["pending", "running", "completed", "failed"]
  19. @dataclass(slots=True)
  20. class SliceJob:
  21. id: int
  22. kind: Literal["library_file", "archive"]
  23. source_id: int
  24. source_name: str
  25. status: SliceJobStatus = "pending"
  26. created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
  27. started_at: datetime | None = None
  28. completed_at: datetime | None = None
  29. # On success: the body returned to the caller — usually a SliceResponse
  30. # or SliceArchiveResponse dict.
  31. result: dict[str, Any] | None = None
  32. # On failure: HTTP status + error message.
  33. error_status: int | None = None
  34. error_detail: str | None = None
  35. # Live progress fed by the sidecar's --pipe channel while the slicer
  36. # is running. Populated by a polling task spawned alongside the
  37. # blocking POST /slice request; None when the sidecar doesn't
  38. # support progress (older sidecars, no request_id, etc.). Surfaced
  39. # in the SliceJobState response so the persistent toast can render
  40. # "Generating G-code (75%)" instead of just elapsed time.
  41. progress: dict[str, Any] | None = None
  42. # Retention: keep finished jobs around for 30 minutes so the polling client
  43. # always sees a terminal state on its next tick. After that, the next access
  44. # sweep prunes them.
  45. _RETENTION_SECONDS = 30 * 60
  46. class SliceDispatchService:
  47. def __init__(self) -> None:
  48. self._jobs: dict[int, SliceJob] = {}
  49. self._next_id: int = 1
  50. self._lock = asyncio.Lock()
  51. self._tasks: dict[int, asyncio.Task] = {}
  52. async def enqueue(
  53. self,
  54. *,
  55. kind: Literal["library_file", "archive"],
  56. source_id: int,
  57. source_name: str,
  58. run: Callable[[int], Awaitable[dict[str, Any]]],
  59. ) -> SliceJob:
  60. """Register a new slice job and start it on the event loop.
  61. ``run`` is an async callable that takes the freshly-created
  62. ``job_id`` (so it can wire up live-progress reporting via
  63. :meth:`set_progress`) and returns the response body the caller
  64. will receive once status flips to ``completed``.
  65. """
  66. async with self._lock:
  67. job = SliceJob(
  68. id=self._next_id,
  69. kind=kind,
  70. source_id=source_id,
  71. source_name=source_name,
  72. )
  73. self._next_id += 1
  74. self._jobs[job.id] = job
  75. self._sweep_locked()
  76. task = asyncio.create_task(self._run_job(job, run), name=f"slice-job-{job.id}")
  77. self._tasks[job.id] = task
  78. return job
  79. async def _run_job(
  80. self,
  81. job: SliceJob,
  82. run: Callable[[int], Awaitable[dict[str, Any]]],
  83. ) -> None:
  84. job.started_at = datetime.now(timezone.utc)
  85. job.status = "running"
  86. try:
  87. result = await run(job.id)
  88. job.result = result
  89. job.status = "completed"
  90. except _SliceJobError as exc:
  91. # Caller-controlled HTTP error — propagate status + detail.
  92. job.status = "failed"
  93. job.error_status = exc.status_code
  94. job.error_detail = exc.detail
  95. except Exception as exc:
  96. logger.exception("Slice job %s failed unexpectedly", job.id)
  97. job.status = "failed"
  98. job.error_status = 500
  99. job.error_detail = f"Unexpected error: {exc}"
  100. finally:
  101. job.completed_at = datetime.now(timezone.utc)
  102. self._tasks.pop(job.id, None)
  103. def get(self, job_id: int) -> SliceJob | None:
  104. return self._jobs.get(job_id)
  105. def set_progress(self, job_id: int, progress: dict[str, Any] | None) -> None:
  106. """Update the live-progress snapshot for a running job.
  107. Called by the slice route's progress poller every ~1s while the
  108. sidecar slice request is in flight. Silently ignores unknown ids
  109. (the job may have just finished and been retention-swept) so a
  110. late poll doesn't crash the polling task.
  111. """
  112. job = self._jobs.get(job_id)
  113. if job is not None:
  114. job.progress = progress
  115. def _sweep_locked(self) -> None:
  116. """Drop finished jobs older than the retention window. Caller holds
  117. the lock."""
  118. now = datetime.now(timezone.utc)
  119. stale_ids = [
  120. jid
  121. for jid, job in self._jobs.items()
  122. if job.status in ("completed", "failed")
  123. and job.completed_at is not None
  124. and (now - job.completed_at).total_seconds() > _RETENTION_SECONDS
  125. ]
  126. for jid in stale_ids:
  127. self._jobs.pop(jid, None)
  128. class _SliceJobError(Exception):
  129. """Raised inside a slice job's `run` callable to surface a specific
  130. HTTP status + detail. The dispatcher catches these and stores them on
  131. the job. Callers convert ``HTTPException`` to this on the boundary.
  132. """
  133. def __init__(self, status_code: int, detail: str) -> None:
  134. super().__init__(detail)
  135. self.status_code = status_code
  136. self.detail = detail
  137. def http_exception_to_job_error(exc) -> _SliceJobError:
  138. """Convert a starlette ``HTTPException`` into the dispatcher's error
  139. type. Handles the common case where slice helpers raise FastAPI's
  140. ``HTTPException`` for validation / sidecar failures.
  141. """
  142. return _SliceJobError(exc.status_code, str(exc.detail))
  143. # Module-level singleton, started/stopped by main.py's lifespan.
  144. slice_dispatch = SliceDispatchService()