test_library_api.py 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052
  1. """Integration tests for Library API endpoints."""
  2. import io
  3. import tempfile
  4. import zipfile
  5. from pathlib import Path
  6. import pytest
  7. from httpx import AsyncClient
  8. class TestLibraryFoldersAPI:
  9. """Integration tests for library folders endpoints."""
  10. @pytest.fixture
  11. async def folder_factory(self, db_session):
  12. """Factory to create test folders."""
  13. _counter = [0]
  14. async def _create_folder(**kwargs):
  15. from backend.app.models.library import LibraryFolder
  16. _counter[0] += 1
  17. counter = _counter[0]
  18. defaults = {
  19. "name": f"Test Folder {counter}",
  20. }
  21. defaults.update(kwargs)
  22. folder = LibraryFolder(**defaults)
  23. db_session.add(folder)
  24. await db_session.commit()
  25. await db_session.refresh(folder)
  26. return folder
  27. return _create_folder
  28. @pytest.mark.asyncio
  29. @pytest.mark.integration
  30. async def test_list_folders_empty(self, async_client: AsyncClient, db_session):
  31. """Verify empty folder list returns empty array."""
  32. response = await async_client.get("/api/v1/library/folders")
  33. assert response.status_code == 200
  34. assert response.json() == []
  35. @pytest.mark.asyncio
  36. @pytest.mark.integration
  37. async def test_create_folder(self, async_client: AsyncClient, db_session):
  38. """Verify folder can be created."""
  39. data = {"name": "New Folder"}
  40. response = await async_client.post("/api/v1/library/folders", json=data)
  41. assert response.status_code == 200
  42. result = response.json()
  43. assert result["name"] == "New Folder"
  44. assert result["id"] is not None
  45. @pytest.mark.asyncio
  46. @pytest.mark.integration
  47. async def test_create_nested_folder(self, async_client: AsyncClient, folder_factory, db_session):
  48. """Verify nested folder can be created."""
  49. parent = await folder_factory(name="Parent")
  50. data = {"name": "Child", "parent_id": parent.id}
  51. response = await async_client.post("/api/v1/library/folders", json=data)
  52. assert response.status_code == 200
  53. result = response.json()
  54. assert result["name"] == "Child"
  55. assert result["parent_id"] == parent.id
  56. @pytest.mark.asyncio
  57. @pytest.mark.integration
  58. async def test_get_folder(self, async_client: AsyncClient, folder_factory, db_session):
  59. """Verify single folder can be retrieved."""
  60. folder = await folder_factory(name="Test Folder")
  61. response = await async_client.get(f"/api/v1/library/folders/{folder.id}")
  62. assert response.status_code == 200
  63. result = response.json()
  64. assert result["id"] == folder.id
  65. assert result["name"] == "Test Folder"
  66. @pytest.mark.asyncio
  67. @pytest.mark.integration
  68. async def test_get_folder_not_found(self, async_client: AsyncClient, db_session):
  69. """Verify 404 for non-existent folder."""
  70. response = await async_client.get("/api/v1/library/folders/9999")
  71. assert response.status_code == 404
  72. @pytest.mark.asyncio
  73. @pytest.mark.integration
  74. async def test_update_folder(self, async_client: AsyncClient, folder_factory, db_session):
  75. """Verify folder can be updated."""
  76. folder = await folder_factory(name="Old Name")
  77. data = {"name": "New Name"}
  78. response = await async_client.put(f"/api/v1/library/folders/{folder.id}", json=data)
  79. assert response.status_code == 200
  80. result = response.json()
  81. assert result["name"] == "New Name"
  82. @pytest.mark.asyncio
  83. @pytest.mark.integration
  84. async def test_delete_folder(self, async_client: AsyncClient, folder_factory, db_session):
  85. """Verify folder can be deleted."""
  86. folder = await folder_factory()
  87. response = await async_client.delete(f"/api/v1/library/folders/{folder.id}")
  88. assert response.status_code == 200
  89. result = response.json()
  90. assert result.get("message") or result.get("success", True)
  91. class TestLibraryFilesAPI:
  92. """Integration tests for library files endpoints."""
  93. @pytest.fixture
  94. async def folder_factory(self, db_session):
  95. """Factory to create test folders."""
  96. _counter = [0]
  97. async def _create_folder(**kwargs):
  98. from backend.app.models.library import LibraryFolder
  99. _counter[0] += 1
  100. counter = _counter[0]
  101. defaults = {"name": f"Test Folder {counter}"}
  102. defaults.update(kwargs)
  103. folder = LibraryFolder(**defaults)
  104. db_session.add(folder)
  105. await db_session.commit()
  106. await db_session.refresh(folder)
  107. return folder
  108. return _create_folder
  109. @pytest.fixture
  110. async def file_factory(self, db_session):
  111. """Factory to create test files."""
  112. _counter = [0]
  113. async def _create_file(**kwargs):
  114. from backend.app.models.library import LibraryFile
  115. _counter[0] += 1
  116. counter = _counter[0]
  117. defaults = {
  118. "filename": f"test_file_{counter}.3mf",
  119. "file_path": f"/test/path/test_file_{counter}.3mf",
  120. "file_size": 1024,
  121. "file_type": "3mf",
  122. }
  123. defaults.update(kwargs)
  124. lib_file = LibraryFile(**defaults)
  125. db_session.add(lib_file)
  126. await db_session.commit()
  127. await db_session.refresh(lib_file)
  128. return lib_file
  129. return _create_file
  130. @pytest.mark.asyncio
  131. @pytest.mark.integration
  132. async def test_list_files_empty(self, async_client: AsyncClient, db_session):
  133. """Verify empty file list returns empty array."""
  134. response = await async_client.get("/api/v1/library/files")
  135. assert response.status_code == 200
  136. assert response.json() == []
  137. @pytest.mark.asyncio
  138. @pytest.mark.integration
  139. async def test_list_files_in_folder(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
  140. """Verify files can be filtered by folder."""
  141. folder = await folder_factory()
  142. file1 = await file_factory(folder_id=folder.id)
  143. await file_factory() # File in root (no folder)
  144. response = await async_client.get(f"/api/v1/library/files?folder_id={folder.id}")
  145. assert response.status_code == 200
  146. result = response.json()
  147. assert len(result) == 1
  148. assert result[0]["id"] == file1.id
  149. @pytest.mark.asyncio
  150. @pytest.mark.integration
  151. async def test_get_file(self, async_client: AsyncClient, file_factory, db_session):
  152. """Verify single file can be retrieved."""
  153. lib_file = await file_factory(filename="test.3mf")
  154. response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
  155. assert response.status_code == 200
  156. result = response.json()
  157. assert result["id"] == lib_file.id
  158. assert result["filename"] == "test.3mf"
  159. @pytest.mark.asyncio
  160. @pytest.mark.integration
  161. async def test_get_file_not_found(self, async_client: AsyncClient, db_session):
  162. """Verify 404 for non-existent file."""
  163. response = await async_client.get("/api/v1/library/files/9999")
  164. assert response.status_code == 404
  165. @pytest.mark.asyncio
  166. @pytest.mark.integration
  167. async def test_delete_file(self, async_client: AsyncClient, file_factory, db_session):
  168. """Verify file can be deleted."""
  169. lib_file = await file_factory()
  170. response = await async_client.delete(f"/api/v1/library/files/{lib_file.id}")
  171. assert response.status_code == 200
  172. result = response.json()
  173. assert result.get("message") or result.get("success", True)
  174. @pytest.mark.asyncio
  175. @pytest.mark.integration
  176. async def test_rename_file(self, async_client: AsyncClient, file_factory, db_session):
  177. """Verify file can be renamed."""
  178. lib_file = await file_factory(filename="old_name.3mf")
  179. data = {"filename": "new_name.3mf"}
  180. response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
  181. assert response.status_code == 200
  182. result = response.json()
  183. assert result["filename"] == "new_name.3mf"
  184. @pytest.mark.asyncio
  185. @pytest.mark.integration
  186. async def test_rename_file_invalid_path_separator(self, async_client: AsyncClient, file_factory, db_session):
  187. """Verify file rename fails with path separators."""
  188. lib_file = await file_factory(filename="test.3mf")
  189. data = {"filename": "path/to/file.3mf"}
  190. response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
  191. assert response.status_code == 400
  192. assert "path separator" in response.json()["detail"].lower()
  193. @pytest.mark.asyncio
  194. @pytest.mark.integration
  195. async def test_rename_file_invalid_backslash(self, async_client: AsyncClient, file_factory, db_session):
  196. """Verify file rename fails with backslash."""
  197. lib_file = await file_factory(filename="test.3mf")
  198. data = {"filename": "path\\to\\file.3mf"}
  199. response = await async_client.put(f"/api/v1/library/files/{lib_file.id}", json=data)
  200. assert response.status_code == 400
  201. assert "path separator" in response.json()["detail"].lower()
  202. @pytest.mark.asyncio
  203. @pytest.mark.integration
  204. async def test_library_stats(self, async_client: AsyncClient, folder_factory, file_factory, db_session):
  205. """Verify library stats endpoint returns counts."""
  206. await folder_factory()
  207. await folder_factory()
  208. await file_factory()
  209. response = await async_client.get("/api/v1/library/stats")
  210. assert response.status_code == 200
  211. result = response.json()
  212. assert result["total_folders"] == 2
  213. assert result["total_files"] == 1
  214. @pytest.mark.asyncio
  215. @pytest.mark.integration
  216. async def test_file_list_includes_user_tracking_fields(self, async_client: AsyncClient, file_factory, db_session):
  217. """Verify file list response includes user tracking fields (Issue #206)."""
  218. lib_file = await file_factory(filename="test.3mf")
  219. response = await async_client.get("/api/v1/library/files?include_root=false")
  220. assert response.status_code == 200
  221. result = response.json()
  222. assert len(result) >= 1
  223. # Find our test file
  224. test_file = next((f for f in result if f["id"] == lib_file.id), None)
  225. assert test_file is not None
  226. # User tracking fields should be present (even if null)
  227. assert "created_by_id" in test_file
  228. assert "created_by_username" in test_file
  229. @pytest.mark.asyncio
  230. @pytest.mark.integration
  231. async def test_file_detail_includes_user_tracking_fields(self, async_client: AsyncClient, file_factory, db_session):
  232. """Verify file detail response includes user tracking fields (Issue #206)."""
  233. lib_file = await file_factory(filename="test_detail.3mf")
  234. response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
  235. assert response.status_code == 200
  236. result = response.json()
  237. # User tracking fields should be present (even if null)
  238. assert "created_by_id" in result
  239. assert "created_by_username" in result
  240. @pytest.mark.asyncio
  241. @pytest.mark.integration
  242. async def test_file_with_user_tracking(self, async_client: AsyncClient, db_session):
  243. """Verify file created with user shows username in response (Issue #206)."""
  244. from backend.app.models.library import LibraryFile
  245. from backend.app.models.user import User
  246. # Create a test user
  247. user = User(username="testuploader", password_hash="fakehash", role="user")
  248. db_session.add(user)
  249. await db_session.flush()
  250. # Create a file with created_by_id set
  251. lib_file = LibraryFile(
  252. filename="user_uploaded.3mf",
  253. file_path="/test/user_uploaded.3mf",
  254. file_size=2048,
  255. file_type="3mf",
  256. created_by_id=user.id,
  257. )
  258. db_session.add(lib_file)
  259. await db_session.commit()
  260. await db_session.refresh(lib_file)
  261. # Verify file detail shows username
  262. response = await async_client.get(f"/api/v1/library/files/{lib_file.id}")
  263. assert response.status_code == 200
  264. result = response.json()
  265. assert result["created_by_id"] == user.id
  266. assert result["created_by_username"] == "testuploader"
  267. # Verify file list also shows username
  268. response = await async_client.get("/api/v1/library/files?include_root=false")
  269. assert response.status_code == 200
  270. files = response.json()
  271. test_file = next((f for f in files if f["id"] == lib_file.id), None)
  272. assert test_file is not None
  273. assert test_file["created_by_id"] == user.id
  274. assert test_file["created_by_username"] == "testuploader"
  275. class TestLibraryAddToQueueAPI:
  276. """Integration tests for /api/v1/library/files/add-to-queue endpoint."""
  277. @pytest.fixture
  278. async def printer_factory(self, db_session):
  279. """Factory to create test printers."""
  280. _counter = [0]
  281. async def _create_printer(**kwargs):
  282. from backend.app.models.printer import Printer
  283. _counter[0] += 1
  284. counter = _counter[0]
  285. defaults = {
  286. "name": f"Test Printer {counter}",
  287. "ip_address": f"192.168.1.{100 + counter}",
  288. "serial_number": f"TESTSERIAL{counter:04d}",
  289. "access_code": "12345678",
  290. "model": "X1C",
  291. }
  292. defaults.update(kwargs)
  293. printer = Printer(**defaults)
  294. db_session.add(printer)
  295. await db_session.commit()
  296. await db_session.refresh(printer)
  297. return printer
  298. return _create_printer
  299. @pytest.fixture
  300. async def library_file_factory(self, db_session):
  301. """Factory to create test library files."""
  302. _counter = [0]
  303. async def _create_library_file(**kwargs):
  304. from backend.app.models.library import LibraryFile
  305. _counter[0] += 1
  306. counter = _counter[0]
  307. defaults = {
  308. "filename": f"test_file_{counter}.gcode.3mf",
  309. "file_path": f"/test/path/test_file_{counter}.gcode.3mf",
  310. "file_size": 1024,
  311. "file_type": "3mf",
  312. }
  313. defaults.update(kwargs)
  314. lib_file = LibraryFile(**defaults)
  315. db_session.add(lib_file)
  316. await db_session.commit()
  317. await db_session.refresh(lib_file)
  318. return lib_file
  319. return _create_library_file
  320. @pytest.mark.asyncio
  321. @pytest.mark.integration
  322. async def test_add_to_queue_file_not_found(self, async_client: AsyncClient, printer_factory, db_session):
  323. """Verify error for non-existent file."""
  324. await printer_factory()
  325. data = {"file_ids": [9999]}
  326. response = await async_client.post("/api/v1/library/files/add-to-queue", json=data)
  327. assert response.status_code == 200
  328. result = response.json()
  329. assert len(result["added"]) == 0
  330. assert len(result["errors"]) == 1
  331. assert result["errors"][0]["file_id"] == 9999
  332. @pytest.mark.asyncio
  333. @pytest.mark.integration
  334. async def test_add_non_sliced_file_to_queue_fails(
  335. self, async_client: AsyncClient, printer_factory, library_file_factory, db_session
  336. ):
  337. """Verify non-sliced file cannot be added to queue."""
  338. await printer_factory()
  339. lib_file = await library_file_factory(
  340. filename="model.stl",
  341. file_path="/test/path/model.stl",
  342. file_type="stl",
  343. )
  344. data = {"file_ids": [lib_file.id]}
  345. response = await async_client.post("/api/v1/library/files/add-to-queue", json=data)
  346. assert response.status_code == 200
  347. result = response.json()
  348. assert len(result["added"]) == 0
  349. assert len(result["errors"]) == 1
  350. assert "sliced" in result["errors"][0]["error"].lower()
  351. class TestLibraryZipExtractAPI:
  352. """Integration tests for ZIP extraction endpoint."""
  353. @pytest.mark.asyncio
  354. @pytest.mark.integration
  355. async def test_extract_zip_invalid_file_type(self, async_client: AsyncClient, db_session):
  356. """Verify non-ZIP files are rejected."""
  357. # Create a fake file that's not a ZIP
  358. files = {"file": ("test.txt", b"This is not a zip file", "text/plain")}
  359. response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
  360. assert response.status_code == 400
  361. assert "ZIP" in response.json()["detail"]
  362. @pytest.mark.asyncio
  363. @pytest.mark.integration
  364. async def test_extract_zip_basic(self, async_client: AsyncClient, db_session):
  365. """Verify basic ZIP extraction works."""
  366. import io
  367. # Create a simple ZIP file in memory
  368. zip_buffer = io.BytesIO()
  369. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  370. zf.writestr("test1.txt", "Content of file 1")
  371. zf.writestr("test2.txt", "Content of file 2")
  372. zip_buffer.seek(0)
  373. files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
  374. response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
  375. assert response.status_code == 200
  376. result = response.json()
  377. assert result["extracted"] == 2
  378. assert len(result["files"]) == 2
  379. assert len(result["errors"]) == 0
  380. @pytest.mark.asyncio
  381. @pytest.mark.integration
  382. async def test_extract_zip_with_folders(self, async_client: AsyncClient, db_session):
  383. """Verify ZIP extraction preserves folder structure."""
  384. import io
  385. # Create a ZIP file with folder structure
  386. zip_buffer = io.BytesIO()
  387. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  388. zf.writestr("folder1/file1.txt", "Content 1")
  389. zf.writestr("folder1/subfolder/file2.txt", "Content 2")
  390. zf.writestr("folder2/file3.txt", "Content 3")
  391. zip_buffer.seek(0)
  392. files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
  393. params = {"preserve_structure": "true"}
  394. response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
  395. assert response.status_code == 200
  396. result = response.json()
  397. assert result["extracted"] == 3
  398. assert result["folders_created"] >= 3 # folder1, folder1/subfolder, folder2
  399. @pytest.mark.asyncio
  400. @pytest.mark.integration
  401. async def test_extract_zip_flat(self, async_client: AsyncClient, db_session):
  402. """Verify ZIP extraction can extract flat (no folders)."""
  403. import io
  404. # Create a ZIP file with folder structure
  405. zip_buffer = io.BytesIO()
  406. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  407. zf.writestr("folder/file1.txt", "Content 1")
  408. zf.writestr("folder/file2.txt", "Content 2")
  409. zip_buffer.seek(0)
  410. files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
  411. params = {"preserve_structure": "false"}
  412. response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
  413. assert response.status_code == 200
  414. result = response.json()
  415. assert result["extracted"] == 2
  416. assert result["folders_created"] == 0 # No folders created when flat
  417. @pytest.mark.asyncio
  418. @pytest.mark.integration
  419. async def test_extract_zip_skips_macos_files(self, async_client: AsyncClient, db_session):
  420. """Verify ZIP extraction skips __MACOSX and hidden files."""
  421. import io
  422. # Create a ZIP file with macOS junk files
  423. zip_buffer = io.BytesIO()
  424. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  425. zf.writestr("real_file.txt", "Real content")
  426. zf.writestr("__MACOSX/._real_file.txt", "macOS metadata")
  427. zf.writestr(".hidden_file", "Hidden content")
  428. zip_buffer.seek(0)
  429. files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
  430. response = await async_client.post("/api/v1/library/files/extract-zip", files=files)
  431. assert response.status_code == 200
  432. result = response.json()
  433. assert result["extracted"] == 1 # Only real_file.txt
  434. assert result["files"][0]["filename"] == "real_file.txt"
  435. @pytest.mark.asyncio
  436. @pytest.mark.integration
  437. async def test_extract_zip_create_folder_from_zip(self, async_client: AsyncClient, db_session):
  438. """Verify ZIP extraction creates a folder from the ZIP filename."""
  439. import io
  440. # Create a ZIP file with some files
  441. zip_buffer = io.BytesIO()
  442. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  443. zf.writestr("file1.txt", "Content 1")
  444. zf.writestr("file2.txt", "Content 2")
  445. zip_buffer.seek(0)
  446. files = {"file": ("MyProject.zip", zip_buffer.read(), "application/zip")}
  447. params = {"create_folder_from_zip": "true", "preserve_structure": "false"}
  448. response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
  449. assert response.status_code == 200
  450. result = response.json()
  451. assert result["extracted"] == 2
  452. assert result["folders_created"] == 1 # MyProject folder created
  453. # Verify the files are in a folder
  454. assert result["files"][0]["folder_id"] is not None
  455. assert result["files"][1]["folder_id"] is not None
  456. # Both files should be in the same folder
  457. assert result["files"][0]["folder_id"] == result["files"][1]["folder_id"]
  458. # Verify the folder was created with the right name
  459. folder_response = await async_client.get(f"/api/v1/library/folders/{result['files'][0]['folder_id']}")
  460. assert folder_response.status_code == 200
  461. folder = folder_response.json()
  462. assert folder["name"] == "MyProject"
  463. class TestLibraryStlThumbnailAPI:
  464. """Integration tests for STL thumbnail generation endpoints."""
  465. @pytest.fixture
  466. async def file_factory(self, db_session):
  467. """Factory to create test files."""
  468. _counter = [0]
  469. async def _create_file(**kwargs):
  470. from backend.app.models.library import LibraryFile
  471. _counter[0] += 1
  472. counter = _counter[0]
  473. defaults = {
  474. "filename": f"test_model_{counter}.stl",
  475. "file_path": f"/test/path/test_model_{counter}.stl",
  476. "file_size": 1024,
  477. "file_type": "stl",
  478. }
  479. defaults.update(kwargs)
  480. lib_file = LibraryFile(**defaults)
  481. db_session.add(lib_file)
  482. await db_session.commit()
  483. await db_session.refresh(lib_file)
  484. return lib_file
  485. return _create_file
  486. @pytest.mark.asyncio
  487. @pytest.mark.integration
  488. async def test_batch_generate_thumbnails_empty(self, async_client: AsyncClient, db_session):
  489. """Verify batch thumbnail generation with no files."""
  490. data = {"all_missing": True}
  491. response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
  492. assert response.status_code == 200
  493. result = response.json()
  494. assert result["processed"] == 0
  495. assert result["succeeded"] == 0
  496. assert result["failed"] == 0
  497. assert result["results"] == []
  498. @pytest.mark.asyncio
  499. @pytest.mark.integration
  500. async def test_batch_generate_thumbnails_no_criteria(self, async_client: AsyncClient, db_session):
  501. """Verify batch thumbnail generation with no criteria returns empty."""
  502. data = {}
  503. response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
  504. assert response.status_code == 200
  505. result = response.json()
  506. assert result["processed"] == 0
  507. @pytest.mark.asyncio
  508. @pytest.mark.integration
  509. async def test_batch_generate_thumbnails_file_not_on_disk(
  510. self, async_client: AsyncClient, file_factory, db_session
  511. ):
  512. """Verify batch thumbnail generation handles missing files gracefully."""
  513. # Create a file in DB but not on disk
  514. stl_file = await file_factory(
  515. filename="missing.stl",
  516. file_path="/nonexistent/path/missing.stl",
  517. thumbnail_path=None,
  518. )
  519. data = {"file_ids": [stl_file.id]}
  520. response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
  521. assert response.status_code == 200
  522. result = response.json()
  523. assert result["processed"] == 1
  524. assert result["succeeded"] == 0
  525. assert result["failed"] == 1
  526. assert result["results"][0]["success"] is False
  527. assert "not found" in result["results"][0]["error"].lower()
  528. @pytest.mark.asyncio
  529. @pytest.mark.integration
  530. async def test_batch_generate_thumbnails_with_real_stl(self, async_client: AsyncClient, db_session):
  531. """Verify batch thumbnail generation with a real STL file."""
  532. from backend.app.models.library import LibraryFile
  533. # Create a simple ASCII STL cube
  534. stl_content = """solid cube
  535. facet normal 0 0 -1
  536. outer loop
  537. vertex 0 0 0
  538. vertex 1 0 0
  539. vertex 1 1 0
  540. endloop
  541. endfacet
  542. facet normal 0 0 1
  543. outer loop
  544. vertex 0 0 1
  545. vertex 1 1 1
  546. vertex 1 0 1
  547. endloop
  548. endfacet
  549. endsolid cube"""
  550. with tempfile.NamedTemporaryFile(suffix=".stl", delete=False, mode="w") as f:
  551. f.write(stl_content)
  552. stl_path = f.name
  553. try:
  554. # Create file in DB pointing to real STL
  555. lib_file = LibraryFile(
  556. filename="test_cube.stl",
  557. file_path=stl_path,
  558. file_size=len(stl_content),
  559. file_type="stl",
  560. thumbnail_path=None,
  561. )
  562. db_session.add(lib_file)
  563. await db_session.commit()
  564. await db_session.refresh(lib_file)
  565. data = {"file_ids": [lib_file.id]}
  566. response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
  567. assert response.status_code == 200
  568. result = response.json()
  569. assert result["processed"] == 1
  570. # Result depends on whether trimesh/matplotlib are installed
  571. # Either succeeds or fails gracefully
  572. assert result["succeeded"] + result["failed"] == 1
  573. finally:
  574. import os
  575. if os.path.exists(stl_path):
  576. os.unlink(stl_path)
  577. @pytest.mark.asyncio
  578. @pytest.mark.integration
  579. async def test_upload_file_with_stl_thumbnail_param(self, async_client: AsyncClient, db_session):
  580. """Verify file upload accepts generate_stl_thumbnails parameter."""
  581. # Create a simple STL file
  582. stl_content = b"solid test\nendsolid test"
  583. files = {"file": ("test.stl", stl_content, "application/octet-stream")}
  584. params = {"generate_stl_thumbnails": "false"}
  585. response = await async_client.post("/api/v1/library/files", files=files, params=params)
  586. assert response.status_code == 200
  587. result = response.json()
  588. assert result["filename"] == "test.stl"
  589. assert result["file_type"] == "stl"
  590. # No thumbnail should be generated when disabled
  591. assert result["thumbnail_path"] is None
  592. @pytest.mark.asyncio
  593. @pytest.mark.integration
  594. async def test_extract_zip_with_stl_thumbnail_param(self, async_client: AsyncClient, db_session):
  595. """Verify ZIP extraction accepts generate_stl_thumbnails parameter."""
  596. # Create a ZIP file containing an STL
  597. stl_content = b"solid test\nendsolid test"
  598. zip_buffer = io.BytesIO()
  599. with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
  600. zf.writestr("model.stl", stl_content)
  601. zip_buffer.seek(0)
  602. files = {"file": ("test.zip", zip_buffer.read(), "application/zip")}
  603. params = {"generate_stl_thumbnails": "false"}
  604. response = await async_client.post("/api/v1/library/files/extract-zip", files=files, params=params)
  605. assert response.status_code == 200
  606. result = response.json()
  607. assert result["extracted"] == 1
  608. assert result["files"][0]["filename"] == "model.stl"
  609. @pytest.mark.asyncio
  610. @pytest.mark.integration
  611. async def test_batch_generate_thumbnails_by_folder(self, async_client: AsyncClient, file_factory, db_session):
  612. """Verify batch thumbnail generation can filter by folder."""
  613. from backend.app.models.library import LibraryFolder
  614. # Create a folder
  615. folder = LibraryFolder(name="STL Folder")
  616. db_session.add(folder)
  617. await db_session.commit()
  618. await db_session.refresh(folder)
  619. # Create STL file in folder (no thumbnail)
  620. stl_in_folder = await file_factory(
  621. filename="in_folder.stl",
  622. folder_id=folder.id,
  623. thumbnail_path=None,
  624. )
  625. # Create STL file at root (no thumbnail)
  626. _stl_at_root = await file_factory(
  627. filename="at_root.stl",
  628. folder_id=None,
  629. thumbnail_path=None,
  630. )
  631. # Request thumbnails only for files in folder
  632. data = {"folder_id": folder.id, "all_missing": True}
  633. response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
  634. assert response.status_code == 200
  635. result = response.json()
  636. # Should only process the file in the folder
  637. assert result["processed"] == 1
  638. assert result["results"][0]["file_id"] == stl_in_folder.id
  639. @pytest.mark.asyncio
  640. @pytest.mark.integration
  641. async def test_batch_generate_thumbnails_all_missing(self, async_client: AsyncClient, file_factory, db_session):
  642. """Verify batch thumbnail generation finds all STL files missing thumbnails."""
  643. # Create files with and without thumbnails
  644. _stl_with_thumb = await file_factory(
  645. filename="with_thumb.stl",
  646. thumbnail_path="/some/path/thumb.png",
  647. )
  648. stl_without_thumb1 = await file_factory(
  649. filename="without_thumb1.stl",
  650. thumbnail_path=None,
  651. )
  652. stl_without_thumb2 = await file_factory(
  653. filename="without_thumb2.stl",
  654. thumbnail_path=None,
  655. )
  656. data = {"all_missing": True}
  657. response = await async_client.post("/api/v1/library/generate-stl-thumbnails", json=data)
  658. assert response.status_code == 200
  659. result = response.json()
  660. # Should only process files without thumbnails
  661. assert result["processed"] == 2
  662. file_ids = {r["file_id"] for r in result["results"]}
  663. assert stl_without_thumb1.id in file_ids
  664. assert stl_without_thumb2.id in file_ids
  665. class TestLibraryPathHelpers:
  666. """Tests for path handling utilities used for backup portability."""
  667. def test_to_relative_path_converts_absolute(self):
  668. """Verify absolute paths are converted to relative paths."""
  669. from backend.app.api.routes.library import to_relative_path
  670. from backend.app.core.config import settings
  671. base_dir = str(settings.base_dir)
  672. abs_path = f"{base_dir}/archive/library/files/test.3mf"
  673. rel_path = to_relative_path(abs_path)
  674. assert not rel_path.startswith("/")
  675. assert rel_path == "archive/library/files/test.3mf"
  676. def test_to_relative_path_handles_path_object(self):
  677. """Verify Path objects are handled correctly."""
  678. from pathlib import Path
  679. from backend.app.api.routes.library import to_relative_path
  680. from backend.app.core.config import settings
  681. abs_path = Path(settings.base_dir) / "archive" / "test.3mf"
  682. rel_path = to_relative_path(abs_path)
  683. assert not rel_path.startswith("/")
  684. assert rel_path == "archive/test.3mf"
  685. def test_to_relative_path_returns_empty_for_empty_input(self):
  686. """Verify empty input returns empty string."""
  687. from backend.app.api.routes.library import to_relative_path
  688. assert to_relative_path("") == ""
  689. assert to_relative_path(None) == ""
  690. def test_to_absolute_path_converts_relative(self):
  691. """Verify relative paths are converted to absolute paths."""
  692. from backend.app.api.routes.library import to_absolute_path
  693. from backend.app.core.config import settings
  694. rel_path = "archive/library/files/test.3mf"
  695. abs_path = to_absolute_path(rel_path)
  696. assert abs_path is not None
  697. assert abs_path.is_absolute()
  698. assert str(abs_path) == f"{settings.base_dir}/archive/library/files/test.3mf"
  699. def test_to_absolute_path_handles_already_absolute(self):
  700. """Verify already absolute paths are returned as-is (for backwards compatibility)."""
  701. from backend.app.api.routes.library import to_absolute_path
  702. abs_path_str = "/data/archive/test.3mf"
  703. result = to_absolute_path(abs_path_str)
  704. assert result is not None
  705. assert str(result) == abs_path_str
  706. def test_to_absolute_path_returns_none_for_empty(self):
  707. """Verify None/empty input returns None."""
  708. from backend.app.api.routes.library import to_absolute_path
  709. assert to_absolute_path(None) is None
  710. assert to_absolute_path("") is None
  711. class TestLibraryPermissions:
  712. """Tests for library permission enforcement."""
  713. @pytest.fixture
  714. async def auth_setup(self, db_session):
  715. """Set up auth with users of different permission levels."""
  716. from backend.app.core.auth import create_access_token, get_password_hash
  717. from backend.app.models.group import Group
  718. from backend.app.models.settings import Settings
  719. from backend.app.models.user import User
  720. # Enable auth
  721. settings = Settings(key="auth_enabled", value="true")
  722. db_session.add(settings)
  723. await db_session.commit()
  724. # Groups are auto-seeded during db init, but we need to commit them
  725. await db_session.commit()
  726. # Get groups
  727. from sqlalchemy import select
  728. admin_group = (await db_session.execute(select(Group).where(Group.name == "Administrators"))).scalar_one()
  729. operator_group = (await db_session.execute(select(Group).where(Group.name == "Operators"))).scalar_one()
  730. viewer_group = (await db_session.execute(select(Group).where(Group.name == "Viewers"))).scalar_one()
  731. password_hash = get_password_hash("password")
  732. # Create users
  733. admin_user = User(username="admin_lib", password_hash=password_hash, role="admin", is_active=True)
  734. admin_user.groups.append(admin_group)
  735. operator_user = User(username="operator_lib", password_hash=password_hash, is_active=True)
  736. operator_user.groups.append(operator_group)
  737. viewer_user = User(username="viewer_lib", password_hash=password_hash, is_active=True)
  738. viewer_user.groups.append(viewer_group)
  739. db_session.add_all([admin_user, operator_user, viewer_user])
  740. await db_session.commit()
  741. # Create tokens
  742. admin_token = create_access_token(data={"sub": admin_user.username})
  743. operator_token = create_access_token(data={"sub": operator_user.username})
  744. viewer_token = create_access_token(data={"sub": viewer_user.username})
  745. return {
  746. "admin_user": admin_user,
  747. "operator_user": operator_user,
  748. "viewer_user": viewer_user,
  749. "admin_token": admin_token,
  750. "operator_token": operator_token,
  751. "viewer_token": viewer_token,
  752. }
  753. @pytest.fixture
  754. async def test_file(self, db_session, auth_setup):
  755. """Create a test file owned by the operator user."""
  756. from backend.app.models.library import LibraryFile
  757. operator_user = auth_setup["operator_user"]
  758. lib_file = LibraryFile(
  759. filename="test.txt",
  760. file_path="data/archive/library/files/test.txt",
  761. file_type="txt",
  762. file_size=100,
  763. created_by_id=operator_user.id,
  764. )
  765. db_session.add(lib_file)
  766. await db_session.commit()
  767. await db_session.refresh(lib_file)
  768. return lib_file
  769. @pytest.mark.asyncio
  770. @pytest.mark.integration
  771. async def test_list_files_requires_library_read(self, async_client: AsyncClient, db_session, auth_setup):
  772. """Verify list_files requires library:read permission."""
  773. viewer_token = auth_setup["viewer_token"]
  774. # Viewers have library:read, should succeed
  775. response = await async_client.get("/api/v1/library/files", headers={"Authorization": f"Bearer {viewer_token}"})
  776. assert response.status_code == 200
  777. @pytest.mark.asyncio
  778. @pytest.mark.integration
  779. async def test_list_files_denied_without_permission(self, async_client: AsyncClient, db_session):
  780. """Verify list_files denied without auth when auth is enabled."""
  781. from backend.app.models.settings import Settings
  782. # Enable auth
  783. settings = Settings(key="auth_enabled", value="true")
  784. db_session.add(settings)
  785. await db_session.commit()
  786. # Request without token should fail
  787. response = await async_client.get("/api/v1/library/files")
  788. assert response.status_code == 401
  789. @pytest.mark.asyncio
  790. @pytest.mark.integration
  791. async def test_delete_file_own_by_owner(self, async_client: AsyncClient, db_session, auth_setup, test_file):
  792. """Verify operator can delete their own files."""
  793. from pathlib import Path
  794. # Create actual file on disk so delete doesn't fail
  795. from backend.app.core.config import settings as app_settings
  796. file_path = Path(app_settings.base_dir) / test_file.file_path
  797. file_path.parent.mkdir(parents=True, exist_ok=True)
  798. file_path.write_text("test content")
  799. operator_token = auth_setup["operator_token"]
  800. response = await async_client.delete(
  801. f"/api/v1/library/files/{test_file.id}", headers={"Authorization": f"Bearer {operator_token}"}
  802. )
  803. assert response.status_code == 200
  804. @pytest.mark.asyncio
  805. @pytest.mark.integration
  806. async def test_delete_file_own_denied_for_others_file(self, async_client: AsyncClient, db_session, auth_setup):
  807. """Verify operator cannot delete files owned by others."""
  808. # Create another operator user with a file
  809. from sqlalchemy import select
  810. from backend.app.core.auth import create_access_token
  811. from backend.app.models.group import Group
  812. from backend.app.models.library import LibraryFile
  813. from backend.app.models.user import User
  814. operator_group = (await db_session.execute(select(Group).where(Group.name == "Operators"))).scalar_one()
  815. from backend.app.core.auth import get_password_hash as get_pw_hash
  816. other_user = User(username="other_op", password_hash=get_pw_hash("password"), is_active=True)
  817. other_user.groups.append(operator_group)
  818. db_session.add(other_user)
  819. await db_session.commit()
  820. await db_session.refresh(other_user)
  821. # Create file owned by other user
  822. other_file = LibraryFile(
  823. filename="other.txt",
  824. file_path="data/archive/library/files/other.txt",
  825. file_type="txt",
  826. file_size=100,
  827. created_by_id=other_user.id,
  828. )
  829. db_session.add(other_file)
  830. await db_session.commit()
  831. await db_session.refresh(other_file)
  832. # Original operator should not be able to delete it
  833. operator_token = auth_setup["operator_token"]
  834. response = await async_client.delete(
  835. f"/api/v1/library/files/{other_file.id}", headers={"Authorization": f"Bearer {operator_token}"}
  836. )
  837. assert response.status_code == 403
  838. assert "your own files" in response.json()["detail"].lower()
  839. @pytest.mark.asyncio
  840. @pytest.mark.integration
  841. async def test_delete_file_admin_can_delete_any(self, async_client: AsyncClient, db_session, auth_setup):
  842. """Verify admin can delete any file."""
  843. from pathlib import Path
  844. from backend.app.core.config import settings as app_settings
  845. from backend.app.models.library import LibraryFile
  846. # Create file owned by operator
  847. operator_user = auth_setup["operator_user"]
  848. lib_file = LibraryFile(
  849. filename="admin_can_delete.txt",
  850. file_path="data/archive/library/files/admin_can_delete.txt",
  851. file_type="txt",
  852. file_size=100,
  853. created_by_id=operator_user.id,
  854. )
  855. db_session.add(lib_file)
  856. await db_session.commit()
  857. await db_session.refresh(lib_file)
  858. # Create actual file on disk
  859. file_path = Path(app_settings.base_dir) / lib_file.file_path
  860. file_path.parent.mkdir(parents=True, exist_ok=True)
  861. file_path.write_text("test content")
  862. # Admin should be able to delete it
  863. admin_token = auth_setup["admin_token"]
  864. response = await async_client.delete(
  865. f"/api/v1/library/files/{lib_file.id}", headers={"Authorization": f"Bearer {admin_token}"}
  866. )
  867. assert response.status_code == 200
  868. @pytest.mark.asyncio
  869. @pytest.mark.integration
  870. async def test_viewer_cannot_delete_files(self, async_client: AsyncClient, db_session, auth_setup, test_file):
  871. """Verify viewer cannot delete any files."""
  872. viewer_token = auth_setup["viewer_token"]
  873. response = await async_client.delete(
  874. f"/api/v1/library/files/{test_file.id}", headers={"Authorization": f"Bearer {viewer_token}"}
  875. )
  876. # Viewers don't have delete_own or delete_all permissions
  877. assert response.status_code == 403