test_security.py 114 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800
  1. """Security tests for the 8 coverage gaps identified in the maintainer review.
  2. Gap 1: encryption.py has zero tests
  3. Gap 2: JWT revocation (revoke_jti, is_jti_revoked, _is_token_fresh) untested
  4. Gap 3: OIDC exchange token replay untested
  5. Gap 4: OIDC email_verified claim handling untested
  6. Gap 5: Email OTP max-attempts invalidation untested
  7. Gap 6: OIDC callback error redirects (SSRF protection) undertested
  8. Gap 7: Login rate limiting untested
  9. Gap 8: challenge_id cookie binding untested
  10. """
  11. from __future__ import annotations
  12. import base64
  13. import secrets
  14. import time
  15. from datetime import datetime, timedelta, timezone
  16. from unittest.mock import AsyncMock, MagicMock, patch
  17. import jwt as pyjwt
  18. import pytest
  19. from cryptography.hazmat.primitives import serialization
  20. from cryptography.hazmat.primitives.asymmetric import rsa
  21. from httpx import AsyncClient
  22. from sqlalchemy.ext.asyncio import AsyncSession
  23. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  24. from backend.app.models.user import User
  25. AUTH_SETUP_URL = "/api/v1/auth/setup"
  26. LOGIN_URL = "/api/v1/auth/login"
  27. LOGOUT_URL = "/api/v1/auth/logout"
  28. ME_URL = "/api/v1/auth/me"
  29. def _auth_header(token: str) -> dict[str, str]:
  30. return {"Authorization": f"Bearer {token}"}
  31. def _norm_pw(password: str) -> str:
  32. """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
  33. if not any(c.isupper() for c in password):
  34. password = password[0].upper() + password[1:]
  35. if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
  36. password = password + "!"
  37. return password
  38. async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
  39. password = _norm_pw(password)
  40. await client.post(
  41. AUTH_SETUP_URL,
  42. json={"auth_enabled": True, "admin_username": username, "admin_password": password},
  43. )
  44. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  45. assert resp.status_code == 200
  46. return resp.json()["access_token"]
  47. def _make_test_rsa_key():
  48. def _b64url(n: int, length: int) -> str:
  49. return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
  50. private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  51. private_pem = private_key.private_bytes(
  52. serialization.Encoding.PEM,
  53. serialization.PrivateFormat.TraditionalOpenSSL,
  54. serialization.NoEncryption(),
  55. )
  56. pub_numbers = private_key.public_key().public_numbers()
  57. jwks = {
  58. "keys": [
  59. {
  60. "kty": "RSA",
  61. "use": "sig",
  62. "alg": "RS256",
  63. "kid": "test-kid-1",
  64. "n": _b64url(pub_numbers.n, 256),
  65. "e": _b64url(pub_numbers.e, 3),
  66. }
  67. ]
  68. }
  69. return private_pem, jwks
  70. # ===========================================================================
  71. # Gap 1: encryption.py unit tests
  72. # ===========================================================================
  73. class TestEncryption:
  74. """encrypt/decrypt round-trips, plaintext passthrough, RuntimeError on missing key.
  75. The ``mfa_encryption_isolation`` autouse fixture (conftest.py) resets the
  76. ``encryption`` module's globals before/after each test and points
  77. ``DATA_DIR`` at a tmp path, so individual tests only need to set
  78. ``MFA_ENCRYPTION_KEY`` when they want a specific key in scope.
  79. """
  80. def test_encrypt_decrypt_roundtrip_with_key(self, monkeypatch):
  81. from cryptography.fernet import Fernet
  82. import backend.app.core.encryption as enc_mod
  83. test_key = Fernet.generate_key().decode()
  84. monkeypatch.setenv("MFA_ENCRYPTION_KEY", test_key)
  85. # Force re-initialisation now that the env var is set.
  86. enc_mod._fernet_instance = None
  87. ciphertext = enc_mod.mfa_encrypt("my-totp-secret")
  88. assert ciphertext.startswith("fernet:")
  89. assert enc_mod.mfa_decrypt(ciphertext) == "my-totp-secret"
  90. def test_plaintext_passthrough_without_key(self, monkeypatch):
  91. # Force the auto-bootstrap into the legacy "no key available" branch
  92. # by patching _load_or_generate_key directly. This is more robust than
  93. # chmod tricks (which root bypasses) when verifying the plaintext path.
  94. import backend.app.core.encryption as enc_mod
  95. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  96. enc_mod._fernet_instance = None
  97. result = enc_mod.mfa_encrypt("plaintext-secret")
  98. assert result == "plaintext-secret"
  99. assert enc_mod.mfa_decrypt("plaintext-secret") == "plaintext-secret"
  100. def test_decrypt_raises_runtime_error_without_key_for_encrypted_value(self, monkeypatch):
  101. import backend.app.core.encryption as enc_mod
  102. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  103. enc_mod._fernet_instance = None
  104. with pytest.raises(RuntimeError, match="MFA_ENCRYPTION_KEY must be set"):
  105. enc_mod.mfa_decrypt("fernet:gAAAAA-fake-ciphertext")
  106. # ------------------------------------------------------------------
  107. # Auto-bootstrap tests for _load_or_generate_key
  108. # ------------------------------------------------------------------
  109. def test_load_or_generate_key_uses_env_when_set(self, monkeypatch, tmp_path):
  110. """Valid env var → key_source == 'env', no file written."""
  111. from cryptography.fernet import Fernet
  112. import backend.app.core.encryption as enc_mod
  113. valid_key = Fernet.generate_key().decode()
  114. monkeypatch.setenv("MFA_ENCRYPTION_KEY", valid_key)
  115. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  116. enc_mod._fernet_instance = None
  117. key, source = enc_mod._load_or_generate_key()
  118. assert key == valid_key
  119. assert source == "env"
  120. assert not (tmp_path / ".mfa_encryption_key").exists()
  121. def test_invalid_env_key_falls_through_to_file(self, monkeypatch, tmp_path, caplog):
  122. """Invalid env var → logger.error + file fallback (auto-generated)."""
  123. import logging
  124. import backend.app.core.encryption as enc_mod
  125. monkeypatch.setenv("MFA_ENCRYPTION_KEY", "not-a-valid-fernet-key")
  126. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  127. enc_mod._fernet_instance = None
  128. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  129. key, source = enc_mod._load_or_generate_key()
  130. assert source == "generated"
  131. assert key is not None
  132. assert (tmp_path / ".mfa_encryption_key").exists()
  133. assert any("not a valid Fernet key" in rec.message for rec in caplog.records)
  134. def test_load_or_generate_key_reads_existing_file(self, monkeypatch, tmp_path):
  135. """File present in DATA_DIR + no env var → key_source == 'file'."""
  136. from cryptography.fernet import Fernet
  137. import backend.app.core.encryption as enc_mod
  138. existing_key = Fernet.generate_key().decode()
  139. key_file = tmp_path / ".mfa_encryption_key"
  140. key_file.write_text(existing_key)
  141. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  142. enc_mod._fernet_instance = None
  143. key, source = enc_mod._load_or_generate_key()
  144. assert key == existing_key
  145. assert source == "file"
  146. def test_load_or_generate_key_creates_file_with_0600(self, monkeypatch, tmp_path):
  147. """Neither env nor file → new key generated, file mode is 0o600."""
  148. import backend.app.core.encryption as enc_mod
  149. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  150. enc_mod._fernet_instance = None
  151. key, source = enc_mod._load_or_generate_key()
  152. assert source == "generated"
  153. assert enc_mod._validate_fernet_key(key)
  154. key_file = tmp_path / ".mfa_encryption_key"
  155. assert key_file.exists()
  156. # Mode bits LSB are 0o600 — owner read+write only.
  157. assert (key_file.stat().st_mode & 0o777) == 0o600
  158. def test_load_or_generate_key_returns_none_on_write_oserror(self, monkeypatch, tmp_path, caplog):
  159. """When DATA_DIR can't be written to (auto-generate path), return (None, 'none_write_failed').
  160. S1: write now uses os.open(O_EXCL|O_CREAT, 0o600) instead of write_text — patch
  161. os.write to simulate the OS-level failure. S8: source distinguishes write-failed
  162. from corrupted to drive accurate operator messaging.
  163. """
  164. import logging
  165. import os
  166. import backend.app.core.encryption as enc_mod
  167. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  168. enc_mod._fernet_instance = None
  169. original_write = os.write
  170. def _raising_write(fd, data):
  171. # Best-effort: trigger OSError specifically for the key write.
  172. raise OSError("simulated read-only filesystem")
  173. monkeypatch.setattr(os, "write", _raising_write)
  174. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  175. key, source = enc_mod._load_or_generate_key()
  176. # Restore os.write so the rest of the test suite is unaffected.
  177. monkeypatch.setattr(os, "write", original_write)
  178. assert key is None
  179. assert source == "none_write_failed"
  180. assert any("Could not save MFA encryption key" in rec.message for rec in caplog.records)
  181. def test_load_or_generate_key_returns_none_on_read_oserror(self, monkeypatch, tmp_path, caplog):
  182. """B4: existing key file but read fails (e.g. permission denied) → (None, 'none_corrupted').
  183. Critical: must NOT regenerate a new key, which would destroy access to
  184. every row already encrypted under the existing key. S8: 'none_corrupted'
  185. marks the cause so operators see the right diagnostic.
  186. """
  187. import logging
  188. from pathlib import Path
  189. import backend.app.core.encryption as enc_mod
  190. # Pre-create a key file so we hit the existing-file branch.
  191. key_file = tmp_path / ".mfa_encryption_key"
  192. key_file.write_text("placeholder-content")
  193. original_size = key_file.stat().st_size
  194. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  195. enc_mod._fernet_instance = None
  196. original_read_text = Path.read_text
  197. def _raising_read_text(self, *args, **kwargs):
  198. if self.name == ".mfa_encryption_key":
  199. raise OSError("simulated permission denied")
  200. return original_read_text(self, *args, **kwargs)
  201. monkeypatch.setattr(Path, "read_text", _raising_read_text)
  202. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  203. key, source = enc_mod._load_or_generate_key()
  204. assert key is None
  205. assert source == "none_corrupted"
  206. # Critical: file must not have been overwritten with a new key.
  207. assert key_file.exists()
  208. assert key_file.stat().st_size == original_size
  209. assert any("Failed to read existing MFA key file" in rec.message for rec in caplog.records)
  210. assert any("Refusing to regenerate" in rec.message for rec in caplog.records)
  211. def test_get_key_source_reflects_active_source(self, monkeypatch, tmp_path):
  212. """get_key_source() returns the source detected on the most recent _get_fernet() call."""
  213. from cryptography.fernet import Fernet
  214. import backend.app.core.encryption as enc_mod
  215. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  216. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  217. enc_mod._fernet_instance = None
  218. enc_mod._key_source = None
  219. # Trigger lazy initialisation
  220. enc_mod.mfa_encrypt("anything")
  221. assert enc_mod.get_key_source() == "env"
  222. def test_corrupted_key_file_returns_none_without_overwrite(self, monkeypatch, tmp_path, caplog):
  223. """A1: invalid key file content → (None, 'none_corrupted'), file not overwritten.
  224. S8: 'none_corrupted' (vs 'none_write_failed') so operators get the right
  225. diagnostic and don't see a misleading 'DATA_DIR not writable' warning.
  226. """
  227. import logging
  228. import backend.app.core.encryption as enc_mod
  229. key_file = tmp_path / ".mfa_encryption_key"
  230. key_file.write_text("invalid_content")
  231. original_mtime = key_file.stat().st_mtime
  232. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  233. enc_mod._fernet_instance = None
  234. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  235. key, source = enc_mod._load_or_generate_key()
  236. assert key is None
  237. assert source == "none_corrupted"
  238. assert key_file.exists(), "file must not be deleted"
  239. assert key_file.stat().st_mtime == original_mtime, "file must not be overwritten"
  240. assert any("not a valid Fernet key" in rec.message for rec in caplog.records)
  241. assert any("Refusing to overwrite" in rec.message for rec in caplog.records)
  242. def test_auto_generate_fileexistserror_returns_none_corrupted(self, monkeypatch, tmp_path, caplog):
  243. """S1: O_EXCL race — file appears between exists() check and open() →
  244. return (None, 'none_corrupted') without overwriting."""
  245. import logging
  246. import os
  247. import backend.app.core.encryption as enc_mod
  248. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  249. enc_mod._fernet_instance = None
  250. original_open = os.open
  251. def _excl_raise(path, flags, mode=0o777):
  252. if str(path).endswith(".mfa_encryption_key") and (flags & os.O_EXCL):
  253. raise FileExistsError(17, "File exists", str(path))
  254. return original_open(path, flags, mode)
  255. monkeypatch.setattr(os, "open", _excl_raise)
  256. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  257. key, source = enc_mod._load_or_generate_key()
  258. assert key is None
  259. assert source == "none_corrupted"
  260. assert any("Race detected" in rec.message for rec in caplog.records)
  261. # ===========================================================================
  262. # Gap 2: JWT revocation — revoke_jti, is_jti_revoked, _is_token_fresh, /me
  263. # ===========================================================================
  264. class TestJWTRevocation:
  265. """JWT revocation and token freshness checks."""
  266. @pytest.mark.asyncio
  267. @pytest.mark.integration
  268. async def test_revoke_jti_and_is_jti_revoked(self, async_client: AsyncClient, db_session: AsyncSession):
  269. """revoke_jti stores the JTI; is_jti_revoked returns True afterwards."""
  270. from backend.app.core.auth import is_jti_revoked, revoke_jti
  271. test_jti = secrets.token_urlsafe(16)
  272. expires = datetime.now(timezone.utc) + timedelta(hours=1)
  273. assert not await is_jti_revoked(test_jti)
  274. await revoke_jti(test_jti, expires, username="testuser")
  275. assert await is_jti_revoked(test_jti)
  276. @pytest.mark.asyncio
  277. @pytest.mark.integration
  278. async def test_revoke_jti_idempotent(self, async_client: AsyncClient):
  279. """Double-revocation of the same JTI should not raise."""
  280. from backend.app.core.auth import is_jti_revoked, revoke_jti
  281. jti = secrets.token_urlsafe(16)
  282. expires = datetime.now(timezone.utc) + timedelta(hours=1)
  283. await revoke_jti(jti, expires)
  284. await revoke_jti(jti, expires) # must not raise
  285. assert await is_jti_revoked(jti)
  286. def test_is_token_fresh_rejects_none_iat(self):
  287. """_is_token_fresh returns False when iat is None (I1 hard cutoff)."""
  288. from backend.app.core.auth import _is_token_fresh
  289. user = MagicMock()
  290. user.password_changed_at = None
  291. assert _is_token_fresh(None, user) is False
  292. def test_is_token_fresh_rejects_token_before_password_change(self):
  293. """_is_token_fresh returns False when iat predates password_changed_at."""
  294. from backend.app.core.auth import _is_token_fresh
  295. now = datetime.now(timezone.utc)
  296. user = MagicMock()
  297. user.password_changed_at = now
  298. old_iat = (now - timedelta(hours=1)).timestamp()
  299. assert _is_token_fresh(old_iat, user) is False
  300. def test_is_token_fresh_accepts_token_after_password_change(self):
  301. """_is_token_fresh returns True when iat is after password_changed_at."""
  302. from backend.app.core.auth import _is_token_fresh
  303. now = datetime.now(timezone.utc)
  304. user = MagicMock()
  305. user.password_changed_at = now - timedelta(hours=1)
  306. recent_iat = now.timestamp()
  307. assert _is_token_fresh(recent_iat, user) is True
  308. def test_is_token_fresh_returns_true_when_no_password_change(self):
  309. """_is_token_fresh returns True when password_changed_at is None (I2 migration not yet run)."""
  310. from backend.app.core.auth import _is_token_fresh
  311. user = MagicMock()
  312. user.password_changed_at = None
  313. assert _is_token_fresh(time.time(), user) is True
  314. @pytest.mark.asyncio
  315. @pytest.mark.integration
  316. async def test_me_endpoint_rejects_token_after_logout(self, async_client: AsyncClient):
  317. """After logout, the bearer token must be rejected by /me (B1 + revocation)."""
  318. token = await _setup_and_login(async_client, "sec_logout_me", "sec_logout_me1")
  319. # Token works before logout
  320. me_resp = await async_client.get(ME_URL, headers=_auth_header(token))
  321. assert me_resp.status_code == 200
  322. # Logout
  323. logout_resp = await async_client.post(LOGOUT_URL, headers=_auth_header(token))
  324. assert logout_resp.status_code == 200
  325. # Token must now be rejected
  326. me_after = await async_client.get(ME_URL, headers=_auth_header(token))
  327. assert me_after.status_code == 401
  328. # ===========================================================================
  329. # Gap 3: OIDC exchange token replay
  330. # ===========================================================================
  331. class TestOIDCExchangeReplay:
  332. """A single-use OIDC exchange token cannot be redeemed twice."""
  333. @pytest.mark.asyncio
  334. @pytest.mark.integration
  335. async def test_exchange_token_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
  336. """The second call to /oidc/exchange with the same token returns 401."""
  337. exchange_token = secrets.token_urlsafe(32)
  338. db_session.add(
  339. AuthEphemeralToken(
  340. token=exchange_token,
  341. token_type="oidc_exchange",
  342. username="oidc_replay_user",
  343. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  344. )
  345. )
  346. await db_session.commit()
  347. # Seed the user so the exchange can resolve it
  348. from backend.app.core.auth import get_password_hash
  349. from backend.app.core.database import async_session, seed_default_groups
  350. async with async_session() as db:
  351. result = await db.execute(__import__("sqlalchemy").select(User).where(User.username == "oidc_replay_user"))
  352. if result.scalar_one_or_none() is None:
  353. db.add(
  354. User(
  355. username="oidc_replay_user",
  356. password_hash=get_password_hash("pw"),
  357. is_active=True,
  358. )
  359. )
  360. await db.commit()
  361. first = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
  362. assert first.status_code == 200
  363. second = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
  364. assert second.status_code == 401
  365. # ===========================================================================
  366. # Gap 4: OIDC email_verified claim handling
  367. # ===========================================================================
  368. class TestOIDCEmailVerified:
  369. """email_verified: False/absent must not link OIDC identity to an existing email."""
  370. @pytest.mark.asyncio
  371. @pytest.mark.integration
  372. async def test_unverified_email_does_not_link_to_existing_user(
  373. self, async_client: AsyncClient, db_session: AsyncSession
  374. ):
  375. """If email_verified is False, the OIDC callback must not auto-link by email."""
  376. private_pem, jwks_data = _make_test_rsa_key()
  377. issuer = "https://idp.evtest.example.com"
  378. client_id = "ev-client"
  379. nonce = secrets.token_urlsafe(16)
  380. now = int(time.time())
  381. id_token = pyjwt.encode(
  382. {
  383. "sub": "ev-sub-new",
  384. "iss": issuer,
  385. "aud": client_id,
  386. "nonce": nonce,
  387. "email": "existing@example.com",
  388. "email_verified": False, # <-- must be ignored
  389. "iat": now,
  390. "exp": now + 300,
  391. },
  392. private_pem,
  393. algorithm="RS256",
  394. headers={"kid": "test-kid-1"},
  395. )
  396. admin_token = await _setup_and_login(async_client, "ev_admin", "ev_admin1")
  397. # Create existing user with the same email (use strong password for validator)
  398. create_user_resp = await async_client.post(
  399. "/api/v1/users",
  400. json={"username": "existing_email_user", "password": "Str0ng!Pass", "email": "existing@example.com"},
  401. headers=_auth_header(admin_token),
  402. )
  403. assert create_user_resp.status_code in (200, 201), create_user_resp.json()
  404. # Create OIDC provider
  405. create_resp = await async_client.post(
  406. "/api/v1/auth/oidc/providers",
  407. json={
  408. "name": "EV-IdP",
  409. "issuer_url": issuer,
  410. "client_id": client_id,
  411. "client_secret": "secret",
  412. "scopes": "openid email",
  413. "is_enabled": True,
  414. "auto_create_users": True,
  415. },
  416. headers=_auth_header(admin_token),
  417. )
  418. assert create_resp.status_code == 201
  419. provider_id = create_resp.json()["id"]
  420. state = secrets.token_urlsafe(32)
  421. code_verifier = secrets.token_urlsafe(48)
  422. db_session.add(
  423. AuthEphemeralToken(
  424. token=state,
  425. token_type="oidc_state",
  426. provider_id=provider_id,
  427. nonce=nonce,
  428. code_verifier=code_verifier,
  429. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  430. )
  431. )
  432. await db_session.commit()
  433. discovery_doc = {
  434. "issuer": issuer,
  435. "authorization_endpoint": f"{issuer}/auth",
  436. "token_endpoint": f"{issuer}/token",
  437. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  438. }
  439. class _MockResp:
  440. def __init__(self, data):
  441. self._data = data
  442. self.status_code = 200
  443. self.is_success = True
  444. self.text = str(data)
  445. def json(self):
  446. return self._data
  447. def raise_for_status(self):
  448. pass
  449. class _MockHttpxClientEV:
  450. def __init__(self, *args, **kwargs):
  451. pass
  452. async def __aenter__(self):
  453. return self
  454. async def __aexit__(self, *_):
  455. pass
  456. async def get(self, url, **kwargs):
  457. if "jwks" in url:
  458. return _MockResp(jwks_data)
  459. return _MockResp(discovery_doc)
  460. async def post(self, url, **kwargs):
  461. return _MockResp({"access_token": "mock", "token_type": "Bearer", "id_token": id_token})
  462. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientEV):
  463. await async_client.get(
  464. f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
  465. follow_redirects=False,
  466. )
  467. # Callback must NOT link to the existing_email_user — a new user is created
  468. # instead (because the email claim was ignored due to email_verified=False).
  469. # Either a new user is provisioned (redirect with oidc_token) or the callback
  470. # fails. In either case, the existing user must not have an OIDC link.
  471. from sqlalchemy import select as sa_select
  472. from backend.app.models.oidc_provider import UserOIDCLink
  473. link_result = await db_session.execute(
  474. sa_select(UserOIDCLink)
  475. .join(User, UserOIDCLink.user_id == User.id)
  476. .where(User.email == "existing@example.com")
  477. )
  478. link = link_result.scalar_one_or_none()
  479. assert link is None, "Existing user must not be auto-linked when email_verified is False"
  480. # ===========================================================================
  481. # Gap 5: Email OTP max-attempts invalidation
  482. # ===========================================================================
  483. class TestEmailOTPMaxAttempts:
  484. """After MAX_ATTEMPTS wrong codes, the OTP is permanently invalidated."""
  485. @pytest.mark.asyncio
  486. @pytest.mark.integration
  487. async def test_email_otp_invalidated_after_max_attempts(self, async_client: AsyncClient, db_session: AsyncSession):
  488. from passlib.context import CryptContext
  489. from sqlalchemy import select as sa_select
  490. from backend.app.models.user_otp_code import UserOTPCode
  491. _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  492. admin_token = await _setup_and_login(async_client, "otp_max_admin", "otp_max_admin1")
  493. # Enable email OTP for admin user
  494. result = await db_session.execute(sa_select(User).where(User.username == "otp_max_admin"))
  495. user = result.scalar_one()
  496. user.email = "otpmax@example.com"
  497. await db_session.commit()
  498. setup_code = "123456"
  499. from backend.app.models.auth_ephemeral import AuthEphemeralToken as AET
  500. setup_token = secrets.token_urlsafe(32)
  501. db_session.add(
  502. AET(
  503. token=setup_token,
  504. token_type="email_otp_setup",
  505. username="otp_max_admin",
  506. nonce=_pwd_ctx.hash(setup_code),
  507. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  508. )
  509. )
  510. await db_session.commit()
  511. await async_client.post(
  512. "/api/v1/auth/2fa/email/enable/confirm",
  513. json={"setup_token": setup_token, "code": setup_code},
  514. headers=_auth_header(admin_token),
  515. )
  516. # Login to get pre_auth_token
  517. login_resp = await async_client.post(
  518. LOGIN_URL, json={"username": "otp_max_admin", "password": "Otp_max_admin1"}
  519. )
  520. pre_auth_token = login_resp.json()["pre_auth_token"]
  521. # Insert an OTP record directly (bypassing SMTP)
  522. real_code = "654321"
  523. otp = UserOTPCode(
  524. user_id=user.id,
  525. code_hash=_pwd_ctx.hash(real_code),
  526. attempts=0,
  527. used=False,
  528. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  529. )
  530. db_session.add(otp)
  531. await db_session.commit()
  532. # Submit MAX_ATTEMPTS wrong codes
  533. from backend.app.api.routes.mfa import MAX_2FA_ATTEMPTS
  534. for _ in range(MAX_2FA_ATTEMPTS):
  535. r = await async_client.post(
  536. "/api/v1/auth/2fa/verify",
  537. json={"pre_auth_token": pre_auth_token, "code": "000000", "method": "email"},
  538. )
  539. # Each attempt must fail with 401
  540. assert r.status_code == 401
  541. # After max attempts, the correct code is also rejected (either OTP
  542. # invalidated → 401, or rate limit hit → 429). Either means locked out.
  543. final = await async_client.post(
  544. "/api/v1/auth/2fa/verify",
  545. json={"pre_auth_token": pre_auth_token, "code": real_code, "method": "email"},
  546. )
  547. assert final.status_code in (401, 429), f"Expected lockout, got {final.status_code}: {final.json()}"
  548. # ===========================================================================
  549. # Gap 6: OIDC callback SSRF protection — invalid authorization_endpoint scheme
  550. # ===========================================================================
  551. class TestOIDCSSRFProtection:
  552. """authorization_endpoint with non-http(s) scheme must be rejected."""
  553. @pytest.mark.asyncio
  554. @pytest.mark.integration
  555. async def test_invalid_authorization_endpoint_scheme_rejected(
  556. self, async_client: AsyncClient, db_session: AsyncSession
  557. ):
  558. issuer = "https://idp.ssrf.example.com"
  559. client_id = "ssrf-client"
  560. admin_token = await _setup_and_login(async_client, "ssrf_admin", "ssrf_admin1")
  561. create_resp = await async_client.post(
  562. "/api/v1/auth/oidc/providers",
  563. json={
  564. "name": "SSRF-IdP",
  565. "issuer_url": issuer,
  566. "client_id": client_id,
  567. "client_secret": "secret",
  568. "scopes": "openid",
  569. "is_enabled": True,
  570. "auto_create_users": False,
  571. },
  572. headers=_auth_header(admin_token),
  573. )
  574. assert create_resp.status_code == 201
  575. provider_id = create_resp.json()["id"]
  576. # Discovery doc returns a javascript: authorization_endpoint
  577. malicious_discovery = {
  578. "issuer": issuer,
  579. "authorization_endpoint": "javascript:alert(1)", # <-- malicious
  580. "token_endpoint": f"{issuer}/token",
  581. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  582. }
  583. class _MockResp:
  584. def __init__(self, data):
  585. self._data = data
  586. self.status_code = 200
  587. self.is_success = True
  588. self.text = str(data)
  589. def json(self):
  590. return self._data
  591. def raise_for_status(self):
  592. pass
  593. class _MockHttpxClientSSRF:
  594. def __init__(self, *args, **kwargs):
  595. pass
  596. async def __aenter__(self):
  597. return self
  598. async def __aexit__(self, *_):
  599. pass
  600. async def get(self, url, **kwargs):
  601. return _MockResp(malicious_discovery)
  602. async def post(self, url, **kwargs):
  603. return _MockResp({})
  604. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientSSRF):
  605. # oidc_authorize uses a path parameter, not query param
  606. authorize_resp = await async_client.get(
  607. f"/api/v1/auth/oidc/authorize/{provider_id}",
  608. follow_redirects=False,
  609. )
  610. # Must be rejected with 502 — B2 guard rejects invalid authorization_endpoint scheme
  611. assert authorize_resp.status_code == 502, authorize_resp.json()
  612. detail = authorize_resp.json().get("detail", "").lower()
  613. assert "authorization_endpoint" in detail or "invalid" in detail
  614. # ===========================================================================
  615. # Gap 7: Login rate limiting
  616. # ===========================================================================
  617. class TestLoginRateLimiting:
  618. """10+ failed logins for the same username must return 429."""
  619. @pytest.mark.asyncio
  620. @pytest.mark.integration
  621. async def test_excessive_failed_logins_return_429(self, async_client: AsyncClient):
  622. from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
  623. # Setup auth but do NOT log in
  624. await async_client.post(
  625. AUTH_SETUP_URL,
  626. json={"auth_enabled": True, "admin_username": "ratelimit_user", "admin_password": "Ratelimit_pw1"},
  627. )
  628. status_codes = []
  629. for _ in range(MAX_LOGIN_ATTEMPTS + 2):
  630. resp = await async_client.post(
  631. LOGIN_URL,
  632. json={"username": "ratelimit_user", "password": "wrong_password"},
  633. )
  634. status_codes.append(resp.status_code)
  635. # The last attempts must be 429 (Too Many Requests)
  636. assert status_codes[-1] == 429, f"Expected 429 after {MAX_LOGIN_ATTEMPTS} failures, got: {status_codes}"
  637. # ===========================================================================
  638. # Gap 8: challenge_id cookie binding
  639. # ===========================================================================
  640. class TestChallengeIdCookieBinding:
  641. """A pre-auth token stolen from session A cannot be used from session B."""
  642. @pytest.mark.asyncio
  643. @pytest.mark.integration
  644. async def test_pre_auth_token_rejected_without_matching_cookie(
  645. self, async_client: AsyncClient, db_session: AsyncSession
  646. ):
  647. import pyotp
  648. from passlib.context import CryptContext
  649. _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  650. # Set up user with TOTP
  651. await _setup_and_login(async_client, "cookie_bind_user", "cookie_bind_pw1")
  652. secret = pyotp.random_base32()
  653. totp_obj = pyotp.TOTP(secret)
  654. from sqlalchemy import select as sa_select
  655. from backend.app.models.user_totp import UserTOTP
  656. result = await db_session.execute(sa_select(User).where(User.username == "cookie_bind_user"))
  657. user = result.scalar_one()
  658. db_session.add(UserTOTP(user_id=user.id, secret=secret, is_enabled=True))
  659. await db_session.commit()
  660. # Login from "session A" — gets a pre_auth_token and a 2fa_challenge cookie
  661. login_resp = await async_client.post(
  662. LOGIN_URL, json={"username": "cookie_bind_user", "password": "Cookie_bind_pw1"}
  663. )
  664. assert login_resp.status_code == 200
  665. assert login_resp.json()["requires_2fa"] is True
  666. pre_auth_token = login_resp.json()["pre_auth_token"]
  667. # The async_client jar now holds the 2fa_challenge cookie for session A
  668. # Simulate session B by creating a new client WITHOUT the cookie
  669. from httpx import ASGITransport, AsyncClient as FreshClient
  670. from backend.app.main import app
  671. async with FreshClient(transport=ASGITransport(app=app), base_url="http://test") as session_b:
  672. # Attempt to use session A's pre_auth_token from session B (no cookie)
  673. verify_resp = await session_b.post(
  674. "/api/v1/auth/2fa/verify",
  675. json={
  676. "pre_auth_token": pre_auth_token,
  677. "code": totp_obj.now(),
  678. "method": "totp",
  679. },
  680. )
  681. # Must be rejected — pre_auth_token is bound to session A's cookie
  682. assert verify_resp.status_code == 401, (
  683. f"Expected 401 for token replay from cookieless session, got {verify_resp.status_code}: "
  684. f"{verify_resp.json()}"
  685. )
  686. # ===========================================================================
  687. # C2: Security-header middleware
  688. # ===========================================================================
  689. class TestSecurityHeaders:
  690. """Every HTTP response must include standard security headers (C2)."""
  691. @pytest.mark.asyncio
  692. @pytest.mark.integration
  693. async def test_security_headers_present(self, async_client: AsyncClient):
  694. """GET /api/v1/auth/me (unauthenticated → 401) still carries security headers."""
  695. resp = await async_client.get(ME_URL)
  696. assert resp.status_code == 401 # sanity — no auth token
  697. assert resp.headers.get("x-content-type-options") == "nosniff"
  698. assert resp.headers.get("x-frame-options") == "SAMEORIGIN"
  699. assert resp.headers.get("referrer-policy") == "strict-origin-when-cross-origin"
  700. csp = resp.headers.get("content-security-policy", "")
  701. assert "default-src 'self'" in csp
  702. assert "script-src 'self'" in csp
  703. assert "frame-ancestors 'none'" in csp
  704. assert "object-src 'none'" in csp
  705. @pytest.mark.asyncio
  706. @pytest.mark.integration
  707. async def test_hsts_absent_for_http(self, async_client: AsyncClient):
  708. """HSTS must NOT be set over plain HTTP (test transport uses http)."""
  709. resp = await async_client.get(ME_URL)
  710. assert "strict-transport-security" not in resp.headers
  711. # ===========================================================================
  712. # I3: Rate-limit bucket interaction — IP spray vs. username spray
  713. # ===========================================================================
  714. class TestRateLimitBuckets:
  715. """IP-spray and username-spray must each trip the correct independent bucket."""
  716. @pytest.mark.asyncio
  717. @pytest.mark.integration
  718. async def test_ip_spray_trips_ip_bucket(self, async_client: AsyncClient):
  719. """20 failed logins from one IP across 20 different usernames trips the IP bucket.
  720. Each per-username bucket only has 1 failure (well below MAX_LOGIN_ATTEMPTS=10),
  721. so the username bucket is never the reason for the 429.
  722. """
  723. from unittest.mock import patch as _patch
  724. unique_ip = "10.99.1.1"
  725. # Ensure auth is enabled
  726. await async_client.post(
  727. AUTH_SETUP_URL,
  728. json={"auth_enabled": True, "admin_username": "spray_ip_admin", "admin_password": "SprayIp_admin1"},
  729. )
  730. status_codes: list[int] = []
  731. with _patch("backend.app.api.routes.auth._get_client_ip", return_value=unique_ip):
  732. for i in range(22):
  733. resp = await async_client.post(
  734. LOGIN_URL,
  735. json={"username": f"spray_ip_victim_{i}", "password": "wrong"},
  736. )
  737. status_codes.append(resp.status_code)
  738. # The first 20 attempts fail with 401; the 21st+ must be 429 (IP bucket full)
  739. assert status_codes[-1] == 429, f"Expected 429 after 20 IP-spray failures, got: {status_codes}"
  740. # No single username saw more than one attempt → username buckets not tripped
  741. non_429 = [c for c in status_codes[:-2] if c == 429]
  742. assert not non_429, f"Username bucket triggered early: {status_codes}"
  743. @pytest.mark.asyncio
  744. @pytest.mark.integration
  745. async def test_username_spray_trips_username_bucket(self, async_client: AsyncClient):
  746. """One username targeted from 10+ different IPs trips the username bucket.
  747. Each per-IP bucket only sees 1 failure, so no IP bucket is tripped.
  748. The username bucket (max 10) is what fires the 429.
  749. """
  750. from unittest.mock import patch as _patch
  751. from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
  752. # Ensure auth is enabled
  753. await async_client.post(
  754. AUTH_SETUP_URL,
  755. json={
  756. "auth_enabled": True,
  757. "admin_username": "spray_uname_admin",
  758. "admin_password": "SprayUname_admin1",
  759. },
  760. )
  761. target_username = "spray_uname_victim"
  762. status_codes: list[int] = []
  763. for i in range(MAX_LOGIN_ATTEMPTS + 2):
  764. rotating_ip = f"10.99.2.{i + 1}"
  765. with _patch("backend.app.api.routes.auth._get_client_ip", return_value=rotating_ip):
  766. resp = await async_client.post(
  767. LOGIN_URL,
  768. json={"username": target_username, "password": "wrong"},
  769. )
  770. status_codes.append(resp.status_code)
  771. # After MAX_LOGIN_ATTEMPTS failures for same username the bucket fires
  772. assert status_codes[-1] == 429, (
  773. f"Expected 429 after {MAX_LOGIN_ATTEMPTS} username-spray failures, got: {status_codes}"
  774. )
  775. # ============================================================================
  776. # TestEncryptLegacyMigration
  777. # ============================================================================
  778. class TestEncryptLegacyMigration:
  779. """Re-encryption migration of legacy plaintext OIDC + TOTP rows.
  780. The migration runs against its own ``async_session`` factory (not the
  781. ``db_session`` fixture) so each test patches the module-level factory to
  782. point at the test-engine before invoking the helper. ``db_session`` is
  783. used to seed and to verify state via the same engine.
  784. """
  785. @staticmethod
  786. def _patch_module_session(monkeypatch, db_session):
  787. """Bind ``database.async_session`` to the test engine for one test."""
  788. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
  789. from backend.app.core import database as db_mod
  790. test_factory = async_sessionmaker(db_session.bind, class_=AsyncSession, expire_on_commit=False)
  791. monkeypatch.setattr(db_mod, "async_session", test_factory)
  792. @staticmethod
  793. def _set_active_key(monkeypatch):
  794. """Configure a valid Fernet key for the migration to use."""
  795. from cryptography.fernet import Fernet
  796. import backend.app.core.encryption as enc_mod
  797. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  798. enc_mod._fernet_instance = None
  799. @pytest.mark.asyncio
  800. @pytest.mark.integration
  801. async def test_migration_encrypts_plaintext_oidc_secret(self, db_session, monkeypatch):
  802. from sqlalchemy import select
  803. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  804. from backend.app.models.oidc_provider import OIDCProvider
  805. self._patch_module_session(monkeypatch, db_session)
  806. self._set_active_key(monkeypatch)
  807. provider = OIDCProvider(
  808. name="LegacyProv",
  809. issuer_url="https://legacy.example.com",
  810. client_id="cid",
  811. _client_secret_enc="legacy-plaintext",
  812. scopes="openid email profile",
  813. is_enabled=True,
  814. )
  815. db_session.add(provider)
  816. await db_session.commit()
  817. await _migrate_encrypt_legacy_secrets()
  818. # Re-fetch on a fresh row state
  819. await db_session.refresh(provider)
  820. assert provider._client_secret_enc.startswith("fernet:")
  821. # Decrypted value matches the original plaintext
  822. assert provider.client_secret == "legacy-plaintext"
  823. # Sanity: a SELECT also sees the encrypted value
  824. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == provider.id))
  825. fetched = result.scalar_one()
  826. assert fetched._client_secret_enc.startswith("fernet:")
  827. @pytest.mark.asyncio
  828. @pytest.mark.integration
  829. async def test_migration_skips_already_encrypted_rows(self, db_session, monkeypatch):
  830. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  831. from backend.app.models.oidc_provider import OIDCProvider
  832. self._patch_module_session(monkeypatch, db_session)
  833. self._set_active_key(monkeypatch)
  834. # Use the property setter so the value is encrypted up front.
  835. provider = OIDCProvider(
  836. name="EncProv",
  837. issuer_url="https://enc.example.com",
  838. client_id="cid",
  839. client_secret="already-encrypted",
  840. scopes="openid email profile",
  841. is_enabled=True,
  842. )
  843. db_session.add(provider)
  844. await db_session.commit()
  845. original_enc = provider._client_secret_enc
  846. await _migrate_encrypt_legacy_secrets()
  847. await _migrate_encrypt_legacy_secrets() # idempotent
  848. await db_session.refresh(provider)
  849. # Value unchanged across two migration runs (still the same ciphertext).
  850. assert provider._client_secret_enc == original_enc
  851. @pytest.mark.asyncio
  852. @pytest.mark.integration
  853. async def test_migration_no_op_when_key_unset(self, db_session, monkeypatch):
  854. import backend.app.core.encryption as enc_mod
  855. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  856. from backend.app.models.oidc_provider import OIDCProvider
  857. self._patch_module_session(monkeypatch, db_session)
  858. # Force "no key" branch
  859. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  860. enc_mod._fernet_instance = None
  861. provider = OIDCProvider(
  862. name="NoKeyProv",
  863. issuer_url="https://nokey.example.com",
  864. client_id="cid",
  865. _client_secret_enc="still-plaintext",
  866. scopes="openid email profile",
  867. is_enabled=True,
  868. )
  869. db_session.add(provider)
  870. await db_session.commit()
  871. await _migrate_encrypt_legacy_secrets()
  872. await db_session.refresh(provider)
  873. # Migration should have early-returned; plaintext untouched.
  874. assert provider._client_secret_enc == "still-plaintext"
  875. @pytest.mark.asyncio
  876. @pytest.mark.integration
  877. async def test_migration_handles_mixed_state(self, db_session, monkeypatch):
  878. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  879. from backend.app.models.oidc_provider import OIDCProvider
  880. self._patch_module_session(monkeypatch, db_session)
  881. self._set_active_key(monkeypatch)
  882. legacy = OIDCProvider(
  883. name="LegacyMix",
  884. issuer_url="https://l.example.com",
  885. client_id="c1",
  886. _client_secret_enc="plain-mix",
  887. scopes="openid email profile",
  888. )
  889. encrypted = OIDCProvider(
  890. name="EncMix",
  891. issuer_url="https://e.example.com",
  892. client_id="c2",
  893. client_secret="encrypted-mix", # uses setter
  894. scopes="openid email profile",
  895. )
  896. db_session.add_all([legacy, encrypted])
  897. await db_session.commit()
  898. original_encrypted = encrypted._client_secret_enc
  899. await _migrate_encrypt_legacy_secrets()
  900. await db_session.refresh(legacy)
  901. await db_session.refresh(encrypted)
  902. assert legacy._client_secret_enc.startswith("fernet:")
  903. assert legacy.client_secret == "plain-mix"
  904. assert encrypted._client_secret_enc == original_encrypted
  905. @pytest.mark.asyncio
  906. @pytest.mark.integration
  907. async def test_migration_encrypts_plaintext_totp_secret(self, db_session, monkeypatch):
  908. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  909. from backend.app.models.user import User
  910. from backend.app.models.user_totp import UserTOTP
  911. self._patch_module_session(monkeypatch, db_session)
  912. self._set_active_key(monkeypatch)
  913. user = User(username="totpuser1219", email="t@example.com", password_hash="x")
  914. db_session.add(user)
  915. await db_session.flush()
  916. totp = UserTOTP(user_id=user.id, _secret_enc="JBSWY3DPEHPK3PXP", is_enabled=True)
  917. db_session.add(totp)
  918. await db_session.commit()
  919. await _migrate_encrypt_legacy_secrets()
  920. await db_session.refresh(totp)
  921. assert totp._secret_enc.startswith("fernet:")
  922. assert totp.secret == "JBSWY3DPEHPK3PXP"
  923. @pytest.mark.asyncio
  924. @pytest.mark.integration
  925. async def test_migration_logs_count_of_rows_re_encrypted(self, db_session, monkeypatch, caplog):
  926. import logging
  927. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  928. from backend.app.models.oidc_provider import OIDCProvider
  929. from backend.app.models.user import User
  930. from backend.app.models.user_totp import UserTOTP
  931. self._patch_module_session(monkeypatch, db_session)
  932. self._set_active_key(monkeypatch)
  933. provider = OIDCProvider(
  934. name="LegacyLog",
  935. issuer_url="https://log.example.com",
  936. client_id="c",
  937. _client_secret_enc="p",
  938. scopes="openid email profile",
  939. )
  940. user = User(username="logger1219", email="l@example.com", password_hash="x")
  941. db_session.add_all([provider, user])
  942. await db_session.flush()
  943. totp = UserTOTP(user_id=user.id, _secret_enc="JBSWY3DPEHPK3PXP", is_enabled=True)
  944. db_session.add(totp)
  945. await db_session.commit()
  946. with caplog.at_level(logging.INFO, logger="backend.app.core.database"):
  947. await _migrate_encrypt_legacy_secrets()
  948. # The migration logs once with both counts.
  949. assert any(
  950. "Re-encrypted legacy plaintext secrets" in rec.message
  951. and "1 OIDC client_secret(s)" in rec.message
  952. and "1 TOTP secret(s)" in rec.message
  953. for rec in caplog.records
  954. )
  955. @pytest.mark.asyncio
  956. @pytest.mark.integration
  957. async def test_migration_continues_on_row_error(self, db_session, monkeypatch, caplog):
  958. """B2: per-row commit semantics — when one row fails to re-encrypt,
  959. OTHER successfully-encrypted rows must remain committed and the
  960. failure surfaces via get_migration_error_count.
  961. Replaces the previous "rollback all" behaviour: a single poison row
  962. used to block every successful re-encryption on every startup forever.
  963. """
  964. import logging
  965. import backend.app.core.encryption as enc_mod # noqa: F401
  966. from backend.app.core.database import (
  967. _migrate_encrypt_legacy_secrets,
  968. get_migration_error_count,
  969. )
  970. from backend.app.models.oidc_provider import OIDCProvider
  971. self._patch_module_session(monkeypatch, db_session)
  972. self._set_active_key(monkeypatch)
  973. good = OIDCProvider(
  974. name="GoodRow",
  975. issuer_url="https://good.example.com",
  976. client_id="c1",
  977. _client_secret_enc="plaintext-good",
  978. scopes="openid email profile",
  979. )
  980. bad = OIDCProvider(
  981. name="BadRow",
  982. issuer_url="https://bad.example.com",
  983. client_id="c2",
  984. _client_secret_enc="plaintext-bad",
  985. scopes="openid email profile",
  986. )
  987. db_session.add_all([good, bad])
  988. await db_session.commit()
  989. original_bad = bad._client_secret_enc
  990. # Force the setter on the SECOND row to raise — patch at the model's
  991. # import location so the property setter picks up the patched function.
  992. import backend.app.models.oidc_provider as oidc_mod
  993. real_encrypt = oidc_mod.mfa_encrypt
  994. call_count = [0]
  995. def _sometimes_raise(value):
  996. call_count[0] += 1
  997. if call_count[0] == 2:
  998. raise RuntimeError("simulated encrypt failure")
  999. return real_encrypt(value)
  1000. monkeypatch.setattr(oidc_mod, "mfa_encrypt", _sometimes_raise)
  1001. with caplog.at_level(logging.ERROR, logger="backend.app.core.database"):
  1002. await _migrate_encrypt_legacy_secrets()
  1003. # B2: per-row commit — good IS encrypted, bad is unchanged.
  1004. await db_session.refresh(good)
  1005. await db_session.refresh(bad)
  1006. assert good._client_secret_enc.startswith("fernet:"), (
  1007. "good row must be successfully re-encrypted (per-row commit)"
  1008. )
  1009. assert bad._client_secret_enc == original_bad, "bad row must remain unchanged (savepoint-style isolation)"
  1010. assert get_migration_error_count() == 1, "the skipped row must be exposed via get_migration_error_count"
  1011. assert any("skipping" in rec.message.lower() for rec in caplog.records)
  1012. @pytest.mark.asyncio
  1013. @pytest.mark.integration
  1014. async def test_migration_logs_no_op_when_all_encrypted(self, db_session, monkeypatch, caplog):
  1015. """A2: when all rows are already encrypted, migration logs a debug no-op."""
  1016. import logging
  1017. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  1018. from backend.app.models.oidc_provider import OIDCProvider
  1019. self._patch_module_session(monkeypatch, db_session)
  1020. self._set_active_key(monkeypatch)
  1021. provider = OIDCProvider(
  1022. name="AlreadyEnc",
  1023. issuer_url="https://ae.example.com",
  1024. client_id="cae",
  1025. client_secret="already-encrypted",
  1026. scopes="openid email profile",
  1027. )
  1028. db_session.add(provider)
  1029. await db_session.commit()
  1030. with caplog.at_level(logging.DEBUG, logger="backend.app.core.database"):
  1031. await _migrate_encrypt_legacy_secrets()
  1032. assert any("no rows needed re-encryption" in rec.message for rec in caplog.records)
  1033. @pytest.mark.asyncio
  1034. @pytest.mark.integration
  1035. async def test_init_db_propagates_unexpected_migration_error(self, monkeypatch):
  1036. """B3: an unexpected error from _migrate_encrypt_legacy_secrets must
  1037. surface (re-raise) instead of being silently swallowed.
  1038. Pins the contract introduced for B3: a startup-fatal error like a
  1039. session-creation failure must fail the lifespan / CLI / restore
  1040. handler explicitly, never run the app with half-migrated rows.
  1041. Implementation note: we patch _migrate_encrypt_legacy_secrets itself
  1042. rather than poking the inner read phase, because that is the contract
  1043. boundary the rest of the codebase relies on (init_db -> migration).
  1044. """
  1045. import backend.app.core.database as db_mod
  1046. async def boom():
  1047. raise RuntimeError("simulated startup-fatal failure")
  1048. # Stub out the rest of init_db so we exercise only the migration step.
  1049. # init_db opens the engine.begin() block, runs metadata.create_all,
  1050. # run_migrations, then awaits _migrate_encrypt_legacy_secrets — the
  1051. # only call we want to fail.
  1052. monkeypatch.setattr(db_mod, "_migrate_encrypt_legacy_secrets", boom)
  1053. monkeypatch.setattr(db_mod, "seed_notification_templates", lambda: _noop_async())
  1054. monkeypatch.setattr(db_mod, "seed_default_groups", lambda: _noop_async())
  1055. monkeypatch.setattr(db_mod, "seed_spool_catalog", lambda: _noop_async())
  1056. monkeypatch.setattr(db_mod, "seed_color_catalog", lambda: _noop_async())
  1057. with pytest.raises(RuntimeError, match="simulated startup-fatal failure"):
  1058. await db_mod.init_db()
  1059. async def _noop_async():
  1060. """Helper for tests that need to stub out `seed_*` async coroutines."""
  1061. return None
  1062. # ============================================================================
  1063. # TestEncryptionStatusEndpoint
  1064. # ============================================================================
  1065. class TestEncryptionStatusEndpoint:
  1066. """GET /api/v1/auth/encryption-status: key source, counts, decryption_broken."""
  1067. STATUS_URL = "/api/v1/auth/encryption-status"
  1068. async def _create_admin_and_login(self, async_client: AsyncClient) -> str:
  1069. """Bootstrap auth + return a Bearer token for an admin."""
  1070. await async_client.post(
  1071. "/api/v1/auth/setup",
  1072. json={
  1073. "auth_enabled": True,
  1074. "admin_username": "admin1219",
  1075. "admin_password": "Admin1219!Pass",
  1076. },
  1077. )
  1078. login = await async_client.post(
  1079. "/api/v1/auth/login",
  1080. json={"username": "admin1219", "password": "Admin1219!Pass"},
  1081. )
  1082. assert login.status_code == 200, login.text
  1083. return login.json()["access_token"]
  1084. @pytest.mark.asyncio
  1085. @pytest.mark.integration
  1086. async def test_status_reports_env_source(self, async_client, monkeypatch):
  1087. from cryptography.fernet import Fernet
  1088. import backend.app.core.encryption as enc_mod
  1089. token = await self._create_admin_and_login(async_client)
  1090. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1091. enc_mod._fernet_instance = None
  1092. enc_mod._key_source = None
  1093. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1094. assert resp.status_code == 200
  1095. data = resp.json()
  1096. assert data["key_configured"] is True
  1097. assert data["key_source"] == "env"
  1098. assert data["decryption_broken"] is False
  1099. @pytest.mark.asyncio
  1100. @pytest.mark.integration
  1101. async def test_status_reports_file_source(self, async_client, monkeypatch, tmp_path):
  1102. from cryptography.fernet import Fernet
  1103. import backend.app.core.encryption as enc_mod
  1104. token = await self._create_admin_and_login(async_client)
  1105. # Pre-place a valid key file in DATA_DIR.
  1106. key_file = tmp_path / ".mfa_encryption_key"
  1107. key_file.write_text(Fernet.generate_key().decode())
  1108. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1109. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1110. enc_mod._fernet_instance = None
  1111. enc_mod._key_source = None
  1112. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1113. assert resp.status_code == 200
  1114. data = resp.json()
  1115. assert data["key_source"] == "file"
  1116. @pytest.mark.asyncio
  1117. @pytest.mark.integration
  1118. async def test_status_reports_generated_source(self, async_client, monkeypatch, tmp_path):
  1119. import backend.app.core.encryption as enc_mod
  1120. token = await self._create_admin_and_login(async_client)
  1121. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1122. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1123. enc_mod._fernet_instance = None
  1124. enc_mod._key_source = None
  1125. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1126. assert resp.status_code == 200
  1127. data = resp.json()
  1128. assert data["key_source"] == "generated"
  1129. assert (tmp_path / ".mfa_encryption_key").exists()
  1130. @pytest.mark.asyncio
  1131. @pytest.mark.integration
  1132. async def test_status_reports_none_source(self, async_client, monkeypatch):
  1133. import backend.app.core.encryption as enc_mod
  1134. token = await self._create_admin_and_login(async_client)
  1135. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1136. enc_mod._fernet_instance = None
  1137. enc_mod._key_source = None
  1138. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1139. assert resp.status_code == 200
  1140. data = resp.json()
  1141. assert data["key_configured"] is False
  1142. assert data["key_source"] == "none"
  1143. @pytest.mark.asyncio
  1144. @pytest.mark.integration
  1145. async def test_status_counts_legacy_rows(self, async_client, db_session, monkeypatch):
  1146. from backend.app.models.oidc_provider import OIDCProvider
  1147. token = await self._create_admin_and_login(async_client)
  1148. provider = OIDCProvider(
  1149. name="LegacyStatus",
  1150. issuer_url="https://ls.example.com",
  1151. client_id="c",
  1152. _client_secret_enc="plaintext-no-prefix",
  1153. scopes="openid email profile",
  1154. )
  1155. db_session.add(provider)
  1156. await db_session.commit()
  1157. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1158. assert resp.status_code == 200
  1159. data = resp.json()
  1160. assert data["legacy_plaintext_rows"]["oidc_providers"] >= 1
  1161. @pytest.mark.asyncio
  1162. @pytest.mark.integration
  1163. async def test_status_counts_encrypted_rows(self, async_client, db_session, monkeypatch):
  1164. from cryptography.fernet import Fernet
  1165. import backend.app.core.encryption as enc_mod
  1166. from backend.app.models.oidc_provider import OIDCProvider
  1167. token = await self._create_admin_and_login(async_client)
  1168. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1169. enc_mod._fernet_instance = None
  1170. enc_mod._key_source = None
  1171. provider = OIDCProvider(
  1172. name="EncStatus",
  1173. issuer_url="https://es.example.com",
  1174. client_id="c",
  1175. client_secret="real-secret", # via setter → encrypted
  1176. scopes="openid email profile",
  1177. )
  1178. db_session.add(provider)
  1179. await db_session.commit()
  1180. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1181. assert resp.status_code == 200
  1182. data = resp.json()
  1183. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1184. @pytest.mark.asyncio
  1185. @pytest.mark.integration
  1186. async def test_status_warns_on_encrypted_rows_without_key(self, async_client, db_session, monkeypatch):
  1187. """Gap 2: encrypted rows present but no key loadable → decryption_broken=true."""
  1188. import backend.app.core.encryption as enc_mod
  1189. from backend.app.models.oidc_provider import OIDCProvider
  1190. token = await self._create_admin_and_login(async_client)
  1191. # Insert a row whose value is already prefixed (simulates a previously-encrypted row).
  1192. provider = OIDCProvider(
  1193. name="BrokenEnc",
  1194. issuer_url="https://be.example.com",
  1195. client_id="c",
  1196. _client_secret_enc="fernet:gAAAAA-fake-but-prefixed",
  1197. scopes="openid email profile",
  1198. )
  1199. db_session.add(provider)
  1200. await db_session.commit()
  1201. # Now disable key loading so decryption is impossible.
  1202. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1203. enc_mod._fernet_instance = None
  1204. enc_mod._key_source = None
  1205. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1206. assert resp.status_code == 200
  1207. data = resp.json()
  1208. assert data["key_configured"] is False
  1209. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1210. assert data["decryption_broken"] is True
  1211. @pytest.mark.asyncio
  1212. @pytest.mark.integration
  1213. async def test_status_requires_settings_read_permission(self, async_client, db_session):
  1214. """Non-admin without settings:read permission gets 403."""
  1215. from backend.app.models.user import User
  1216. await self._create_admin_and_login(async_client)
  1217. # Create a low-privilege user (no group → no permissions in default seed).
  1218. from backend.app.core.auth import get_password_hash
  1219. viewer = User(
  1220. username="viewer1219",
  1221. email="viewer1219@example.com",
  1222. password_hash=get_password_hash("Viewer1219!Pass"),
  1223. role="user",
  1224. is_active=True,
  1225. )
  1226. db_session.add(viewer)
  1227. await db_session.commit()
  1228. login = await async_client.post(
  1229. "/api/v1/auth/login",
  1230. json={"username": "viewer1219", "password": "Viewer1219!Pass"},
  1231. )
  1232. assert login.status_code == 200, login.text
  1233. token = login.json().get("access_token")
  1234. assert token is not None, f"Expected access_token in login response, got: {login.json()}"
  1235. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1236. assert resp.status_code == 403
  1237. @pytest.mark.asyncio
  1238. @pytest.mark.integration
  1239. async def test_status_returns_500_on_db_error(self, async_client, monkeypatch):
  1240. """A8: SQLAlchemyError during count queries → 500 with static message."""
  1241. from unittest.mock import AsyncMock
  1242. from sqlalchemy.exc import SQLAlchemyError
  1243. token = await self._create_admin_and_login(async_client)
  1244. async def _raise(*args, **kwargs):
  1245. raise SQLAlchemyError("simulated DB failure")
  1246. monkeypatch.setattr("sqlalchemy.ext.asyncio.AsyncSession.execute", AsyncMock(side_effect=_raise))
  1247. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1248. assert resp.status_code == 500
  1249. assert "encryption status" in resp.json().get("detail", "").lower()
  1250. @pytest.mark.asyncio
  1251. @pytest.mark.integration
  1252. async def test_status_returns_403_for_viewer_in_viewers_group(self, async_client, db_session):
  1253. """S2: a user in the Viewers group (has SETTINGS_READ but NOT SETTINGS_UPDATE)
  1254. must get 403 — encryption-status is admin/operator only.
  1255. """
  1256. from sqlalchemy import insert, select
  1257. from backend.app.core.auth import get_password_hash
  1258. from backend.app.models.group import Group, user_groups
  1259. from backend.app.models.user import User
  1260. # Bootstrap auth (creates default groups via setup endpoint).
  1261. await self._create_admin_and_login(async_client)
  1262. # Create a user explicitly in the Viewers group — it has SETTINGS_READ
  1263. # but not SETTINGS_UPDATE, which is the discriminator for S2.
  1264. viewer = User(
  1265. username="viewer_s2",
  1266. email="viewer_s2@example.com",
  1267. password_hash=get_password_hash("ViewerS2!Pass1"),
  1268. role="user",
  1269. is_active=True,
  1270. )
  1271. db_session.add(viewer)
  1272. await db_session.flush()
  1273. viewers_group = (await db_session.execute(select(Group).where(Group.name == "Viewers"))).scalar_one_or_none()
  1274. assert viewers_group is not None, "Viewers group must be seeded by setup"
  1275. # Insert the association row directly to avoid touching the lazy
  1276. # `viewer.groups` relationship (which would trigger an implicit
  1277. # IO inside an active async transaction and fail with MissingGreenlet).
  1278. await db_session.execute(insert(user_groups).values(user_id=viewer.id, group_id=viewers_group.id))
  1279. await db_session.commit()
  1280. login = await async_client.post(
  1281. "/api/v1/auth/login",
  1282. json={"username": "viewer_s2", "password": "ViewerS2!Pass1"},
  1283. )
  1284. assert login.status_code == 200, login.text
  1285. token = login.json()["access_token"]
  1286. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1287. assert resp.status_code == 403, "S2: Viewers (SETTINGS_READ only) must NOT be able to read encryption-status"
  1288. @pytest.mark.asyncio
  1289. @pytest.mark.integration
  1290. async def test_status_decryption_broken_when_wrong_key_active(self, async_client, db_session, monkeypatch):
  1291. """B4: key is configured but cannot decrypt existing rows → decryption_broken=True.
  1292. This is the "wrong key" state that the legacy computed_field check
  1293. missed — operator pasted a different valid Fernet key (rotation,
  1294. cross-deployment restore, env override). Status used to show GREEN
  1295. while every encrypted row was unrecoverable.
  1296. """
  1297. from cryptography.fernet import Fernet
  1298. import backend.app.core.encryption as enc_mod
  1299. from backend.app.models.oidc_provider import OIDCProvider
  1300. token = await self._create_admin_and_login(async_client)
  1301. # Insert a row whose value is fernet-prefixed but encrypted under a
  1302. # DIFFERENT key (the prefix matches, but decrypt will throw).
  1303. provider = OIDCProvider(
  1304. name="WrongKeyEnc",
  1305. issuer_url="https://wk.example.com",
  1306. client_id="c",
  1307. _client_secret_enc=("fernet:" + Fernet(Fernet.generate_key()).encrypt(b"original").decode()),
  1308. scopes="openid email profile",
  1309. )
  1310. db_session.add(provider)
  1311. await db_session.commit()
  1312. # Now activate a DIFFERENT key — sample-decrypt must fail.
  1313. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1314. enc_mod._fernet_instance = None
  1315. enc_mod._key_source = None
  1316. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1317. assert resp.status_code == 200, resp.text
  1318. data = resp.json()
  1319. assert data["key_configured"] is True, "different key is still 'configured'"
  1320. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1321. assert data["decryption_broken"] is True, "B4: sample-decrypt must detect wrong-key state"
  1322. @pytest.mark.asyncio
  1323. @pytest.mark.integration
  1324. async def test_status_decryption_broken_with_only_totp_rows(self, async_client, db_session, monkeypatch):
  1325. """B4: the sample-decrypt fallback to UserTOTP fires when there are no
  1326. encrypted OIDC rows but TOTP rows exist. The OIDC-only test above
  1327. proves the primary path; this pins the second branch in the same
  1328. try-block so a future refactor of the row-source switch can't silently
  1329. regress wrong-key detection for TOTP-only deployments.
  1330. """
  1331. from cryptography.fernet import Fernet
  1332. from sqlalchemy import select
  1333. import backend.app.core.encryption as enc_mod
  1334. from backend.app.models.user import User
  1335. from backend.app.models.user_totp import UserTOTP
  1336. token = await self._create_admin_and_login(async_client)
  1337. # Look up the admin user created by login so we can attach a TOTP row.
  1338. admin_row = await db_session.execute(select(User).where(User.username == "admin1219"))
  1339. admin = admin_row.scalar_one()
  1340. # Seed a UserTOTP row encrypted under key A. No OIDC rows exist, so
  1341. # the endpoint's first branch (oidc_providers > 0) misses and the
  1342. # sample falls through to UserTOTP.
  1343. key_a_ciphertext = Fernet(Fernet.generate_key()).encrypt(b"original-totp-secret").decode()
  1344. db_session.add(UserTOTP(user_id=admin.id, _secret_enc=f"fernet:{key_a_ciphertext}", is_enabled=True))
  1345. await db_session.commit()
  1346. # Activate a DIFFERENT key — the TOTP-fallback sample-decrypt must fail.
  1347. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1348. enc_mod._fernet_instance = None
  1349. enc_mod._key_source = None
  1350. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1351. assert resp.status_code == 200, resp.text
  1352. data = resp.json()
  1353. assert data["key_configured"] is True
  1354. assert data["encrypted_rows"]["oidc_providers"] == 0, "test premise: no OIDC rows so TOTP branch fires"
  1355. assert data["encrypted_rows"]["user_totp"] >= 1
  1356. assert data["decryption_broken"] is True, "B4: TOTP-fallback sample-decrypt must detect wrong-key state"
  1357. @pytest.mark.asyncio
  1358. @pytest.mark.integration
  1359. async def test_status_surfaces_real_migration_error_count(self, async_client, db_session, monkeypatch, caplog):
  1360. """B2: a real migration with a poison row produces an error_count that
  1361. flows through to the endpoint's `migration_error_count` field.
  1362. Replaces an earlier tautology that patched the module-level counter
  1363. directly. The chained version verifies the full path: poison row →
  1364. per-row migration skip → ``get_migration_error_count()`` →
  1365. ``GET /encryption-status``.
  1366. """
  1367. import logging
  1368. from backend.app.core.database import _migrate_encrypt_legacy_secrets, get_migration_error_count
  1369. from backend.app.models.oidc_provider import OIDCProvider
  1370. token = await self._create_admin_and_login(async_client)
  1371. # Bind the migration's session factory to the test engine and activate a key.
  1372. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
  1373. from backend.app.core import database as db_mod
  1374. test_factory = async_sessionmaker(db_session.bind, class_=AsyncSession, expire_on_commit=False)
  1375. monkeypatch.setattr(db_mod, "async_session", test_factory)
  1376. from cryptography.fernet import Fernet
  1377. import backend.app.core.encryption as enc_mod
  1378. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1379. enc_mod._fernet_instance = None
  1380. # Two legacy plaintext rows; force the SECOND row's encrypt call to raise.
  1381. db_session.add_all(
  1382. [
  1383. OIDCProvider(
  1384. name="GoodRow",
  1385. issuer_url="https://good.example.com",
  1386. client_id="c1",
  1387. _client_secret_enc="plaintext-good",
  1388. scopes="openid email profile",
  1389. ),
  1390. OIDCProvider(
  1391. name="BadRow",
  1392. issuer_url="https://bad.example.com",
  1393. client_id="c2",
  1394. _client_secret_enc="plaintext-bad",
  1395. scopes="openid email profile",
  1396. ),
  1397. ]
  1398. )
  1399. await db_session.commit()
  1400. import backend.app.models.oidc_provider as oidc_mod
  1401. real_encrypt = oidc_mod.mfa_encrypt
  1402. call_count = [0]
  1403. def _sometimes_raise(value):
  1404. call_count[0] += 1
  1405. if call_count[0] == 2:
  1406. raise RuntimeError("simulated encrypt failure")
  1407. return real_encrypt(value)
  1408. monkeypatch.setattr(oidc_mod, "mfa_encrypt", _sometimes_raise)
  1409. with caplog.at_level(logging.ERROR, logger="backend.app.core.database"):
  1410. await _migrate_encrypt_legacy_secrets()
  1411. # Sanity: the migration's own counter saw the failure.
  1412. assert get_migration_error_count() == 1
  1413. # The endpoint must surface the same number — full path pinned, not just the getter.
  1414. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1415. assert resp.status_code == 200, resp.text
  1416. data = resp.json()
  1417. assert data["migration_error_count"] == 1, (
  1418. "endpoint must report the actual migration outcome, not just read a stub global"
  1419. )
  1420. # ============================================================================
  1421. # TestEncryptionRoundtrip (E2E)
  1422. # ============================================================================
  1423. class TestEncryptionRoundtrip:
  1424. """End-to-end: writes via the property setter store ciphertext at the column
  1425. level; reads via the property getter return the original plaintext."""
  1426. @pytest.mark.asyncio
  1427. @pytest.mark.integration
  1428. async def test_oidc_provider_secret_encrypted_at_rest_e2e(self, db_session, monkeypatch):
  1429. from cryptography.fernet import Fernet
  1430. from sqlalchemy import select
  1431. import backend.app.core.encryption as enc_mod
  1432. from backend.app.models.oidc_provider import OIDCProvider
  1433. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1434. enc_mod._fernet_instance = None
  1435. provider = OIDCProvider(
  1436. name="E2E_OIDC",
  1437. issuer_url="https://e2e.example.com",
  1438. client_id="cid",
  1439. client_secret="my-real-client-secret", # via setter → encrypted
  1440. scopes="openid email profile",
  1441. is_enabled=True,
  1442. )
  1443. db_session.add(provider)
  1444. await db_session.commit()
  1445. # Raw column read: must be ciphertext, not the plaintext.
  1446. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == provider.id))
  1447. fetched = result.scalar_one()
  1448. assert fetched._client_secret_enc.startswith("fernet:")
  1449. assert fetched._client_secret_enc != "my-real-client-secret"
  1450. # Property read: returns original plaintext.
  1451. assert fetched.client_secret == "my-real-client-secret"
  1452. @pytest.mark.asyncio
  1453. @pytest.mark.integration
  1454. async def test_totp_secret_encrypted_at_rest_e2e(self, db_session, monkeypatch):
  1455. from cryptography.fernet import Fernet
  1456. from sqlalchemy import select
  1457. import backend.app.core.encryption as enc_mod
  1458. from backend.app.models.user import User
  1459. from backend.app.models.user_totp import UserTOTP
  1460. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1461. enc_mod._fernet_instance = None
  1462. user = User(username="e2etotp1219", email="e@example.com", password_hash="x")
  1463. db_session.add(user)
  1464. await db_session.flush()
  1465. totp = UserTOTP(user_id=user.id, secret="JBSWY3DPEHPK3PXP", is_enabled=True)
  1466. db_session.add(totp)
  1467. await db_session.commit()
  1468. result = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user.id))
  1469. fetched = result.scalar_one()
  1470. assert fetched._secret_enc.startswith("fernet:")
  1471. assert fetched._secret_enc != "JBSWY3DPEHPK3PXP"
  1472. assert fetched.secret == "JBSWY3DPEHPK3PXP"
  1473. # ============================================================================
  1474. # TestBackupKeyFiles
  1475. # Verifies that .mfa_encryption_key is included in backup ZIPs (so backups
  1476. # are self-contained) and restored with chmod 0600 — and that path-traversal
  1477. # payloads in a malicious ZIP are rejected.
  1478. # ============================================================================
  1479. class TestBackupKeyFiles:
  1480. @pytest.mark.asyncio
  1481. @pytest.mark.integration
  1482. async def test_backup_includes_mfa_encryption_key_when_present(self, async_client, monkeypatch, tmp_path):
  1483. import zipfile
  1484. from backend.app.api.routes.settings import create_backup_zip
  1485. from backend.app.core.config import settings as app_settings
  1486. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1487. # Ensure `app_settings.base_dir` follows DATA_DIR for this test by
  1488. # patching the module attribute (config caches it at import time).
  1489. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1490. key_path = tmp_path / ".mfa_encryption_key"
  1491. key_path.write_text("test-key-content")
  1492. zip_path, _filename = await create_backup_zip(output_path=tmp_path)
  1493. try:
  1494. with zipfile.ZipFile(zip_path) as zf:
  1495. names = zf.namelist()
  1496. assert ".mfa_encryption_key" in names
  1497. assert zf.read(".mfa_encryption_key").decode() == "test-key-content"
  1498. finally:
  1499. zip_path.unlink(missing_ok=True)
  1500. @pytest.mark.asyncio
  1501. @pytest.mark.integration
  1502. async def test_backup_skips_mfa_encryption_key_when_absent(self, async_client, monkeypatch, tmp_path):
  1503. import zipfile
  1504. from backend.app.api.routes.settings import create_backup_zip
  1505. from backend.app.core.config import settings as app_settings
  1506. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1507. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1508. # No .mfa_encryption_key written — must not crash.
  1509. zip_path, _filename = await create_backup_zip(output_path=tmp_path)
  1510. try:
  1511. with zipfile.ZipFile(zip_path) as zf:
  1512. names = zf.namelist()
  1513. assert ".mfa_encryption_key" not in names
  1514. finally:
  1515. zip_path.unlink(missing_ok=True)
  1516. @pytest.mark.asyncio
  1517. @pytest.mark.integration
  1518. async def test_restore_writes_key_files_with_chmod_0600(self, async_client, monkeypatch, tmp_path):
  1519. """T1: restore endpoint writes key file with mode 0o600.
  1520. Bypasses the SQLite-copy step via patches so execution reaches the
  1521. key-write code unconditionally — the previous version used a stub
  1522. ``b"SQLite format 3"`` which made ``sqlite3.backup()`` fail and the
  1523. key-write code never ran.
  1524. """
  1525. import io
  1526. import zipfile
  1527. from unittest.mock import AsyncMock, patch
  1528. from backend.app.core.config import settings as app_settings
  1529. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1530. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1531. # Build a minimal ZIP with a stub DB and the key file.
  1532. buf = io.BytesIO()
  1533. with zipfile.ZipFile(buf, "w") as zf:
  1534. zf.writestr("bambuddy.db", b"SQLite format 3")
  1535. zf.writestr(".mfa_encryption_key", "test-restored-key")
  1536. buf.seek(0)
  1537. with (
  1538. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1539. patch(
  1540. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1541. new_callable=AsyncMock,
  1542. ),
  1543. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1544. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1545. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1546. ):
  1547. resp = await async_client.post(
  1548. "/api/v1/settings/restore",
  1549. files={"file": ("backup.zip", buf, "application/zip")},
  1550. )
  1551. assert resp.status_code == 200
  1552. restored_key = tmp_path / ".mfa_encryption_key"
  1553. assert restored_key.exists()
  1554. assert restored_key.read_text() == "test-restored-key"
  1555. assert (restored_key.stat().st_mode & 0o777) == 0o600
  1556. @pytest.mark.asyncio
  1557. @pytest.mark.integration
  1558. async def test_restore_handles_missing_key_files(self, async_client, monkeypatch, tmp_path):
  1559. """T2: ZIP without key file → restore succeeds, no key written to DATA_DIR."""
  1560. import io
  1561. import zipfile
  1562. from unittest.mock import AsyncMock, patch
  1563. from backend.app.core.config import settings as app_settings
  1564. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1565. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1566. buf = io.BytesIO()
  1567. with zipfile.ZipFile(buf, "w") as zf:
  1568. zf.writestr("bambuddy.db", b"SQLite format 3")
  1569. # Intentionally no .mfa_encryption_key entry.
  1570. buf.seek(0)
  1571. with (
  1572. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1573. patch(
  1574. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1575. new_callable=AsyncMock,
  1576. ),
  1577. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1578. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1579. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1580. ):
  1581. resp = await async_client.post(
  1582. "/api/v1/settings/restore",
  1583. files={"file": ("backup.zip", buf, "application/zip")},
  1584. )
  1585. assert resp.status_code == 200
  1586. assert not (tmp_path / ".mfa_encryption_key").exists()
  1587. @pytest.mark.asyncio
  1588. @pytest.mark.integration
  1589. async def test_restore_aborts_db_swap_when_key_write_fails(self, async_client, monkeypatch, tmp_path):
  1590. """B1: when MFA key write fails, restore must abort BEFORE the database
  1591. swap so the live DB is not left with rows encrypted under a key that
  1592. no longer exists on disk."""
  1593. import io
  1594. import os
  1595. import zipfile
  1596. from unittest.mock import AsyncMock, patch
  1597. from backend.app.core.config import settings as app_settings
  1598. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1599. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1600. # Build ZIP with a key file that we will fail to write to DATA_DIR.
  1601. buf = io.BytesIO()
  1602. with zipfile.ZipFile(buf, "w") as zf:
  1603. zf.writestr("bambuddy.db", b"SQLite format 3 backup data")
  1604. zf.writestr(".mfa_encryption_key", "backup-key-content")
  1605. buf.seek(0)
  1606. # Track whether the database swap functions were called.
  1607. # If B1 is correct, key-write failure aborts BEFORE these run.
  1608. import_pg_mock = AsyncMock()
  1609. reinit_mock = AsyncMock()
  1610. init_mock = AsyncMock()
  1611. original_open = os.open
  1612. def _key_write_fails(path, flags, mode=0o777, **kwargs):
  1613. # `shutil.rmtree` calls os.open(... dir_fd=...) during temp-dir
  1614. # cleanup — accept and forward any extra kwargs so the mock
  1615. # doesn't break the cleanup path.
  1616. if str(path).endswith(".mfa_encryption_key.restore-tmp"):
  1617. raise OSError(28, "No space left on device", str(path))
  1618. return original_open(path, flags, mode, **kwargs)
  1619. with (
  1620. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1621. patch(
  1622. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1623. import_pg_mock,
  1624. ),
  1625. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1626. patch("backend.app.core.database.reinitialize_database", reinit_mock),
  1627. patch("backend.app.core.database.init_db", init_mock),
  1628. ):
  1629. monkeypatch.setattr(os, "open", _key_write_fails)
  1630. resp = await async_client.post(
  1631. "/api/v1/settings/restore",
  1632. files={"file": ("backup.zip", buf, "application/zip")},
  1633. )
  1634. assert resp.status_code == 500
  1635. assert "Database is unchanged" in resp.json().get("detail", "")
  1636. # Database swap functions must NOT have been called — the abort
  1637. # happens before that step.
  1638. import_pg_mock.assert_not_awaited()
  1639. reinit_mock.assert_not_awaited()
  1640. init_mock.assert_not_awaited()
  1641. # No partial key file should be left behind.
  1642. assert not (tmp_path / ".mfa_encryption_key").exists()
  1643. @pytest.mark.asyncio
  1644. @pytest.mark.integration
  1645. async def test_restore_resets_encryption_singleton_after_key_replace(self, async_client, monkeypatch, tmp_path):
  1646. """B1: after a successful key replace, the encryption singleton must be
  1647. cleared so init_db's re-encryption migration picks up the restored key
  1648. instead of the cached Fernet from the previous key.
  1649. """
  1650. import io
  1651. import zipfile
  1652. from unittest.mock import AsyncMock, patch
  1653. from cryptography.fernet import Fernet
  1654. import backend.app.core.encryption as enc_mod
  1655. from backend.app.core.config import settings as app_settings
  1656. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1657. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1658. # Pre-warm the singleton with an "old" key so we can detect the reset.
  1659. old_key = Fernet.generate_key().decode()
  1660. monkeypatch.setenv("MFA_ENCRYPTION_KEY", old_key)
  1661. enc_mod._fernet_instance = None
  1662. enc_mod._key_source = None
  1663. # Trigger lazy load → singleton holds the old Fernet.
  1664. assert enc_mod.is_encryption_active() is True
  1665. assert enc_mod._fernet_instance is not None
  1666. old_fernet_obj = enc_mod._fernet_instance
  1667. # Build ZIP that delivers a DIFFERENT key file.
  1668. new_key = Fernet.generate_key().decode()
  1669. assert new_key != old_key
  1670. buf = io.BytesIO()
  1671. with zipfile.ZipFile(buf, "w") as zf:
  1672. zf.writestr("bambuddy.db", b"SQLite format 3 backup data")
  1673. zf.writestr(".mfa_encryption_key", new_key)
  1674. buf.seek(0)
  1675. with (
  1676. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1677. patch(
  1678. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1679. new_callable=AsyncMock,
  1680. ),
  1681. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1682. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1683. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1684. ):
  1685. resp = await async_client.post(
  1686. "/api/v1/settings/restore",
  1687. files={"file": ("backup.zip", buf, "application/zip")},
  1688. )
  1689. assert resp.status_code == 200, resp.text
  1690. # The singleton must have been invalidated. The exact post-state depends
  1691. # on whether init_db (mocked) re-loaded the singleton, but the cached
  1692. # _fernet_instance reference from before the restore must not be the
  1693. # active one any more.
  1694. assert enc_mod._fernet_instance is None or enc_mod._fernet_instance is not old_fernet_obj, (
  1695. "B1: encryption singleton must be reset after key replace so init_db's migration picks up the restored key"
  1696. )
  1697. # The key file must be on disk with the new content.
  1698. restored = (tmp_path / ".mfa_encryption_key").read_text()
  1699. assert restored == new_key
  1700. @pytest.mark.asyncio
  1701. @pytest.mark.integration
  1702. async def test_restore_rejects_path_traversal_in_zip(self, async_client, monkeypatch, tmp_path):
  1703. """A4: ZIP with path-traversal entry → HTTP 400, no file written outside temp dir."""
  1704. import io
  1705. import zipfile
  1706. from backend.app.core.config import settings as app_settings
  1707. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1708. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1709. # Build ZIP with a relative path-traversal entry.
  1710. buf = io.BytesIO()
  1711. with zipfile.ZipFile(buf, "w") as zf:
  1712. zf.writestr("../etc/passwd", "root:x:0:0")
  1713. zf.writestr("bambuddy.db", b"SQLite format 3")
  1714. buf.seek(0)
  1715. resp = await async_client.post(
  1716. "/api/v1/settings/restore",
  1717. files={"file": ("backup.zip", buf, "application/zip")},
  1718. )
  1719. assert resp.status_code == 400
  1720. assert "unsafe path" in resp.json().get("detail", "").lower()
  1721. @pytest.mark.asyncio
  1722. @pytest.mark.integration
  1723. async def test_restore_rejects_prefix_collision_zipslip(self, async_client, monkeypatch, tmp_path):
  1724. """T1: ZIP entry with prefix-collision path must be rejected.
  1725. A startswith() check would accept '/tmp/abc_evil/file' when the
  1726. extraction root was '/tmp/abc' — is_relative_to correctly rejects it.
  1727. The restore handler creates a tempfile.TemporaryDirectory inside the
  1728. system temp dir; we craft an entry that resolves to a sibling path
  1729. whose name starts with the temp dir's basename.
  1730. """
  1731. import io
  1732. import zipfile
  1733. from backend.app.core.config import settings as app_settings
  1734. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1735. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1736. # Use a path with traversal — the resolved path will share the parent
  1737. # temp directory's basename as a prefix but NOT be inside the
  1738. # extraction root. We don't know the random extraction-root name at
  1739. # ZIP-build time, so we pick a literal "../poc-evil-prefix-collision/"
  1740. # which traverses up one level from the extraction root and lands in
  1741. # a sibling directory. is_relative_to() must reject this; a naive
  1742. # startswith() against the parent's parent would accept it.
  1743. evil_name = "../escaped-prefix-collision/poc.txt"
  1744. buf = io.BytesIO()
  1745. with zipfile.ZipFile(buf, "w") as zf:
  1746. zf.writestr(evil_name, "pwned")
  1747. zf.writestr("bambuddy.db", b"SQLite format 3\x00")
  1748. buf.seek(0)
  1749. resp = await async_client.post(
  1750. "/api/v1/settings/restore",
  1751. files={"file": ("backup.zip", buf, "application/zip")},
  1752. )
  1753. assert resp.status_code == 400
  1754. assert "unsafe path" in resp.json().get("detail", "").lower()
  1755. @pytest.mark.asyncio
  1756. @pytest.mark.integration
  1757. async def test_restore_rejects_absolute_path_in_zip(self, async_client, monkeypatch, tmp_path):
  1758. """B1: ZIP with an absolute path entry must be rejected by is_relative_to check."""
  1759. import io
  1760. import zipfile
  1761. from backend.app.core.config import settings as app_settings
  1762. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1763. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1764. buf = io.BytesIO()
  1765. with zipfile.ZipFile(buf, "w") as zf:
  1766. # Absolute path in the archive — extracts outside temp_path on
  1767. # systems where (temp_path / "/etc/passwd") resolves to /etc/passwd.
  1768. zf.writestr("/etc/passwd", "root:x:0:0")
  1769. zf.writestr("bambuddy.db", b"SQLite format 3")
  1770. buf.seek(0)
  1771. resp = await async_client.post(
  1772. "/api/v1/settings/restore",
  1773. files={"file": ("backup.zip", buf, "application/zip")},
  1774. )
  1775. assert resp.status_code == 400
  1776. assert "unsafe path" in resp.json().get("detail", "").lower()
  1777. @pytest.mark.asyncio
  1778. @pytest.mark.integration
  1779. async def test_backup_fails_when_key_file_unreadable(self, async_client, monkeypatch, tmp_path):
  1780. """A5: OSError while copying key file propagates out of create_backup_zip."""
  1781. import shutil
  1782. from backend.app.api.routes.settings import create_backup_zip
  1783. from backend.app.core.config import settings as app_settings
  1784. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1785. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1786. (tmp_path / ".mfa_encryption_key").write_text("key")
  1787. original_copy2 = shutil.copy2
  1788. def _raise_on_key(src, dst):
  1789. if ".mfa_encryption_key" in str(src):
  1790. raise OSError("simulated unreadable key file")
  1791. return original_copy2(src, dst)
  1792. monkeypatch.setattr(shutil, "copy2", _raise_on_key)
  1793. import pytest as _pytest
  1794. with _pytest.raises(OSError, match="simulated unreadable"):
  1795. await create_backup_zip(output_path=tmp_path)
  1796. @pytest.mark.asyncio
  1797. @pytest.mark.integration
  1798. async def test_backup_restore_roundtrip_preserves_encrypted_oidc_secret(
  1799. self, async_client, db_session, monkeypatch, tmp_path
  1800. ):
  1801. """T3: encrypt → backup → simulate key loss → restore → decrypt.
  1802. Verifies the user-facing promise that local backup ZIPs are
  1803. self-contained: an OIDC client_secret encrypted under one key still
  1804. decrypts after restore even when the running install no longer has
  1805. the key on disk or in the env. Exercises the B1 key-first restore
  1806. path and the B4 sample-decrypt status check together.
  1807. """
  1808. import zipfile
  1809. from pathlib import Path
  1810. from unittest.mock import AsyncMock, patch
  1811. from cryptography.fernet import Fernet
  1812. from sqlalchemy import select
  1813. import backend.app.core.encryption as enc_mod
  1814. from backend.app.api.routes.settings import create_backup_zip
  1815. from backend.app.core.config import settings as app_settings
  1816. from backend.app.models.oidc_provider import OIDCProvider
  1817. # 1. Pin a key, encrypt an OIDC secret via the property setter.
  1818. key = Fernet.generate_key().decode()
  1819. monkeypatch.setenv("MFA_ENCRYPTION_KEY", key)
  1820. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1821. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1822. # Persist the key file too, so create_backup_zip picks it up.
  1823. (tmp_path / ".mfa_encryption_key").write_text(key)
  1824. enc_mod._fernet_instance = None
  1825. enc_mod._key_source = None
  1826. provider = OIDCProvider(
  1827. name="RoundtripProv",
  1828. issuer_url="https://rt.example.com",
  1829. client_id="cid",
  1830. client_secret="my-original-secret", # via setter -> encrypted
  1831. scopes="openid email profile",
  1832. is_enabled=True,
  1833. )
  1834. db_session.add(provider)
  1835. await db_session.commit()
  1836. original_id = provider.id
  1837. assert provider._client_secret_enc.startswith("fernet:")
  1838. # 2. Create a backup ZIP (must include .mfa_encryption_key).
  1839. zip_path, _ = await create_backup_zip(output_path=tmp_path)
  1840. try:
  1841. with zipfile.ZipFile(zip_path) as zf:
  1842. names = zf.namelist()
  1843. assert ".mfa_encryption_key" in names, "T3: backup ZIP must include the key file"
  1844. # 3. Simulate key loss: delete the key file from DATA_DIR, drop
  1845. # the env var, reset the cached fernet singleton.
  1846. (tmp_path / ".mfa_encryption_key").unlink()
  1847. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1848. enc_mod._fernet_instance = None
  1849. enc_mod._key_source = None
  1850. # 4. Restore the ZIP via the endpoint. Mock out the DB-swap
  1851. # (we keep the live in-memory test DB) and init_db side effects
  1852. # so this test focuses on the key-restore path.
  1853. with (
  1854. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1855. patch(
  1856. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1857. new_callable=AsyncMock,
  1858. ),
  1859. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1860. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1861. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1862. open(zip_path, "rb") as f,
  1863. ):
  1864. resp = await async_client.post(
  1865. "/api/v1/settings/restore",
  1866. files={"file": ("backup.zip", f, "application/zip")},
  1867. )
  1868. assert resp.status_code == 200, resp.text
  1869. # 5. Reset the singleton again (B1 already does this in production,
  1870. # but here init_db is mocked so we explicitly invalidate).
  1871. enc_mod._fernet_instance = None
  1872. enc_mod._key_source = None
  1873. # 6. The key file must be back on disk with restrictive permissions.
  1874. restored = Path(tmp_path) / ".mfa_encryption_key"
  1875. assert restored.exists(), "T3: key file must be restored to DATA_DIR"
  1876. assert (restored.stat().st_mode & 0o777) == 0o600
  1877. # 7. Decryption works again — the property getter must return the
  1878. # original plaintext, proving the restored key matches the
  1879. # cipher in the (still in-memory) DB row.
  1880. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == original_id))
  1881. restored_provider = result.scalar_one()
  1882. assert restored_provider.client_secret == "my-original-secret"
  1883. finally:
  1884. zip_path.unlink(missing_ok=True)
  1885. # ============================================================================
  1886. # TestTOTPDecryptionBroken (C9)
  1887. # Verifies the decryption-broken state (encrypted TOTP row + no key) for each
  1888. # TOTP endpoint. Behaviour differs between recovery-aware and non-recovery
  1889. # endpoints:
  1890. # - setup_totp / enable_totp / verify_2fa: HTTP 500 (no backup-code path).
  1891. # - disable_totp / regenerate_backup_codes: fall through to the backup-code
  1892. # branch — HTTP 200 with a valid backup code, HTTP 400 without.
  1893. # ============================================================================
  1894. class TestTOTPDecryptionBroken:
  1895. """C9: RuntimeError from mfa_decrypt — 500 for non-recovery endpoints,
  1896. backup-code fall-through for disable_totp / regenerate_backup_codes."""
  1897. async def _setup_admin_and_totp_user(self, async_client, db_session):
  1898. """Create admin (enables auth), log in as admin, add TOTP record with fernet secret."""
  1899. from backend.app.models.user_totp import UserTOTP
  1900. admin_username = f"admin_c9_{secrets.token_hex(4)}"
  1901. setup = await async_client.post(
  1902. "/api/v1/auth/setup",
  1903. json={
  1904. "auth_enabled": True,
  1905. "admin_username": admin_username,
  1906. "admin_password": "Admin_C9_Pass1!",
  1907. },
  1908. )
  1909. assert setup.status_code in (200, 201), setup.text
  1910. login = await async_client.post(
  1911. "/api/v1/auth/login",
  1912. json={"username": admin_username, "password": "Admin_C9_Pass1!"},
  1913. )
  1914. assert login.status_code == 200, login.text
  1915. token = login.json()["access_token"]
  1916. # Get the admin user_id from the /me endpoint
  1917. me = await async_client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
  1918. assert me.status_code == 200
  1919. user_id = me.json()["id"]
  1920. # Insert a TOTP row with a fernet-prefixed secret directly (no key needed for insert).
  1921. totp = UserTOTP(
  1922. user_id=user_id,
  1923. _secret_enc="fernet:gAAAAA-not-really-encrypted",
  1924. is_enabled=True,
  1925. )
  1926. db_session.add(totp)
  1927. await db_session.commit()
  1928. return token, admin_username, user_id
  1929. @pytest.mark.asyncio
  1930. @pytest.mark.integration
  1931. async def test_enable_totp_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  1932. """C9: enable endpoint → 500 when TOTP secret is encrypted but key unavailable."""
  1933. import backend.app.core.encryption as enc_mod
  1934. token, _, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  1935. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1936. enc_mod._fernet_instance = None
  1937. # enable_totp requires setup-but-not-yet-enabled state; force is_enabled=False
  1938. from sqlalchemy import select as _select
  1939. from backend.app.models.user_totp import UserTOTP
  1940. result = await db_session.execute(_select(UserTOTP))
  1941. for t in result.scalars().all():
  1942. t.is_enabled = False
  1943. await db_session.commit()
  1944. resp = await async_client.post(
  1945. "/api/v1/auth/2fa/totp/enable",
  1946. json={"code": "123456"},
  1947. headers={"Authorization": f"Bearer {token}"},
  1948. )
  1949. assert resp.status_code == 500
  1950. assert "unavailable" in resp.json().get("detail", "").lower()
  1951. @pytest.mark.asyncio
  1952. @pytest.mark.integration
  1953. async def test_disable_totp_returns_400_when_decryption_broken_and_no_backup_codes(
  1954. self, async_client, db_session, monkeypatch
  1955. ):
  1956. """B2a + S3: disable falls through to backup-code branch when TOTP secret
  1957. cannot be decrypted; with no backup codes seeded, the request is
  1958. rejected as an invalid code (400), not a server error.
  1959. S3: AND the failed-attempt counter must NOT be incremented — the
  1960. cause was a server-side key loss, not a user mistake.
  1961. """
  1962. from sqlalchemy import select as _select
  1963. import backend.app.core.encryption as enc_mod
  1964. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  1965. token, admin_username, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  1966. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1967. enc_mod._fernet_instance = None
  1968. resp = await async_client.post(
  1969. "/api/v1/auth/2fa/totp/disable",
  1970. json={"code": "123456"},
  1971. headers={"Authorization": f"Bearer {token}"},
  1972. )
  1973. assert resp.status_code == 400
  1974. assert "invalid" in resp.json().get("detail", "").lower()
  1975. # S3: no fail-counter debit on server-side key loss.
  1976. events = (
  1977. (
  1978. await db_session.execute(
  1979. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  1980. )
  1981. )
  1982. .scalars()
  1983. .all()
  1984. )
  1985. assert len(events) == 0, "S3: must not debit fail-counter on key-loss"
  1986. @pytest.mark.asyncio
  1987. @pytest.mark.integration
  1988. async def test_regenerate_backup_codes_returns_400_when_decryption_broken_and_no_backup_codes(
  1989. self, async_client, db_session, monkeypatch
  1990. ):
  1991. """B2b + S3: regenerate-backup-codes falls through to backup-code branch when
  1992. TOTP secret cannot be decrypted; with no backup codes seeded, the
  1993. request is rejected as an invalid code (400) AND the fail-counter
  1994. is NOT incremented (S3: server-side cause, not user mistake).
  1995. """
  1996. from sqlalchemy import select as _select
  1997. import backend.app.core.encryption as enc_mod
  1998. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  1999. token, admin_username, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  2000. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2001. enc_mod._fernet_instance = None
  2002. resp = await async_client.post(
  2003. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2004. json={"code": "123456"},
  2005. headers={"Authorization": f"Bearer {token}"},
  2006. )
  2007. assert resp.status_code == 400
  2008. assert "invalid" in resp.json().get("detail", "").lower()
  2009. events = (
  2010. (
  2011. await db_session.execute(
  2012. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2013. )
  2014. )
  2015. .scalars()
  2016. .all()
  2017. )
  2018. assert len(events) == 0, "S3: must not debit fail-counter on key-loss"
  2019. @pytest.mark.asyncio
  2020. @pytest.mark.integration
  2021. async def test_disable_totp_succeeds_via_backup_code_when_decryption_broken(
  2022. self, async_client, db_session, monkeypatch
  2023. ):
  2024. """B2a: a valid backup code disables TOTP even when the secret cannot
  2025. be decrypted — recovery path for users who lost the encryption key."""
  2026. from sqlalchemy import select as _select
  2027. import backend.app.core.encryption as enc_mod
  2028. from backend.app.api.routes.mfa import _generate_backup_codes
  2029. from backend.app.models.user_totp import UserTOTP
  2030. token, _, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2031. # Seed a real backup-code hash on the existing TOTP row.
  2032. plain_codes, hashed_codes = _generate_backup_codes()
  2033. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2034. totp = result.scalar_one()
  2035. totp.backup_code_hashes = hashed_codes
  2036. await db_session.commit()
  2037. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2038. enc_mod._fernet_instance = None
  2039. resp = await async_client.post(
  2040. "/api/v1/auth/2fa/totp/disable",
  2041. json={"code": plain_codes[0]},
  2042. headers={"Authorization": f"Bearer {token}"},
  2043. )
  2044. assert resp.status_code == 200, resp.text
  2045. # The TOTP row must have been deleted.
  2046. result_after = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2047. assert result_after.scalar_one_or_none() is None
  2048. @pytest.mark.asyncio
  2049. @pytest.mark.integration
  2050. async def test_regenerate_backup_codes_succeeds_via_backup_code_when_decryption_broken(
  2051. self, async_client, db_session, monkeypatch
  2052. ):
  2053. """B2b: a valid backup code rotates the codes even when the secret
  2054. cannot be decrypted — recovery path mirrors disable_totp."""
  2055. from sqlalchemy import select as _select
  2056. import backend.app.core.encryption as enc_mod
  2057. from backend.app.api.routes.mfa import _generate_backup_codes
  2058. from backend.app.models.user_totp import UserTOTP
  2059. token, _, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2060. plain_codes, hashed_codes = _generate_backup_codes()
  2061. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2062. totp = result.scalar_one()
  2063. totp.backup_code_hashes = hashed_codes
  2064. await db_session.commit()
  2065. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2066. enc_mod._fernet_instance = None
  2067. resp = await async_client.post(
  2068. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2069. json={"code": plain_codes[0]},
  2070. headers={"Authorization": f"Bearer {token}"},
  2071. )
  2072. assert resp.status_code == 200, resp.text
  2073. body = resp.json()
  2074. assert "backup_codes" in body
  2075. assert len(body["backup_codes"]) == 10
  2076. @pytest.mark.asyncio
  2077. @pytest.mark.integration
  2078. async def test_disable_totp_wrong_code_with_seeded_hashes_returns_400_and_debits_counter(
  2079. self, async_client, db_session, monkeypatch
  2080. ):
  2081. """T2: with backup_code_hashes seeded AND a working encryption key,
  2082. a wrong code is rejected (400) AND the fail-counter IS incremented.
  2083. This pins the behaviour that a future refactor swallowing
  2084. compare_digest mismatches would still let the existing 'no codes
  2085. configured' tests pass — only this assertion exercises the actual
  2086. pwd_context.verify mismatch path.
  2087. """
  2088. from cryptography.fernet import Fernet
  2089. from sqlalchemy import select as _select
  2090. import backend.app.core.encryption as enc_mod
  2091. from backend.app.api.routes.mfa import _generate_backup_codes
  2092. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2093. from backend.app.models.user_totp import UserTOTP
  2094. # Active key — secret can be decrypted, this is NOT key-loss.
  2095. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  2096. enc_mod._fernet_instance = None
  2097. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2098. # Replace stub fernet:-prefixed value with a real encrypted secret so
  2099. # disable_totp's TOTP-decrypt path doesn't throw, AND seed real hashes.
  2100. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2101. totp = result.scalar_one()
  2102. totp.secret = "JBSWY3DPEHPK3PXP" # via setter -> mfa_encrypt
  2103. plain_codes, hashed_codes = _generate_backup_codes()
  2104. totp.backup_code_hashes = hashed_codes
  2105. await db_session.commit()
  2106. # Submit a code that matches NEITHER the TOTP nor any backup-code hash.
  2107. resp = await async_client.post(
  2108. "/api/v1/auth/2fa/totp/disable",
  2109. json={"code": "WRONGCD1"}, # wrong but well-formed
  2110. headers={"Authorization": f"Bearer {token}"},
  2111. )
  2112. assert resp.status_code == 400
  2113. assert "invalid" in resp.json().get("detail", "").lower()
  2114. # T2 + S3: with key intact, the fail-counter MUST increment for a
  2115. # real wrong-code attempt (this is the user-error path, not key-loss).
  2116. events = (
  2117. (
  2118. await db_session.execute(
  2119. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2120. )
  2121. )
  2122. .scalars()
  2123. .all()
  2124. )
  2125. assert len(events) >= 1, "T2: with key intact, wrong code must debit the fail-counter"
  2126. @pytest.mark.asyncio
  2127. @pytest.mark.integration
  2128. async def test_regenerate_backup_codes_wrong_code_with_seeded_hashes_returns_400_and_debits_counter(
  2129. self, async_client, db_session, monkeypatch
  2130. ):
  2131. """T2: same as the disable_totp variant for /regenerate-backup-codes."""
  2132. from cryptography.fernet import Fernet
  2133. from sqlalchemy import select as _select
  2134. import backend.app.core.encryption as enc_mod
  2135. from backend.app.api.routes.mfa import _generate_backup_codes
  2136. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2137. from backend.app.models.user_totp import UserTOTP
  2138. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  2139. enc_mod._fernet_instance = None
  2140. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2141. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2142. totp = result.scalar_one()
  2143. totp.secret = "JBSWY3DPEHPK3PXP"
  2144. plain_codes, hashed_codes = _generate_backup_codes()
  2145. totp.backup_code_hashes = hashed_codes
  2146. await db_session.commit()
  2147. resp = await async_client.post(
  2148. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2149. json={"code": "WRONGCD2"},
  2150. headers={"Authorization": f"Bearer {token}"},
  2151. )
  2152. assert resp.status_code == 400
  2153. assert "invalid" in resp.json().get("detail", "").lower()
  2154. events = (
  2155. (
  2156. await db_session.execute(
  2157. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2158. )
  2159. )
  2160. .scalars()
  2161. .all()
  2162. )
  2163. assert len(events) >= 1, "T2: with key intact, wrong code must debit the fail-counter"
  2164. @pytest.mark.asyncio
  2165. @pytest.mark.integration
  2166. async def test_disable_totp_wrong_code_with_seeded_hashes_at_keyloss_no_counter_debit(
  2167. self, async_client, db_session, monkeypatch
  2168. ):
  2169. """T2 + S3 cross-check: with hashes seeded but encryption key gone,
  2170. a wrong code returns 400 BUT the fail-counter MUST NOT increment.
  2171. This is the dual of the test above — same wrong-code 400 outcome,
  2172. but the counter debit is gated on the cause of failure (server-side
  2173. key loss must NOT penalise the user).
  2174. """
  2175. from sqlalchemy import select as _select
  2176. import backend.app.core.encryption as enc_mod
  2177. from backend.app.api.routes.mfa import _generate_backup_codes
  2178. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2179. from backend.app.models.user_totp import UserTOTP
  2180. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2181. # Seed real hashes on the existing TOTP row.
  2182. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2183. totp = result.scalar_one()
  2184. plain_codes, hashed_codes = _generate_backup_codes()
  2185. totp.backup_code_hashes = hashed_codes
  2186. await db_session.commit()
  2187. # Now simulate key loss.
  2188. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2189. enc_mod._fernet_instance = None
  2190. resp = await async_client.post(
  2191. "/api/v1/auth/2fa/totp/disable",
  2192. json={"code": "WRONGCD3"},
  2193. headers={"Authorization": f"Bearer {token}"},
  2194. )
  2195. assert resp.status_code == 400
  2196. # S3: counter MUST be unchanged — this is a server-side problem.
  2197. events = (
  2198. (
  2199. await db_session.execute(
  2200. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2201. )
  2202. )
  2203. .scalars()
  2204. .all()
  2205. )
  2206. assert len(events) == 0, "S3: must not debit fail-counter when cause is server-side key-loss"
  2207. @pytest.mark.asyncio
  2208. @pytest.mark.integration
  2209. async def test_setup_totp_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  2210. """B3: setup endpoint → 500 when an active TOTP secret can't be decrypted.
  2211. Replacing an active authenticator requires verifying the current TOTP
  2212. code; with no recovery (backup-code) path on this endpoint, the only
  2213. safe outcome is a 500 surface to the operator.
  2214. """
  2215. import backend.app.core.encryption as enc_mod
  2216. token, _, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  2217. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2218. enc_mod._fernet_instance = None
  2219. resp = await async_client.post(
  2220. "/api/v1/auth/2fa/totp/setup",
  2221. json={"code": "123456"},
  2222. headers={"Authorization": f"Bearer {token}"},
  2223. )
  2224. assert resp.status_code == 500
  2225. assert "unavailable" in resp.json().get("detail", "").lower()
  2226. @pytest.mark.asyncio
  2227. @pytest.mark.integration
  2228. async def test_verify_2fa_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  2229. """C9: verify endpoint (TOTP method) → 500 when TOTP secret unreadable."""
  2230. from datetime import datetime, timedelta, timezone
  2231. import backend.app.core.encryption as enc_mod
  2232. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  2233. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2234. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2235. enc_mod._fernet_instance = None
  2236. # Create a pre_auth token to simulate the post-login 2FA challenge step.
  2237. raw_token = secrets.token_urlsafe(32)
  2238. ephemeral = AuthEphemeralToken(
  2239. token=raw_token,
  2240. token_type="pre_auth",
  2241. username=admin_username,
  2242. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2243. )
  2244. db_session.add(ephemeral)
  2245. await db_session.commit()
  2246. resp = await async_client.post(
  2247. "/api/v1/auth/2fa/verify",
  2248. json={"pre_auth_token": raw_token, "method": "totp", "code": "123456"},
  2249. )
  2250. assert resp.status_code == 500
  2251. assert "unavailable" in resp.json().get("detail", "").lower()