test_projects_api.py 51 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300
  1. """Integration tests for Projects API endpoints."""
  2. import pytest
  3. from httpx import AsyncClient
  4. class TestProjectsAPI:
  5. """Integration tests for /api/v1/projects endpoints."""
  6. @pytest.fixture
  7. async def project_factory(self, db_session):
  8. """Factory to create test projects."""
  9. _counter = [0]
  10. async def _create_project(**kwargs):
  11. from backend.app.models.project import Project
  12. _counter[0] += 1
  13. counter = _counter[0]
  14. defaults = {
  15. "name": f"Test Project {counter}",
  16. "description": "Test project description",
  17. "color": "#FF0000",
  18. }
  19. defaults.update(kwargs)
  20. project = Project(**defaults)
  21. db_session.add(project)
  22. await db_session.commit()
  23. await db_session.refresh(project)
  24. return project
  25. return _create_project
  26. @pytest.mark.asyncio
  27. @pytest.mark.integration
  28. async def test_list_projects_empty(self, async_client: AsyncClient):
  29. """Verify empty list when no projects exist."""
  30. response = await async_client.get("/api/v1/projects/")
  31. assert response.status_code == 200
  32. assert isinstance(response.json(), list)
  33. @pytest.mark.asyncio
  34. @pytest.mark.integration
  35. async def test_list_projects_with_data(self, async_client: AsyncClient, project_factory, db_session):
  36. """Verify list returns existing projects."""
  37. await project_factory(name="My Project")
  38. response = await async_client.get("/api/v1/projects/")
  39. assert response.status_code == 200
  40. data = response.json()
  41. assert any(p["name"] == "My Project" for p in data)
  42. @pytest.mark.asyncio
  43. @pytest.mark.integration
  44. async def test_create_project(self, async_client: AsyncClient):
  45. """Verify project can be created."""
  46. data = {
  47. "name": "New Project",
  48. "description": "A new project",
  49. "color": "#00FF00",
  50. }
  51. response = await async_client.post("/api/v1/projects/", json=data)
  52. assert response.status_code == 200
  53. result = response.json()
  54. assert result["name"] == "New Project"
  55. assert result["color"] == "#00FF00"
  56. @pytest.mark.asyncio
  57. @pytest.mark.integration
  58. async def test_get_project(self, async_client: AsyncClient, project_factory, db_session):
  59. """Verify single project can be retrieved."""
  60. project = await project_factory(name="Get Test Project")
  61. response = await async_client.get(f"/api/v1/projects/{project.id}")
  62. assert response.status_code == 200
  63. assert response.json()["name"] == "Get Test Project"
  64. @pytest.mark.asyncio
  65. @pytest.mark.integration
  66. async def test_get_project_not_found(self, async_client: AsyncClient):
  67. """Verify 404 for non-existent project."""
  68. response = await async_client.get("/api/v1/projects/9999")
  69. assert response.status_code == 404
  70. @pytest.mark.asyncio
  71. @pytest.mark.integration
  72. async def test_update_project(self, async_client: AsyncClient, project_factory, db_session):
  73. """Verify project can be updated."""
  74. project = await project_factory(name="Original")
  75. response = await async_client.patch(
  76. f"/api/v1/projects/{project.id}", json={"name": "Updated", "description": "Updated description"}
  77. )
  78. assert response.status_code == 200
  79. result = response.json()
  80. assert result["name"] == "Updated"
  81. assert result["description"] == "Updated description"
  82. @pytest.mark.asyncio
  83. @pytest.mark.integration
  84. async def test_delete_project(self, async_client: AsyncClient, project_factory, db_session):
  85. """Verify project can be deleted."""
  86. project = await project_factory()
  87. response = await async_client.delete(f"/api/v1/projects/{project.id}")
  88. assert response.status_code == 200
  89. data = response.json()
  90. assert data["message"] == "Project deleted"
  91. @pytest.mark.asyncio
  92. @pytest.mark.integration
  93. async def test_delete_project_not_found(self, async_client: AsyncClient):
  94. """Verify 404 for deleting non-existent project."""
  95. response = await async_client.delete("/api/v1/projects/9999")
  96. assert response.status_code == 404
  97. class TestProjectUrlAndCoverImage:
  98. """Tests for #1155 — url field + cover image upload/get/delete."""
  99. @pytest.fixture
  100. async def project_factory(self, db_session):
  101. async def _create(**kwargs):
  102. from backend.app.models.project import Project
  103. defaults = {"name": "URL/Cover Project", "color": "#00ff00"}
  104. defaults.update(kwargs)
  105. project = Project(**defaults)
  106. db_session.add(project)
  107. await db_session.commit()
  108. await db_session.refresh(project)
  109. return project
  110. return _create
  111. @pytest.mark.asyncio
  112. @pytest.mark.integration
  113. async def test_create_project_accepts_https_url(self, async_client: AsyncClient):
  114. response = await async_client.post(
  115. "/api/v1/projects/",
  116. json={"name": "With URL", "url": "https://makerworld.com/models/12345"},
  117. )
  118. assert response.status_code == 200
  119. body = response.json()
  120. assert body["url"] == "https://makerworld.com/models/12345"
  121. @pytest.mark.asyncio
  122. @pytest.mark.integration
  123. async def test_create_project_rejects_javascript_url(self, async_client: AsyncClient):
  124. # `<a href>` rendering would execute javascript: URLs — schema must reject.
  125. response = await async_client.post(
  126. "/api/v1/projects/",
  127. json={"name": "Hostile", "url": "javascript:alert(1)"},
  128. )
  129. assert response.status_code == 422
  130. @pytest.mark.asyncio
  131. @pytest.mark.integration
  132. async def test_create_project_rejects_data_url(self, async_client: AsyncClient):
  133. response = await async_client.post(
  134. "/api/v1/projects/",
  135. json={"name": "Hostile", "url": "data:text/html,<script>alert(1)</script>"},
  136. )
  137. assert response.status_code == 422
  138. @pytest.mark.asyncio
  139. @pytest.mark.integration
  140. async def test_patch_project_clears_url_when_explicitly_null(self, async_client: AsyncClient, project_factory):
  141. project = await project_factory(url="https://example.com")
  142. response = await async_client.patch(f"/api/v1/projects/{project.id}", json={"url": None})
  143. assert response.status_code == 200
  144. assert response.json()["url"] is None
  145. @pytest.mark.asyncio
  146. @pytest.mark.integration
  147. async def test_upload_cover_image_then_serve_then_delete(self, async_client: AsyncClient, project_factory):
  148. project = await project_factory()
  149. # 1x1 PNG (smallest valid PNG bytes)
  150. png_bytes = bytes.fromhex(
  151. "89504e470d0a1a0a0000000d49484452000000010000000108060000001f15c4"
  152. "890000000d49444154789c63f80f00000100010000000000000049454e44ae42"
  153. "6082"
  154. )
  155. upload = await async_client.post(
  156. f"/api/v1/projects/{project.id}/cover-image",
  157. files={"file": ("cover.png", png_bytes, "image/png")},
  158. )
  159. assert upload.status_code == 200, upload.text
  160. body = upload.json()
  161. assert body["status"] == "success"
  162. assert body["filename"].endswith(".png")
  163. cover_filename = body["filename"]
  164. # GET should serve the bytes back
  165. served = await async_client.get(f"/api/v1/projects/{project.id}/cover-image")
  166. assert served.status_code == 200
  167. assert served.headers["content-type"] == "image/png"
  168. assert served.content == png_bytes
  169. # Project response should reflect the cover_image_filename field
  170. view = await async_client.get(f"/api/v1/projects/{project.id}")
  171. assert view.json()["cover_image_filename"] == cover_filename
  172. # DELETE should clear the field
  173. deleted = await async_client.delete(f"/api/v1/projects/{project.id}/cover-image")
  174. assert deleted.status_code == 200
  175. view2 = await async_client.get(f"/api/v1/projects/{project.id}")
  176. assert view2.json()["cover_image_filename"] is None
  177. # And subsequent GET should 404
  178. served2 = await async_client.get(f"/api/v1/projects/{project.id}/cover-image")
  179. assert served2.status_code == 404
  180. @pytest.mark.asyncio
  181. @pytest.mark.integration
  182. async def test_upload_cover_image_rejects_non_image(self, async_client: AsyncClient, project_factory):
  183. project = await project_factory()
  184. response = await async_client.post(
  185. f"/api/v1/projects/{project.id}/cover-image",
  186. files={"file": ("evil.exe", b"MZ\x00\x00", "application/octet-stream")},
  187. )
  188. assert response.status_code == 400
  189. @pytest.mark.integration
  190. def test_cover_image_get_uses_stream_token_gate(self):
  191. """Regression guard: GET /projects/{id}/cover-image MUST be gated by
  192. ``RequireCameraStreamTokenIfAuthEnabled`` (accepts ``?token=…`` query
  193. string) rather than by the bearer-token gate, because browsers can't
  194. attach an ``Authorization`` header to ``<img src>`` requests. Swapping
  195. back to the bearer gate would silently 401 every cover image when auth
  196. is enabled."""
  197. from fastapi.routing import APIRoute
  198. from backend.app.api.routes.projects import router
  199. # Find the GET cover-image route. The router exposes path/methods/
  200. # dependencies via APIRoute objects.
  201. cover_get = None
  202. for route in router.routes:
  203. if isinstance(route, APIRoute) and route.path.endswith("/cover-image") and "GET" in route.methods:
  204. cover_get = route
  205. break
  206. assert cover_get is not None, "GET cover-image route missing"
  207. # The route's dependant tree includes a Depends(require_camera_stream_token_if_auth_enabled())
  208. # — its `call` is the inner check function returned by that factory.
  209. # Walk the dependant tree and assert one of the dependencies came from
  210. # the stream-token factory, NOT from require_permission_if_auth_enabled.
  211. from backend.app.core.auth import (
  212. require_camera_stream_token_if_auth_enabled,
  213. )
  214. # The factory returns a fresh closure each call; the most reliable
  215. # signature is the qualified name of the function in the closure chain.
  216. expected_qualname = require_camera_stream_token_if_auth_enabled().__qualname__
  217. gate_qualnames = [dep.call.__qualname__ for dep in cover_get.dependant.dependencies if dep.call]
  218. assert expected_qualname in gate_qualnames, (
  219. f"GET cover-image route is not gated by RequireCameraStreamTokenIfAuthEnabled. Found: {gate_qualnames}"
  220. )
  221. class TestProjectPartsTracking:
  222. """Tests for project parts tracking feature."""
  223. @pytest.fixture
  224. async def project_factory(self, db_session):
  225. """Factory to create test projects."""
  226. async def _create_project(**kwargs):
  227. from backend.app.models.project import Project
  228. defaults = {
  229. "name": "Parts Test Project",
  230. "description": "Test project",
  231. "color": "#FF0000",
  232. }
  233. defaults.update(kwargs)
  234. project = Project(**defaults)
  235. db_session.add(project)
  236. await db_session.commit()
  237. await db_session.refresh(project)
  238. return project
  239. return _create_project
  240. @pytest.fixture
  241. async def archive_factory(self, db_session):
  242. """Factory to create a test archive plus a matching PrintLogEntry.
  243. Project stats aggregate from ``print_log_entries`` (#1593), so a
  244. test that only writes archives wouldn't exercise the production
  245. path — production always writes one log entry per run. The
  246. factory mirrors that: every archive whose status is anything other
  247. than ``"archived"`` (file shelved without printing) gets a log
  248. entry whose status matches the archive.
  249. """
  250. async def _create_archive(**kwargs):
  251. from backend.app.models.archive import PrintArchive
  252. from backend.app.models.print_log import PrintLogEntry
  253. defaults = {
  254. "filename": "test.3mf",
  255. "file_path": "test/test.3mf",
  256. "file_size": 1000,
  257. "print_name": "Test Print",
  258. "status": "completed",
  259. "quantity": 1,
  260. }
  261. defaults.update(kwargs)
  262. archive = PrintArchive(**defaults)
  263. db_session.add(archive)
  264. await db_session.commit()
  265. await db_session.refresh(archive)
  266. if archive.status != "archived":
  267. db_session.add(
  268. PrintLogEntry(
  269. archive_id=archive.id,
  270. print_name=archive.print_name,
  271. status=archive.status,
  272. )
  273. )
  274. await db_session.commit()
  275. return archive
  276. return _create_archive
  277. @pytest.mark.asyncio
  278. @pytest.mark.integration
  279. async def test_create_project_with_target_parts_count(self, async_client: AsyncClient):
  280. """Verify project can be created with target_parts_count."""
  281. data = {
  282. "name": "Parts Project",
  283. "target_count": 10, # 10 plates
  284. "target_parts_count": 50, # 50 parts total
  285. }
  286. response = await async_client.post("/api/v1/projects/", json=data)
  287. assert response.status_code == 200
  288. result = response.json()
  289. assert result["target_count"] == 10
  290. assert result["target_parts_count"] == 50
  291. @pytest.mark.asyncio
  292. @pytest.mark.integration
  293. async def test_update_project_target_parts_count(self, async_client: AsyncClient, project_factory, db_session):
  294. """Verify target_parts_count can be updated."""
  295. project = await project_factory()
  296. response = await async_client.patch(
  297. f"/api/v1/projects/{project.id}",
  298. json={"target_parts_count": 100},
  299. )
  300. assert response.status_code == 200
  301. assert response.json()["target_parts_count"] == 100
  302. @pytest.mark.asyncio
  303. @pytest.mark.integration
  304. async def test_project_parts_progress_calculation(
  305. self, async_client: AsyncClient, project_factory, archive_factory, db_session
  306. ):
  307. """Verify parts progress is calculated from archive quantities."""
  308. # Create project with target of 20 parts
  309. project = await project_factory(target_parts_count=20)
  310. # Create archives with different quantities
  311. await archive_factory(project_id=project.id, quantity=3, status="completed") # 3 parts
  312. await archive_factory(project_id=project.id, quantity=5, status="completed") # 5 parts
  313. await archive_factory(project_id=project.id, quantity=2, status="completed") # 2 parts
  314. # Total: 10 parts completed out of 20 = 50%
  315. response = await async_client.get(f"/api/v1/projects/{project.id}")
  316. assert response.status_code == 200
  317. data = response.json()
  318. # Check stats
  319. assert data["stats"]["completed_prints"] == 10 # Sum of quantities
  320. assert data["stats"]["parts_progress_percent"] == 50.0 # 10/20 = 50%
  321. assert data["stats"]["remaining_parts"] == 10 # 20 - 10 = 10
  322. @pytest.mark.asyncio
  323. @pytest.mark.integration
  324. async def test_project_list_shows_parts_count(
  325. self, async_client: AsyncClient, project_factory, archive_factory, db_session
  326. ):
  327. """Verify project list returns correct completed_count (parts sum)."""
  328. project = await project_factory(name="List Parts Project", target_parts_count=100)
  329. # Create archives with quantities
  330. await archive_factory(project_id=project.id, quantity=4, status="completed")
  331. await archive_factory(project_id=project.id, quantity=6, status="completed")
  332. # Total: 10 parts, 2 plates
  333. response = await async_client.get("/api/v1/projects/")
  334. assert response.status_code == 200
  335. data = response.json()
  336. # Find our project
  337. our_project = next((p for p in data if p["name"] == "List Parts Project"), None)
  338. assert our_project is not None
  339. assert our_project["archive_count"] == 2 # 2 plates
  340. assert our_project["completed_count"] == 10 # 10 parts (sum of quantities)
  341. assert our_project["target_parts_count"] == 100
  342. @pytest.mark.asyncio
  343. @pytest.mark.integration
  344. async def test_plates_vs_parts_progress(
  345. self, async_client: AsyncClient, project_factory, archive_factory, db_session
  346. ):
  347. """Verify plates and parts progress are calculated separately."""
  348. # Project needs 5 plates producing 25 parts total (5 parts per plate)
  349. project = await project_factory(target_count=5, target_parts_count=25)
  350. # Complete 2 plates, each with 5 parts
  351. await archive_factory(project_id=project.id, quantity=5, status="completed")
  352. await archive_factory(project_id=project.id, quantity=5, status="completed")
  353. # Plates: 2/5 = 40%, Parts: 10/25 = 40%
  354. response = await async_client.get(f"/api/v1/projects/{project.id}")
  355. assert response.status_code == 200
  356. data = response.json()
  357. assert data["stats"]["total_archives"] == 2 # 2 plates
  358. assert data["stats"]["completed_prints"] == 10 # 10 parts
  359. assert data["stats"]["progress_percent"] == 40.0 # plates: 2/5
  360. assert data["stats"]["parts_progress_percent"] == 40.0 # parts: 10/25
  361. class TestProjectArchivedStatusNotCounted:
  362. """Tests for bug #630: archived files added to a project should not count as printed."""
  363. @pytest.fixture
  364. async def project_factory(self, db_session):
  365. """Factory to create test projects."""
  366. async def _create_project(**kwargs):
  367. from backend.app.models.project import Project
  368. defaults = {
  369. "name": "Archived Status Test",
  370. "description": "Test project",
  371. "color": "#FF0000",
  372. }
  373. defaults.update(kwargs)
  374. project = Project(**defaults)
  375. db_session.add(project)
  376. await db_session.commit()
  377. await db_session.refresh(project)
  378. return project
  379. return _create_project
  380. @pytest.fixture
  381. async def archive_factory(self, db_session):
  382. """Factory to create a test archive plus a matching PrintLogEntry —
  383. see TestProjectPartsTracking.archive_factory for rationale (#1593)."""
  384. async def _create_archive(**kwargs):
  385. from backend.app.models.archive import PrintArchive
  386. from backend.app.models.print_log import PrintLogEntry
  387. defaults = {
  388. "filename": "test.3mf",
  389. "file_path": "test/test.3mf",
  390. "file_size": 1000,
  391. "print_name": "Test Print",
  392. "status": "completed",
  393. "quantity": 1,
  394. }
  395. defaults.update(kwargs)
  396. archive = PrintArchive(**defaults)
  397. db_session.add(archive)
  398. await db_session.commit()
  399. await db_session.refresh(archive)
  400. if archive.status != "archived":
  401. db_session.add(
  402. PrintLogEntry(
  403. archive_id=archive.id,
  404. print_name=archive.print_name,
  405. status=archive.status,
  406. )
  407. )
  408. await db_session.commit()
  409. return archive
  410. return _create_archive
  411. @pytest.mark.asyncio
  412. @pytest.mark.integration
  413. async def test_archived_files_not_counted_as_completed(
  414. self, async_client: AsyncClient, project_factory, archive_factory, db_session
  415. ):
  416. """Archived files added to a project should not count in completed_prints stats."""
  417. project = await project_factory(target_parts_count=20)
  418. # 2 actually printed (completed), 3 just archived (not printed yet)
  419. await archive_factory(project_id=project.id, quantity=2, status="completed")
  420. await archive_factory(project_id=project.id, quantity=3, status="archived")
  421. await archive_factory(project_id=project.id, quantity=5, status="archived")
  422. response = await async_client.get(f"/api/v1/projects/{project.id}")
  423. assert response.status_code == 200
  424. data = response.json()
  425. # Only the completed archive should count
  426. assert data["stats"]["completed_prints"] == 2
  427. assert data["stats"]["parts_progress_percent"] == 10.0 # 2/20 = 10%
  428. assert data["stats"]["remaining_parts"] == 18
  429. @pytest.mark.asyncio
  430. @pytest.mark.integration
  431. async def test_archived_files_not_counted_in_project_list(
  432. self, async_client: AsyncClient, project_factory, archive_factory, db_session
  433. ):
  434. """Project list endpoint should not count archived files as completed."""
  435. project = await project_factory(name="List Archived Test", target_parts_count=50)
  436. await archive_factory(project_id=project.id, quantity=4, status="completed")
  437. await archive_factory(project_id=project.id, quantity=6, status="archived")
  438. response = await async_client.get("/api/v1/projects/")
  439. assert response.status_code == 200
  440. data = response.json()
  441. our_project = next((p for p in data if p["name"] == "List Archived Test"), None)
  442. assert our_project is not None
  443. assert our_project["completed_count"] == 4 # Only completed, not archived
  444. # Post-#1593: archive_count is "print runs", not "files attached". An
  445. # ``archived``-status file (shelved without printing) has no
  446. # PrintLogEntry and doesn't count — only the actual printed run does.
  447. assert our_project["archive_count"] == 1
  448. @pytest.mark.asyncio
  449. @pytest.mark.integration
  450. async def test_only_completed_status_counts(
  451. self, async_client: AsyncClient, project_factory, archive_factory, db_session
  452. ):
  453. """Only 'completed' status should count in stats, not archived/failed/etc."""
  454. project = await project_factory(target_parts_count=100)
  455. await archive_factory(project_id=project.id, quantity=10, status="completed")
  456. await archive_factory(project_id=project.id, quantity=5, status="archived")
  457. await archive_factory(project_id=project.id, quantity=3, status="failed")
  458. await archive_factory(project_id=project.id, quantity=2, status="aborted")
  459. response = await async_client.get(f"/api/v1/projects/{project.id}")
  460. assert response.status_code == 200
  461. data = response.json()
  462. assert data["stats"]["completed_prints"] == 10 # Only "completed"
  463. assert data["stats"]["failed_prints"] == 2 # failed + aborted (count of runs)
  464. # Post-#1593: total_archives counts runs from print_log_entries, not
  465. # files. The ``archived`` row is a shelved file with no run, so it
  466. # contributes 0; the other three (completed, failed, aborted) each
  467. # produced a run.
  468. assert data["stats"]["total_archives"] == 3
  469. # total_items sums quantity per run: 10 (completed) + 3 (failed) + 2 (aborted) = 15
  470. assert data["stats"]["total_items"] == 15
  471. class TestProjectStatsPerRun:
  472. """Project stats aggregate per-run from ``print_log_entries`` so
  473. reprints and multi-plate prints count every run (#1593). Pre-fix the
  474. stats counted ``print_archives`` (one row per file), so 3 reprints of
  475. one file showed as 1 job with plate-1-only filament/time/cost.
  476. """
  477. @pytest.fixture
  478. async def project_factory(self, db_session):
  479. async def _create_project(**kwargs):
  480. from backend.app.models.project import Project
  481. defaults = {"name": "Per-Run Stats Project", "color": "#FF0000"}
  482. defaults.update(kwargs)
  483. project = Project(**defaults)
  484. db_session.add(project)
  485. await db_session.commit()
  486. await db_session.refresh(project)
  487. return project
  488. return _create_project
  489. @pytest.fixture
  490. async def archive_with_runs(self, db_session):
  491. """Build a single archive + N PrintLogEntry rows.
  492. Models the reporter's case: one source file (archive) is reprinted
  493. N times, each run with its own duration / filament / cost.
  494. """
  495. async def _create(*, project_id: int, runs: list[dict], archive_status: str = "completed", quantity: int = 1):
  496. from backend.app.models.archive import PrintArchive
  497. from backend.app.models.print_log import PrintLogEntry
  498. archive = PrintArchive(
  499. filename="reprinted.3mf",
  500. file_path="test/reprinted.3mf",
  501. file_size=1000,
  502. print_name="Reprinted Print",
  503. status=archive_status,
  504. quantity=quantity,
  505. project_id=project_id,
  506. )
  507. db_session.add(archive)
  508. await db_session.commit()
  509. await db_session.refresh(archive)
  510. for run in runs:
  511. db_session.add(
  512. PrintLogEntry(
  513. archive_id=archive.id,
  514. print_name=archive.print_name,
  515. status=run.get("status", "completed"),
  516. duration_seconds=run.get("duration_seconds"),
  517. filament_used_grams=run.get("filament_used_grams"),
  518. cost=run.get("cost"),
  519. energy_kwh=run.get("energy_kwh"),
  520. energy_cost=run.get("energy_cost"),
  521. )
  522. )
  523. await db_session.commit()
  524. return archive
  525. return _create
  526. @pytest.mark.asyncio
  527. @pytest.mark.integration
  528. async def test_three_reprints_count_as_three_jobs_with_summed_totals(
  529. self, async_client: AsyncClient, project_factory, archive_with_runs
  530. ):
  531. """Reporter's case: 3 runs of one multi-plate file should report
  532. 3 jobs and summed time / filament / cost — pre-fix it reported 1
  533. job with plate-1-only totals."""
  534. project = await project_factory()
  535. await archive_with_runs(
  536. project_id=project.id,
  537. runs=[
  538. {"duration_seconds": 7140, "filament_used_grams": 19.2, "cost": 0.40},
  539. {"duration_seconds": 6000, "filament_used_grams": 20.0, "cost": 0.40},
  540. {"duration_seconds": 6300, "filament_used_grams": 18.8, "cost": 0.40},
  541. ],
  542. )
  543. response = await async_client.get(f"/api/v1/projects/{project.id}")
  544. assert response.status_code == 200
  545. stats = response.json()["stats"]
  546. assert stats["total_archives"] == 3, "3 runs must show as 3 jobs"
  547. assert stats["completed_prints"] == 3, "Each run with quantity=1 contributes 1 part"
  548. assert stats["total_filament_grams"] == round(19.2 + 20.0 + 18.8, 2)
  549. assert stats["total_print_time_hours"] == round((7140 + 6000 + 6300) / 3600, 2)
  550. # Cost rounds at 2 decimals — 3 * 0.40 = 1.20
  551. assert stats["estimated_cost"] == 1.20
  552. @pytest.mark.asyncio
  553. @pytest.mark.integration
  554. async def test_orphan_log_entries_do_not_bleed_into_projects(
  555. self, async_client: AsyncClient, project_factory, db_session
  556. ):
  557. """Log rows whose ``archive_id`` is NULL (archive deleted via
  558. ON DELETE SET NULL) must not leak into any project — the inner
  559. join filters them out by construction."""
  560. from backend.app.models.print_log import PrintLogEntry
  561. project = await project_factory()
  562. # Orphan log entries — no archive_id.
  563. for _ in range(5):
  564. db_session.add(
  565. PrintLogEntry(
  566. archive_id=None,
  567. print_name="Orphan Run",
  568. status="completed",
  569. duration_seconds=3600,
  570. filament_used_grams=20.0,
  571. cost=0.5,
  572. )
  573. )
  574. await db_session.commit()
  575. response = await async_client.get(f"/api/v1/projects/{project.id}")
  576. assert response.status_code == 200
  577. stats = response.json()["stats"]
  578. # None of the orphan rows are attributable to this project.
  579. assert stats["total_archives"] == 0
  580. assert stats["completed_prints"] == 0
  581. assert stats["total_filament_grams"] == 0
  582. assert stats["total_print_time_hours"] == 0
  583. assert stats["estimated_cost"] == 0
  584. @pytest.mark.asyncio
  585. @pytest.mark.integration
  586. async def test_mixed_run_outcomes_split_completed_and_failed(
  587. self, async_client: AsyncClient, project_factory, archive_with_runs
  588. ):
  589. """A multi-run archive with mixed outcomes splits cleanly between
  590. completed_prints (per-quantity) and failed_prints (per-run)."""
  591. project = await project_factory()
  592. await archive_with_runs(
  593. project_id=project.id,
  594. quantity=2,
  595. runs=[
  596. {"status": "completed", "filament_used_grams": 30.0},
  597. {"status": "completed", "filament_used_grams": 30.0},
  598. {"status": "failed", "filament_used_grams": 5.0},
  599. {"status": "aborted", "filament_used_grams": 2.0},
  600. ],
  601. )
  602. response = await async_client.get(f"/api/v1/projects/{project.id}")
  603. stats = response.json()["stats"]
  604. assert stats["total_archives"] == 4
  605. # 2 completed runs × quantity=2 each = 4 parts
  606. assert stats["completed_prints"] == 4
  607. # 2 failure runs (failed + aborted) count as 2, not 2*quantity
  608. assert stats["failed_prints"] == 2
  609. # All 4 runs contribute filament: 30 + 30 + 5 + 2 = 67
  610. assert stats["total_filament_grams"] == 67.0
  611. @pytest.mark.asyncio
  612. @pytest.mark.integration
  613. async def test_quick_stats_in_list_view_agree_with_per_project_stats(
  614. self, async_client: AsyncClient, project_factory, archive_with_runs
  615. ):
  616. """The /projects list view's quick stats must agree with
  617. /projects/{id}'s detailed stats — both come from the same per-run
  618. aggregation."""
  619. project = await project_factory(name="Quick-Stats Alignment")
  620. await archive_with_runs(
  621. project_id=project.id,
  622. quantity=1,
  623. runs=[
  624. {"status": "completed"},
  625. {"status": "completed"},
  626. {"status": "failed"},
  627. ],
  628. )
  629. list_resp = await async_client.get("/api/v1/projects/")
  630. ours = next(p for p in list_resp.json() if p["name"] == "Quick-Stats Alignment")
  631. assert ours["archive_count"] == 3
  632. assert ours["completed_count"] == 2
  633. assert ours["failed_count"] == 1
  634. class TestProjectArchivesAPI:
  635. """Tests for project-archive relationships."""
  636. @pytest.fixture
  637. async def project_factory(self, db_session):
  638. """Factory to create test projects."""
  639. async def _create_project(**kwargs):
  640. from backend.app.models.project import Project
  641. defaults = {
  642. "name": "Archive Test Project",
  643. "description": "Test project",
  644. "color": "#0000FF",
  645. }
  646. defaults.update(kwargs)
  647. project = Project(**defaults)
  648. db_session.add(project)
  649. await db_session.commit()
  650. await db_session.refresh(project)
  651. return project
  652. return _create_project
  653. @pytest.mark.asyncio
  654. @pytest.mark.integration
  655. async def test_get_project_with_archives(self, async_client: AsyncClient, project_factory, db_session):
  656. """Verify project can be retrieved with archive count."""
  657. project = await project_factory()
  658. response = await async_client.get(f"/api/v1/projects/{project.id}")
  659. assert response.status_code == 200
  660. # Project should have an archive count (may be 0)
  661. data = response.json()
  662. assert "name" in data
  663. @pytest.mark.asyncio
  664. @pytest.mark.integration
  665. async def test_list_archives_in_project_returns_archives_with_creator(
  666. self, async_client: AsyncClient, project_factory, db_session
  667. ):
  668. """``GET /projects/{id}/archives`` must eagerly load both the project AND
  669. the creator User. Without selectinload(created_by) the response
  670. converter triggers a lazy attribute load on a closed async session
  671. and the request 500s with MissingGreenlet — exactly what was reported
  672. the moment a user with auth enabled (so archives carry created_by_id)
  673. opened a project view.
  674. """
  675. from backend.app.models.archive import PrintArchive
  676. from backend.app.models.user import User
  677. # Seed: a user (the eventual creator) and a project owning two archives,
  678. # one with created_by_id set, one without.
  679. creator = User(
  680. username="archive-creator",
  681. password_hash="x",
  682. role="user",
  683. is_active=True,
  684. )
  685. db_session.add(creator)
  686. await db_session.commit()
  687. await db_session.refresh(creator)
  688. project = await project_factory(name="Project Archives Smoke")
  689. attributed = PrintArchive(
  690. filename="attributed.3mf",
  691. file_path="x/attributed.3mf",
  692. file_size=2048,
  693. print_name="Attributed Print",
  694. status="completed",
  695. quantity=1,
  696. project_id=project.id,
  697. created_by_id=creator.id,
  698. )
  699. anonymous = PrintArchive(
  700. filename="anon.3mf",
  701. file_path="x/anon.3mf",
  702. file_size=2048,
  703. print_name="Anonymous Print",
  704. status="completed",
  705. quantity=1,
  706. project_id=project.id,
  707. created_by_id=None,
  708. )
  709. db_session.add_all([attributed, anonymous])
  710. await db_session.commit()
  711. response = await async_client.get(f"/api/v1/projects/{project.id}/archives?limit=100&offset=0")
  712. assert response.status_code == 200, f"Expected 200, got {response.status_code} body={response.text}"
  713. rows = response.json()
  714. assert len(rows) == 2
  715. # Both archive shapes serialise — the attributed one surfaces the
  716. # creator username (proving the eager-load worked) and the anonymous
  717. # one stays None without exploding.
  718. by_filename = {r["filename"]: r for r in rows}
  719. assert by_filename["attributed.3mf"]["created_by_username"] == "archive-creator"
  720. assert by_filename["attributed.3mf"]["created_by_id"] == creator.id
  721. assert by_filename["anon.3mf"]["created_by_username"] is None
  722. assert by_filename["anon.3mf"]["created_by_id"] is None
  723. class TestProjectExportImport:
  724. """Tests for project export/import functionality."""
  725. @pytest.fixture
  726. async def project_factory(self, db_session):
  727. """Factory to create test projects."""
  728. _counter = [0]
  729. async def _create_project(**kwargs):
  730. from backend.app.models.project import Project
  731. _counter[0] += 1
  732. counter = _counter[0]
  733. defaults = {
  734. "name": f"Export Test Project {counter}",
  735. "description": "Test project for export",
  736. "color": "#00FF00",
  737. }
  738. defaults.update(kwargs)
  739. project = Project(**defaults)
  740. db_session.add(project)
  741. await db_session.commit()
  742. await db_session.refresh(project)
  743. return project
  744. return _create_project
  745. @pytest.fixture
  746. async def bom_item_factory(self, db_session):
  747. """Factory to create test BOM items."""
  748. async def _create_bom_item(project_id: int, **kwargs):
  749. from backend.app.models.project_bom import ProjectBOMItem
  750. defaults = {
  751. "project_id": project_id,
  752. "name": "Test Part",
  753. "quantity_needed": 1,
  754. "quantity_acquired": 0,
  755. "sort_order": 0,
  756. }
  757. defaults.update(kwargs)
  758. item = ProjectBOMItem(**defaults)
  759. db_session.add(item)
  760. await db_session.commit()
  761. await db_session.refresh(item)
  762. return item
  763. return _create_bom_item
  764. @pytest.mark.asyncio
  765. @pytest.mark.integration
  766. async def test_export_project(self, async_client: AsyncClient, project_factory, bom_item_factory, db_session):
  767. """Verify project export includes BOM items."""
  768. project = await project_factory(
  769. name="Export Me",
  770. description="A test project",
  771. target_count=10,
  772. target_parts_count=50,
  773. budget=100.0,
  774. )
  775. # Add BOM items
  776. await bom_item_factory(project.id, name="M3x8 Screws", quantity_needed=20, unit_price=0.10)
  777. await bom_item_factory(project.id, name="Heat Inserts", quantity_needed=10, unit_price=0.25)
  778. # Test JSON format export
  779. response = await async_client.get(f"/api/v1/projects/{project.id}/export?format=json")
  780. assert response.status_code == 200
  781. data = response.json()
  782. assert data["name"] == "Export Me"
  783. assert data["description"] == "A test project"
  784. assert data["target_count"] == 10
  785. assert data["target_parts_count"] == 50
  786. assert data["budget"] == 100.0
  787. assert len(data["bom_items"]) == 2
  788. # Check BOM items
  789. bom_names = [item["name"] for item in data["bom_items"]]
  790. assert "M3x8 Screws" in bom_names
  791. assert "Heat Inserts" in bom_names
  792. # Test ZIP format export (default)
  793. zip_response = await async_client.get(f"/api/v1/projects/{project.id}/export")
  794. assert zip_response.status_code == 200
  795. assert zip_response.headers["content-type"] == "application/zip"
  796. @pytest.mark.asyncio
  797. @pytest.mark.integration
  798. async def test_import_project(self, async_client: AsyncClient):
  799. """Verify project can be imported with BOM items."""
  800. import_data = {
  801. "name": "Imported Project",
  802. "description": "Imported from JSON",
  803. "color": "#FF00FF",
  804. "target_count": 5,
  805. "target_parts_count": 25,
  806. "budget": 50.0,
  807. "bom_items": [
  808. {
  809. "name": "PTFE Tubes",
  810. "quantity_needed": 4,
  811. "quantity_acquired": 0,
  812. "unit_price": 2.50,
  813. "sourcing_url": "https://example.com",
  814. "stl_filename": None,
  815. "remarks": "Need 4mm ID",
  816. },
  817. ],
  818. }
  819. response = await async_client.post("/api/v1/projects/import", json=import_data)
  820. assert response.status_code == 200
  821. data = response.json()
  822. assert data["name"] == "Imported Project"
  823. assert data["description"] == "Imported from JSON"
  824. assert data["target_count"] == 5
  825. assert data["target_parts_count"] == 25
  826. assert data["budget"] == 50.0
  827. assert data["id"] > 0 # Has a valid ID
  828. # BOM stats should show 1 item imported
  829. assert data["stats"]["bom_total_items"] == 1
  830. @pytest.mark.asyncio
  831. @pytest.mark.integration
  832. async def test_export_project_with_linked_folder(self, async_client: AsyncClient, project_factory, db_session):
  833. """Verify project export includes linked folders."""
  834. from backend.app.models.library import LibraryFolder
  835. project = await project_factory(name="Project With Folder")
  836. # Create a linked folder
  837. folder = LibraryFolder(name="Project Files", project_id=project.id)
  838. db_session.add(folder)
  839. await db_session.commit()
  840. response = await async_client.get(f"/api/v1/projects/{project.id}/export?format=json")
  841. assert response.status_code == 200
  842. data = response.json()
  843. assert data["name"] == "Project With Folder"
  844. assert len(data["linked_folders"]) == 1
  845. assert data["linked_folders"][0]["name"] == "Project Files"
  846. @pytest.mark.asyncio
  847. @pytest.mark.integration
  848. async def test_import_project_with_linked_folder(self, async_client: AsyncClient):
  849. """Verify project import accepts linked folders data."""
  850. import_data = {
  851. "name": "Imported With Folders",
  852. "linked_folders": [
  853. {"name": "STL Files"},
  854. {"name": "Documentation"},
  855. ],
  856. }
  857. # Import should succeed with linked_folders
  858. response = await async_client.post("/api/v1/projects/import", json=import_data)
  859. assert response.status_code == 200
  860. data = response.json()
  861. assert data["name"] == "Imported With Folders"
  862. assert data["id"] > 0
  863. @pytest.mark.asyncio
  864. @pytest.mark.integration
  865. async def test_import_project_from_json_file(self, async_client: AsyncClient):
  866. """Verify project can be imported from JSON file upload."""
  867. import io
  868. import json
  869. project_data = {
  870. "name": "File Uploaded Project",
  871. "description": "Imported from JSON file",
  872. "color": "#123456",
  873. }
  874. # Create a file-like object
  875. file_content = json.dumps(project_data).encode()
  876. files = {"file": ("project.json", io.BytesIO(file_content), "application/json")}
  877. response = await async_client.post("/api/v1/projects/import/file", files=files)
  878. assert response.status_code == 200
  879. data = response.json()
  880. assert data["name"] == "File Uploaded Project"
  881. assert data["description"] == "Imported from JSON file"
  882. @pytest.mark.asyncio
  883. @pytest.mark.integration
  884. async def test_import_project_from_zip_file(self, async_client: AsyncClient):
  885. """Verify project can be imported from ZIP file with files."""
  886. import io
  887. import json
  888. import zipfile
  889. project_data = {
  890. "name": "ZIP Imported Project",
  891. "description": "Imported from ZIP",
  892. "linked_folders": [{"name": "TestFolder", "files": [{"filename": "test.txt"}]}],
  893. }
  894. # Create a ZIP file in memory
  895. zip_buffer = io.BytesIO()
  896. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  897. zf.writestr("project.json", json.dumps(project_data))
  898. zf.writestr("files/TestFolder/test.txt", "Hello World")
  899. zip_buffer.seek(0)
  900. files = {"file": ("project.zip", zip_buffer, "application/zip")}
  901. response = await async_client.post("/api/v1/projects/import/file", files=files)
  902. assert response.status_code == 200
  903. data = response.json()
  904. assert data["name"] == "ZIP Imported Project"
  905. assert data["description"] == "Imported from ZIP"
  906. @pytest.mark.asyncio
  907. @pytest.mark.integration
  908. async def test_export_zip_contains_files(self, async_client: AsyncClient, project_factory, db_session):
  909. """Verify ZIP export contains actual files from linked folders."""
  910. import io
  911. import json
  912. import zipfile
  913. from pathlib import Path
  914. from backend.app.api.routes.library import get_library_dir
  915. from backend.app.models.library import LibraryFile, LibraryFolder
  916. project = await project_factory(name="Project With Files")
  917. # Create a linked folder with is_external fields
  918. folder = LibraryFolder(
  919. name="TestExportFolder",
  920. project_id=project.id,
  921. is_external=False,
  922. external_readonly=False,
  923. external_show_hidden=False,
  924. )
  925. db_session.add(folder)
  926. await db_session.flush()
  927. # Create a test file on disk
  928. library_dir = get_library_dir()
  929. folder_path = library_dir / "TestExportFolder"
  930. folder_path.mkdir(parents=True, exist_ok=True)
  931. test_file_path = folder_path / "test_export.txt"
  932. test_file_path.write_text("Export test content")
  933. # Create library file record
  934. lib_file = LibraryFile(
  935. folder_id=folder.id,
  936. filename="test_export.txt",
  937. file_path="TestExportFolder/test_export.txt",
  938. file_type="other",
  939. file_size=19,
  940. is_external=False,
  941. )
  942. db_session.add(lib_file)
  943. await db_session.commit()
  944. # Export as ZIP
  945. response = await async_client.get(f"/api/v1/projects/{project.id}/export")
  946. assert response.status_code == 200
  947. assert response.headers["content-type"] == "application/zip"
  948. # Verify ZIP contents
  949. zip_buffer = io.BytesIO(response.content)
  950. with zipfile.ZipFile(zip_buffer, "r") as zf:
  951. assert "project.json" in zf.namelist()
  952. assert "files/TestExportFolder/test_export.txt" in zf.namelist()
  953. # Verify file content
  954. file_content = zf.read("files/TestExportFolder/test_export.txt").decode()
  955. assert file_content == "Export test content"
  956. # Verify project.json
  957. project_data = json.loads(zf.read("project.json"))
  958. assert project_data["name"] == "Project With Files"
  959. # Cleanup
  960. test_file_path.unlink(missing_ok=True)
  961. folder_path.rmdir()
  962. @pytest.mark.asyncio
  963. @pytest.mark.integration
  964. async def test_import_invalid_file_type(self, async_client: AsyncClient):
  965. """Verify import rejects invalid file types."""
  966. import io
  967. files = {"file": ("project.txt", io.BytesIO(b"invalid"), "text/plain")}
  968. response = await async_client.post("/api/v1/projects/import/file", files=files)
  969. assert response.status_code == 400
  970. assert "must be .zip or .json" in response.json()["detail"]
  971. @pytest.mark.asyncio
  972. @pytest.mark.integration
  973. async def test_import_zip_missing_project_json(self, async_client: AsyncClient):
  974. """Verify import rejects ZIP without project.json."""
  975. import io
  976. import zipfile
  977. zip_buffer = io.BytesIO()
  978. with zipfile.ZipFile(zip_buffer, "w") as zf:
  979. zf.writestr("other.txt", "no project.json here")
  980. zip_buffer.seek(0)
  981. files = {"file": ("project.zip", zip_buffer, "application/zip")}
  982. response = await async_client.post("/api/v1/projects/import/file", files=files)
  983. assert response.status_code == 400
  984. assert "project.json" in response.json()["detail"]
  985. @pytest.mark.asyncio
  986. @pytest.mark.integration
  987. async def test_import_invalid_json(self, async_client: AsyncClient):
  988. """Verify import rejects invalid JSON content."""
  989. import io
  990. files = {"file": ("project.json", io.BytesIO(b"not valid json"), "application/json")}
  991. response = await async_client.post("/api/v1/projects/import/file", files=files)
  992. assert response.status_code == 400
  993. assert "Invalid JSON" in response.json()["detail"]
  994. @pytest.mark.asyncio
  995. @pytest.mark.integration
  996. async def test_import_rejects_absolute_path_in_folder_name(self, async_client: AsyncClient, tmp_path):
  997. """Absolute paths in `linked_folders[*].name` must not escape library_dir.
  998. Verbatim shape from the upstream advisory: attacker sets folder name to
  999. an absolute path, expecting Python's ``Path("/lib") / "/anywhere"`` to
  1000. collapse to ``Path("/anywhere")`` and let the next file write land
  1001. outside the library directory.
  1002. """
  1003. import io
  1004. import json
  1005. import zipfile
  1006. target_outside = tmp_path / "outside" / "owned"
  1007. # Build a ZIP whose folder name points outside library_dir entirely.
  1008. zip_buffer = io.BytesIO()
  1009. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  1010. zf.writestr(
  1011. "project.json",
  1012. json.dumps(
  1013. {
  1014. "name": "innocent",
  1015. "linked_folders": [{"name": str(target_outside)}],
  1016. }
  1017. ),
  1018. )
  1019. zf.writestr(f"files/{target_outside}/evil.pth", b"import os; os.system('echo pwned > /tmp/owned')\n")
  1020. zip_buffer.seek(0)
  1021. files = {"file": ("evil.zip", zip_buffer, "application/zip")}
  1022. response = await async_client.post("/api/v1/projects/import/file", files=files)
  1023. assert response.status_code == 400, response.text
  1024. assert not target_outside.exists(), "Attacker payload landed outside library_dir"
  1025. @pytest.mark.asyncio
  1026. @pytest.mark.integration
  1027. async def test_import_rejects_dotdot_in_folder_name(self, async_client: AsyncClient):
  1028. """`..` segments in folder name must be rejected."""
  1029. import io
  1030. import json
  1031. import zipfile
  1032. zip_buffer = io.BytesIO()
  1033. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  1034. zf.writestr(
  1035. "project.json",
  1036. json.dumps(
  1037. {
  1038. "name": "innocent",
  1039. "linked_folders": [{"name": "../../../etc"}],
  1040. }
  1041. ),
  1042. )
  1043. zf.writestr("files/../../../etc/x.txt", b"x")
  1044. zip_buffer.seek(0)
  1045. files = {"file": ("evil.zip", zip_buffer, "application/zip")}
  1046. response = await async_client.post("/api/v1/projects/import/file", files=files)
  1047. assert response.status_code == 400, response.text
  1048. @pytest.mark.asyncio
  1049. @pytest.mark.integration
  1050. async def test_import_rejects_dotdot_in_relative_path(self, async_client: AsyncClient):
  1051. """`..` segments in the per-entry path (Vector B in the advisory) must
  1052. be rejected even when the folder name itself is fine."""
  1053. import io
  1054. import json
  1055. import zipfile
  1056. zip_buffer = io.BytesIO()
  1057. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  1058. zf.writestr(
  1059. "project.json",
  1060. json.dumps(
  1061. {
  1062. "name": "innocent",
  1063. "linked_folders": [{"name": "ok"}],
  1064. }
  1065. ),
  1066. )
  1067. # Folder name is benign, but the file path inside attempts to
  1068. # escape via ``..``.
  1069. zf.writestr("files/ok/../../../etc/x.txt", b"x")
  1070. zip_buffer.seek(0)
  1071. files = {"file": ("evil.zip", zip_buffer, "application/zip")}
  1072. response = await async_client.post("/api/v1/projects/import/file", files=files)
  1073. assert response.status_code == 400, response.text
  1074. @pytest.mark.asyncio
  1075. @pytest.mark.integration
  1076. async def test_import_legit_nested_zip_still_works(self, async_client: AsyncClient):
  1077. """A legitimate ZIP with a nested file path inside the folder must
  1078. continue to import cleanly. Guards against the fix being over-strict."""
  1079. import io
  1080. import json
  1081. import zipfile
  1082. zip_buffer = io.BytesIO()
  1083. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  1084. zf.writestr(
  1085. "project.json",
  1086. json.dumps(
  1087. {
  1088. "name": "nested-ok",
  1089. "linked_folders": [{"name": "OkFolder"}],
  1090. }
  1091. ),
  1092. )
  1093. zf.writestr("files/OkFolder/sub/dir/inside.txt", b"hello")
  1094. zip_buffer.seek(0)
  1095. files = {"file": ("nested.zip", zip_buffer, "application/zip")}
  1096. response = await async_client.post("/api/v1/projects/import/file", files=files)
  1097. assert response.status_code == 200, response.text
  1098. data = response.json()
  1099. assert data["name"] == "nested-ok"