test_sjf_scheduling.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. """Integration tests for Shortest Job First (SJF) queue scheduling."""
  2. import pytest
  3. from sqlalchemy import select
  4. from backend.app.models.print_queue import PrintQueueItem
  5. from backend.app.models.settings import Settings
  6. class TestSJFScheduling:
  7. """Tests for shortest-job-first queue ordering and starvation guard."""
  8. @pytest.fixture
  9. async def printer_factory(self, db_session):
  10. """Factory to create test printers."""
  11. _counter = [0]
  12. async def _create_printer(**kwargs):
  13. from backend.app.models.printer import Printer
  14. _counter[0] += 1
  15. counter = _counter[0]
  16. defaults = {
  17. "name": f"Test Printer {counter}",
  18. "ip_address": f"192.168.1.{100 + counter}",
  19. "serial_number": f"TESTSERIAL{counter:04d}",
  20. "access_code": "12345678",
  21. "model": "X1C",
  22. }
  23. defaults.update(kwargs)
  24. printer = Printer(**defaults)
  25. db_session.add(printer)
  26. await db_session.commit()
  27. await db_session.refresh(printer)
  28. return printer
  29. return _create_printer
  30. @pytest.fixture
  31. async def archive_factory(self, db_session):
  32. """Factory to create test archives."""
  33. _counter = [0]
  34. async def _create_archive(**kwargs):
  35. from backend.app.models.archive import PrintArchive
  36. _counter[0] += 1
  37. counter = _counter[0]
  38. defaults = {
  39. "filename": f"test_print_{counter}.3mf",
  40. "print_name": f"Test Print {counter}",
  41. "file_path": f"/tmp/test_print_{counter}.3mf",
  42. "file_size": 1024,
  43. "content_hash": f"testhash{counter:08d}",
  44. "status": "completed",
  45. }
  46. defaults.update(kwargs)
  47. archive = PrintArchive(**defaults)
  48. db_session.add(archive)
  49. await db_session.commit()
  50. await db_session.refresh(archive)
  51. return archive
  52. return _create_archive
  53. @pytest.fixture
  54. async def queue_item_factory(self, db_session, printer_factory, archive_factory):
  55. """Factory to create test queue items with print_time_seconds."""
  56. async def _create_queue_item(**kwargs):
  57. if "printer_id" not in kwargs:
  58. printer = await printer_factory()
  59. kwargs["printer_id"] = printer.id
  60. if "archive_id" not in kwargs:
  61. archive = await archive_factory()
  62. kwargs["archive_id"] = archive.id
  63. defaults = {
  64. "status": "pending",
  65. "position": 0,
  66. }
  67. defaults.update(kwargs)
  68. item = PrintQueueItem(**defaults)
  69. db_session.add(item)
  70. await db_session.commit()
  71. await db_session.refresh(item)
  72. return item
  73. return _create_queue_item
  74. @pytest.mark.asyncio
  75. @pytest.mark.integration
  76. async def test_queue_item_has_print_time_seconds(self, queue_item_factory):
  77. """Verify print_time_seconds can be stored on queue items."""
  78. item = await queue_item_factory(print_time_seconds=3600, position=1)
  79. assert item.print_time_seconds == 3600
  80. @pytest.mark.asyncio
  81. @pytest.mark.integration
  82. async def test_queue_item_has_been_jumped(self, queue_item_factory):
  83. """Verify been_jumped defaults to False and can be set."""
  84. item = await queue_item_factory(position=1)
  85. assert item.been_jumped is False
  86. item2 = await queue_item_factory(been_jumped=True, position=2)
  87. assert item2.been_jumped is True
  88. @pytest.mark.asyncio
  89. @pytest.mark.integration
  90. async def test_sjf_ordering_shorter_jobs_first(self, db_session, queue_item_factory, printer_factory):
  91. """Verify SJF query orders by print_time_seconds ascending."""
  92. printer = await printer_factory()
  93. # Add items in FIFO order: long, medium, short
  94. long_job = await queue_item_factory(
  95. printer_id=printer.id,
  96. position=1,
  97. print_time_seconds=28800, # 8 hours
  98. )
  99. medium_job = await queue_item_factory(
  100. printer_id=printer.id,
  101. position=2,
  102. print_time_seconds=3600, # 1 hour
  103. )
  104. short_job = await queue_item_factory(
  105. printer_id=printer.id,
  106. position=3,
  107. print_time_seconds=1200, # 20 min
  108. )
  109. # SJF query: been_jumped DESC, print_time_seconds ASC NULLS LAST, position
  110. result = await db_session.execute(
  111. select(PrintQueueItem)
  112. .where(PrintQueueItem.status == "pending")
  113. .where(PrintQueueItem.printer_id == printer.id)
  114. .order_by(
  115. PrintQueueItem.been_jumped.desc(),
  116. PrintQueueItem.print_time_seconds.asc().nullslast(),
  117. PrintQueueItem.position,
  118. )
  119. )
  120. items = list(result.scalars().all())
  121. assert len(items) == 3
  122. assert items[0].id == short_job.id # 20 min first
  123. assert items[1].id == medium_job.id # 1 hour second
  124. assert items[2].id == long_job.id # 8 hours last
  125. @pytest.mark.asyncio
  126. @pytest.mark.integration
  127. async def test_sjf_null_print_time_goes_last(self, db_session, queue_item_factory, printer_factory):
  128. """Verify items without print_time_seconds are sorted last in SJF mode."""
  129. printer = await printer_factory()
  130. no_time = await queue_item_factory(printer_id=printer.id, position=1, print_time_seconds=None)
  131. short_job = await queue_item_factory(printer_id=printer.id, position=2, print_time_seconds=600)
  132. result = await db_session.execute(
  133. select(PrintQueueItem)
  134. .where(PrintQueueItem.status == "pending")
  135. .where(PrintQueueItem.printer_id == printer.id)
  136. .order_by(
  137. PrintQueueItem.been_jumped.desc(),
  138. PrintQueueItem.print_time_seconds.asc().nullslast(),
  139. PrintQueueItem.position,
  140. )
  141. )
  142. items = list(result.scalars().all())
  143. assert items[0].id == short_job.id # Known duration first
  144. assert items[1].id == no_time.id # Unknown duration last
  145. @pytest.mark.asyncio
  146. @pytest.mark.integration
  147. async def test_starvation_guard_jumped_items_first(self, db_session, queue_item_factory, printer_factory):
  148. """Verify been_jumped items are sorted before non-jumped items."""
  149. printer = await printer_factory()
  150. # Long job that was jumped (should go first now)
  151. jumped_long = await queue_item_factory(
  152. printer_id=printer.id, position=1, print_time_seconds=28800, been_jumped=True
  153. )
  154. # Short job (would normally go first, but jumped_long has priority)
  155. short_job = await queue_item_factory(printer_id=printer.id, position=2, print_time_seconds=600)
  156. result = await db_session.execute(
  157. select(PrintQueueItem)
  158. .where(PrintQueueItem.status == "pending")
  159. .where(PrintQueueItem.printer_id == printer.id)
  160. .order_by(
  161. PrintQueueItem.been_jumped.desc(),
  162. PrintQueueItem.print_time_seconds.asc().nullslast(),
  163. PrintQueueItem.position,
  164. )
  165. )
  166. items = list(result.scalars().all())
  167. assert items[0].id == jumped_long.id # Jumped item gets priority
  168. assert items[1].id == short_job.id
  169. @pytest.mark.asyncio
  170. @pytest.mark.integration
  171. async def test_fifo_ordering_ignores_print_time(self, db_session, queue_item_factory, printer_factory):
  172. """Verify default FIFO ordering uses position only, not print_time_seconds."""
  173. printer = await printer_factory()
  174. long_first = await queue_item_factory(printer_id=printer.id, position=1, print_time_seconds=28800)
  175. short_second = await queue_item_factory(printer_id=printer.id, position=2, print_time_seconds=600)
  176. # Default FIFO query (no SJF)
  177. result = await db_session.execute(
  178. select(PrintQueueItem)
  179. .where(PrintQueueItem.status == "pending")
  180. .where(PrintQueueItem.printer_id == printer.id)
  181. .order_by(PrintQueueItem.position)
  182. )
  183. items = list(result.scalars().all())
  184. assert items[0].id == long_first.id # Position 1 first (FIFO)
  185. assert items[1].id == short_second.id # Position 2 second
  186. @pytest.mark.asyncio
  187. @pytest.mark.integration
  188. async def test_sjf_position_as_tiebreaker(self, db_session, queue_item_factory, printer_factory):
  189. """Verify position is used as tiebreaker when print times are equal."""
  190. printer = await printer_factory()
  191. first = await queue_item_factory(printer_id=printer.id, position=1, print_time_seconds=3600)
  192. second = await queue_item_factory(printer_id=printer.id, position=2, print_time_seconds=3600)
  193. result = await db_session.execute(
  194. select(PrintQueueItem)
  195. .where(PrintQueueItem.status == "pending")
  196. .where(PrintQueueItem.printer_id == printer.id)
  197. .order_by(
  198. PrintQueueItem.been_jumped.desc(),
  199. PrintQueueItem.print_time_seconds.asc().nullslast(),
  200. PrintQueueItem.position,
  201. )
  202. )
  203. items = list(result.scalars().all())
  204. assert items[0].id == first.id # Same duration, lower position wins
  205. assert items[1].id == second.id
  206. @pytest.mark.asyncio
  207. @pytest.mark.integration
  208. async def test_starvation_flag_set_on_jumped_items(self, db_session, queue_item_factory, printer_factory):
  209. """Verify the starvation flag logic marks jumped items correctly."""
  210. printer = await printer_factory()
  211. # Simulate: long job at position 1, short job at position 2
  212. long_job = await queue_item_factory(printer_id=printer.id, position=1, print_time_seconds=28800)
  213. short_job = await queue_item_factory(printer_id=printer.id, position=2, print_time_seconds=1200)
  214. # Simulate what the scheduler does when SJF picks short_job first:
  215. # Mark items that were jumped (lower position, longer duration)
  216. items = [long_job, short_job]
  217. winning_item = short_job # SJF would pick this
  218. for other in items:
  219. if (
  220. other.id != winning_item.id
  221. and other.status == "pending"
  222. and other.printer_id == winning_item.printer_id
  223. and not other.been_jumped
  224. and other.position < winning_item.position
  225. and (other.print_time_seconds is None or other.print_time_seconds > winning_item.print_time_seconds)
  226. ):
  227. other.been_jumped = True
  228. await db_session.commit()
  229. await db_session.refresh(long_job)
  230. assert long_job.been_jumped is True
  231. assert short_job.been_jumped is False
  232. @pytest.mark.asyncio
  233. @pytest.mark.integration
  234. async def test_starvation_guard_prevents_double_jump(self, db_session, queue_item_factory, printer_factory):
  235. """Verify an already-jumped item won't be jumped again."""
  236. printer = await printer_factory()
  237. # Long job already jumped once
  238. long_job = await queue_item_factory(
  239. printer_id=printer.id, position=1, print_time_seconds=28800, been_jumped=True
  240. )
  241. # Even shorter job arrives
  242. tiny_job = await queue_item_factory(printer_id=printer.id, position=3, print_time_seconds=300)
  243. # SJF order: jumped items first, then by duration
  244. result = await db_session.execute(
  245. select(PrintQueueItem)
  246. .where(PrintQueueItem.status == "pending")
  247. .where(PrintQueueItem.printer_id == printer.id)
  248. .order_by(
  249. PrintQueueItem.been_jumped.desc(),
  250. PrintQueueItem.print_time_seconds.asc().nullslast(),
  251. PrintQueueItem.position,
  252. )
  253. )
  254. items = list(result.scalars().all())
  255. # long_job goes first because it was already jumped (starvation protection)
  256. assert items[0].id == long_job.id
  257. assert items[1].id == tiny_job.id
  258. @pytest.mark.asyncio
  259. @pytest.mark.integration
  260. async def test_queue_shortest_first_setting(self, db_session):
  261. """Verify the queue_shortest_first setting can be stored and read."""
  262. setting = Settings(key="queue_shortest_first", value="true")
  263. db_session.add(setting)
  264. await db_session.commit()
  265. result = await db_session.execute(select(Settings).where(Settings.key == "queue_shortest_first"))
  266. stored = result.scalar_one_or_none()
  267. assert stored is not None
  268. assert stored.value == "true"