test_archives_api.py 53 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301
  1. """Integration tests for Archives API endpoints.
  2. Tests the full request/response cycle for /api/v1/archives/ endpoints.
  3. """
  4. from pathlib import Path
  5. import pytest
  6. from httpx import AsyncClient
  7. class TestArchivesAPI:
  8. """Integration tests for /api/v1/archives/ endpoints."""
  9. # ========================================================================
  10. # List endpoints
  11. # ========================================================================
  12. @pytest.mark.asyncio
  13. @pytest.mark.integration
  14. async def test_list_archives_empty(self, async_client: AsyncClient):
  15. """Verify empty list is returned when no archives exist."""
  16. response = await async_client.get("/api/v1/archives/")
  17. assert response.status_code == 200
  18. data = response.json()
  19. assert isinstance(data, list)
  20. assert len(data) == 0
  21. @pytest.mark.asyncio
  22. @pytest.mark.integration
  23. async def test_list_archives_with_data(
  24. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  25. ):
  26. """Verify list returns existing archives."""
  27. printer = await printer_factory()
  28. await archive_factory(printer.id, print_name="Test Archive")
  29. response = await async_client.get("/api/v1/archives/")
  30. assert response.status_code == 200
  31. data = response.json()
  32. assert isinstance(data, list)
  33. assert len(data) >= 1
  34. assert any(a["print_name"] == "Test Archive" for a in data)
  35. @pytest.mark.asyncio
  36. @pytest.mark.integration
  37. async def test_list_archives_pagination(
  38. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  39. ):
  40. """Verify pagination works correctly."""
  41. printer = await printer_factory()
  42. # Create 5 archives
  43. for i in range(5):
  44. await archive_factory(printer.id, print_name=f"Archive {i}")
  45. # Get first page with limit 2
  46. response = await async_client.get("/api/v1/archives/?limit=2&offset=0")
  47. assert response.status_code == 200
  48. data = response.json()
  49. assert isinstance(data, list)
  50. assert len(data) == 2
  51. @pytest.mark.asyncio
  52. @pytest.mark.integration
  53. async def test_list_archives_filter_by_printer(
  54. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  55. ):
  56. """Verify filtering by printer_id works."""
  57. printer1 = await printer_factory(name="Printer 1", serial_number="00M09A000000001")
  58. printer2 = await printer_factory(name="Printer 2", serial_number="00M09A000000002")
  59. await archive_factory(printer1.id, print_name="Printer 1 Archive")
  60. await archive_factory(printer2.id, print_name="Printer 2 Archive")
  61. response = await async_client.get(f"/api/v1/archives/?printer_id={printer1.id}")
  62. assert response.status_code == 200
  63. data = response.json()
  64. assert all(a["printer_id"] == printer1.id for a in data)
  65. # ========================================================================
  66. # Get single endpoint
  67. # ========================================================================
  68. @pytest.mark.asyncio
  69. @pytest.mark.integration
  70. async def test_get_archive(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  71. """Verify single archive can be retrieved."""
  72. printer = await printer_factory()
  73. archive = await archive_factory(printer.id, print_name="Get Test Archive")
  74. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  75. assert response.status_code == 200
  76. result = response.json()
  77. assert result["id"] == archive.id
  78. assert result["print_name"] == "Get Test Archive"
  79. @pytest.mark.asyncio
  80. @pytest.mark.integration
  81. async def test_get_archive_not_found(self, async_client: AsyncClient):
  82. """Verify 404 for non-existent archive."""
  83. response = await async_client.get("/api/v1/archives/9999")
  84. assert response.status_code == 404
  85. # ========================================================================
  86. # Update endpoints
  87. # ========================================================================
  88. @pytest.mark.asyncio
  89. @pytest.mark.integration
  90. async def test_update_archive_name(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  91. """Verify archive name can be updated."""
  92. printer = await printer_factory()
  93. archive = await archive_factory(printer.id, print_name="Original Name")
  94. response = await async_client.patch(f"/api/v1/archives/{archive.id}", json={"print_name": "Updated Name"})
  95. assert response.status_code == 200
  96. assert response.json()["print_name"] == "Updated Name"
  97. @pytest.mark.asyncio
  98. @pytest.mark.integration
  99. async def test_update_archive_notes(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  100. """Verify archive notes can be updated."""
  101. printer = await printer_factory()
  102. archive = await archive_factory(printer.id)
  103. response = await async_client.patch(f"/api/v1/archives/{archive.id}", json={"notes": "Great print!"})
  104. assert response.status_code == 200
  105. assert response.json()["notes"] == "Great print!"
  106. @pytest.mark.asyncio
  107. @pytest.mark.integration
  108. async def test_update_archive_favorite(
  109. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  110. ):
  111. """Verify archive favorite status can be updated."""
  112. printer = await printer_factory()
  113. archive = await archive_factory(printer.id)
  114. response = await async_client.patch(f"/api/v1/archives/{archive.id}", json={"is_favorite": True})
  115. assert response.status_code == 200
  116. assert response.json()["is_favorite"] is True
  117. @pytest.mark.asyncio
  118. @pytest.mark.integration
  119. async def test_update_archive_external_url(
  120. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  121. ):
  122. """Verify archive external_url can be updated."""
  123. printer = await printer_factory()
  124. archive = await archive_factory(printer.id)
  125. response = await async_client.patch(
  126. f"/api/v1/archives/{archive.id}", json={"external_url": "https://printables.com/model/12345"}
  127. )
  128. assert response.status_code == 200
  129. assert response.json()["external_url"] == "https://printables.com/model/12345"
  130. # Verify it can be cleared
  131. response = await async_client.patch(f"/api/v1/archives/{archive.id}", json={"external_url": None})
  132. assert response.status_code == 200
  133. assert response.json()["external_url"] is None
  134. @pytest.mark.asyncio
  135. @pytest.mark.integration
  136. async def test_update_archive_failure_reason_mirrors_to_print_log_entry(
  137. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  138. ):
  139. """#1444: PATCH /archives/{id} with failure_reason must mirror to the
  140. latest PrintLogEntry so the Stats page Failure Analysis widget
  141. (which reads PrintLogEntry.failure_reason) reflects the user's
  142. reclassification instead of showing "Unknown" forever.
  143. """
  144. from sqlalchemy import select
  145. from backend.app.models.print_log import PrintLogEntry
  146. printer = await printer_factory()
  147. # archive_factory auto-creates a matching PrintLogEntry (failure_reason
  148. # carried from the archive, which is NULL here — same shape as the bug
  149. # repro: print completed → log entry written with NULL → user goes to
  150. # classify the failure afterwards).
  151. archive = await archive_factory(printer.id, print_name="Failed Print", status="failed", run_status="failed")
  152. response = await async_client.patch(
  153. f"/api/v1/archives/{archive.id}",
  154. json={"failure_reason": "Adhesion failure"},
  155. )
  156. assert response.status_code == 200
  157. result = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
  158. mirrored = result.scalar_one()
  159. assert mirrored.failure_reason == "Adhesion failure"
  160. @pytest.mark.asyncio
  161. @pytest.mark.integration
  162. async def test_update_archive_status_mirrors_to_print_log_entry(
  163. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  164. ):
  165. """#1444: PATCH /archives/{id} with status must mirror to the latest
  166. PrintLogEntry so stats that filter on PrintLogEntry.status see the
  167. user's reclassification.
  168. """
  169. from sqlalchemy import select
  170. from backend.app.models.print_log import PrintLogEntry
  171. printer = await printer_factory()
  172. archive = await archive_factory(printer.id, run_status="completed")
  173. response = await async_client.patch(
  174. f"/api/v1/archives/{archive.id}",
  175. json={"status": "failed"},
  176. )
  177. assert response.status_code == 200
  178. result = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
  179. mirrored = result.scalar_one()
  180. assert mirrored.status == "failed"
  181. @pytest.mark.asyncio
  182. @pytest.mark.integration
  183. async def test_update_archive_failure_reason_only_touches_latest_entry(
  184. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  185. ):
  186. """#1444: For an archive with multiple runs (reprints), only the
  187. latest PrintLogEntry should receive the reclassification. Earlier
  188. runs were classified at their own time and must not be retroactively
  189. overwritten.
  190. """
  191. from backend.app.models.print_log import PrintLogEntry
  192. printer = await printer_factory()
  193. # First run — created by the factory's auto-run with its own reason.
  194. archive = await archive_factory(printer.id, status="failed", run_status="failed")
  195. from sqlalchemy import select
  196. first_run = (
  197. await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
  198. ).scalar_one()
  199. first_run.failure_reason = "Filament tangle"
  200. await db_session.commit()
  201. # Second run — the reprint that just finished with NULL classification.
  202. latest_run = PrintLogEntry(archive_id=archive.id, status="failed", failure_reason=None)
  203. db_session.add(latest_run)
  204. await db_session.commit()
  205. response = await async_client.patch(
  206. f"/api/v1/archives/{archive.id}",
  207. json={"failure_reason": "Adhesion failure"},
  208. )
  209. assert response.status_code == 200
  210. await db_session.refresh(first_run)
  211. await db_session.refresh(latest_run)
  212. assert first_run.failure_reason == "Filament tangle"
  213. assert latest_run.failure_reason == "Adhesion failure"
  214. # ========================================================================
  215. # Delete endpoints
  216. # ========================================================================
  217. @pytest.mark.asyncio
  218. @pytest.mark.integration
  219. async def test_delete_archive(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  220. """Verify archive can be deleted."""
  221. printer = await printer_factory()
  222. archive = await archive_factory(printer.id)
  223. archive_id = archive.id
  224. response = await async_client.delete(f"/api/v1/archives/{archive_id}")
  225. assert response.status_code == 200
  226. # Verify deleted
  227. response = await async_client.get(f"/api/v1/archives/{archive_id}")
  228. assert response.status_code == 404
  229. @pytest.mark.asyncio
  230. @pytest.mark.integration
  231. async def test_delete_nonexistent_archive(self, async_client: AsyncClient):
  232. """Verify deleting non-existent archive returns 404."""
  233. response = await async_client.delete("/api/v1/archives/9999")
  234. assert response.status_code == 404
  235. @pytest.mark.asyncio
  236. @pytest.mark.integration
  237. async def test_soft_delete_preserves_stats_contribution(
  238. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  239. ):
  240. """#1343: deleting an archive without ``purge_stats`` keeps its
  241. contribution in Quick Stats. The row vanishes from listings but the
  242. filament / time / cost totals stay intact.
  243. """
  244. printer = await printer_factory()
  245. await archive_factory(
  246. printer.id,
  247. status="completed",
  248. print_time_seconds=3600,
  249. filament_used_grams=50.0,
  250. cost=1.50,
  251. )
  252. archive_to_delete = await archive_factory(
  253. printer.id,
  254. status="completed",
  255. print_time_seconds=7200,
  256. filament_used_grams=100.0,
  257. cost=3.00,
  258. )
  259. # Pre-delete: stats include both archives.
  260. pre = (await async_client.get("/api/v1/archives/stats")).json()
  261. assert pre["total_prints"] == 2
  262. assert pre["total_filament_grams"] == 150.0
  263. assert pre["total_cost"] == 4.50
  264. # Soft delete (default — no purge_stats param).
  265. resp = await async_client.delete(f"/api/v1/archives/{archive_to_delete.id}")
  266. assert resp.status_code == 200
  267. body = resp.json()
  268. assert body["purged_from_stats"] is False
  269. # Listing hides the deleted archive…
  270. listing = (await async_client.get("/api/v1/archives/")).json()
  271. assert all(a["id"] != archive_to_delete.id for a in listing)
  272. # …but stats still reflect both prints (the whole point of #1343).
  273. post = (await async_client.get("/api/v1/archives/stats")).json()
  274. assert post["total_prints"] == 2
  275. assert post["total_filament_grams"] == 150.0
  276. assert post["total_cost"] == 4.50
  277. @pytest.mark.asyncio
  278. @pytest.mark.integration
  279. async def test_soft_delete_clears_thumbnail_path_on_linked_log_entries(
  280. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  281. ):
  282. """#1348 follow-up: soft-deleting an archive removes its files from disk;
  283. the cached thumbnail_path on linked PrintLogEntry rows must be NULLed
  284. in the same transaction so the print-log view doesn't 404-storm on the
  285. now-deleted thumbnail file."""
  286. from sqlalchemy import select
  287. from backend.app.models.print_log import PrintLogEntry
  288. printer = await printer_factory()
  289. archive = await archive_factory(
  290. printer.id,
  291. status="completed",
  292. thumbnail_path="archives/test/test_print/thumbnail.png",
  293. )
  294. # The factory's auto-PrintLogEntry doesn't copy thumbnail_path; set it
  295. # manually to mirror what the production write_log_entry path stores.
  296. run_query = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
  297. run = run_query.scalar_one()
  298. run.thumbnail_path = "archives/test/test_print/thumbnail.png"
  299. await db_session.commit()
  300. assert run.thumbnail_path is not None
  301. resp = await async_client.delete(f"/api/v1/archives/{archive.id}")
  302. assert resp.status_code == 200
  303. assert resp.json()["purged_from_stats"] is False
  304. await db_session.refresh(run)
  305. assert run.thumbnail_path is None, "soft-delete must NULL thumbnail_path on linked log entry"
  306. # The log entry itself survives the soft delete (its filament/cost
  307. # contribution still needs to flow into stats per #1343).
  308. assert run.id is not None
  309. assert run.archive_id == archive.id
  310. @pytest.mark.asyncio
  311. @pytest.mark.integration
  312. async def test_hard_delete_clears_thumbnail_path_before_fk_cascade(
  313. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  314. ):
  315. """#1348 follow-up: the auto-purge sweeper (and any caller of
  316. ArchiveService.delete_archive) hard-deletes the archive row but leaves
  317. PrintLogEntry rows alive via ON DELETE SET NULL. The eager
  318. thumbnail_path clear must run inside delete_archive so even orphaned
  319. log entries don't surface stale paths."""
  320. from sqlalchemy import select
  321. from backend.app.models.print_log import PrintLogEntry
  322. from backend.app.services.archive import ArchiveService
  323. printer = await printer_factory()
  324. archive = await archive_factory(
  325. printer.id,
  326. status="completed",
  327. thumbnail_path="archives/test/test_print/thumbnail.png",
  328. )
  329. run_query = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
  330. run = run_query.scalar_one()
  331. run.thumbnail_path = "archives/test/test_print/thumbnail.png"
  332. await db_session.commit()
  333. run_id = run.id
  334. service = ArchiveService(db_session)
  335. assert await service.delete_archive(archive.id) is True
  336. # Log entry survives the hard-delete (the FK is ON DELETE SET NULL
  337. # in production; SQLite test config doesn't enable foreign_keys=ON
  338. # by default so archive_id may still be set, but the row itself
  339. # remains for audit). The thumbnail_path was cleared eagerly by
  340. # _null_print_log_thumbnail_paths before db.delete(archive).
  341. refetch = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.id == run_id))
  342. survivor = refetch.scalar_one()
  343. assert survivor.thumbnail_path is None, (
  344. "delete_archive must NULL thumbnail_path before removing the archive row"
  345. )
  346. @pytest.mark.asyncio
  347. @pytest.mark.integration
  348. async def test_print_log_thumbnail_route_lazy_nulls_missing_file(
  349. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  350. ):
  351. """#1348 follow-up: GET /print-log/{id}/thumbnail self-heals when the
  352. thumbnail_path on a log entry points at a missing file (failed print
  353. whose thumbnail was never written, or a stale path that escaped the
  354. delete-time cleanup)."""
  355. from sqlalchemy import select
  356. from backend.app.models.print_log import PrintLogEntry
  357. printer = await printer_factory()
  358. archive = await archive_factory(printer.id, status="failed")
  359. run_query = await db_session.execute(select(PrintLogEntry).where(PrintLogEntry.archive_id == archive.id))
  360. run = run_query.scalar_one()
  361. # Path points at a file that never existed (failed-print case where
  362. # archive.thumbnail_path was set but the extractor never produced one).
  363. run.thumbnail_path = "archives/missing/never_written/thumbnail.png"
  364. await db_session.commit()
  365. # Auth is disabled in the integration test config, so the stream-token
  366. # guard is bypassed — the route runs the lazy-NULL branch directly.
  367. resp = await async_client.get(f"/api/v1/print-log/{run.id}/thumbnail")
  368. assert resp.status_code == 404
  369. await db_session.refresh(run)
  370. assert run.thumbnail_path is None, "missing file must self-heal to NULL"
  371. @pytest.mark.asyncio
  372. @pytest.mark.integration
  373. async def test_purge_stats_drops_archive_from_quick_stats(
  374. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  375. ):
  376. """#1343: deleting with ``?purge_stats=true`` hard-deletes the row,
  377. dropping its contribution from Quick Stats (the original behaviour,
  378. now opt-in)."""
  379. printer = await printer_factory()
  380. keep = await archive_factory(printer.id, status="completed", filament_used_grams=50.0)
  381. purge = await archive_factory(printer.id, status="completed", filament_used_grams=100.0)
  382. resp = await async_client.delete(f"/api/v1/archives/{purge.id}?purge_stats=true")
  383. assert resp.status_code == 200
  384. assert resp.json()["purged_from_stats"] is True
  385. stats = (await async_client.get("/api/v1/archives/stats")).json()
  386. assert stats["total_prints"] == 1
  387. assert stats["total_filament_grams"] == 50.0
  388. # The kept archive is still listed.
  389. listing = (await async_client.get("/api/v1/archives/")).json()
  390. assert [a["id"] for a in listing] == [keep.id]
  391. @pytest.mark.asyncio
  392. @pytest.mark.integration
  393. async def test_soft_deleted_archive_404_on_detail(
  394. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  395. ):
  396. """A soft-deleted archive must 404 on GET — a stale bookmark or
  397. direct URL should not expose a row the user has already removed."""
  398. printer = await printer_factory()
  399. archive = await archive_factory(printer.id)
  400. await async_client.delete(f"/api/v1/archives/{archive.id}")
  401. resp = await async_client.get(f"/api/v1/archives/{archive.id}")
  402. assert resp.status_code == 404
  403. @pytest.mark.asyncio
  404. @pytest.mark.integration
  405. async def test_soft_deleted_archive_hidden_from_search(
  406. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  407. ):
  408. """Search must skip soft-deleted archives. Uses the LIKE fallback by
  409. querying a single-character pattern that the SQLite FTS5 rejects, so
  410. the test covers the fallback path that the production FTS path also
  411. respects."""
  412. printer = await printer_factory()
  413. archive = await archive_factory(printer.id, print_name="UniqueSoftDeleteCandidate")
  414. await async_client.delete(f"/api/v1/archives/{archive.id}")
  415. resp = await async_client.get("/api/v1/archives/search?q=UniqueSoftDeleteCandidate")
  416. assert resp.status_code == 200
  417. assert resp.json() == []
  418. # ========================================================================
  419. # Statistics endpoints
  420. # ========================================================================
  421. @pytest.mark.asyncio
  422. @pytest.mark.integration
  423. async def test_get_archive_stats(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  424. """Verify archive statistics can be retrieved."""
  425. printer = await printer_factory()
  426. await archive_factory(
  427. printer.id,
  428. status="completed",
  429. print_time_seconds=3600,
  430. filament_used_grams=50.0,
  431. )
  432. await archive_factory(
  433. printer.id,
  434. status="completed",
  435. print_time_seconds=7200,
  436. filament_used_grams=100.0,
  437. )
  438. response = await async_client.get("/api/v1/archives/stats")
  439. assert response.status_code == 200
  440. result = response.json()
  441. # Check for actual stats fields
  442. assert "total_prints" in result
  443. assert "successful_prints" in result
  444. class TestArchivesSlimAPI:
  445. """Integration tests for /api/v1/archives/slim endpoint."""
  446. @pytest.mark.asyncio
  447. @pytest.mark.integration
  448. async def test_slim_empty(self, async_client: AsyncClient):
  449. """Verify empty list when no archives exist."""
  450. response = await async_client.get("/api/v1/archives/slim")
  451. assert response.status_code == 200
  452. assert response.json() == []
  453. @pytest.mark.asyncio
  454. @pytest.mark.integration
  455. async def test_slim_returns_only_expected_fields(
  456. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  457. ):
  458. """Verify response contains only slim fields, not full archive data."""
  459. printer = await printer_factory()
  460. await archive_factory(
  461. printer.id,
  462. print_name="Slim Test",
  463. status="completed",
  464. filament_type="PLA",
  465. filament_color="#FF0000",
  466. filament_used_grams=50.0,
  467. print_time_seconds=3600,
  468. cost=1.50,
  469. quantity=2,
  470. )
  471. response = await async_client.get("/api/v1/archives/slim")
  472. assert response.status_code == 200
  473. data = response.json()
  474. assert len(data) == 1
  475. item = data[0]
  476. # Expected fields present
  477. assert item["printer_id"] == printer.id
  478. assert item["print_name"] == "Slim Test"
  479. assert item["status"] == "completed"
  480. assert item["filament_type"] == "PLA"
  481. assert item["filament_color"] == "#FF0000"
  482. assert item["filament_used_grams"] == 50.0
  483. assert item["print_time_seconds"] == 3600
  484. assert item["cost"] == 1.50
  485. # quantity is per-event semantics now (each PrintLogEntry = one run);
  486. # the archive's quantity field is no longer surfaced through this
  487. # endpoint after the #1390 per-event migration.
  488. assert item["quantity"] == 1
  489. assert "created_at" in item
  490. # Full archive fields must NOT be present
  491. assert "id" not in item
  492. assert "filename" not in item
  493. assert "file_path" not in item
  494. assert "file_size" not in item
  495. assert "extra_data" not in item
  496. assert "notes" not in item
  497. assert "tags" not in item
  498. assert "photos" not in item
  499. assert "thumbnail_path" not in item
  500. assert "content_hash" not in item
  501. assert "duplicates" not in item
  502. assert "duplicate_count" not in item
  503. @pytest.mark.asyncio
  504. @pytest.mark.integration
  505. async def test_slim_computes_actual_time(
  506. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  507. ):
  508. """Verify actual_time_seconds is computed from started_at/completed_at."""
  509. from datetime import datetime, timezone
  510. printer = await printer_factory()
  511. started = datetime(2024, 1, 1, 10, 0, 0, tzinfo=timezone.utc)
  512. completed = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) # 2 hours = 7200s
  513. await archive_factory(
  514. printer.id,
  515. status="completed",
  516. started_at=started,
  517. completed_at=completed,
  518. )
  519. response = await async_client.get("/api/v1/archives/slim")
  520. assert response.status_code == 200
  521. item = response.json()[0]
  522. assert item["actual_time_seconds"] == 7200
  523. @pytest.mark.asyncio
  524. @pytest.mark.integration
  525. async def test_slim_actual_time_for_failed_includes_elapsed(
  526. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  527. ):
  528. """Failed prints report measured elapsed time so Printer Stats By Time
  529. matches Quick Stats Print Time (#1390). Previously this returned null
  530. and the frontend fell back to the slicer estimate, double-counting the
  531. unfinished portion of the print."""
  532. from datetime import datetime, timezone
  533. printer = await printer_factory()
  534. await archive_factory(
  535. printer.id,
  536. status="failed",
  537. started_at=datetime(2024, 1, 1, 10, 0, 0, tzinfo=timezone.utc),
  538. completed_at=datetime(2024, 1, 1, 11, 0, 0, tzinfo=timezone.utc),
  539. )
  540. response = await async_client.get("/api/v1/archives/slim")
  541. assert response.status_code == 200
  542. item = response.json()[0]
  543. assert item["actual_time_seconds"] == 3600
  544. @pytest.mark.asyncio
  545. @pytest.mark.integration
  546. async def test_slim_date_filtering(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  547. """Verify date_from and date_to filters work."""
  548. from datetime import datetime, timezone
  549. printer = await printer_factory()
  550. await archive_factory(
  551. printer.id,
  552. print_name="Old Print",
  553. created_at=datetime(2024, 1, 1, tzinfo=timezone.utc),
  554. )
  555. await archive_factory(
  556. printer.id,
  557. print_name="New Print",
  558. created_at=datetime(2024, 6, 15, tzinfo=timezone.utc),
  559. )
  560. # Filter to only June 2024
  561. response = await async_client.get("/api/v1/archives/slim?date_from=2024-06-01&date_to=2024-06-30")
  562. assert response.status_code == 200
  563. data = response.json()
  564. assert len(data) == 1
  565. assert data[0]["print_name"] == "New Print"
  566. @pytest.mark.asyncio
  567. @pytest.mark.integration
  568. async def test_slim_pagination(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  569. """Verify limit and offset work."""
  570. printer = await printer_factory()
  571. for i in range(5):
  572. await archive_factory(printer.id, print_name=f"Print {i}")
  573. response = await async_client.get("/api/v1/archives/slim?limit=2&offset=0")
  574. assert response.status_code == 200
  575. assert len(response.json()) == 2
  576. @pytest.mark.asyncio
  577. @pytest.mark.integration
  578. async def test_slim_counts_reprints_as_separate_rows(
  579. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  580. ):
  581. """Reprints add events even though the archive row is overwritten (#1390).
  582. Before the per-event migration, /archives/slim returned one row per
  583. archive — so an archive that had been reprinted three times appeared
  584. once and undercounted Filament Used / Cost / Time. The endpoint must
  585. now return one row per logged event.
  586. """
  587. from backend.app.models.print_log import PrintLogEntry
  588. printer = await printer_factory()
  589. archive = await archive_factory(
  590. printer.id,
  591. print_name="Reprinted Model",
  592. filament_used_grams=50.0,
  593. cost=1.50,
  594. )
  595. # archive_factory synthesizes one event; add two more to simulate
  596. # the same archive being reprinted twice more.
  597. for _ in range(2):
  598. db_session.add(
  599. PrintLogEntry(
  600. archive_id=archive.id,
  601. printer_id=archive.printer_id,
  602. status="completed",
  603. filament_type=archive.filament_type,
  604. filament_used_grams=archive.filament_used_grams,
  605. cost=archive.cost,
  606. print_name=archive.print_name,
  607. )
  608. )
  609. await db_session.commit()
  610. response = await async_client.get("/api/v1/archives/slim")
  611. assert response.status_code == 200
  612. data = response.json()
  613. assert len(data) == 3, "Each reprint must contribute one row"
  614. total_filament = sum(item["filament_used_grams"] or 0 for item in data)
  615. assert total_filament == 150.0, "Sum across events must reflect all three runs"
  616. @pytest.mark.asyncio
  617. @pytest.mark.integration
  618. async def test_slim_includes_orphan_events(self, async_client: AsyncClient, printer_factory, db_session):
  619. """Events whose archive was hard-deleted still appear (#1390).
  620. After ON DELETE SET NULL the event row survives with archive_id=NULL.
  621. The slim endpoint must keep counting it so Quick Stats and the
  622. archive-iterating widgets stay aligned.
  623. """
  624. from backend.app.models.print_log import PrintLogEntry
  625. printer = await printer_factory()
  626. db_session.add(
  627. PrintLogEntry(
  628. archive_id=None,
  629. printer_id=printer.id,
  630. status="completed",
  631. filament_type="PETG",
  632. filament_used_grams=25.0,
  633. cost=0.75,
  634. print_name="Orphaned Print",
  635. )
  636. )
  637. await db_session.commit()
  638. response = await async_client.get("/api/v1/archives/slim")
  639. assert response.status_code == 200
  640. data = response.json()
  641. assert len(data) == 1
  642. assert data[0]["print_name"] == "Orphaned Print"
  643. assert data[0]["filament_used_grams"] == 25.0
  644. # print_time_seconds (sliced estimate) comes from the archive table,
  645. # which orphans no longer have — must surface as null gracefully.
  646. assert data[0]["print_time_seconds"] is None
  647. class TestFailureAnalysisAPI:
  648. """Per-event failure analysis (#1390)."""
  649. @pytest.mark.asyncio
  650. @pytest.mark.integration
  651. async def test_failure_analysis_counts_reprints_and_orphans(
  652. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  653. ):
  654. """Failure analysis aggregates per event, not per archive.
  655. Verifies the dual fix for #1390: a reprint that adds a second failed
  656. event must count twice, and an orphan failed event (archive deleted)
  657. must still appear in the totals.
  658. """
  659. from backend.app.models.print_log import PrintLogEntry
  660. printer = await printer_factory()
  661. archive = await archive_factory(
  662. printer.id,
  663. print_name="Failing Model",
  664. status="failed",
  665. failure_reason="filament_runout",
  666. )
  667. # Add a second failed event for the same archive (a reprint that also
  668. # failed) and one orphan failed event (archive was deleted).
  669. db_session.add(
  670. PrintLogEntry(
  671. archive_id=archive.id,
  672. printer_id=printer.id,
  673. status="failed",
  674. failure_reason="filament_runout",
  675. filament_type=archive.filament_type,
  676. print_name=archive.print_name,
  677. )
  678. )
  679. db_session.add(
  680. PrintLogEntry(
  681. archive_id=None,
  682. printer_id=printer.id,
  683. status="failed",
  684. failure_reason="bed_adhesion",
  685. filament_type="PETG",
  686. print_name="Orphaned Failed Print",
  687. )
  688. )
  689. await db_session.commit()
  690. response = await async_client.get("/api/v1/archives/analysis/failures")
  691. assert response.status_code == 200
  692. result = response.json()
  693. assert result["total_prints"] == 3
  694. assert result["failed_prints"] == 3
  695. assert result["failures_by_reason"]["filament_runout"] == 2
  696. assert result["failures_by_reason"]["bed_adhesion"] == 1
  697. class TestArchiveDataIntegrity:
  698. """Tests for archive data integrity."""
  699. @pytest.mark.asyncio
  700. @pytest.mark.integration
  701. async def test_archive_linked_to_printer(
  702. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  703. ):
  704. """Verify archive is properly linked to printer."""
  705. printer = await printer_factory(name="My Printer")
  706. archive = await archive_factory(printer.id)
  707. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  708. assert response.status_code == 200
  709. result = response.json()
  710. assert result["printer_id"] == printer.id
  711. @pytest.mark.asyncio
  712. @pytest.mark.integration
  713. async def test_archive_stores_print_data(
  714. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  715. ):
  716. """Verify archive stores all print data correctly."""
  717. printer = await printer_factory()
  718. archive = await archive_factory(
  719. printer.id,
  720. print_name="Test Print",
  721. filename="test.3mf",
  722. status="completed",
  723. filament_type="PLA",
  724. filament_used_grams=75.5,
  725. print_time_seconds=5400,
  726. )
  727. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  728. assert response.status_code == 200
  729. result = response.json()
  730. assert result["print_name"] == "Test Print"
  731. assert result["filename"] == "test.3mf"
  732. assert result["status"] == "completed"
  733. assert result["filament_type"] == "PLA"
  734. assert result["filament_used_grams"] == 75.5
  735. assert result["print_time_seconds"] == 5400
  736. @pytest.mark.asyncio
  737. @pytest.mark.integration
  738. async def test_archive_update_persists(
  739. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  740. ):
  741. """CRITICAL: Verify archive updates persist."""
  742. printer = await printer_factory()
  743. archive = await archive_factory(printer.id, notes="Original notes")
  744. # Update
  745. await async_client.patch(f"/api/v1/archives/{archive.id}", json={"notes": "Updated notes", "is_favorite": True})
  746. # Verify persistence
  747. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  748. result = response.json()
  749. assert result["notes"] == "Updated notes"
  750. assert result["is_favorite"] is True
  751. class TestArchiveF3DEndpoints:
  752. """Tests for F3D (Fusion 360 design file) attachment endpoints."""
  753. @pytest.mark.asyncio
  754. @pytest.mark.integration
  755. async def test_archive_response_includes_f3d_path(
  756. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  757. ):
  758. """Verify f3d_path is included in archive response."""
  759. printer = await printer_factory()
  760. archive = await archive_factory(printer.id, f3d_path="archives/test/design.f3d")
  761. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  762. assert response.status_code == 200
  763. result = response.json()
  764. assert "f3d_path" in result
  765. assert result["f3d_path"] == "archives/test/design.f3d"
  766. @pytest.mark.asyncio
  767. @pytest.mark.integration
  768. async def test_archive_response_f3d_path_null_when_not_set(
  769. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  770. ):
  771. """Verify f3d_path is null when no F3D file attached."""
  772. printer = await printer_factory()
  773. archive = await archive_factory(printer.id)
  774. response = await async_client.get(f"/api/v1/archives/{archive.id}")
  775. assert response.status_code == 200
  776. result = response.json()
  777. assert "f3d_path" in result
  778. assert result["f3d_path"] is None
  779. @pytest.mark.asyncio
  780. @pytest.mark.integration
  781. async def test_upload_f3d_to_nonexistent_archive(self, async_client: AsyncClient):
  782. """Verify 404 when uploading F3D to non-existent archive."""
  783. # Create a minimal file-like upload
  784. files = {"file": ("design.f3d", b"fake f3d content", "application/octet-stream")}
  785. response = await async_client.post("/api/v1/archives/9999/f3d", files=files)
  786. assert response.status_code == 404
  787. @pytest.mark.asyncio
  788. @pytest.mark.integration
  789. async def test_download_f3d_not_found_when_no_file(
  790. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  791. ):
  792. """Verify 404 when downloading F3D from archive without F3D file."""
  793. printer = await printer_factory()
  794. archive = await archive_factory(printer.id)
  795. response = await async_client.get(f"/api/v1/archives/{archive.id}/f3d")
  796. assert response.status_code == 404
  797. @pytest.mark.asyncio
  798. @pytest.mark.integration
  799. async def test_download_f3d_nonexistent_archive(self, async_client: AsyncClient):
  800. """Verify 404 when downloading F3D from non-existent archive."""
  801. response = await async_client.get("/api/v1/archives/9999/f3d")
  802. assert response.status_code == 404
  803. @pytest.mark.asyncio
  804. @pytest.mark.integration
  805. async def test_delete_f3d_nonexistent_archive(self, async_client: AsyncClient):
  806. """Verify 404 when deleting F3D from non-existent archive."""
  807. response = await async_client.delete("/api/v1/archives/9999/f3d")
  808. assert response.status_code == 404
  809. @pytest.mark.asyncio
  810. @pytest.mark.integration
  811. async def test_delete_f3d_when_no_file(
  812. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  813. ):
  814. """Verify 404 when deleting F3D from archive without F3D file."""
  815. printer = await printer_factory()
  816. archive = await archive_factory(printer.id)
  817. response = await async_client.delete(f"/api/v1/archives/{archive.id}/f3d")
  818. assert response.status_code == 404
  819. @pytest.mark.asyncio
  820. @pytest.mark.integration
  821. async def test_list_archives_includes_f3d_path(
  822. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  823. ):
  824. """Verify f3d_path is included in archive list responses."""
  825. printer = await printer_factory()
  826. await archive_factory(printer.id, print_name="With F3D", f3d_path="archives/test/design.f3d")
  827. await archive_factory(printer.id, print_name="Without F3D")
  828. response = await async_client.get("/api/v1/archives/")
  829. assert response.status_code == 200
  830. data = response.json()
  831. assert len(data) >= 2
  832. with_f3d = next((a for a in data if a["print_name"] == "With F3D"), None)
  833. without_f3d = next((a for a in data if a["print_name"] == "Without F3D"), None)
  834. assert with_f3d is not None
  835. assert with_f3d["f3d_path"] == "archives/test/design.f3d"
  836. assert without_f3d is not None
  837. assert without_f3d["f3d_path"] is None
  838. # ========================================================================
  839. # Multi-Plate 3MF endpoints (Issue #93)
  840. # ========================================================================
  841. @pytest.mark.asyncio
  842. @pytest.mark.integration
  843. async def test_get_archive_plates_not_found(self, async_client: AsyncClient):
  844. """Verify 404 when fetching plates for non-existent archive."""
  845. response = await async_client.get("/api/v1/archives/999999/plates")
  846. assert response.status_code == 404
  847. @pytest.mark.asyncio
  848. @pytest.mark.integration
  849. async def test_get_plate_thumbnail_not_found(self, async_client: AsyncClient):
  850. """Verify 404 when fetching plate thumbnail for non-existent archive."""
  851. response = await async_client.get("/api/v1/archives/999999/plate-thumbnail/1")
  852. assert response.status_code == 404
  853. @pytest.mark.asyncio
  854. @pytest.mark.integration
  855. async def test_filament_requirements_not_found(self, async_client: AsyncClient):
  856. """Verify filament-requirements returns 404 for non-existent archive."""
  857. response = await async_client.get("/api/v1/archives/999999/filament-requirements")
  858. assert response.status_code == 404
  859. @pytest.mark.asyncio
  860. @pytest.mark.integration
  861. async def test_filament_requirements_with_plate_id_not_found(self, async_client: AsyncClient):
  862. """Verify filament-requirements with plate_id returns 404 for non-existent archive."""
  863. response = await async_client.get("/api/v1/archives/999999/filament-requirements?plate_id=1")
  864. assert response.status_code == 404
  865. # ========================================================================
  866. # Tag Management endpoints (Issue #183)
  867. # ========================================================================
  868. @pytest.mark.asyncio
  869. @pytest.mark.integration
  870. async def test_get_tags_empty(self, async_client: AsyncClient):
  871. """Verify empty list when no tags exist."""
  872. response = await async_client.get("/api/v1/archives/tags")
  873. assert response.status_code == 200
  874. data = response.json()
  875. assert isinstance(data, list)
  876. assert len(data) == 0
  877. @pytest.mark.asyncio
  878. @pytest.mark.integration
  879. async def test_get_tags_with_data(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  880. """Verify tags are returned with counts."""
  881. printer = await printer_factory()
  882. await archive_factory(printer.id, print_name="Archive 1", tags="functional, test")
  883. await archive_factory(printer.id, print_name="Archive 2", tags="functional, calibration")
  884. await archive_factory(printer.id, print_name="Archive 3", tags="test")
  885. response = await async_client.get("/api/v1/archives/tags")
  886. assert response.status_code == 200
  887. data = response.json()
  888. assert isinstance(data, list)
  889. # Convert to dict for easier lookup
  890. tags_dict = {t["name"]: t["count"] for t in data}
  891. assert tags_dict.get("functional") == 2
  892. assert tags_dict.get("test") == 2
  893. assert tags_dict.get("calibration") == 1
  894. @pytest.mark.asyncio
  895. @pytest.mark.integration
  896. async def test_get_tags_sorted_by_count(
  897. self, async_client: AsyncClient, archive_factory, printer_factory, db_session
  898. ):
  899. """Verify tags are sorted by count descending, then by name."""
  900. printer = await printer_factory()
  901. await archive_factory(printer.id, tags="alpha")
  902. await archive_factory(printer.id, tags="beta, alpha")
  903. await archive_factory(printer.id, tags="gamma, beta, alpha")
  904. response = await async_client.get("/api/v1/archives/tags")
  905. assert response.status_code == 200
  906. data = response.json()
  907. # alpha=3, beta=2, gamma=1
  908. assert data[0]["name"] == "alpha"
  909. assert data[0]["count"] == 3
  910. assert data[1]["name"] == "beta"
  911. assert data[1]["count"] == 2
  912. assert data[2]["name"] == "gamma"
  913. assert data[2]["count"] == 1
  914. @pytest.mark.asyncio
  915. @pytest.mark.integration
  916. async def test_rename_tag(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  917. """Verify renaming a tag updates all archives."""
  918. printer = await printer_factory()
  919. a1 = await archive_factory(printer.id, print_name="Archive 1", tags="old-tag, other")
  920. a2 = await archive_factory(printer.id, print_name="Archive 2", tags="old-tag")
  921. await archive_factory(printer.id, print_name="Archive 3", tags="different")
  922. response = await async_client.put("/api/v1/archives/tags/old-tag", json={"new_name": "new-tag"})
  923. assert response.status_code == 200
  924. data = response.json()
  925. assert data["affected"] == 2
  926. # Verify the archives were updated
  927. response = await async_client.get(f"/api/v1/archives/{a1.id}")
  928. assert "new-tag" in response.json()["tags"]
  929. assert "old-tag" not in response.json()["tags"]
  930. response = await async_client.get(f"/api/v1/archives/{a2.id}")
  931. assert response.json()["tags"] == "new-tag"
  932. @pytest.mark.asyncio
  933. @pytest.mark.integration
  934. async def test_rename_tag_no_change(self, async_client: AsyncClient):
  935. """Verify renaming to same name returns 0 affected."""
  936. response = await async_client.put("/api/v1/archives/tags/some-tag", json={"new_name": "some-tag"})
  937. assert response.status_code == 200
  938. assert response.json()["affected"] == 0
  939. @pytest.mark.asyncio
  940. @pytest.mark.integration
  941. async def test_rename_tag_empty_name_error(self, async_client: AsyncClient):
  942. """Verify renaming to empty name returns error."""
  943. response = await async_client.put("/api/v1/archives/tags/some-tag", json={"new_name": ""})
  944. assert response.status_code == 400
  945. @pytest.mark.asyncio
  946. @pytest.mark.integration
  947. async def test_delete_tag(self, async_client: AsyncClient, archive_factory, printer_factory, db_session):
  948. """Verify deleting a tag removes it from all archives."""
  949. printer = await printer_factory()
  950. a1 = await archive_factory(printer.id, print_name="Archive 1", tags="delete-me, keep")
  951. a2 = await archive_factory(printer.id, print_name="Archive 2", tags="delete-me")
  952. await archive_factory(printer.id, print_name="Archive 3", tags="different")
  953. response = await async_client.delete("/api/v1/archives/tags/delete-me")
  954. assert response.status_code == 200
  955. data = response.json()
  956. assert data["affected"] == 2
  957. # Verify the archives were updated
  958. response = await async_client.get(f"/api/v1/archives/{a1.id}")
  959. assert response.json()["tags"] == "keep"
  960. response = await async_client.get(f"/api/v1/archives/{a2.id}")
  961. # Should be None or empty when last tag is removed
  962. assert response.json()["tags"] is None or response.json()["tags"] == ""
  963. @pytest.mark.asyncio
  964. @pytest.mark.integration
  965. async def test_delete_tag_not_found(self, async_client: AsyncClient):
  966. """Verify deleting non-existent tag returns 0 affected."""
  967. response = await async_client.delete("/api/v1/archives/tags/nonexistent-tag")
  968. assert response.status_code == 200
  969. assert response.json()["affected"] == 0
  970. class TestUploadSourceThreeMF:
  971. """Regression for #1531: source-3MF upload on fallback archives."""
  972. @staticmethod
  973. def _minimal_3mf_bytes() -> bytes:
  974. """Smallest valid .3mf — the upload path enforces a zip header check."""
  975. import io
  976. import zipfile
  977. buf = io.BytesIO()
  978. with zipfile.ZipFile(buf, "w") as zf:
  979. zf.writestr("[Content_Types].xml", "<types/>")
  980. return buf.getvalue()
  981. @pytest.mark.asyncio
  982. @pytest.mark.integration
  983. async def test_fallback_archive_source_upload_lands_under_base_dir(
  984. self, async_client: AsyncClient, archive_factory, printer_factory, monkeypatch, tmp_path
  985. ):
  986. """Fallback archive (file_path='') must accept a source upload and store it inside base_dir.
  987. Pre-fix, ``Path(base_dir) / ''`` collapsed to ``base_dir`` and the
  988. ``.parent`` walked out of the data volume, sending the file to
  989. ``/app/source/...`` and crashing on ``relative_to``.
  990. """
  991. from backend.app.core.config import settings as app_settings
  992. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  993. printer = await printer_factory()
  994. archive = await archive_factory(
  995. printer.id,
  996. print_name="Cloud Print",
  997. file_path="", # fallback archive — no source 3MF was archived
  998. filename="Cloud Print.3mf",
  999. )
  1000. files = {"file": ("cloud_print.3mf", self._minimal_3mf_bytes(), "application/octet-stream")}
  1001. response = await async_client.post(f"/api/v1/archives/{archive.id}/source", files=files)
  1002. assert response.status_code == 200, response.text
  1003. payload = response.json()
  1004. rel = payload["source_3mf_path"]
  1005. # Stored as a relative path inside base_dir.
  1006. assert not rel.startswith("/"), f"source_3mf_path should be relative, got {rel!r}"
  1007. # File physically landed under base_dir (NOT escaped to /app/source/).
  1008. assert (tmp_path / rel).is_file()
  1009. # Deterministic fallback location keyed off archive id.
  1010. assert rel == f"archive/no_source/{archive.id}/cloud_print.3mf"
  1011. @pytest.mark.asyncio
  1012. @pytest.mark.integration
  1013. async def test_normal_archive_source_upload_unchanged(
  1014. self, async_client: AsyncClient, archive_factory, printer_factory, monkeypatch, tmp_path
  1015. ):
  1016. """Normal archive (file_path set) still nests the source under <archive>/source/."""
  1017. from backend.app.core.config import settings as app_settings
  1018. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1019. printer = await printer_factory()
  1020. # archive_factory's default file_path is "archives/test/test_print.gcode.3mf".
  1021. archive = await archive_factory(printer.id, print_name="Real Print")
  1022. files = {"file": ("real_print.3mf", self._minimal_3mf_bytes(), "application/octet-stream")}
  1023. response = await async_client.post(f"/api/v1/archives/{archive.id}/source", files=files)
  1024. assert response.status_code == 200, response.text
  1025. rel = response.json()["source_3mf_path"]
  1026. assert rel == "archives/test/source/real_print.3mf"
  1027. assert (tmp_path / rel).is_file()
  1028. @pytest.mark.asyncio
  1029. @pytest.mark.integration
  1030. async def test_symlinked_data_dir_upload_succeeds(
  1031. self, async_client: AsyncClient, archive_factory, printer_factory, monkeypatch, tmp_path
  1032. ):
  1033. """Regression: DATA_DIR that's a symlink to the real storage must not break the upload.
  1034. Common on TrueNAS / Synology / QNAP storage pools, and any
  1035. ``-v /symlinked/host/path:/app/data`` mount. The helper resolves
  1036. only for the containment check and returns literal paths so the
  1037. caller's ``relative_to(settings.base_dir)`` doesn't trip over a
  1038. canonical-vs-symlink mismatch.
  1039. """
  1040. from backend.app.core.config import settings as app_settings
  1041. real_dir = tmp_path / "real_storage"
  1042. real_dir.mkdir()
  1043. symlink_dir = tmp_path / "data_via_symlink"
  1044. symlink_dir.symlink_to(real_dir)
  1045. monkeypatch.setattr(app_settings, "base_dir", symlink_dir)
  1046. printer = await printer_factory()
  1047. archive = await archive_factory(
  1048. printer.id,
  1049. print_name="Symlinked Print",
  1050. file_path="archives/X1C/print.gcode.3mf",
  1051. filename="print.gcode.3mf",
  1052. )
  1053. files = {"file": ("print.3mf", self._minimal_3mf_bytes(), "application/octet-stream")}
  1054. response = await async_client.post(f"/api/v1/archives/{archive.id}/source", files=files)
  1055. assert response.status_code == 200, response.text
  1056. rel = response.json()["source_3mf_path"]
  1057. assert rel == "archives/X1C/source/print.3mf"
  1058. # Reachable via both the symlink and the canonical path.
  1059. assert (symlink_dir / rel).is_file()
  1060. assert (real_dir / rel).is_file()
  1061. @pytest.mark.asyncio
  1062. @pytest.mark.integration
  1063. async def test_absolute_file_path_rejected_with_clear_500(
  1064. self, async_client: AsyncClient, archive_factory, printer_factory, monkeypatch, tmp_path
  1065. ):
  1066. """A row whose file_path is absolute (corrupted by old import / manual edit)
  1067. must fail with the explicit "outside the data directory" message, not silently
  1068. write outside base_dir."""
  1069. from backend.app.core.config import settings as app_settings
  1070. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1071. printer = await printer_factory()
  1072. archive = await archive_factory(
  1073. printer.id,
  1074. print_name="Corrupt Path",
  1075. file_path="/tmp/totally_outside.gcode.3mf",
  1076. filename="totally_outside.gcode.3mf",
  1077. )
  1078. files = {"file": ("totally_outside.3mf", self._minimal_3mf_bytes(), "application/octet-stream")}
  1079. response = await async_client.post(f"/api/v1/archives/{archive.id}/source", files=files)
  1080. assert response.status_code == 500
  1081. assert "outside the data directory" in response.json()["detail"]
  1082. # Did not write anything under the bogus /tmp/source/ either.
  1083. assert not (Path("/tmp") / "source").exists() or not (Path("/tmp") / "source" / "totally_outside.3mf").exists()