test_security.py 115 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823
  1. """Security tests for the 8 coverage gaps identified in the maintainer review.
  2. Gap 1: encryption.py has zero tests
  3. Gap 2: JWT revocation (revoke_jti, is_jti_revoked, _is_token_fresh) untested
  4. Gap 3: OIDC exchange token replay untested
  5. Gap 4: OIDC email_verified claim handling untested
  6. Gap 5: Email OTP max-attempts invalidation untested
  7. Gap 6: OIDC callback error redirects (SSRF protection) undertested
  8. Gap 7: Login rate limiting untested
  9. Gap 8: challenge_id cookie binding untested
  10. """
  11. from __future__ import annotations
  12. import base64
  13. import secrets
  14. import time
  15. from datetime import datetime, timedelta, timezone
  16. from unittest.mock import AsyncMock, MagicMock, patch
  17. import jwt as pyjwt
  18. import pytest
  19. from cryptography.hazmat.primitives import serialization
  20. from cryptography.hazmat.primitives.asymmetric import rsa
  21. from httpx import AsyncClient
  22. from sqlalchemy.ext.asyncio import AsyncSession
  23. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  24. from backend.app.models.user import User
  25. AUTH_SETUP_URL = "/api/v1/auth/setup"
  26. LOGIN_URL = "/api/v1/auth/login"
  27. LOGOUT_URL = "/api/v1/auth/logout"
  28. ME_URL = "/api/v1/auth/me"
  29. def _auth_header(token: str) -> dict[str, str]:
  30. return {"Authorization": f"Bearer {token}"}
  31. def _norm_pw(password: str) -> str:
  32. """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
  33. if not any(c.isupper() for c in password):
  34. password = password[0].upper() + password[1:]
  35. if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
  36. password = password + "!"
  37. return password
  38. async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
  39. password = _norm_pw(password)
  40. await client.post(
  41. AUTH_SETUP_URL,
  42. json={"auth_enabled": True, "admin_username": username, "admin_password": password},
  43. )
  44. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  45. assert resp.status_code == 200
  46. return resp.json()["access_token"]
  47. def _make_test_rsa_key():
  48. def _b64url(n: int, length: int) -> str:
  49. return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
  50. private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  51. private_pem = private_key.private_bytes(
  52. serialization.Encoding.PEM,
  53. serialization.PrivateFormat.TraditionalOpenSSL,
  54. serialization.NoEncryption(),
  55. )
  56. pub_numbers = private_key.public_key().public_numbers()
  57. jwks = {
  58. "keys": [
  59. {
  60. "kty": "RSA",
  61. "use": "sig",
  62. "alg": "RS256",
  63. "kid": "test-kid-1",
  64. "n": _b64url(pub_numbers.n, 256),
  65. "e": _b64url(pub_numbers.e, 3),
  66. }
  67. ]
  68. }
  69. return private_pem, jwks
  70. # ===========================================================================
  71. # Gap 1: encryption.py unit tests
  72. # ===========================================================================
  73. class TestEncryption:
  74. """encrypt/decrypt round-trips, plaintext passthrough, RuntimeError on missing key.
  75. The ``mfa_encryption_isolation`` autouse fixture (conftest.py) resets the
  76. ``encryption`` module's globals before/after each test and points
  77. ``DATA_DIR`` at a tmp path, so individual tests only need to set
  78. ``MFA_ENCRYPTION_KEY`` when they want a specific key in scope.
  79. """
  80. def test_encrypt_decrypt_roundtrip_with_key(self, monkeypatch):
  81. from cryptography.fernet import Fernet
  82. import backend.app.core.encryption as enc_mod
  83. test_key = Fernet.generate_key().decode()
  84. monkeypatch.setenv("MFA_ENCRYPTION_KEY", test_key)
  85. # Force re-initialisation now that the env var is set.
  86. enc_mod._fernet_instance = None
  87. ciphertext = enc_mod.mfa_encrypt("my-totp-secret")
  88. assert ciphertext.startswith("fernet:")
  89. assert enc_mod.mfa_decrypt(ciphertext) == "my-totp-secret"
  90. def test_plaintext_passthrough_without_key(self, monkeypatch):
  91. # Force the auto-bootstrap into the legacy "no key available" branch
  92. # by patching _load_or_generate_key directly. This is more robust than
  93. # chmod tricks (which root bypasses) when verifying the plaintext path.
  94. import backend.app.core.encryption as enc_mod
  95. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  96. enc_mod._fernet_instance = None
  97. result = enc_mod.mfa_encrypt("plaintext-secret")
  98. assert result == "plaintext-secret"
  99. assert enc_mod.mfa_decrypt("plaintext-secret") == "plaintext-secret"
  100. def test_decrypt_raises_runtime_error_without_key_for_encrypted_value(self, monkeypatch):
  101. import backend.app.core.encryption as enc_mod
  102. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  103. enc_mod._fernet_instance = None
  104. with pytest.raises(RuntimeError, match="MFA_ENCRYPTION_KEY must be set"):
  105. enc_mod.mfa_decrypt("fernet:gAAAAA-fake-ciphertext")
  106. # ------------------------------------------------------------------
  107. # Auto-bootstrap tests for _load_or_generate_key
  108. # ------------------------------------------------------------------
  109. def test_load_or_generate_key_uses_env_when_set(self, monkeypatch, tmp_path):
  110. """Valid env var → key_source == 'env', no file written."""
  111. from cryptography.fernet import Fernet
  112. import backend.app.core.encryption as enc_mod
  113. valid_key = Fernet.generate_key().decode()
  114. monkeypatch.setenv("MFA_ENCRYPTION_KEY", valid_key)
  115. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  116. enc_mod._fernet_instance = None
  117. key, source = enc_mod._load_or_generate_key()
  118. assert key == valid_key
  119. assert source == "env"
  120. assert not (tmp_path / ".mfa_encryption_key").exists()
  121. def test_invalid_env_key_falls_through_to_file(self, monkeypatch, tmp_path, caplog):
  122. """Invalid env var → logger.error + file fallback (auto-generated)."""
  123. import logging
  124. import backend.app.core.encryption as enc_mod
  125. monkeypatch.setenv("MFA_ENCRYPTION_KEY", "not-a-valid-fernet-key")
  126. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  127. enc_mod._fernet_instance = None
  128. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  129. key, source = enc_mod._load_or_generate_key()
  130. assert source == "generated"
  131. assert key is not None
  132. assert (tmp_path / ".mfa_encryption_key").exists()
  133. assert any("not a valid Fernet key" in rec.message for rec in caplog.records)
  134. def test_load_or_generate_key_reads_existing_file(self, monkeypatch, tmp_path):
  135. """File present in DATA_DIR + no env var → key_source == 'file'."""
  136. from cryptography.fernet import Fernet
  137. import backend.app.core.encryption as enc_mod
  138. existing_key = Fernet.generate_key().decode()
  139. key_file = tmp_path / ".mfa_encryption_key"
  140. key_file.write_text(existing_key)
  141. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  142. enc_mod._fernet_instance = None
  143. key, source = enc_mod._load_or_generate_key()
  144. assert key == existing_key
  145. assert source == "file"
  146. def test_load_or_generate_key_creates_file_with_0600(self, monkeypatch, tmp_path):
  147. """Neither env nor file → new key generated, file mode is 0o600."""
  148. import backend.app.core.encryption as enc_mod
  149. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  150. enc_mod._fernet_instance = None
  151. key, source = enc_mod._load_or_generate_key()
  152. assert source == "generated"
  153. assert enc_mod._validate_fernet_key(key)
  154. key_file = tmp_path / ".mfa_encryption_key"
  155. assert key_file.exists()
  156. # Mode bits LSB are 0o600 — owner read+write only.
  157. assert (key_file.stat().st_mode & 0o777) == 0o600
  158. def test_load_or_generate_key_returns_none_on_write_oserror(self, monkeypatch, tmp_path, caplog):
  159. """When DATA_DIR can't be written to (auto-generate path), return (None, 'none_write_failed').
  160. S1: write now uses os.open(O_EXCL|O_CREAT, 0o600) instead of write_text — patch
  161. os.write to simulate the OS-level failure. S8: source distinguishes write-failed
  162. from corrupted to drive accurate operator messaging.
  163. """
  164. import logging
  165. import os
  166. import backend.app.core.encryption as enc_mod
  167. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  168. enc_mod._fernet_instance = None
  169. original_write = os.write
  170. def _raising_write(fd, data):
  171. # Best-effort: trigger OSError specifically for the key write.
  172. raise OSError("simulated read-only filesystem")
  173. monkeypatch.setattr(os, "write", _raising_write)
  174. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  175. key, source = enc_mod._load_or_generate_key()
  176. # Restore os.write so the rest of the test suite is unaffected.
  177. monkeypatch.setattr(os, "write", original_write)
  178. assert key is None
  179. assert source == "none_write_failed"
  180. assert any("Could not save MFA encryption key" in rec.message for rec in caplog.records)
  181. def test_load_or_generate_key_returns_none_on_read_oserror(self, monkeypatch, tmp_path, caplog):
  182. """B4: existing key file but read fails (e.g. permission denied) → (None, 'none_corrupted').
  183. Critical: must NOT regenerate a new key, which would destroy access to
  184. every row already encrypted under the existing key. S8: 'none_corrupted'
  185. marks the cause so operators see the right diagnostic.
  186. """
  187. import logging
  188. from pathlib import Path
  189. import backend.app.core.encryption as enc_mod
  190. # Pre-create a key file so we hit the existing-file branch.
  191. key_file = tmp_path / ".mfa_encryption_key"
  192. key_file.write_text("placeholder-content")
  193. original_size = key_file.stat().st_size
  194. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  195. enc_mod._fernet_instance = None
  196. original_read_text = Path.read_text
  197. def _raising_read_text(self, *args, **kwargs):
  198. if self.name == ".mfa_encryption_key":
  199. raise OSError("simulated permission denied")
  200. return original_read_text(self, *args, **kwargs)
  201. monkeypatch.setattr(Path, "read_text", _raising_read_text)
  202. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  203. key, source = enc_mod._load_or_generate_key()
  204. assert key is None
  205. assert source == "none_corrupted"
  206. # Critical: file must not have been overwritten with a new key.
  207. assert key_file.exists()
  208. assert key_file.stat().st_size == original_size
  209. assert any("Failed to read existing MFA key file" in rec.message for rec in caplog.records)
  210. assert any("Refusing to regenerate" in rec.message for rec in caplog.records)
  211. def test_get_key_source_reflects_active_source(self, monkeypatch, tmp_path):
  212. """get_key_source() returns the source detected on the most recent _get_fernet() call."""
  213. from cryptography.fernet import Fernet
  214. import backend.app.core.encryption as enc_mod
  215. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  216. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  217. enc_mod._fernet_instance = None
  218. enc_mod._key_source = None
  219. # Trigger lazy initialisation
  220. enc_mod.mfa_encrypt("anything")
  221. assert enc_mod.get_key_source() == "env"
  222. def test_corrupted_key_file_returns_none_without_overwrite(self, monkeypatch, tmp_path, caplog):
  223. """A1: invalid key file content → (None, 'none_corrupted'), file not overwritten.
  224. S8: 'none_corrupted' (vs 'none_write_failed') so operators get the right
  225. diagnostic and don't see a misleading 'DATA_DIR not writable' warning.
  226. """
  227. import logging
  228. import backend.app.core.encryption as enc_mod
  229. key_file = tmp_path / ".mfa_encryption_key"
  230. key_file.write_text("invalid_content")
  231. original_mtime = key_file.stat().st_mtime
  232. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  233. enc_mod._fernet_instance = None
  234. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  235. key, source = enc_mod._load_or_generate_key()
  236. assert key is None
  237. assert source == "none_corrupted"
  238. assert key_file.exists(), "file must not be deleted"
  239. assert key_file.stat().st_mtime == original_mtime, "file must not be overwritten"
  240. assert any("not a valid Fernet key" in rec.message for rec in caplog.records)
  241. assert any("Refusing to overwrite" in rec.message for rec in caplog.records)
  242. def test_auto_generate_fileexistserror_returns_none_corrupted(self, monkeypatch, tmp_path, caplog):
  243. """S1: O_EXCL race — file appears between exists() check and open() →
  244. return (None, 'none_corrupted') without overwriting."""
  245. import logging
  246. import os
  247. import backend.app.core.encryption as enc_mod
  248. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  249. enc_mod._fernet_instance = None
  250. original_open = os.open
  251. def _excl_raise(path, flags, mode=0o777):
  252. if str(path).endswith(".mfa_encryption_key") and (flags & os.O_EXCL):
  253. raise FileExistsError(17, "File exists", str(path))
  254. return original_open(path, flags, mode)
  255. monkeypatch.setattr(os, "open", _excl_raise)
  256. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  257. key, source = enc_mod._load_or_generate_key()
  258. assert key is None
  259. assert source == "none_corrupted"
  260. assert any("Race detected" in rec.message for rec in caplog.records)
  261. # ===========================================================================
  262. # Gap 2: JWT revocation — revoke_jti, is_jti_revoked, _is_token_fresh, /me
  263. # ===========================================================================
  264. class TestJWTRevocation:
  265. """JWT revocation and token freshness checks."""
  266. @pytest.mark.asyncio
  267. @pytest.mark.integration
  268. async def test_revoke_jti_and_is_jti_revoked(self, async_client: AsyncClient, db_session: AsyncSession):
  269. """revoke_jti stores the JTI; is_jti_revoked returns True afterwards."""
  270. from backend.app.core.auth import is_jti_revoked, revoke_jti
  271. test_jti = secrets.token_urlsafe(16)
  272. expires = datetime.now(timezone.utc) + timedelta(hours=1)
  273. assert not await is_jti_revoked(test_jti)
  274. await revoke_jti(test_jti, expires, username="testuser")
  275. assert await is_jti_revoked(test_jti)
  276. @pytest.mark.asyncio
  277. @pytest.mark.integration
  278. async def test_revoke_jti_idempotent(self, async_client: AsyncClient):
  279. """Double-revocation of the same JTI should not raise."""
  280. from backend.app.core.auth import is_jti_revoked, revoke_jti
  281. jti = secrets.token_urlsafe(16)
  282. expires = datetime.now(timezone.utc) + timedelta(hours=1)
  283. await revoke_jti(jti, expires)
  284. await revoke_jti(jti, expires) # must not raise
  285. assert await is_jti_revoked(jti)
  286. def test_is_token_fresh_rejects_none_iat(self):
  287. """_is_token_fresh returns False when iat is None (I1 hard cutoff)."""
  288. from backend.app.core.auth import _is_token_fresh
  289. user = MagicMock()
  290. user.password_changed_at = None
  291. assert _is_token_fresh(None, user) is False
  292. def test_is_token_fresh_rejects_token_before_password_change(self):
  293. """_is_token_fresh returns False when iat predates password_changed_at."""
  294. from backend.app.core.auth import _is_token_fresh
  295. now = datetime.now(timezone.utc)
  296. user = MagicMock()
  297. user.password_changed_at = now
  298. old_iat = (now - timedelta(hours=1)).timestamp()
  299. assert _is_token_fresh(old_iat, user) is False
  300. def test_is_token_fresh_accepts_token_after_password_change(self):
  301. """_is_token_fresh returns True when iat is after password_changed_at."""
  302. from backend.app.core.auth import _is_token_fresh
  303. now = datetime.now(timezone.utc)
  304. user = MagicMock()
  305. user.password_changed_at = now - timedelta(hours=1)
  306. recent_iat = now.timestamp()
  307. assert _is_token_fresh(recent_iat, user) is True
  308. def test_is_token_fresh_returns_true_when_no_password_change(self):
  309. """_is_token_fresh returns True when password_changed_at is None (I2 migration not yet run)."""
  310. from backend.app.core.auth import _is_token_fresh
  311. user = MagicMock()
  312. user.password_changed_at = None
  313. assert _is_token_fresh(time.time(), user) is True
  314. @pytest.mark.asyncio
  315. @pytest.mark.integration
  316. async def test_me_endpoint_rejects_token_after_logout(self, async_client: AsyncClient):
  317. """After logout, the bearer token must be rejected by /me (B1 + revocation)."""
  318. token = await _setup_and_login(async_client, "sec_logout_me", "sec_logout_me1")
  319. # Token works before logout
  320. me_resp = await async_client.get(ME_URL, headers=_auth_header(token))
  321. assert me_resp.status_code == 200
  322. # Logout
  323. logout_resp = await async_client.post(LOGOUT_URL, headers=_auth_header(token))
  324. assert logout_resp.status_code == 200
  325. # Token must now be rejected
  326. me_after = await async_client.get(ME_URL, headers=_auth_header(token))
  327. assert me_after.status_code == 401
  328. # ===========================================================================
  329. # Gap 3: OIDC exchange token replay
  330. # ===========================================================================
  331. class TestOIDCExchangeReplay:
  332. """A single-use OIDC exchange token cannot be redeemed twice."""
  333. @pytest.mark.asyncio
  334. @pytest.mark.integration
  335. async def test_exchange_token_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
  336. """The second call to /oidc/exchange with the same token returns 401."""
  337. exchange_token = secrets.token_urlsafe(32)
  338. db_session.add(
  339. AuthEphemeralToken(
  340. token=exchange_token,
  341. token_type="oidc_exchange",
  342. username="oidc_replay_user",
  343. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  344. )
  345. )
  346. await db_session.commit()
  347. # Seed the user so the exchange can resolve it
  348. from backend.app.core.auth import get_password_hash
  349. from backend.app.core.database import async_session, seed_default_groups
  350. async with async_session() as db:
  351. result = await db.execute(__import__("sqlalchemy").select(User).where(User.username == "oidc_replay_user"))
  352. if result.scalar_one_or_none() is None:
  353. db.add(
  354. User(
  355. username="oidc_replay_user",
  356. password_hash=get_password_hash("pw"),
  357. is_active=True,
  358. )
  359. )
  360. await db.commit()
  361. first = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
  362. assert first.status_code == 200
  363. second = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
  364. assert second.status_code == 401
  365. # ===========================================================================
  366. # Gap 4: OIDC email_verified claim handling
  367. # ===========================================================================
  368. class TestOIDCEmailVerified:
  369. """email_verified: False/absent must not link OIDC identity to an existing email."""
  370. @pytest.mark.asyncio
  371. @pytest.mark.integration
  372. async def test_unverified_email_does_not_link_to_existing_user(
  373. self, async_client: AsyncClient, db_session: AsyncSession
  374. ):
  375. """If email_verified is False, the OIDC callback must not auto-link by email."""
  376. private_pem, jwks_data = _make_test_rsa_key()
  377. issuer = "https://idp.evtest.example.com"
  378. client_id = "ev-client"
  379. nonce = secrets.token_urlsafe(16)
  380. now = int(time.time())
  381. id_token = pyjwt.encode(
  382. {
  383. "sub": "ev-sub-new",
  384. "iss": issuer,
  385. "aud": client_id,
  386. "nonce": nonce,
  387. "email": "existing@example.com",
  388. "email_verified": False, # <-- must be ignored
  389. "iat": now,
  390. "exp": now + 300,
  391. },
  392. private_pem,
  393. algorithm="RS256",
  394. headers={"kid": "test-kid-1"},
  395. )
  396. admin_token = await _setup_and_login(async_client, "ev_admin", "ev_admin1")
  397. # Create existing user with the same email (use strong password for validator)
  398. create_user_resp = await async_client.post(
  399. "/api/v1/users",
  400. json={"username": "existing_email_user", "password": "Str0ng!Pass", "email": "existing@example.com"},
  401. headers=_auth_header(admin_token),
  402. )
  403. assert create_user_resp.status_code in (200, 201), create_user_resp.json()
  404. # Create OIDC provider
  405. create_resp = await async_client.post(
  406. "/api/v1/auth/oidc/providers",
  407. json={
  408. "name": "EV-IdP",
  409. "issuer_url": issuer,
  410. "client_id": client_id,
  411. "client_secret": "secret",
  412. "scopes": "openid email",
  413. "is_enabled": True,
  414. "auto_create_users": True,
  415. },
  416. headers=_auth_header(admin_token),
  417. )
  418. assert create_resp.status_code == 201
  419. provider_id = create_resp.json()["id"]
  420. state = secrets.token_urlsafe(32)
  421. code_verifier = secrets.token_urlsafe(48)
  422. db_session.add(
  423. AuthEphemeralToken(
  424. token=state,
  425. token_type="oidc_state",
  426. provider_id=provider_id,
  427. nonce=nonce,
  428. code_verifier=code_verifier,
  429. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  430. )
  431. )
  432. await db_session.commit()
  433. discovery_doc = {
  434. "issuer": issuer,
  435. "authorization_endpoint": f"{issuer}/auth",
  436. "token_endpoint": f"{issuer}/token",
  437. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  438. }
  439. class _MockResp:
  440. def __init__(self, data):
  441. self._data = data
  442. self.status_code = 200
  443. self.is_success = True
  444. self.text = str(data)
  445. def json(self):
  446. return self._data
  447. def raise_for_status(self):
  448. pass
  449. class _MockHttpxClientEV:
  450. def __init__(self, *args, **kwargs):
  451. pass
  452. async def __aenter__(self):
  453. return self
  454. async def __aexit__(self, *_):
  455. pass
  456. async def get(self, url, **kwargs):
  457. if "jwks" in url:
  458. return _MockResp(jwks_data)
  459. return _MockResp(discovery_doc)
  460. async def post(self, url, **kwargs):
  461. return _MockResp({"access_token": "mock", "token_type": "Bearer", "id_token": id_token})
  462. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientEV):
  463. await async_client.get(
  464. f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
  465. follow_redirects=False,
  466. )
  467. # Callback must NOT link to the existing_email_user — a new user is created
  468. # instead (because the email claim was ignored due to email_verified=False).
  469. # Either a new user is provisioned (redirect with oidc_token) or the callback
  470. # fails. In either case, the existing user must not have an OIDC link.
  471. from sqlalchemy import select as sa_select
  472. from backend.app.models.oidc_provider import UserOIDCLink
  473. link_result = await db_session.execute(
  474. sa_select(UserOIDCLink)
  475. .join(User, UserOIDCLink.user_id == User.id)
  476. .where(User.email == "existing@example.com")
  477. )
  478. link = link_result.scalar_one_or_none()
  479. assert link is None, "Existing user must not be auto-linked when email_verified is False"
  480. # ===========================================================================
  481. # Gap 5: Email OTP max-attempts invalidation
  482. # ===========================================================================
  483. class TestEmailOTPMaxAttempts:
  484. """After MAX_ATTEMPTS wrong codes, the OTP is permanently invalidated."""
  485. @pytest.mark.asyncio
  486. @pytest.mark.integration
  487. async def test_email_otp_invalidated_after_max_attempts(self, async_client: AsyncClient, db_session: AsyncSession):
  488. from passlib.context import CryptContext
  489. from sqlalchemy import select as sa_select
  490. from backend.app.models.user_otp_code import UserOTPCode
  491. _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  492. admin_token = await _setup_and_login(async_client, "otp_max_admin", "otp_max_admin1")
  493. # Enable email OTP for admin user
  494. result = await db_session.execute(sa_select(User).where(User.username == "otp_max_admin"))
  495. user = result.scalar_one()
  496. user.email = "otpmax@example.com"
  497. await db_session.commit()
  498. setup_code = "123456"
  499. from backend.app.models.auth_ephemeral import AuthEphemeralToken as AET
  500. setup_token = secrets.token_urlsafe(32)
  501. db_session.add(
  502. AET(
  503. token=setup_token,
  504. token_type="email_otp_setup",
  505. username="otp_max_admin",
  506. nonce=_pwd_ctx.hash(setup_code),
  507. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  508. )
  509. )
  510. await db_session.commit()
  511. await async_client.post(
  512. "/api/v1/auth/2fa/email/enable/confirm",
  513. json={"setup_token": setup_token, "code": setup_code},
  514. headers=_auth_header(admin_token),
  515. )
  516. # Login to get pre_auth_token
  517. login_resp = await async_client.post(
  518. LOGIN_URL, json={"username": "otp_max_admin", "password": "Otp_max_admin1"}
  519. )
  520. pre_auth_token = login_resp.json()["pre_auth_token"]
  521. # Insert an OTP record directly (bypassing SMTP)
  522. real_code = "654321"
  523. otp = UserOTPCode(
  524. user_id=user.id,
  525. code_hash=_pwd_ctx.hash(real_code),
  526. attempts=0,
  527. used=False,
  528. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  529. )
  530. db_session.add(otp)
  531. await db_session.commit()
  532. # Submit MAX_ATTEMPTS wrong codes
  533. from backend.app.api.routes.mfa import MAX_2FA_ATTEMPTS
  534. for _ in range(MAX_2FA_ATTEMPTS):
  535. r = await async_client.post(
  536. "/api/v1/auth/2fa/verify",
  537. json={"pre_auth_token": pre_auth_token, "code": "000000", "method": "email"},
  538. )
  539. # Each attempt must fail with 401
  540. assert r.status_code == 401
  541. # After max attempts, the correct code is also rejected (either OTP
  542. # invalidated → 401, or rate limit hit → 429). Either means locked out.
  543. final = await async_client.post(
  544. "/api/v1/auth/2fa/verify",
  545. json={"pre_auth_token": pre_auth_token, "code": real_code, "method": "email"},
  546. )
  547. assert final.status_code in (401, 429), f"Expected lockout, got {final.status_code}: {final.json()}"
  548. # ===========================================================================
  549. # Gap 6: OIDC callback SSRF protection — invalid authorization_endpoint scheme
  550. # ===========================================================================
  551. class TestOIDCSSRFProtection:
  552. """authorization_endpoint with non-http(s) scheme must be rejected."""
  553. @pytest.mark.asyncio
  554. @pytest.mark.integration
  555. async def test_invalid_authorization_endpoint_scheme_rejected(
  556. self, async_client: AsyncClient, db_session: AsyncSession
  557. ):
  558. issuer = "https://idp.ssrf.example.com"
  559. client_id = "ssrf-client"
  560. admin_token = await _setup_and_login(async_client, "ssrf_admin", "ssrf_admin1")
  561. create_resp = await async_client.post(
  562. "/api/v1/auth/oidc/providers",
  563. json={
  564. "name": "SSRF-IdP",
  565. "issuer_url": issuer,
  566. "client_id": client_id,
  567. "client_secret": "secret",
  568. "scopes": "openid",
  569. "is_enabled": True,
  570. "auto_create_users": False,
  571. },
  572. headers=_auth_header(admin_token),
  573. )
  574. assert create_resp.status_code == 201
  575. provider_id = create_resp.json()["id"]
  576. # Discovery doc returns a javascript: authorization_endpoint
  577. malicious_discovery = {
  578. "issuer": issuer,
  579. "authorization_endpoint": "javascript:alert(1)", # <-- malicious
  580. "token_endpoint": f"{issuer}/token",
  581. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  582. }
  583. class _MockResp:
  584. def __init__(self, data):
  585. self._data = data
  586. self.status_code = 200
  587. self.is_success = True
  588. self.text = str(data)
  589. def json(self):
  590. return self._data
  591. def raise_for_status(self):
  592. pass
  593. class _MockHttpxClientSSRF:
  594. def __init__(self, *args, **kwargs):
  595. pass
  596. async def __aenter__(self):
  597. return self
  598. async def __aexit__(self, *_):
  599. pass
  600. async def get(self, url, **kwargs):
  601. return _MockResp(malicious_discovery)
  602. async def post(self, url, **kwargs):
  603. return _MockResp({})
  604. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientSSRF):
  605. # oidc_authorize uses a path parameter, not query param
  606. authorize_resp = await async_client.get(
  607. f"/api/v1/auth/oidc/authorize/{provider_id}",
  608. follow_redirects=False,
  609. )
  610. # Must be rejected with 502 — B2 guard rejects invalid authorization_endpoint scheme
  611. assert authorize_resp.status_code == 502, authorize_resp.json()
  612. detail = authorize_resp.json().get("detail", "").lower()
  613. assert "authorization_endpoint" in detail or "invalid" in detail
  614. # ===========================================================================
  615. # Gap 7: Login rate limiting
  616. # ===========================================================================
  617. class TestLoginRateLimiting:
  618. """10+ failed logins for the same username must return 429."""
  619. @pytest.mark.asyncio
  620. @pytest.mark.integration
  621. async def test_excessive_failed_logins_return_429(self, async_client: AsyncClient):
  622. from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
  623. # Setup auth but do NOT log in
  624. await async_client.post(
  625. AUTH_SETUP_URL,
  626. json={"auth_enabled": True, "admin_username": "ratelimit_user", "admin_password": "Ratelimit_pw1"},
  627. )
  628. status_codes = []
  629. for _ in range(MAX_LOGIN_ATTEMPTS + 2):
  630. resp = await async_client.post(
  631. LOGIN_URL,
  632. json={"username": "ratelimit_user", "password": "wrong_password"},
  633. )
  634. status_codes.append(resp.status_code)
  635. # The last attempts must be 429 (Too Many Requests)
  636. assert status_codes[-1] == 429, f"Expected 429 after {MAX_LOGIN_ATTEMPTS} failures, got: {status_codes}"
  637. # ===========================================================================
  638. # Gap 8: challenge_id cookie binding
  639. # ===========================================================================
  640. class TestChallengeIdCookieBinding:
  641. """A pre-auth token stolen from session A cannot be used from session B."""
  642. @pytest.mark.asyncio
  643. @pytest.mark.integration
  644. async def test_pre_auth_token_rejected_without_matching_cookie(
  645. self, async_client: AsyncClient, db_session: AsyncSession
  646. ):
  647. import pyotp
  648. from passlib.context import CryptContext
  649. _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  650. # Set up user with TOTP
  651. await _setup_and_login(async_client, "cookie_bind_user", "cookie_bind_pw1")
  652. secret = pyotp.random_base32()
  653. totp_obj = pyotp.TOTP(secret)
  654. from sqlalchemy import select as sa_select
  655. from backend.app.models.user_totp import UserTOTP
  656. result = await db_session.execute(sa_select(User).where(User.username == "cookie_bind_user"))
  657. user = result.scalar_one()
  658. db_session.add(UserTOTP(user_id=user.id, secret=secret, is_enabled=True))
  659. await db_session.commit()
  660. # Login from "session A" — gets a pre_auth_token and a 2fa_challenge cookie
  661. login_resp = await async_client.post(
  662. LOGIN_URL, json={"username": "cookie_bind_user", "password": "Cookie_bind_pw1"}
  663. )
  664. assert login_resp.status_code == 200
  665. assert login_resp.json()["requires_2fa"] is True
  666. pre_auth_token = login_resp.json()["pre_auth_token"]
  667. # The async_client jar now holds the 2fa_challenge cookie for session A
  668. # Simulate session B by creating a new client WITHOUT the cookie
  669. from httpx import ASGITransport, AsyncClient as FreshClient
  670. from backend.app.main import app
  671. async with FreshClient(transport=ASGITransport(app=app), base_url="http://test") as session_b:
  672. # Attempt to use session A's pre_auth_token from session B (no cookie)
  673. verify_resp = await session_b.post(
  674. "/api/v1/auth/2fa/verify",
  675. json={
  676. "pre_auth_token": pre_auth_token,
  677. "code": totp_obj.now(),
  678. "method": "totp",
  679. },
  680. )
  681. # Must be rejected — pre_auth_token is bound to session A's cookie
  682. assert verify_resp.status_code == 401, (
  683. f"Expected 401 for token replay from cookieless session, got {verify_resp.status_code}: "
  684. f"{verify_resp.json()}"
  685. )
  686. # ===========================================================================
  687. # C2: Security-header middleware
  688. # ===========================================================================
  689. class TestSecurityHeaders:
  690. """Every HTTP response must include standard security headers (C2)."""
  691. @pytest.mark.asyncio
  692. @pytest.mark.integration
  693. async def test_security_headers_present(self, async_client: AsyncClient):
  694. """GET /api/v1/auth/me (unauthenticated → 401) still carries security headers."""
  695. resp = await async_client.get(ME_URL)
  696. assert resp.status_code == 401 # sanity — no auth token
  697. assert resp.headers.get("x-content-type-options") == "nosniff"
  698. assert resp.headers.get("x-frame-options") == "SAMEORIGIN"
  699. assert resp.headers.get("referrer-policy") == "strict-origin-when-cross-origin"
  700. csp = resp.headers.get("content-security-policy", "")
  701. assert "default-src 'self'" in csp
  702. assert "script-src 'self'" in csp
  703. assert "frame-ancestors 'none'" in csp
  704. assert "object-src 'none'" in csp
  705. @pytest.mark.asyncio
  706. @pytest.mark.integration
  707. async def test_hsts_absent_for_http(self, async_client: AsyncClient):
  708. """HSTS must NOT be set over plain HTTP (test transport uses http)."""
  709. resp = await async_client.get(ME_URL)
  710. assert "strict-transport-security" not in resp.headers
  711. # ===========================================================================
  712. # I3: Rate-limit bucket interaction — IP spray vs. username spray
  713. # ===========================================================================
  714. class TestRateLimitBuckets:
  715. """IP-spray and username-spray must each trip the correct independent bucket."""
  716. @pytest.mark.asyncio
  717. @pytest.mark.integration
  718. async def test_ip_spray_trips_ip_bucket(self, async_client: AsyncClient):
  719. """20 failed logins from one IP across 20 different usernames trips the IP bucket.
  720. Each per-username bucket only has 1 failure (well below MAX_LOGIN_ATTEMPTS=10),
  721. so the username bucket is never the reason for the 429.
  722. """
  723. from unittest.mock import patch as _patch
  724. unique_ip = "10.99.1.1"
  725. # Ensure auth is enabled
  726. await async_client.post(
  727. AUTH_SETUP_URL,
  728. json={"auth_enabled": True, "admin_username": "spray_ip_admin", "admin_password": "SprayIp_admin1"},
  729. )
  730. status_codes: list[int] = []
  731. with _patch("backend.app.api.routes.auth._get_client_ip", return_value=unique_ip):
  732. for i in range(22):
  733. resp = await async_client.post(
  734. LOGIN_URL,
  735. json={"username": f"spray_ip_victim_{i}", "password": "wrong"},
  736. )
  737. status_codes.append(resp.status_code)
  738. # The first 20 attempts fail with 401; the 21st+ must be 429 (IP bucket full)
  739. assert status_codes[-1] == 429, f"Expected 429 after 20 IP-spray failures, got: {status_codes}"
  740. # No single username saw more than one attempt → username buckets not tripped
  741. non_429 = [c for c in status_codes[:-2] if c == 429]
  742. assert not non_429, f"Username bucket triggered early: {status_codes}"
  743. @pytest.mark.asyncio
  744. @pytest.mark.integration
  745. async def test_username_spray_trips_username_bucket(self, async_client: AsyncClient):
  746. """One username targeted from 10+ different IPs trips the username bucket.
  747. Each per-IP bucket only sees 1 failure, so no IP bucket is tripped.
  748. The username bucket (max 10) is what fires the 429.
  749. """
  750. from unittest.mock import patch as _patch
  751. from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
  752. # Ensure auth is enabled
  753. await async_client.post(
  754. AUTH_SETUP_URL,
  755. json={
  756. "auth_enabled": True,
  757. "admin_username": "spray_uname_admin",
  758. "admin_password": "SprayUname_admin1",
  759. },
  760. )
  761. target_username = "spray_uname_victim"
  762. status_codes: list[int] = []
  763. for i in range(MAX_LOGIN_ATTEMPTS + 2):
  764. rotating_ip = f"10.99.2.{i + 1}"
  765. with _patch("backend.app.api.routes.auth._get_client_ip", return_value=rotating_ip):
  766. resp = await async_client.post(
  767. LOGIN_URL,
  768. json={"username": target_username, "password": "wrong"},
  769. )
  770. status_codes.append(resp.status_code)
  771. # After MAX_LOGIN_ATTEMPTS failures for same username the bucket fires
  772. assert status_codes[-1] == 429, (
  773. f"Expected 429 after {MAX_LOGIN_ATTEMPTS} username-spray failures, got: {status_codes}"
  774. )
  775. # ============================================================================
  776. # TestEncryptLegacyMigration
  777. # ============================================================================
  778. class TestEncryptLegacyMigration:
  779. """Re-encryption migration of legacy plaintext OIDC + TOTP rows.
  780. The migration runs against its own ``async_session`` factory (not the
  781. ``db_session`` fixture) so each test patches the module-level factory to
  782. point at the test-engine before invoking the helper. ``db_session`` is
  783. used to seed and to verify state via the same engine.
  784. """
  785. @staticmethod
  786. def _patch_module_session(monkeypatch, db_session):
  787. """Bind ``database.async_session`` to the test engine for one test."""
  788. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
  789. from backend.app.core import database as db_mod
  790. test_factory = async_sessionmaker(db_session.bind, class_=AsyncSession, expire_on_commit=False)
  791. monkeypatch.setattr(db_mod, "async_session", test_factory)
  792. @staticmethod
  793. def _set_active_key(monkeypatch):
  794. """Configure a valid Fernet key for the migration to use."""
  795. from cryptography.fernet import Fernet
  796. import backend.app.core.encryption as enc_mod
  797. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  798. enc_mod._fernet_instance = None
  799. @pytest.mark.asyncio
  800. @pytest.mark.integration
  801. async def test_migration_encrypts_plaintext_oidc_secret(self, db_session, monkeypatch):
  802. from sqlalchemy import select
  803. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  804. from backend.app.models.oidc_provider import OIDCProvider
  805. self._patch_module_session(monkeypatch, db_session)
  806. self._set_active_key(monkeypatch)
  807. provider = OIDCProvider(
  808. name="LegacyProv",
  809. issuer_url="https://legacy.example.com",
  810. client_id="cid",
  811. _client_secret_enc="legacy-plaintext",
  812. scopes="openid email profile",
  813. is_enabled=True,
  814. )
  815. db_session.add(provider)
  816. await db_session.commit()
  817. await _migrate_encrypt_legacy_secrets()
  818. # Re-fetch on a fresh row state
  819. await db_session.refresh(provider)
  820. assert provider._client_secret_enc.startswith("fernet:")
  821. # Decrypted value matches the original plaintext
  822. assert provider.client_secret == "legacy-plaintext"
  823. # Sanity: a SELECT also sees the encrypted value
  824. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == provider.id))
  825. fetched = result.scalar_one()
  826. assert fetched._client_secret_enc.startswith("fernet:")
  827. @pytest.mark.asyncio
  828. @pytest.mark.integration
  829. async def test_migration_skips_already_encrypted_rows(self, db_session, monkeypatch):
  830. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  831. from backend.app.models.oidc_provider import OIDCProvider
  832. self._patch_module_session(monkeypatch, db_session)
  833. self._set_active_key(monkeypatch)
  834. # Use the property setter so the value is encrypted up front.
  835. provider = OIDCProvider(
  836. name="EncProv",
  837. issuer_url="https://enc.example.com",
  838. client_id="cid",
  839. client_secret="already-encrypted",
  840. scopes="openid email profile",
  841. is_enabled=True,
  842. )
  843. db_session.add(provider)
  844. await db_session.commit()
  845. original_enc = provider._client_secret_enc
  846. await _migrate_encrypt_legacy_secrets()
  847. await _migrate_encrypt_legacy_secrets() # idempotent
  848. await db_session.refresh(provider)
  849. # Value unchanged across two migration runs (still the same ciphertext).
  850. assert provider._client_secret_enc == original_enc
  851. @pytest.mark.asyncio
  852. @pytest.mark.integration
  853. async def test_migration_no_op_when_key_unset(self, db_session, monkeypatch):
  854. import backend.app.core.encryption as enc_mod
  855. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  856. from backend.app.models.oidc_provider import OIDCProvider
  857. self._patch_module_session(monkeypatch, db_session)
  858. # Force "no key" branch
  859. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  860. enc_mod._fernet_instance = None
  861. provider = OIDCProvider(
  862. name="NoKeyProv",
  863. issuer_url="https://nokey.example.com",
  864. client_id="cid",
  865. _client_secret_enc="still-plaintext",
  866. scopes="openid email profile",
  867. is_enabled=True,
  868. )
  869. db_session.add(provider)
  870. await db_session.commit()
  871. await _migrate_encrypt_legacy_secrets()
  872. await db_session.refresh(provider)
  873. # Migration should have early-returned; plaintext untouched.
  874. assert provider._client_secret_enc == "still-plaintext"
  875. @pytest.mark.asyncio
  876. @pytest.mark.integration
  877. async def test_migration_handles_mixed_state(self, db_session, monkeypatch):
  878. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  879. from backend.app.models.oidc_provider import OIDCProvider
  880. self._patch_module_session(monkeypatch, db_session)
  881. self._set_active_key(monkeypatch)
  882. legacy = OIDCProvider(
  883. name="LegacyMix",
  884. issuer_url="https://l.example.com",
  885. client_id="c1",
  886. _client_secret_enc="plain-mix",
  887. scopes="openid email profile",
  888. )
  889. encrypted = OIDCProvider(
  890. name="EncMix",
  891. issuer_url="https://e.example.com",
  892. client_id="c2",
  893. client_secret="encrypted-mix", # uses setter
  894. scopes="openid email profile",
  895. )
  896. db_session.add_all([legacy, encrypted])
  897. await db_session.commit()
  898. original_encrypted = encrypted._client_secret_enc
  899. await _migrate_encrypt_legacy_secrets()
  900. await db_session.refresh(legacy)
  901. await db_session.refresh(encrypted)
  902. assert legacy._client_secret_enc.startswith("fernet:")
  903. assert legacy.client_secret == "plain-mix"
  904. assert encrypted._client_secret_enc == original_encrypted
  905. @pytest.mark.asyncio
  906. @pytest.mark.integration
  907. async def test_migration_encrypts_plaintext_totp_secret(self, db_session, monkeypatch):
  908. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  909. from backend.app.models.user import User
  910. from backend.app.models.user_totp import UserTOTP
  911. self._patch_module_session(monkeypatch, db_session)
  912. self._set_active_key(monkeypatch)
  913. user = User(username="totpuser1219", email="t@example.com", password_hash="x")
  914. db_session.add(user)
  915. await db_session.flush()
  916. totp = UserTOTP(user_id=user.id, _secret_enc="JBSWY3DPEHPK3PXP", is_enabled=True)
  917. db_session.add(totp)
  918. await db_session.commit()
  919. await _migrate_encrypt_legacy_secrets()
  920. await db_session.refresh(totp)
  921. assert totp._secret_enc.startswith("fernet:")
  922. assert totp.secret == "JBSWY3DPEHPK3PXP"
  923. @pytest.mark.asyncio
  924. @pytest.mark.integration
  925. async def test_migration_logs_count_of_rows_re_encrypted(self, db_session, monkeypatch, caplog):
  926. import logging
  927. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  928. from backend.app.models.oidc_provider import OIDCProvider
  929. from backend.app.models.user import User
  930. from backend.app.models.user_totp import UserTOTP
  931. self._patch_module_session(monkeypatch, db_session)
  932. self._set_active_key(monkeypatch)
  933. provider = OIDCProvider(
  934. name="LegacyLog",
  935. issuer_url="https://log.example.com",
  936. client_id="c",
  937. _client_secret_enc="p",
  938. scopes="openid email profile",
  939. )
  940. user = User(username="logger1219", email="l@example.com", password_hash="x")
  941. db_session.add_all([provider, user])
  942. await db_session.flush()
  943. totp = UserTOTP(user_id=user.id, _secret_enc="JBSWY3DPEHPK3PXP", is_enabled=True)
  944. db_session.add(totp)
  945. await db_session.commit()
  946. with caplog.at_level(logging.INFO, logger="backend.app.core.database"):
  947. await _migrate_encrypt_legacy_secrets()
  948. # The migration logs once with both counts.
  949. assert any(
  950. "Re-encrypted legacy plaintext secrets" in rec.message
  951. and "1 OIDC client_secret(s)" in rec.message
  952. and "1 TOTP secret(s)" in rec.message
  953. for rec in caplog.records
  954. )
  955. @pytest.mark.asyncio
  956. @pytest.mark.integration
  957. async def test_migration_continues_on_row_error(self, db_session, monkeypatch, caplog):
  958. """B2: per-row commit semantics — when one row fails to re-encrypt,
  959. OTHER successfully-encrypted rows must remain committed and the
  960. failure surfaces via get_migration_error_count.
  961. Replaces the previous "rollback all" behaviour: a single poison row
  962. used to block every successful re-encryption on every startup forever.
  963. """
  964. import logging
  965. import backend.app.core.encryption as enc_mod # noqa: F401
  966. from backend.app.core.database import (
  967. _migrate_encrypt_legacy_secrets,
  968. get_migration_error_count,
  969. )
  970. from backend.app.models.oidc_provider import OIDCProvider
  971. self._patch_module_session(monkeypatch, db_session)
  972. self._set_active_key(monkeypatch)
  973. good = OIDCProvider(
  974. name="GoodRow",
  975. issuer_url="https://good.example.com",
  976. client_id="c1",
  977. _client_secret_enc="plaintext-good",
  978. scopes="openid email profile",
  979. )
  980. bad = OIDCProvider(
  981. name="BadRow",
  982. issuer_url="https://bad.example.com",
  983. client_id="c2",
  984. _client_secret_enc="plaintext-bad",
  985. scopes="openid email profile",
  986. )
  987. db_session.add_all([good, bad])
  988. await db_session.commit()
  989. original_bad = bad._client_secret_enc
  990. # Force the setter on the SECOND row to raise — patch at the model's
  991. # import location so the property setter picks up the patched function.
  992. import backend.app.models.oidc_provider as oidc_mod
  993. real_encrypt = oidc_mod.mfa_encrypt
  994. call_count = [0]
  995. def _sometimes_raise(value):
  996. call_count[0] += 1
  997. if call_count[0] == 2:
  998. raise RuntimeError("simulated encrypt failure")
  999. return real_encrypt(value)
  1000. monkeypatch.setattr(oidc_mod, "mfa_encrypt", _sometimes_raise)
  1001. with caplog.at_level(logging.ERROR, logger="backend.app.core.database"):
  1002. await _migrate_encrypt_legacy_secrets()
  1003. # B2: per-row commit — good IS encrypted, bad is unchanged.
  1004. await db_session.refresh(good)
  1005. await db_session.refresh(bad)
  1006. assert good._client_secret_enc.startswith("fernet:"), (
  1007. "good row must be successfully re-encrypted (per-row commit)"
  1008. )
  1009. assert bad._client_secret_enc == original_bad, "bad row must remain unchanged (savepoint-style isolation)"
  1010. assert get_migration_error_count() == 1, "the skipped row must be exposed via get_migration_error_count"
  1011. assert any("skipping" in rec.message.lower() for rec in caplog.records)
  1012. @pytest.mark.asyncio
  1013. @pytest.mark.integration
  1014. async def test_migration_logs_no_op_when_all_encrypted(self, db_session, monkeypatch, caplog):
  1015. """A2: when all rows are already encrypted, migration logs a debug no-op."""
  1016. import logging
  1017. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  1018. from backend.app.models.oidc_provider import OIDCProvider
  1019. self._patch_module_session(monkeypatch, db_session)
  1020. self._set_active_key(monkeypatch)
  1021. provider = OIDCProvider(
  1022. name="AlreadyEnc",
  1023. issuer_url="https://ae.example.com",
  1024. client_id="cae",
  1025. client_secret="already-encrypted",
  1026. scopes="openid email profile",
  1027. )
  1028. db_session.add(provider)
  1029. await db_session.commit()
  1030. with caplog.at_level(logging.DEBUG, logger="backend.app.core.database"):
  1031. await _migrate_encrypt_legacy_secrets()
  1032. assert any("no rows needed re-encryption" in rec.message for rec in caplog.records)
  1033. @pytest.mark.asyncio
  1034. @pytest.mark.integration
  1035. async def test_init_db_propagates_unexpected_migration_error(self, monkeypatch, tmp_path):
  1036. """B3: an unexpected error from _migrate_encrypt_legacy_secrets must
  1037. surface (re-raise) instead of being silently swallowed.
  1038. Pins the contract introduced for B3: a startup-fatal error like a
  1039. session-creation failure must fail the lifespan / CLI / restore
  1040. handler explicitly, never run the app with half-migrated rows.
  1041. Implementation note: we patch _migrate_encrypt_legacy_secrets itself
  1042. rather than poking the inner read phase, because that is the contract
  1043. boundary the rest of the codebase relies on (init_db -> migration).
  1044. """
  1045. from sqlalchemy import event
  1046. from sqlalchemy.ext.asyncio import create_async_engine
  1047. import backend.app.core.database as db_mod
  1048. from backend.app.core.config import settings
  1049. # init_db() uses the module-level `engine`, which was bound at import
  1050. # time to settings.database_url — that resolves to the real shared
  1051. # bambuddy.db at the project root (or, when DATABASE_URL is set, the
  1052. # configured Postgres). The autouse DATA_DIR fixture runs too late to
  1053. # influence either. Letting this test write to that real DB makes it
  1054. # (a) non-hermetic and (b) flake under `-n 30` with "database is
  1055. # locked" when two workers race on the file. Substitute an isolated
  1056. # per-test SQLite engine — and override settings.database_url for
  1057. # this test so the is_sqlite() / is_postgres() dialect guards inside
  1058. # run_migrations pick the SQLite path against this engine.
  1059. test_db_url = f"sqlite+aiosqlite:///{tmp_path / 'init_db_test.db'}"
  1060. test_engine = create_async_engine(test_db_url, echo=False)
  1061. event.listen(test_engine.sync_engine, "connect", db_mod._set_sqlite_pragmas)
  1062. monkeypatch.setattr(db_mod, "engine", test_engine)
  1063. monkeypatch.setattr(settings, "database_url", test_db_url)
  1064. async def boom():
  1065. raise RuntimeError("simulated startup-fatal failure")
  1066. # Stub out the rest of init_db so we exercise only the migration step.
  1067. # init_db opens the engine.begin() block, runs metadata.create_all,
  1068. # run_migrations, then awaits _migrate_encrypt_legacy_secrets — the
  1069. # only call we want to fail.
  1070. monkeypatch.setattr(db_mod, "_migrate_encrypt_legacy_secrets", boom)
  1071. monkeypatch.setattr(db_mod, "seed_notification_templates", lambda: _noop_async())
  1072. monkeypatch.setattr(db_mod, "seed_default_groups", lambda: _noop_async())
  1073. monkeypatch.setattr(db_mod, "seed_spool_catalog", lambda: _noop_async())
  1074. monkeypatch.setattr(db_mod, "seed_color_catalog", lambda: _noop_async())
  1075. try:
  1076. with pytest.raises(RuntimeError, match="simulated startup-fatal failure"):
  1077. await db_mod.init_db()
  1078. finally:
  1079. await test_engine.dispose()
  1080. async def _noop_async():
  1081. """Helper for tests that need to stub out `seed_*` async coroutines."""
  1082. return None
  1083. # ============================================================================
  1084. # TestEncryptionStatusEndpoint
  1085. # ============================================================================
  1086. class TestEncryptionStatusEndpoint:
  1087. """GET /api/v1/auth/encryption-status: key source, counts, decryption_broken."""
  1088. STATUS_URL = "/api/v1/auth/encryption-status"
  1089. async def _create_admin_and_login(self, async_client: AsyncClient) -> str:
  1090. """Bootstrap auth + return a Bearer token for an admin."""
  1091. await async_client.post(
  1092. "/api/v1/auth/setup",
  1093. json={
  1094. "auth_enabled": True,
  1095. "admin_username": "admin1219",
  1096. "admin_password": "Admin1219!Pass",
  1097. },
  1098. )
  1099. login = await async_client.post(
  1100. "/api/v1/auth/login",
  1101. json={"username": "admin1219", "password": "Admin1219!Pass"},
  1102. )
  1103. assert login.status_code == 200, login.text
  1104. return login.json()["access_token"]
  1105. @pytest.mark.asyncio
  1106. @pytest.mark.integration
  1107. async def test_status_reports_env_source(self, async_client, monkeypatch):
  1108. from cryptography.fernet import Fernet
  1109. import backend.app.core.encryption as enc_mod
  1110. token = await self._create_admin_and_login(async_client)
  1111. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1112. enc_mod._fernet_instance = None
  1113. enc_mod._key_source = None
  1114. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1115. assert resp.status_code == 200
  1116. data = resp.json()
  1117. assert data["key_configured"] is True
  1118. assert data["key_source"] == "env"
  1119. assert data["decryption_broken"] is False
  1120. @pytest.mark.asyncio
  1121. @pytest.mark.integration
  1122. async def test_status_reports_file_source(self, async_client, monkeypatch, tmp_path):
  1123. from cryptography.fernet import Fernet
  1124. import backend.app.core.encryption as enc_mod
  1125. token = await self._create_admin_and_login(async_client)
  1126. # Pre-place a valid key file in DATA_DIR.
  1127. key_file = tmp_path / ".mfa_encryption_key"
  1128. key_file.write_text(Fernet.generate_key().decode())
  1129. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1130. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1131. enc_mod._fernet_instance = None
  1132. enc_mod._key_source = None
  1133. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1134. assert resp.status_code == 200
  1135. data = resp.json()
  1136. assert data["key_source"] == "file"
  1137. @pytest.mark.asyncio
  1138. @pytest.mark.integration
  1139. async def test_status_reports_generated_source(self, async_client, monkeypatch, tmp_path):
  1140. import backend.app.core.encryption as enc_mod
  1141. token = await self._create_admin_and_login(async_client)
  1142. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1143. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1144. enc_mod._fernet_instance = None
  1145. enc_mod._key_source = None
  1146. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1147. assert resp.status_code == 200
  1148. data = resp.json()
  1149. assert data["key_source"] == "generated"
  1150. assert (tmp_path / ".mfa_encryption_key").exists()
  1151. @pytest.mark.asyncio
  1152. @pytest.mark.integration
  1153. async def test_status_reports_none_source(self, async_client, monkeypatch):
  1154. import backend.app.core.encryption as enc_mod
  1155. token = await self._create_admin_and_login(async_client)
  1156. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1157. enc_mod._fernet_instance = None
  1158. enc_mod._key_source = None
  1159. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1160. assert resp.status_code == 200
  1161. data = resp.json()
  1162. assert data["key_configured"] is False
  1163. assert data["key_source"] == "none"
  1164. @pytest.mark.asyncio
  1165. @pytest.mark.integration
  1166. async def test_status_counts_legacy_rows(self, async_client, db_session, monkeypatch):
  1167. from backend.app.models.oidc_provider import OIDCProvider
  1168. token = await self._create_admin_and_login(async_client)
  1169. provider = OIDCProvider(
  1170. name="LegacyStatus",
  1171. issuer_url="https://ls.example.com",
  1172. client_id="c",
  1173. _client_secret_enc="plaintext-no-prefix",
  1174. scopes="openid email profile",
  1175. )
  1176. db_session.add(provider)
  1177. await db_session.commit()
  1178. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1179. assert resp.status_code == 200
  1180. data = resp.json()
  1181. assert data["legacy_plaintext_rows"]["oidc_providers"] >= 1
  1182. @pytest.mark.asyncio
  1183. @pytest.mark.integration
  1184. async def test_status_counts_encrypted_rows(self, async_client, db_session, monkeypatch):
  1185. from cryptography.fernet import Fernet
  1186. import backend.app.core.encryption as enc_mod
  1187. from backend.app.models.oidc_provider import OIDCProvider
  1188. token = await self._create_admin_and_login(async_client)
  1189. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1190. enc_mod._fernet_instance = None
  1191. enc_mod._key_source = None
  1192. provider = OIDCProvider(
  1193. name="EncStatus",
  1194. issuer_url="https://es.example.com",
  1195. client_id="c",
  1196. client_secret="real-secret", # via setter → encrypted
  1197. scopes="openid email profile",
  1198. )
  1199. db_session.add(provider)
  1200. await db_session.commit()
  1201. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1202. assert resp.status_code == 200
  1203. data = resp.json()
  1204. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1205. @pytest.mark.asyncio
  1206. @pytest.mark.integration
  1207. async def test_status_warns_on_encrypted_rows_without_key(self, async_client, db_session, monkeypatch):
  1208. """Gap 2: encrypted rows present but no key loadable → decryption_broken=true."""
  1209. import backend.app.core.encryption as enc_mod
  1210. from backend.app.models.oidc_provider import OIDCProvider
  1211. token = await self._create_admin_and_login(async_client)
  1212. # Insert a row whose value is already prefixed (simulates a previously-encrypted row).
  1213. provider = OIDCProvider(
  1214. name="BrokenEnc",
  1215. issuer_url="https://be.example.com",
  1216. client_id="c",
  1217. _client_secret_enc="fernet:gAAAAA-fake-but-prefixed",
  1218. scopes="openid email profile",
  1219. )
  1220. db_session.add(provider)
  1221. await db_session.commit()
  1222. # Now disable key loading so decryption is impossible.
  1223. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1224. enc_mod._fernet_instance = None
  1225. enc_mod._key_source = None
  1226. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1227. assert resp.status_code == 200
  1228. data = resp.json()
  1229. assert data["key_configured"] is False
  1230. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1231. assert data["decryption_broken"] is True
  1232. @pytest.mark.asyncio
  1233. @pytest.mark.integration
  1234. async def test_status_requires_settings_read_permission(self, async_client, db_session):
  1235. """Non-admin without settings:read permission gets 403."""
  1236. from backend.app.models.user import User
  1237. await self._create_admin_and_login(async_client)
  1238. # Create a low-privilege user (no group → no permissions in default seed).
  1239. from backend.app.core.auth import get_password_hash
  1240. viewer = User(
  1241. username="viewer1219",
  1242. email="viewer1219@example.com",
  1243. password_hash=get_password_hash("Viewer1219!Pass"),
  1244. role="user",
  1245. is_active=True,
  1246. )
  1247. db_session.add(viewer)
  1248. await db_session.commit()
  1249. login = await async_client.post(
  1250. "/api/v1/auth/login",
  1251. json={"username": "viewer1219", "password": "Viewer1219!Pass"},
  1252. )
  1253. assert login.status_code == 200, login.text
  1254. token = login.json().get("access_token")
  1255. assert token is not None, f"Expected access_token in login response, got: {login.json()}"
  1256. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1257. assert resp.status_code == 403
  1258. @pytest.mark.asyncio
  1259. @pytest.mark.integration
  1260. async def test_status_returns_500_on_db_error(self, async_client, monkeypatch):
  1261. """A8: SQLAlchemyError during count queries → 500 with static message."""
  1262. from unittest.mock import AsyncMock
  1263. from sqlalchemy.exc import SQLAlchemyError
  1264. token = await self._create_admin_and_login(async_client)
  1265. async def _raise(*args, **kwargs):
  1266. raise SQLAlchemyError("simulated DB failure")
  1267. monkeypatch.setattr("sqlalchemy.ext.asyncio.AsyncSession.execute", AsyncMock(side_effect=_raise))
  1268. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1269. assert resp.status_code == 500
  1270. assert "encryption status" in resp.json().get("detail", "").lower()
  1271. @pytest.mark.asyncio
  1272. @pytest.mark.integration
  1273. async def test_status_returns_403_for_viewer_in_viewers_group(self, async_client, db_session):
  1274. """S2: a user in the Viewers group (has SETTINGS_READ but NOT SETTINGS_UPDATE)
  1275. must get 403 — encryption-status is admin/operator only.
  1276. """
  1277. from sqlalchemy import insert, select
  1278. from backend.app.core.auth import get_password_hash
  1279. from backend.app.models.group import Group, user_groups
  1280. from backend.app.models.user import User
  1281. # Bootstrap auth (creates default groups via setup endpoint).
  1282. await self._create_admin_and_login(async_client)
  1283. # Create a user explicitly in the Viewers group — it has SETTINGS_READ
  1284. # but not SETTINGS_UPDATE, which is the discriminator for S2.
  1285. viewer = User(
  1286. username="viewer_s2",
  1287. email="viewer_s2@example.com",
  1288. password_hash=get_password_hash("ViewerS2!Pass1"),
  1289. role="user",
  1290. is_active=True,
  1291. )
  1292. db_session.add(viewer)
  1293. await db_session.flush()
  1294. viewers_group = (await db_session.execute(select(Group).where(Group.name == "Viewers"))).scalar_one_or_none()
  1295. assert viewers_group is not None, "Viewers group must be seeded by setup"
  1296. # Insert the association row directly to avoid touching the lazy
  1297. # `viewer.groups` relationship (which would trigger an implicit
  1298. # IO inside an active async transaction and fail with MissingGreenlet).
  1299. await db_session.execute(insert(user_groups).values(user_id=viewer.id, group_id=viewers_group.id))
  1300. await db_session.commit()
  1301. login = await async_client.post(
  1302. "/api/v1/auth/login",
  1303. json={"username": "viewer_s2", "password": "ViewerS2!Pass1"},
  1304. )
  1305. assert login.status_code == 200, login.text
  1306. token = login.json()["access_token"]
  1307. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1308. assert resp.status_code == 403, "S2: Viewers (SETTINGS_READ only) must NOT be able to read encryption-status"
  1309. @pytest.mark.asyncio
  1310. @pytest.mark.integration
  1311. async def test_status_decryption_broken_when_wrong_key_active(self, async_client, db_session, monkeypatch):
  1312. """B4: key is configured but cannot decrypt existing rows → decryption_broken=True.
  1313. This is the "wrong key" state that the legacy computed_field check
  1314. missed — operator pasted a different valid Fernet key (rotation,
  1315. cross-deployment restore, env override). Status used to show GREEN
  1316. while every encrypted row was unrecoverable.
  1317. """
  1318. from cryptography.fernet import Fernet
  1319. import backend.app.core.encryption as enc_mod
  1320. from backend.app.models.oidc_provider import OIDCProvider
  1321. token = await self._create_admin_and_login(async_client)
  1322. # Insert a row whose value is fernet-prefixed but encrypted under a
  1323. # DIFFERENT key (the prefix matches, but decrypt will throw).
  1324. provider = OIDCProvider(
  1325. name="WrongKeyEnc",
  1326. issuer_url="https://wk.example.com",
  1327. client_id="c",
  1328. _client_secret_enc=("fernet:" + Fernet(Fernet.generate_key()).encrypt(b"original").decode()),
  1329. scopes="openid email profile",
  1330. )
  1331. db_session.add(provider)
  1332. await db_session.commit()
  1333. # Now activate a DIFFERENT key — sample-decrypt must fail.
  1334. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1335. enc_mod._fernet_instance = None
  1336. enc_mod._key_source = None
  1337. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1338. assert resp.status_code == 200, resp.text
  1339. data = resp.json()
  1340. assert data["key_configured"] is True, "different key is still 'configured'"
  1341. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1342. assert data["decryption_broken"] is True, "B4: sample-decrypt must detect wrong-key state"
  1343. @pytest.mark.asyncio
  1344. @pytest.mark.integration
  1345. async def test_status_decryption_broken_with_only_totp_rows(self, async_client, db_session, monkeypatch):
  1346. """B4: the sample-decrypt fallback to UserTOTP fires when there are no
  1347. encrypted OIDC rows but TOTP rows exist. The OIDC-only test above
  1348. proves the primary path; this pins the second branch in the same
  1349. try-block so a future refactor of the row-source switch can't silently
  1350. regress wrong-key detection for TOTP-only deployments.
  1351. """
  1352. from cryptography.fernet import Fernet
  1353. from sqlalchemy import select
  1354. import backend.app.core.encryption as enc_mod
  1355. from backend.app.models.user import User
  1356. from backend.app.models.user_totp import UserTOTP
  1357. token = await self._create_admin_and_login(async_client)
  1358. # Look up the admin user created by login so we can attach a TOTP row.
  1359. admin_row = await db_session.execute(select(User).where(User.username == "admin1219"))
  1360. admin = admin_row.scalar_one()
  1361. # Seed a UserTOTP row encrypted under key A. No OIDC rows exist, so
  1362. # the endpoint's first branch (oidc_providers > 0) misses and the
  1363. # sample falls through to UserTOTP.
  1364. key_a_ciphertext = Fernet(Fernet.generate_key()).encrypt(b"original-totp-secret").decode()
  1365. db_session.add(UserTOTP(user_id=admin.id, _secret_enc=f"fernet:{key_a_ciphertext}", is_enabled=True))
  1366. await db_session.commit()
  1367. # Activate a DIFFERENT key — the TOTP-fallback sample-decrypt must fail.
  1368. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1369. enc_mod._fernet_instance = None
  1370. enc_mod._key_source = None
  1371. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1372. assert resp.status_code == 200, resp.text
  1373. data = resp.json()
  1374. assert data["key_configured"] is True
  1375. assert data["encrypted_rows"]["oidc_providers"] == 0, "test premise: no OIDC rows so TOTP branch fires"
  1376. assert data["encrypted_rows"]["user_totp"] >= 1
  1377. assert data["decryption_broken"] is True, "B4: TOTP-fallback sample-decrypt must detect wrong-key state"
  1378. @pytest.mark.asyncio
  1379. @pytest.mark.integration
  1380. async def test_status_surfaces_real_migration_error_count(self, async_client, db_session, monkeypatch, caplog):
  1381. """B2: a real migration with a poison row produces an error_count that
  1382. flows through to the endpoint's `migration_error_count` field.
  1383. Replaces an earlier tautology that patched the module-level counter
  1384. directly. The chained version verifies the full path: poison row →
  1385. per-row migration skip → ``get_migration_error_count()`` →
  1386. ``GET /encryption-status``.
  1387. """
  1388. import logging
  1389. from backend.app.core.database import _migrate_encrypt_legacy_secrets, get_migration_error_count
  1390. from backend.app.models.oidc_provider import OIDCProvider
  1391. token = await self._create_admin_and_login(async_client)
  1392. # Bind the migration's session factory to the test engine and activate a key.
  1393. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
  1394. from backend.app.core import database as db_mod
  1395. test_factory = async_sessionmaker(db_session.bind, class_=AsyncSession, expire_on_commit=False)
  1396. monkeypatch.setattr(db_mod, "async_session", test_factory)
  1397. from cryptography.fernet import Fernet
  1398. import backend.app.core.encryption as enc_mod
  1399. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1400. enc_mod._fernet_instance = None
  1401. # Two legacy plaintext rows; force the SECOND row's encrypt call to raise.
  1402. db_session.add_all(
  1403. [
  1404. OIDCProvider(
  1405. name="GoodRow",
  1406. issuer_url="https://good.example.com",
  1407. client_id="c1",
  1408. _client_secret_enc="plaintext-good",
  1409. scopes="openid email profile",
  1410. ),
  1411. OIDCProvider(
  1412. name="BadRow",
  1413. issuer_url="https://bad.example.com",
  1414. client_id="c2",
  1415. _client_secret_enc="plaintext-bad",
  1416. scopes="openid email profile",
  1417. ),
  1418. ]
  1419. )
  1420. await db_session.commit()
  1421. import backend.app.models.oidc_provider as oidc_mod
  1422. real_encrypt = oidc_mod.mfa_encrypt
  1423. call_count = [0]
  1424. def _sometimes_raise(value):
  1425. call_count[0] += 1
  1426. if call_count[0] == 2:
  1427. raise RuntimeError("simulated encrypt failure")
  1428. return real_encrypt(value)
  1429. monkeypatch.setattr(oidc_mod, "mfa_encrypt", _sometimes_raise)
  1430. with caplog.at_level(logging.ERROR, logger="backend.app.core.database"):
  1431. await _migrate_encrypt_legacy_secrets()
  1432. # Sanity: the migration's own counter saw the failure.
  1433. assert get_migration_error_count() == 1
  1434. # The endpoint must surface the same number — full path pinned, not just the getter.
  1435. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1436. assert resp.status_code == 200, resp.text
  1437. data = resp.json()
  1438. assert data["migration_error_count"] == 1, (
  1439. "endpoint must report the actual migration outcome, not just read a stub global"
  1440. )
  1441. # ============================================================================
  1442. # TestEncryptionRoundtrip (E2E)
  1443. # ============================================================================
  1444. class TestEncryptionRoundtrip:
  1445. """End-to-end: writes via the property setter store ciphertext at the column
  1446. level; reads via the property getter return the original plaintext."""
  1447. @pytest.mark.asyncio
  1448. @pytest.mark.integration
  1449. async def test_oidc_provider_secret_encrypted_at_rest_e2e(self, db_session, monkeypatch):
  1450. from cryptography.fernet import Fernet
  1451. from sqlalchemy import select
  1452. import backend.app.core.encryption as enc_mod
  1453. from backend.app.models.oidc_provider import OIDCProvider
  1454. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1455. enc_mod._fernet_instance = None
  1456. provider = OIDCProvider(
  1457. name="E2E_OIDC",
  1458. issuer_url="https://e2e.example.com",
  1459. client_id="cid",
  1460. client_secret="my-real-client-secret", # via setter → encrypted
  1461. scopes="openid email profile",
  1462. is_enabled=True,
  1463. )
  1464. db_session.add(provider)
  1465. await db_session.commit()
  1466. # Raw column read: must be ciphertext, not the plaintext.
  1467. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == provider.id))
  1468. fetched = result.scalar_one()
  1469. assert fetched._client_secret_enc.startswith("fernet:")
  1470. assert fetched._client_secret_enc != "my-real-client-secret"
  1471. # Property read: returns original plaintext.
  1472. assert fetched.client_secret == "my-real-client-secret"
  1473. @pytest.mark.asyncio
  1474. @pytest.mark.integration
  1475. async def test_totp_secret_encrypted_at_rest_e2e(self, db_session, monkeypatch):
  1476. from cryptography.fernet import Fernet
  1477. from sqlalchemy import select
  1478. import backend.app.core.encryption as enc_mod
  1479. from backend.app.models.user import User
  1480. from backend.app.models.user_totp import UserTOTP
  1481. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1482. enc_mod._fernet_instance = None
  1483. user = User(username="e2etotp1219", email="e@example.com", password_hash="x")
  1484. db_session.add(user)
  1485. await db_session.flush()
  1486. totp = UserTOTP(user_id=user.id, secret="JBSWY3DPEHPK3PXP", is_enabled=True)
  1487. db_session.add(totp)
  1488. await db_session.commit()
  1489. result = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user.id))
  1490. fetched = result.scalar_one()
  1491. assert fetched._secret_enc.startswith("fernet:")
  1492. assert fetched._secret_enc != "JBSWY3DPEHPK3PXP"
  1493. assert fetched.secret == "JBSWY3DPEHPK3PXP"
  1494. # ============================================================================
  1495. # TestBackupKeyFiles
  1496. # Verifies that .mfa_encryption_key is included in backup ZIPs (so backups
  1497. # are self-contained) and restored with chmod 0600 — and that path-traversal
  1498. # payloads in a malicious ZIP are rejected.
  1499. # ============================================================================
  1500. class TestBackupKeyFiles:
  1501. @pytest.mark.asyncio
  1502. @pytest.mark.integration
  1503. async def test_backup_includes_mfa_encryption_key_when_present(self, async_client, monkeypatch, tmp_path):
  1504. import zipfile
  1505. from backend.app.api.routes.settings import create_backup_zip
  1506. from backend.app.core.config import settings as app_settings
  1507. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1508. # Ensure `app_settings.base_dir` follows DATA_DIR for this test by
  1509. # patching the module attribute (config caches it at import time).
  1510. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1511. key_path = tmp_path / ".mfa_encryption_key"
  1512. key_path.write_text("test-key-content")
  1513. zip_path, _filename = await create_backup_zip(output_path=tmp_path)
  1514. try:
  1515. with zipfile.ZipFile(zip_path) as zf:
  1516. names = zf.namelist()
  1517. assert ".mfa_encryption_key" in names
  1518. assert zf.read(".mfa_encryption_key").decode() == "test-key-content"
  1519. finally:
  1520. zip_path.unlink(missing_ok=True)
  1521. @pytest.mark.asyncio
  1522. @pytest.mark.integration
  1523. async def test_backup_skips_mfa_encryption_key_when_absent(self, async_client, monkeypatch, tmp_path):
  1524. import zipfile
  1525. from backend.app.api.routes.settings import create_backup_zip
  1526. from backend.app.core.config import settings as app_settings
  1527. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1528. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1529. # No .mfa_encryption_key written — must not crash.
  1530. zip_path, _filename = await create_backup_zip(output_path=tmp_path)
  1531. try:
  1532. with zipfile.ZipFile(zip_path) as zf:
  1533. names = zf.namelist()
  1534. assert ".mfa_encryption_key" not in names
  1535. finally:
  1536. zip_path.unlink(missing_ok=True)
  1537. @pytest.mark.asyncio
  1538. @pytest.mark.integration
  1539. async def test_restore_writes_key_files_with_chmod_0600(self, async_client, monkeypatch, tmp_path):
  1540. """T1: restore endpoint writes key file with mode 0o600.
  1541. Bypasses the SQLite-copy step via patches so execution reaches the
  1542. key-write code unconditionally — the previous version used a stub
  1543. ``b"SQLite format 3"`` which made ``sqlite3.backup()`` fail and the
  1544. key-write code never ran.
  1545. """
  1546. import io
  1547. import zipfile
  1548. from unittest.mock import AsyncMock, patch
  1549. from backend.app.core.config import settings as app_settings
  1550. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1551. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1552. # Build a minimal ZIP with a stub DB and the key file.
  1553. buf = io.BytesIO()
  1554. with zipfile.ZipFile(buf, "w") as zf:
  1555. zf.writestr("bambuddy.db", b"SQLite format 3")
  1556. zf.writestr(".mfa_encryption_key", "test-restored-key")
  1557. buf.seek(0)
  1558. with (
  1559. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1560. patch(
  1561. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1562. new_callable=AsyncMock,
  1563. ),
  1564. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1565. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1566. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1567. ):
  1568. resp = await async_client.post(
  1569. "/api/v1/settings/restore",
  1570. files={"file": ("backup.zip", buf, "application/zip")},
  1571. )
  1572. assert resp.status_code == 200
  1573. restored_key = tmp_path / ".mfa_encryption_key"
  1574. assert restored_key.exists()
  1575. assert restored_key.read_text() == "test-restored-key"
  1576. assert (restored_key.stat().st_mode & 0o777) == 0o600
  1577. @pytest.mark.asyncio
  1578. @pytest.mark.integration
  1579. async def test_restore_handles_missing_key_files(self, async_client, monkeypatch, tmp_path):
  1580. """T2: ZIP without key file → restore succeeds, no key written to DATA_DIR."""
  1581. import io
  1582. import zipfile
  1583. from unittest.mock import AsyncMock, patch
  1584. from backend.app.core.config import settings as app_settings
  1585. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1586. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1587. buf = io.BytesIO()
  1588. with zipfile.ZipFile(buf, "w") as zf:
  1589. zf.writestr("bambuddy.db", b"SQLite format 3")
  1590. # Intentionally no .mfa_encryption_key entry.
  1591. buf.seek(0)
  1592. with (
  1593. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1594. patch(
  1595. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1596. new_callable=AsyncMock,
  1597. ),
  1598. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1599. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1600. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1601. ):
  1602. resp = await async_client.post(
  1603. "/api/v1/settings/restore",
  1604. files={"file": ("backup.zip", buf, "application/zip")},
  1605. )
  1606. assert resp.status_code == 200
  1607. assert not (tmp_path / ".mfa_encryption_key").exists()
  1608. @pytest.mark.asyncio
  1609. @pytest.mark.integration
  1610. async def test_restore_aborts_db_swap_when_key_write_fails(self, async_client, monkeypatch, tmp_path):
  1611. """B1: when MFA key write fails, restore must abort BEFORE the database
  1612. swap so the live DB is not left with rows encrypted under a key that
  1613. no longer exists on disk."""
  1614. import io
  1615. import os
  1616. import zipfile
  1617. from unittest.mock import AsyncMock, patch
  1618. from backend.app.core.config import settings as app_settings
  1619. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1620. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1621. # Build ZIP with a key file that we will fail to write to DATA_DIR.
  1622. buf = io.BytesIO()
  1623. with zipfile.ZipFile(buf, "w") as zf:
  1624. zf.writestr("bambuddy.db", b"SQLite format 3 backup data")
  1625. zf.writestr(".mfa_encryption_key", "backup-key-content")
  1626. buf.seek(0)
  1627. # Track whether the database swap functions were called.
  1628. # If B1 is correct, key-write failure aborts BEFORE these run.
  1629. import_pg_mock = AsyncMock()
  1630. reinit_mock = AsyncMock()
  1631. init_mock = AsyncMock()
  1632. original_open = os.open
  1633. def _key_write_fails(path, flags, mode=0o777, **kwargs):
  1634. # `shutil.rmtree` calls os.open(... dir_fd=...) during temp-dir
  1635. # cleanup — accept and forward any extra kwargs so the mock
  1636. # doesn't break the cleanup path.
  1637. if str(path).endswith(".mfa_encryption_key.restore-tmp"):
  1638. raise OSError(28, "No space left on device", str(path))
  1639. return original_open(path, flags, mode, **kwargs)
  1640. with (
  1641. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1642. patch(
  1643. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1644. import_pg_mock,
  1645. ),
  1646. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1647. patch("backend.app.core.database.reinitialize_database", reinit_mock),
  1648. patch("backend.app.core.database.init_db", init_mock),
  1649. ):
  1650. monkeypatch.setattr(os, "open", _key_write_fails)
  1651. resp = await async_client.post(
  1652. "/api/v1/settings/restore",
  1653. files={"file": ("backup.zip", buf, "application/zip")},
  1654. )
  1655. assert resp.status_code == 500
  1656. assert "Database is unchanged" in resp.json().get("detail", "")
  1657. # Database swap functions must NOT have been called — the abort
  1658. # happens before that step.
  1659. import_pg_mock.assert_not_awaited()
  1660. reinit_mock.assert_not_awaited()
  1661. init_mock.assert_not_awaited()
  1662. # No partial key file should be left behind.
  1663. assert not (tmp_path / ".mfa_encryption_key").exists()
  1664. @pytest.mark.asyncio
  1665. @pytest.mark.integration
  1666. async def test_restore_resets_encryption_singleton_after_key_replace(self, async_client, monkeypatch, tmp_path):
  1667. """B1: after a successful key replace, the encryption singleton must be
  1668. cleared so init_db's re-encryption migration picks up the restored key
  1669. instead of the cached Fernet from the previous key.
  1670. """
  1671. import io
  1672. import zipfile
  1673. from unittest.mock import AsyncMock, patch
  1674. from cryptography.fernet import Fernet
  1675. import backend.app.core.encryption as enc_mod
  1676. from backend.app.core.config import settings as app_settings
  1677. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1678. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1679. # Pre-warm the singleton with an "old" key so we can detect the reset.
  1680. old_key = Fernet.generate_key().decode()
  1681. monkeypatch.setenv("MFA_ENCRYPTION_KEY", old_key)
  1682. enc_mod._fernet_instance = None
  1683. enc_mod._key_source = None
  1684. # Trigger lazy load → singleton holds the old Fernet.
  1685. assert enc_mod.is_encryption_active() is True
  1686. assert enc_mod._fernet_instance is not None
  1687. old_fernet_obj = enc_mod._fernet_instance
  1688. # Build ZIP that delivers a DIFFERENT key file.
  1689. new_key = Fernet.generate_key().decode()
  1690. assert new_key != old_key
  1691. buf = io.BytesIO()
  1692. with zipfile.ZipFile(buf, "w") as zf:
  1693. zf.writestr("bambuddy.db", b"SQLite format 3 backup data")
  1694. zf.writestr(".mfa_encryption_key", new_key)
  1695. buf.seek(0)
  1696. with (
  1697. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1698. patch(
  1699. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1700. new_callable=AsyncMock,
  1701. ),
  1702. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1703. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1704. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1705. ):
  1706. resp = await async_client.post(
  1707. "/api/v1/settings/restore",
  1708. files={"file": ("backup.zip", buf, "application/zip")},
  1709. )
  1710. assert resp.status_code == 200, resp.text
  1711. # The singleton must have been invalidated. The exact post-state depends
  1712. # on whether init_db (mocked) re-loaded the singleton, but the cached
  1713. # _fernet_instance reference from before the restore must not be the
  1714. # active one any more.
  1715. assert enc_mod._fernet_instance is None or enc_mod._fernet_instance is not old_fernet_obj, (
  1716. "B1: encryption singleton must be reset after key replace so init_db's migration picks up the restored key"
  1717. )
  1718. # The key file must be on disk with the new content.
  1719. restored = (tmp_path / ".mfa_encryption_key").read_text()
  1720. assert restored == new_key
  1721. @pytest.mark.asyncio
  1722. @pytest.mark.integration
  1723. async def test_restore_rejects_path_traversal_in_zip(self, async_client, monkeypatch, tmp_path):
  1724. """A4: ZIP with path-traversal entry → HTTP 400, no file written outside temp dir."""
  1725. import io
  1726. import zipfile
  1727. from backend.app.core.config import settings as app_settings
  1728. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1729. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1730. # Build ZIP with a relative path-traversal entry.
  1731. buf = io.BytesIO()
  1732. with zipfile.ZipFile(buf, "w") as zf:
  1733. zf.writestr("../etc/passwd", "root:x:0:0")
  1734. zf.writestr("bambuddy.db", b"SQLite format 3")
  1735. buf.seek(0)
  1736. resp = await async_client.post(
  1737. "/api/v1/settings/restore",
  1738. files={"file": ("backup.zip", buf, "application/zip")},
  1739. )
  1740. assert resp.status_code == 400
  1741. assert "unsafe path" in resp.json().get("detail", "").lower()
  1742. @pytest.mark.asyncio
  1743. @pytest.mark.integration
  1744. async def test_restore_rejects_prefix_collision_zipslip(self, async_client, monkeypatch, tmp_path):
  1745. """T1: ZIP entry with prefix-collision path must be rejected.
  1746. A startswith() check would accept '/tmp/abc_evil/file' when the
  1747. extraction root was '/tmp/abc' — is_relative_to correctly rejects it.
  1748. The restore handler creates a tempfile.TemporaryDirectory inside the
  1749. system temp dir; we craft an entry that resolves to a sibling path
  1750. whose name starts with the temp dir's basename.
  1751. """
  1752. import io
  1753. import zipfile
  1754. from backend.app.core.config import settings as app_settings
  1755. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1756. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1757. # Use a path with traversal — the resolved path will share the parent
  1758. # temp directory's basename as a prefix but NOT be inside the
  1759. # extraction root. We don't know the random extraction-root name at
  1760. # ZIP-build time, so we pick a literal "../poc-evil-prefix-collision/"
  1761. # which traverses up one level from the extraction root and lands in
  1762. # a sibling directory. is_relative_to() must reject this; a naive
  1763. # startswith() against the parent's parent would accept it.
  1764. evil_name = "../escaped-prefix-collision/poc.txt"
  1765. buf = io.BytesIO()
  1766. with zipfile.ZipFile(buf, "w") as zf:
  1767. zf.writestr(evil_name, "pwned")
  1768. zf.writestr("bambuddy.db", b"SQLite format 3\x00")
  1769. buf.seek(0)
  1770. resp = await async_client.post(
  1771. "/api/v1/settings/restore",
  1772. files={"file": ("backup.zip", buf, "application/zip")},
  1773. )
  1774. assert resp.status_code == 400
  1775. assert "unsafe path" in resp.json().get("detail", "").lower()
  1776. @pytest.mark.asyncio
  1777. @pytest.mark.integration
  1778. async def test_restore_rejects_absolute_path_in_zip(self, async_client, monkeypatch, tmp_path):
  1779. """B1: ZIP with an absolute path entry must be rejected by is_relative_to check."""
  1780. import io
  1781. import zipfile
  1782. from backend.app.core.config import settings as app_settings
  1783. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1784. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1785. buf = io.BytesIO()
  1786. with zipfile.ZipFile(buf, "w") as zf:
  1787. # Absolute path in the archive — extracts outside temp_path on
  1788. # systems where (temp_path / "/etc/passwd") resolves to /etc/passwd.
  1789. zf.writestr("/etc/passwd", "root:x:0:0")
  1790. zf.writestr("bambuddy.db", b"SQLite format 3")
  1791. buf.seek(0)
  1792. resp = await async_client.post(
  1793. "/api/v1/settings/restore",
  1794. files={"file": ("backup.zip", buf, "application/zip")},
  1795. )
  1796. assert resp.status_code == 400
  1797. assert "unsafe path" in resp.json().get("detail", "").lower()
  1798. @pytest.mark.asyncio
  1799. @pytest.mark.integration
  1800. async def test_backup_fails_when_key_file_unreadable(self, async_client, monkeypatch, tmp_path):
  1801. """A5: OSError while copying key file propagates out of create_backup_zip."""
  1802. import shutil
  1803. from backend.app.api.routes.settings import create_backup_zip
  1804. from backend.app.core.config import settings as app_settings
  1805. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1806. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1807. (tmp_path / ".mfa_encryption_key").write_text("key")
  1808. original_copy2 = shutil.copy2
  1809. def _raise_on_key(src, dst):
  1810. if ".mfa_encryption_key" in str(src):
  1811. raise OSError("simulated unreadable key file")
  1812. return original_copy2(src, dst)
  1813. monkeypatch.setattr(shutil, "copy2", _raise_on_key)
  1814. import pytest as _pytest
  1815. with _pytest.raises(OSError, match="simulated unreadable"):
  1816. await create_backup_zip(output_path=tmp_path)
  1817. @pytest.mark.asyncio
  1818. @pytest.mark.integration
  1819. async def test_backup_restore_roundtrip_preserves_encrypted_oidc_secret(
  1820. self, async_client, db_session, monkeypatch, tmp_path
  1821. ):
  1822. """T3: encrypt → backup → simulate key loss → restore → decrypt.
  1823. Verifies the user-facing promise that local backup ZIPs are
  1824. self-contained: an OIDC client_secret encrypted under one key still
  1825. decrypts after restore even when the running install no longer has
  1826. the key on disk or in the env. Exercises the B1 key-first restore
  1827. path and the B4 sample-decrypt status check together.
  1828. """
  1829. import zipfile
  1830. from pathlib import Path
  1831. from unittest.mock import AsyncMock, patch
  1832. from cryptography.fernet import Fernet
  1833. from sqlalchemy import select
  1834. import backend.app.core.encryption as enc_mod
  1835. from backend.app.api.routes.settings import create_backup_zip
  1836. from backend.app.core.config import settings as app_settings
  1837. from backend.app.models.oidc_provider import OIDCProvider
  1838. # 1. Pin a key, encrypt an OIDC secret via the property setter.
  1839. key = Fernet.generate_key().decode()
  1840. monkeypatch.setenv("MFA_ENCRYPTION_KEY", key)
  1841. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1842. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1843. # Persist the key file too, so create_backup_zip picks it up.
  1844. (tmp_path / ".mfa_encryption_key").write_text(key)
  1845. enc_mod._fernet_instance = None
  1846. enc_mod._key_source = None
  1847. provider = OIDCProvider(
  1848. name="RoundtripProv",
  1849. issuer_url="https://rt.example.com",
  1850. client_id="cid",
  1851. client_secret="my-original-secret", # via setter -> encrypted
  1852. scopes="openid email profile",
  1853. is_enabled=True,
  1854. )
  1855. db_session.add(provider)
  1856. await db_session.commit()
  1857. original_id = provider.id
  1858. assert provider._client_secret_enc.startswith("fernet:")
  1859. # 2. Create a backup ZIP (must include .mfa_encryption_key).
  1860. zip_path, _ = await create_backup_zip(output_path=tmp_path)
  1861. try:
  1862. with zipfile.ZipFile(zip_path) as zf:
  1863. names = zf.namelist()
  1864. assert ".mfa_encryption_key" in names, "T3: backup ZIP must include the key file"
  1865. # 3. Simulate key loss: delete the key file from DATA_DIR, drop
  1866. # the env var, reset the cached fernet singleton.
  1867. (tmp_path / ".mfa_encryption_key").unlink()
  1868. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1869. enc_mod._fernet_instance = None
  1870. enc_mod._key_source = None
  1871. # 4. Restore the ZIP via the endpoint. Mock out the DB-swap
  1872. # (we keep the live in-memory test DB) and init_db side effects
  1873. # so this test focuses on the key-restore path.
  1874. with (
  1875. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1876. patch(
  1877. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1878. new_callable=AsyncMock,
  1879. ),
  1880. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1881. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1882. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1883. open(zip_path, "rb") as f,
  1884. ):
  1885. resp = await async_client.post(
  1886. "/api/v1/settings/restore",
  1887. files={"file": ("backup.zip", f, "application/zip")},
  1888. )
  1889. assert resp.status_code == 200, resp.text
  1890. # 5. Reset the singleton again (B1 already does this in production,
  1891. # but here init_db is mocked so we explicitly invalidate).
  1892. enc_mod._fernet_instance = None
  1893. enc_mod._key_source = None
  1894. # 6. The key file must be back on disk with restrictive permissions.
  1895. restored = Path(tmp_path) / ".mfa_encryption_key"
  1896. assert restored.exists(), "T3: key file must be restored to DATA_DIR"
  1897. assert (restored.stat().st_mode & 0o777) == 0o600
  1898. # 7. Decryption works again — the property getter must return the
  1899. # original plaintext, proving the restored key matches the
  1900. # cipher in the (still in-memory) DB row.
  1901. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == original_id))
  1902. restored_provider = result.scalar_one()
  1903. assert restored_provider.client_secret == "my-original-secret"
  1904. finally:
  1905. zip_path.unlink(missing_ok=True)
  1906. # ============================================================================
  1907. # TestTOTPDecryptionBroken (C9)
  1908. # Verifies the decryption-broken state (encrypted TOTP row + no key) for each
  1909. # TOTP endpoint. Behaviour differs between recovery-aware and non-recovery
  1910. # endpoints:
  1911. # - setup_totp / enable_totp / verify_2fa: HTTP 500 (no backup-code path).
  1912. # - disable_totp / regenerate_backup_codes: fall through to the backup-code
  1913. # branch — HTTP 200 with a valid backup code, HTTP 400 without.
  1914. # ============================================================================
  1915. class TestTOTPDecryptionBroken:
  1916. """C9: RuntimeError from mfa_decrypt — 500 for non-recovery endpoints,
  1917. backup-code fall-through for disable_totp / regenerate_backup_codes."""
  1918. async def _setup_admin_and_totp_user(self, async_client, db_session):
  1919. """Create admin (enables auth), log in as admin, add TOTP record with fernet secret."""
  1920. from backend.app.models.user_totp import UserTOTP
  1921. admin_username = f"admin_c9_{secrets.token_hex(4)}"
  1922. setup = await async_client.post(
  1923. "/api/v1/auth/setup",
  1924. json={
  1925. "auth_enabled": True,
  1926. "admin_username": admin_username,
  1927. "admin_password": "Admin_C9_Pass1!",
  1928. },
  1929. )
  1930. assert setup.status_code in (200, 201), setup.text
  1931. login = await async_client.post(
  1932. "/api/v1/auth/login",
  1933. json={"username": admin_username, "password": "Admin_C9_Pass1!"},
  1934. )
  1935. assert login.status_code == 200, login.text
  1936. token = login.json()["access_token"]
  1937. # Get the admin user_id from the /me endpoint
  1938. me = await async_client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
  1939. assert me.status_code == 200
  1940. user_id = me.json()["id"]
  1941. # Insert a TOTP row with a fernet-prefixed secret directly (no key needed for insert).
  1942. totp = UserTOTP(
  1943. user_id=user_id,
  1944. _secret_enc="fernet:gAAAAA-not-really-encrypted",
  1945. is_enabled=True,
  1946. )
  1947. db_session.add(totp)
  1948. await db_session.commit()
  1949. return token, admin_username, user_id
  1950. @pytest.mark.asyncio
  1951. @pytest.mark.integration
  1952. async def test_enable_totp_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  1953. """C9: enable endpoint → 500 when TOTP secret is encrypted but key unavailable."""
  1954. import backend.app.core.encryption as enc_mod
  1955. token, _, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  1956. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1957. enc_mod._fernet_instance = None
  1958. # enable_totp requires setup-but-not-yet-enabled state; force is_enabled=False
  1959. from sqlalchemy import select as _select
  1960. from backend.app.models.user_totp import UserTOTP
  1961. result = await db_session.execute(_select(UserTOTP))
  1962. for t in result.scalars().all():
  1963. t.is_enabled = False
  1964. await db_session.commit()
  1965. resp = await async_client.post(
  1966. "/api/v1/auth/2fa/totp/enable",
  1967. json={"code": "123456"},
  1968. headers={"Authorization": f"Bearer {token}"},
  1969. )
  1970. assert resp.status_code == 500
  1971. assert "unavailable" in resp.json().get("detail", "").lower()
  1972. @pytest.mark.asyncio
  1973. @pytest.mark.integration
  1974. async def test_disable_totp_returns_400_when_decryption_broken_and_no_backup_codes(
  1975. self, async_client, db_session, monkeypatch
  1976. ):
  1977. """B2a + S3: disable falls through to backup-code branch when TOTP secret
  1978. cannot be decrypted; with no backup codes seeded, the request is
  1979. rejected as an invalid code (400), not a server error.
  1980. S3: AND the failed-attempt counter must NOT be incremented — the
  1981. cause was a server-side key loss, not a user mistake.
  1982. """
  1983. from sqlalchemy import select as _select
  1984. import backend.app.core.encryption as enc_mod
  1985. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  1986. token, admin_username, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  1987. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1988. enc_mod._fernet_instance = None
  1989. resp = await async_client.post(
  1990. "/api/v1/auth/2fa/totp/disable",
  1991. json={"code": "123456"},
  1992. headers={"Authorization": f"Bearer {token}"},
  1993. )
  1994. assert resp.status_code == 400
  1995. assert "invalid" in resp.json().get("detail", "").lower()
  1996. # S3: no fail-counter debit on server-side key loss.
  1997. events = (
  1998. (
  1999. await db_session.execute(
  2000. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2001. )
  2002. )
  2003. .scalars()
  2004. .all()
  2005. )
  2006. assert len(events) == 0, "S3: must not debit fail-counter on key-loss"
  2007. @pytest.mark.asyncio
  2008. @pytest.mark.integration
  2009. async def test_regenerate_backup_codes_returns_400_when_decryption_broken_and_no_backup_codes(
  2010. self, async_client, db_session, monkeypatch
  2011. ):
  2012. """B2b + S3: regenerate-backup-codes falls through to backup-code branch when
  2013. TOTP secret cannot be decrypted; with no backup codes seeded, the
  2014. request is rejected as an invalid code (400) AND the fail-counter
  2015. is NOT incremented (S3: server-side cause, not user mistake).
  2016. """
  2017. from sqlalchemy import select as _select
  2018. import backend.app.core.encryption as enc_mod
  2019. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2020. token, admin_username, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  2021. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2022. enc_mod._fernet_instance = None
  2023. resp = await async_client.post(
  2024. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2025. json={"code": "123456"},
  2026. headers={"Authorization": f"Bearer {token}"},
  2027. )
  2028. assert resp.status_code == 400
  2029. assert "invalid" in resp.json().get("detail", "").lower()
  2030. events = (
  2031. (
  2032. await db_session.execute(
  2033. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2034. )
  2035. )
  2036. .scalars()
  2037. .all()
  2038. )
  2039. assert len(events) == 0, "S3: must not debit fail-counter on key-loss"
  2040. @pytest.mark.asyncio
  2041. @pytest.mark.integration
  2042. async def test_disable_totp_succeeds_via_backup_code_when_decryption_broken(
  2043. self, async_client, db_session, monkeypatch
  2044. ):
  2045. """B2a: a valid backup code disables TOTP even when the secret cannot
  2046. be decrypted — recovery path for users who lost the encryption key."""
  2047. from sqlalchemy import select as _select
  2048. import backend.app.core.encryption as enc_mod
  2049. from backend.app.api.routes.mfa import _generate_backup_codes
  2050. from backend.app.models.user_totp import UserTOTP
  2051. token, _, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2052. # Seed a real backup-code hash on the existing TOTP row.
  2053. plain_codes, hashed_codes = _generate_backup_codes()
  2054. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2055. totp = result.scalar_one()
  2056. totp.backup_code_hashes = hashed_codes
  2057. await db_session.commit()
  2058. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2059. enc_mod._fernet_instance = None
  2060. resp = await async_client.post(
  2061. "/api/v1/auth/2fa/totp/disable",
  2062. json={"code": plain_codes[0]},
  2063. headers={"Authorization": f"Bearer {token}"},
  2064. )
  2065. assert resp.status_code == 200, resp.text
  2066. # The TOTP row must have been deleted.
  2067. result_after = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2068. assert result_after.scalar_one_or_none() is None
  2069. @pytest.mark.asyncio
  2070. @pytest.mark.integration
  2071. async def test_regenerate_backup_codes_succeeds_via_backup_code_when_decryption_broken(
  2072. self, async_client, db_session, monkeypatch
  2073. ):
  2074. """B2b: a valid backup code rotates the codes even when the secret
  2075. cannot be decrypted — recovery path mirrors disable_totp."""
  2076. from sqlalchemy import select as _select
  2077. import backend.app.core.encryption as enc_mod
  2078. from backend.app.api.routes.mfa import _generate_backup_codes
  2079. from backend.app.models.user_totp import UserTOTP
  2080. token, _, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2081. plain_codes, hashed_codes = _generate_backup_codes()
  2082. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2083. totp = result.scalar_one()
  2084. totp.backup_code_hashes = hashed_codes
  2085. await db_session.commit()
  2086. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2087. enc_mod._fernet_instance = None
  2088. resp = await async_client.post(
  2089. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2090. json={"code": plain_codes[0]},
  2091. headers={"Authorization": f"Bearer {token}"},
  2092. )
  2093. assert resp.status_code == 200, resp.text
  2094. body = resp.json()
  2095. assert "backup_codes" in body
  2096. assert len(body["backup_codes"]) == 10
  2097. @pytest.mark.asyncio
  2098. @pytest.mark.integration
  2099. async def test_disable_totp_wrong_code_with_seeded_hashes_returns_400_and_debits_counter(
  2100. self, async_client, db_session, monkeypatch
  2101. ):
  2102. """T2: with backup_code_hashes seeded AND a working encryption key,
  2103. a wrong code is rejected (400) AND the fail-counter IS incremented.
  2104. This pins the behaviour that a future refactor swallowing
  2105. compare_digest mismatches would still let the existing 'no codes
  2106. configured' tests pass — only this assertion exercises the actual
  2107. pwd_context.verify mismatch path.
  2108. """
  2109. from cryptography.fernet import Fernet
  2110. from sqlalchemy import select as _select
  2111. import backend.app.core.encryption as enc_mod
  2112. from backend.app.api.routes.mfa import _generate_backup_codes
  2113. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2114. from backend.app.models.user_totp import UserTOTP
  2115. # Active key — secret can be decrypted, this is NOT key-loss.
  2116. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  2117. enc_mod._fernet_instance = None
  2118. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2119. # Replace stub fernet:-prefixed value with a real encrypted secret so
  2120. # disable_totp's TOTP-decrypt path doesn't throw, AND seed real hashes.
  2121. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2122. totp = result.scalar_one()
  2123. totp.secret = "JBSWY3DPEHPK3PXP" # via setter -> mfa_encrypt
  2124. plain_codes, hashed_codes = _generate_backup_codes()
  2125. totp.backup_code_hashes = hashed_codes
  2126. await db_session.commit()
  2127. # Submit a code that matches NEITHER the TOTP nor any backup-code hash.
  2128. resp = await async_client.post(
  2129. "/api/v1/auth/2fa/totp/disable",
  2130. json={"code": "WRONGCD1"}, # wrong but well-formed
  2131. headers={"Authorization": f"Bearer {token}"},
  2132. )
  2133. assert resp.status_code == 400
  2134. assert "invalid" in resp.json().get("detail", "").lower()
  2135. # T2 + S3: with key intact, the fail-counter MUST increment for a
  2136. # real wrong-code attempt (this is the user-error path, not key-loss).
  2137. events = (
  2138. (
  2139. await db_session.execute(
  2140. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2141. )
  2142. )
  2143. .scalars()
  2144. .all()
  2145. )
  2146. assert len(events) >= 1, "T2: with key intact, wrong code must debit the fail-counter"
  2147. @pytest.mark.asyncio
  2148. @pytest.mark.integration
  2149. async def test_regenerate_backup_codes_wrong_code_with_seeded_hashes_returns_400_and_debits_counter(
  2150. self, async_client, db_session, monkeypatch
  2151. ):
  2152. """T2: same as the disable_totp variant for /regenerate-backup-codes."""
  2153. from cryptography.fernet import Fernet
  2154. from sqlalchemy import select as _select
  2155. import backend.app.core.encryption as enc_mod
  2156. from backend.app.api.routes.mfa import _generate_backup_codes
  2157. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2158. from backend.app.models.user_totp import UserTOTP
  2159. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  2160. enc_mod._fernet_instance = None
  2161. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2162. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2163. totp = result.scalar_one()
  2164. totp.secret = "JBSWY3DPEHPK3PXP"
  2165. plain_codes, hashed_codes = _generate_backup_codes()
  2166. totp.backup_code_hashes = hashed_codes
  2167. await db_session.commit()
  2168. resp = await async_client.post(
  2169. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2170. json={"code": "WRONGCD2"},
  2171. headers={"Authorization": f"Bearer {token}"},
  2172. )
  2173. assert resp.status_code == 400
  2174. assert "invalid" in resp.json().get("detail", "").lower()
  2175. events = (
  2176. (
  2177. await db_session.execute(
  2178. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2179. )
  2180. )
  2181. .scalars()
  2182. .all()
  2183. )
  2184. assert len(events) >= 1, "T2: with key intact, wrong code must debit the fail-counter"
  2185. @pytest.mark.asyncio
  2186. @pytest.mark.integration
  2187. async def test_disable_totp_wrong_code_with_seeded_hashes_at_keyloss_no_counter_debit(
  2188. self, async_client, db_session, monkeypatch
  2189. ):
  2190. """T2 + S3 cross-check: with hashes seeded but encryption key gone,
  2191. a wrong code returns 400 BUT the fail-counter MUST NOT increment.
  2192. This is the dual of the test above — same wrong-code 400 outcome,
  2193. but the counter debit is gated on the cause of failure (server-side
  2194. key loss must NOT penalise the user).
  2195. """
  2196. from sqlalchemy import select as _select
  2197. import backend.app.core.encryption as enc_mod
  2198. from backend.app.api.routes.mfa import _generate_backup_codes
  2199. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2200. from backend.app.models.user_totp import UserTOTP
  2201. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2202. # Seed real hashes on the existing TOTP row.
  2203. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2204. totp = result.scalar_one()
  2205. plain_codes, hashed_codes = _generate_backup_codes()
  2206. totp.backup_code_hashes = hashed_codes
  2207. await db_session.commit()
  2208. # Now simulate key loss.
  2209. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2210. enc_mod._fernet_instance = None
  2211. resp = await async_client.post(
  2212. "/api/v1/auth/2fa/totp/disable",
  2213. json={"code": "WRONGCD3"},
  2214. headers={"Authorization": f"Bearer {token}"},
  2215. )
  2216. assert resp.status_code == 400
  2217. # S3: counter MUST be unchanged — this is a server-side problem.
  2218. events = (
  2219. (
  2220. await db_session.execute(
  2221. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2222. )
  2223. )
  2224. .scalars()
  2225. .all()
  2226. )
  2227. assert len(events) == 0, "S3: must not debit fail-counter when cause is server-side key-loss"
  2228. @pytest.mark.asyncio
  2229. @pytest.mark.integration
  2230. async def test_setup_totp_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  2231. """B3: setup endpoint → 500 when an active TOTP secret can't be decrypted.
  2232. Replacing an active authenticator requires verifying the current TOTP
  2233. code; with no recovery (backup-code) path on this endpoint, the only
  2234. safe outcome is a 500 surface to the operator.
  2235. """
  2236. import backend.app.core.encryption as enc_mod
  2237. token, _, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  2238. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2239. enc_mod._fernet_instance = None
  2240. resp = await async_client.post(
  2241. "/api/v1/auth/2fa/totp/setup",
  2242. json={"code": "123456"},
  2243. headers={"Authorization": f"Bearer {token}"},
  2244. )
  2245. assert resp.status_code == 500
  2246. assert "unavailable" in resp.json().get("detail", "").lower()
  2247. @pytest.mark.asyncio
  2248. @pytest.mark.integration
  2249. async def test_verify_2fa_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  2250. """C9: verify endpoint (TOTP method) → 500 when TOTP secret unreadable."""
  2251. from datetime import datetime, timedelta, timezone
  2252. import backend.app.core.encryption as enc_mod
  2253. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  2254. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2255. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2256. enc_mod._fernet_instance = None
  2257. # Create a pre_auth token to simulate the post-login 2FA challenge step.
  2258. raw_token = secrets.token_urlsafe(32)
  2259. ephemeral = AuthEphemeralToken(
  2260. token=raw_token,
  2261. token_type="pre_auth",
  2262. username=admin_username,
  2263. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2264. )
  2265. db_session.add(ephemeral)
  2266. await db_session.commit()
  2267. resp = await async_client.post(
  2268. "/api/v1/auth/2fa/verify",
  2269. json={"pre_auth_token": raw_token, "method": "totp", "code": "123456"},
  2270. )
  2271. assert resp.status_code == 500
  2272. assert "unavailable" in resp.json().get("detail", "").lower()