test_security.py 116 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837
  1. """Security tests for the 8 coverage gaps identified in the maintainer review.
  2. Gap 1: encryption.py has zero tests
  3. Gap 2: JWT revocation (revoke_jti, is_jti_revoked, _is_token_fresh) untested
  4. Gap 3: OIDC exchange token replay untested
  5. Gap 4: OIDC email_verified claim handling untested
  6. Gap 5: Email OTP max-attempts invalidation untested
  7. Gap 6: OIDC callback error redirects (SSRF protection) undertested
  8. Gap 7: Login rate limiting untested
  9. Gap 8: challenge_id cookie binding untested
  10. """
  11. from __future__ import annotations
  12. import base64
  13. import secrets
  14. import time
  15. from datetime import datetime, timedelta, timezone
  16. from unittest.mock import AsyncMock, MagicMock, patch
  17. import jwt as pyjwt
  18. import pytest
  19. from cryptography.hazmat.primitives import serialization
  20. from cryptography.hazmat.primitives.asymmetric import rsa
  21. from httpx import AsyncClient
  22. from sqlalchemy.ext.asyncio import AsyncSession
  23. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  24. from backend.app.models.user import User
  25. AUTH_SETUP_URL = "/api/v1/auth/setup"
  26. LOGIN_URL = "/api/v1/auth/login"
  27. LOGOUT_URL = "/api/v1/auth/logout"
  28. ME_URL = "/api/v1/auth/me"
  29. def _auth_header(token: str) -> dict[str, str]:
  30. return {"Authorization": f"Bearer {token}"}
  31. def _norm_pw(password: str) -> str:
  32. """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
  33. if not any(c.isupper() for c in password):
  34. password = password[0].upper() + password[1:]
  35. if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
  36. password = password + "!"
  37. return password
  38. async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
  39. password = _norm_pw(password)
  40. await client.post(
  41. AUTH_SETUP_URL,
  42. json={"auth_enabled": True, "admin_username": username, "admin_password": password},
  43. )
  44. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  45. assert resp.status_code == 200
  46. return resp.json()["access_token"]
  47. def _make_test_rsa_key():
  48. def _b64url(n: int, length: int) -> str:
  49. return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
  50. private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  51. private_pem = private_key.private_bytes(
  52. serialization.Encoding.PEM,
  53. serialization.PrivateFormat.TraditionalOpenSSL,
  54. serialization.NoEncryption(),
  55. )
  56. pub_numbers = private_key.public_key().public_numbers()
  57. jwks = {
  58. "keys": [
  59. {
  60. "kty": "RSA",
  61. "use": "sig",
  62. "alg": "RS256",
  63. "kid": "test-kid-1",
  64. "n": _b64url(pub_numbers.n, 256),
  65. "e": _b64url(pub_numbers.e, 3),
  66. }
  67. ]
  68. }
  69. return private_pem, jwks
  70. # ===========================================================================
  71. # Gap 1: encryption.py unit tests
  72. # ===========================================================================
  73. class TestEncryption:
  74. """encrypt/decrypt round-trips, plaintext passthrough, RuntimeError on missing key.
  75. The ``mfa_encryption_isolation`` autouse fixture (conftest.py) resets the
  76. ``encryption`` module's globals before/after each test and points
  77. ``DATA_DIR`` at a tmp path, so individual tests only need to set
  78. ``MFA_ENCRYPTION_KEY`` when they want a specific key in scope.
  79. """
  80. def test_encrypt_decrypt_roundtrip_with_key(self, monkeypatch):
  81. from cryptography.fernet import Fernet
  82. import backend.app.core.encryption as enc_mod
  83. test_key = Fernet.generate_key().decode()
  84. monkeypatch.setenv("MFA_ENCRYPTION_KEY", test_key)
  85. # Force re-initialisation now that the env var is set.
  86. enc_mod._fernet_instance = None
  87. ciphertext = enc_mod.mfa_encrypt("my-totp-secret")
  88. assert ciphertext.startswith("fernet:")
  89. assert enc_mod.mfa_decrypt(ciphertext) == "my-totp-secret"
  90. def test_plaintext_passthrough_without_key(self, monkeypatch):
  91. # Force the auto-bootstrap into the legacy "no key available" branch
  92. # by patching _load_or_generate_key directly. This is more robust than
  93. # chmod tricks (which root bypasses) when verifying the plaintext path.
  94. import backend.app.core.encryption as enc_mod
  95. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  96. enc_mod._fernet_instance = None
  97. result = enc_mod.mfa_encrypt("plaintext-secret")
  98. assert result == "plaintext-secret"
  99. assert enc_mod.mfa_decrypt("plaintext-secret") == "plaintext-secret"
  100. def test_decrypt_raises_runtime_error_without_key_for_encrypted_value(self, monkeypatch):
  101. import backend.app.core.encryption as enc_mod
  102. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  103. enc_mod._fernet_instance = None
  104. with pytest.raises(RuntimeError, match="MFA_ENCRYPTION_KEY must be set"):
  105. enc_mod.mfa_decrypt("fernet:gAAAAA-fake-ciphertext")
  106. # ------------------------------------------------------------------
  107. # Auto-bootstrap tests for _load_or_generate_key
  108. # ------------------------------------------------------------------
  109. def test_load_or_generate_key_uses_env_when_set(self, monkeypatch, tmp_path):
  110. """Valid env var → key_source == 'env', no file written."""
  111. from cryptography.fernet import Fernet
  112. import backend.app.core.encryption as enc_mod
  113. valid_key = Fernet.generate_key().decode()
  114. monkeypatch.setenv("MFA_ENCRYPTION_KEY", valid_key)
  115. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  116. enc_mod._fernet_instance = None
  117. key, source = enc_mod._load_or_generate_key()
  118. assert key == valid_key
  119. assert source == "env"
  120. assert not (tmp_path / ".mfa_encryption_key").exists()
  121. def test_invalid_env_key_falls_through_to_file(self, monkeypatch, tmp_path, caplog):
  122. """Invalid env var → logger.error + file fallback (auto-generated)."""
  123. import logging
  124. import backend.app.core.encryption as enc_mod
  125. monkeypatch.setenv("MFA_ENCRYPTION_KEY", "not-a-valid-fernet-key")
  126. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  127. enc_mod._fernet_instance = None
  128. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  129. key, source = enc_mod._load_or_generate_key()
  130. assert source == "generated"
  131. assert key is not None
  132. assert (tmp_path / ".mfa_encryption_key").exists()
  133. assert any("not a valid Fernet key" in rec.message for rec in caplog.records)
  134. def test_load_or_generate_key_reads_existing_file(self, monkeypatch, tmp_path):
  135. """File present in DATA_DIR + no env var → key_source == 'file'."""
  136. from cryptography.fernet import Fernet
  137. import backend.app.core.encryption as enc_mod
  138. existing_key = Fernet.generate_key().decode()
  139. key_file = tmp_path / ".mfa_encryption_key"
  140. key_file.write_text(existing_key)
  141. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  142. enc_mod._fernet_instance = None
  143. key, source = enc_mod._load_or_generate_key()
  144. assert key == existing_key
  145. assert source == "file"
  146. def test_load_or_generate_key_creates_file_with_0600(self, monkeypatch, tmp_path):
  147. """Neither env nor file → new key generated, file mode is 0o600."""
  148. import backend.app.core.encryption as enc_mod
  149. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  150. enc_mod._fernet_instance = None
  151. key, source = enc_mod._load_or_generate_key()
  152. assert source == "generated"
  153. assert enc_mod._validate_fernet_key(key)
  154. key_file = tmp_path / ".mfa_encryption_key"
  155. assert key_file.exists()
  156. # Mode bits LSB are 0o600 — owner read+write only.
  157. assert (key_file.stat().st_mode & 0o777) == 0o600
  158. def test_load_or_generate_key_returns_none_on_write_oserror(self, monkeypatch, tmp_path, caplog):
  159. """When DATA_DIR can't be written to (auto-generate path), return (None, 'none_write_failed').
  160. S1: write now uses os.open(O_EXCL|O_CREAT, 0o600) instead of write_text — patch
  161. os.write to simulate the OS-level failure. S8: source distinguishes write-failed
  162. from corrupted to drive accurate operator messaging.
  163. """
  164. import logging
  165. import os
  166. import backend.app.core.encryption as enc_mod
  167. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  168. enc_mod._fernet_instance = None
  169. original_write = os.write
  170. def _raising_write(fd, data):
  171. # Best-effort: trigger OSError specifically for the key write.
  172. raise OSError("simulated read-only filesystem")
  173. monkeypatch.setattr(os, "write", _raising_write)
  174. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  175. key, source = enc_mod._load_or_generate_key()
  176. # Restore os.write so the rest of the test suite is unaffected.
  177. monkeypatch.setattr(os, "write", original_write)
  178. assert key is None
  179. assert source == "none_write_failed"
  180. assert any("Could not save MFA encryption key" in rec.message for rec in caplog.records)
  181. def test_load_or_generate_key_returns_none_on_read_oserror(self, monkeypatch, tmp_path, caplog):
  182. """B4: existing key file but read fails (e.g. permission denied) → (None, 'none_corrupted').
  183. Critical: must NOT regenerate a new key, which would destroy access to
  184. every row already encrypted under the existing key. S8: 'none_corrupted'
  185. marks the cause so operators see the right diagnostic.
  186. """
  187. import logging
  188. from pathlib import Path
  189. import backend.app.core.encryption as enc_mod
  190. # Pre-create a key file so we hit the existing-file branch.
  191. key_file = tmp_path / ".mfa_encryption_key"
  192. key_file.write_text("placeholder-content")
  193. original_size = key_file.stat().st_size
  194. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  195. enc_mod._fernet_instance = None
  196. original_read_text = Path.read_text
  197. def _raising_read_text(self, *args, **kwargs):
  198. if self.name == ".mfa_encryption_key":
  199. raise OSError("simulated permission denied")
  200. return original_read_text(self, *args, **kwargs)
  201. monkeypatch.setattr(Path, "read_text", _raising_read_text)
  202. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  203. key, source = enc_mod._load_or_generate_key()
  204. assert key is None
  205. assert source == "none_corrupted"
  206. # Critical: file must not have been overwritten with a new key.
  207. assert key_file.exists()
  208. assert key_file.stat().st_size == original_size
  209. assert any("Failed to read existing MFA key file" in rec.message for rec in caplog.records)
  210. assert any("Refusing to regenerate" in rec.message for rec in caplog.records)
  211. def test_get_key_source_reflects_active_source(self, monkeypatch, tmp_path):
  212. """get_key_source() returns the source detected on the most recent _get_fernet() call."""
  213. from cryptography.fernet import Fernet
  214. import backend.app.core.encryption as enc_mod
  215. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  216. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  217. enc_mod._fernet_instance = None
  218. enc_mod._key_source = None
  219. # Trigger lazy initialisation
  220. enc_mod.mfa_encrypt("anything")
  221. assert enc_mod.get_key_source() == "env"
  222. def test_corrupted_key_file_returns_none_without_overwrite(self, monkeypatch, tmp_path, caplog):
  223. """A1: invalid key file content → (None, 'none_corrupted'), file not overwritten.
  224. S8: 'none_corrupted' (vs 'none_write_failed') so operators get the right
  225. diagnostic and don't see a misleading 'DATA_DIR not writable' warning.
  226. """
  227. import logging
  228. import backend.app.core.encryption as enc_mod
  229. key_file = tmp_path / ".mfa_encryption_key"
  230. key_file.write_text("invalid_content")
  231. original_mtime = key_file.stat().st_mtime
  232. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  233. enc_mod._fernet_instance = None
  234. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  235. key, source = enc_mod._load_or_generate_key()
  236. assert key is None
  237. assert source == "none_corrupted"
  238. assert key_file.exists(), "file must not be deleted"
  239. assert key_file.stat().st_mtime == original_mtime, "file must not be overwritten"
  240. assert any("not a valid Fernet key" in rec.message for rec in caplog.records)
  241. assert any("Refusing to overwrite" in rec.message for rec in caplog.records)
  242. def test_auto_generate_fileexistserror_returns_none_corrupted(self, monkeypatch, tmp_path, caplog):
  243. """S1: O_EXCL race — file appears between exists() check and open() →
  244. return (None, 'none_corrupted') without overwriting."""
  245. import logging
  246. import os
  247. import backend.app.core.encryption as enc_mod
  248. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  249. enc_mod._fernet_instance = None
  250. original_open = os.open
  251. def _excl_raise(path, flags, mode=0o777):
  252. if str(path).endswith(".mfa_encryption_key") and (flags & os.O_EXCL):
  253. raise FileExistsError(17, "File exists", str(path))
  254. return original_open(path, flags, mode)
  255. monkeypatch.setattr(os, "open", _excl_raise)
  256. with caplog.at_level(logging.ERROR, logger="backend.app.core.encryption"):
  257. key, source = enc_mod._load_or_generate_key()
  258. assert key is None
  259. assert source == "none_corrupted"
  260. assert any("Race detected" in rec.message for rec in caplog.records)
  261. # ===========================================================================
  262. # Gap 2: JWT revocation — revoke_jti, is_jti_revoked, _is_token_fresh, /me
  263. # ===========================================================================
  264. class TestJWTRevocation:
  265. """JWT revocation and token freshness checks."""
  266. @pytest.mark.asyncio
  267. @pytest.mark.integration
  268. async def test_revoke_jti_and_is_jti_revoked(self, async_client: AsyncClient, db_session: AsyncSession):
  269. """revoke_jti stores the JTI; is_jti_revoked returns True afterwards."""
  270. from backend.app.core.auth import is_jti_revoked, revoke_jti
  271. test_jti = secrets.token_urlsafe(16)
  272. expires = datetime.now(timezone.utc) + timedelta(hours=1)
  273. assert not await is_jti_revoked(test_jti)
  274. await revoke_jti(test_jti, expires, username="testuser")
  275. assert await is_jti_revoked(test_jti)
  276. @pytest.mark.asyncio
  277. @pytest.mark.integration
  278. async def test_revoke_jti_idempotent(self, async_client: AsyncClient):
  279. """Double-revocation of the same JTI should not raise."""
  280. from backend.app.core.auth import is_jti_revoked, revoke_jti
  281. jti = secrets.token_urlsafe(16)
  282. expires = datetime.now(timezone.utc) + timedelta(hours=1)
  283. await revoke_jti(jti, expires)
  284. await revoke_jti(jti, expires) # must not raise
  285. assert await is_jti_revoked(jti)
  286. def test_is_token_fresh_rejects_none_iat(self):
  287. """_is_token_fresh returns False when iat is None (I1 hard cutoff)."""
  288. from backend.app.core.auth import _is_token_fresh
  289. user = MagicMock()
  290. user.password_changed_at = None
  291. assert _is_token_fresh(None, user) is False
  292. def test_is_token_fresh_rejects_token_before_password_change(self):
  293. """_is_token_fresh returns False when iat predates password_changed_at."""
  294. from backend.app.core.auth import _is_token_fresh
  295. now = datetime.now(timezone.utc)
  296. user = MagicMock()
  297. user.password_changed_at = now
  298. old_iat = (now - timedelta(hours=1)).timestamp()
  299. assert _is_token_fresh(old_iat, user) is False
  300. def test_is_token_fresh_accepts_token_after_password_change(self):
  301. """_is_token_fresh returns True when iat is after password_changed_at."""
  302. from backend.app.core.auth import _is_token_fresh
  303. now = datetime.now(timezone.utc)
  304. user = MagicMock()
  305. user.password_changed_at = now - timedelta(hours=1)
  306. recent_iat = now.timestamp()
  307. assert _is_token_fresh(recent_iat, user) is True
  308. def test_is_token_fresh_returns_true_when_no_password_change(self):
  309. """_is_token_fresh returns True when password_changed_at is None (I2 migration not yet run)."""
  310. from backend.app.core.auth import _is_token_fresh
  311. user = MagicMock()
  312. user.password_changed_at = None
  313. assert _is_token_fresh(time.time(), user) is True
  314. @pytest.mark.asyncio
  315. @pytest.mark.integration
  316. async def test_me_endpoint_rejects_token_after_logout(self, async_client: AsyncClient):
  317. """After logout, the bearer token must be rejected by /me (B1 + revocation)."""
  318. token = await _setup_and_login(async_client, "sec_logout_me", "sec_logout_me1")
  319. # Token works before logout
  320. me_resp = await async_client.get(ME_URL, headers=_auth_header(token))
  321. assert me_resp.status_code == 200
  322. # Logout
  323. logout_resp = await async_client.post(LOGOUT_URL, headers=_auth_header(token))
  324. assert logout_resp.status_code == 200
  325. # Token must now be rejected
  326. me_after = await async_client.get(ME_URL, headers=_auth_header(token))
  327. assert me_after.status_code == 401
  328. # ===========================================================================
  329. # Gap 3: OIDC exchange token replay
  330. # ===========================================================================
  331. class TestOIDCExchangeReplay:
  332. """A single-use OIDC exchange token cannot be redeemed twice."""
  333. @pytest.mark.asyncio
  334. @pytest.mark.integration
  335. async def test_exchange_token_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
  336. """The second call to /oidc/exchange with the same token returns 401."""
  337. exchange_token = secrets.token_urlsafe(32)
  338. db_session.add(
  339. AuthEphemeralToken(
  340. token=exchange_token,
  341. token_type="oidc_exchange",
  342. username="oidc_replay_user",
  343. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  344. )
  345. )
  346. await db_session.commit()
  347. # Seed the user so the exchange can resolve it
  348. from backend.app.core.auth import get_password_hash
  349. from backend.app.core.database import async_session, seed_default_groups
  350. async with async_session() as db:
  351. result = await db.execute(__import__("sqlalchemy").select(User).where(User.username == "oidc_replay_user"))
  352. if result.scalar_one_or_none() is None:
  353. db.add(
  354. User(
  355. username="oidc_replay_user",
  356. password_hash=get_password_hash("pw"),
  357. is_active=True,
  358. )
  359. )
  360. await db.commit()
  361. first = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
  362. assert first.status_code == 200
  363. second = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": exchange_token})
  364. assert second.status_code == 401
  365. # ===========================================================================
  366. # Gap 4: OIDC email_verified claim handling
  367. # ===========================================================================
  368. class TestOIDCEmailVerified:
  369. """email_verified: False/absent must not link OIDC identity to an existing email."""
  370. @pytest.mark.asyncio
  371. @pytest.mark.integration
  372. async def test_unverified_email_does_not_link_to_existing_user(
  373. self, async_client: AsyncClient, db_session: AsyncSession
  374. ):
  375. """If email_verified is False, the OIDC callback must not auto-link by email."""
  376. private_pem, jwks_data = _make_test_rsa_key()
  377. issuer = "https://idp.evtest.example.com"
  378. client_id = "ev-client"
  379. nonce = secrets.token_urlsafe(16)
  380. now = int(time.time())
  381. id_token = pyjwt.encode(
  382. {
  383. "sub": "ev-sub-new",
  384. "iss": issuer,
  385. "aud": client_id,
  386. "nonce": nonce,
  387. "email": "existing@example.com",
  388. "email_verified": False, # <-- must be ignored
  389. "iat": now,
  390. "exp": now + 300,
  391. },
  392. private_pem,
  393. algorithm="RS256",
  394. headers={"kid": "test-kid-1"},
  395. )
  396. admin_token = await _setup_and_login(async_client, "ev_admin", "ev_admin1")
  397. # Create existing user with the same email (use strong password for validator)
  398. create_user_resp = await async_client.post(
  399. "/api/v1/users",
  400. json={"username": "existing_email_user", "password": "Str0ng!Pass", "email": "existing@example.com"},
  401. headers=_auth_header(admin_token),
  402. )
  403. assert create_user_resp.status_code in (200, 201), create_user_resp.json()
  404. # Create OIDC provider
  405. create_resp = await async_client.post(
  406. "/api/v1/auth/oidc/providers",
  407. json={
  408. "name": "EV-IdP",
  409. "issuer_url": issuer,
  410. "client_id": client_id,
  411. "client_secret": "secret",
  412. "scopes": "openid email",
  413. "is_enabled": True,
  414. "auto_create_users": True,
  415. },
  416. headers=_auth_header(admin_token),
  417. )
  418. assert create_resp.status_code == 201
  419. provider_id = create_resp.json()["id"]
  420. state = secrets.token_urlsafe(32)
  421. code_verifier = secrets.token_urlsafe(48)
  422. db_session.add(
  423. AuthEphemeralToken(
  424. token=state,
  425. token_type="oidc_state",
  426. provider_id=provider_id,
  427. nonce=nonce,
  428. code_verifier=code_verifier,
  429. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  430. )
  431. )
  432. await db_session.commit()
  433. discovery_doc = {
  434. "issuer": issuer,
  435. "authorization_endpoint": f"{issuer}/auth",
  436. "token_endpoint": f"{issuer}/token",
  437. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  438. }
  439. class _MockResp:
  440. def __init__(self, data):
  441. self._data = data
  442. self.status_code = 200
  443. self.is_success = True
  444. self.text = str(data)
  445. def json(self):
  446. return self._data
  447. def raise_for_status(self):
  448. pass
  449. class _MockHttpxClientEV:
  450. def __init__(self, *args, **kwargs):
  451. pass
  452. async def __aenter__(self):
  453. return self
  454. async def __aexit__(self, *_):
  455. pass
  456. async def get(self, url, **kwargs):
  457. if "jwks" in url:
  458. return _MockResp(jwks_data)
  459. return _MockResp(discovery_doc)
  460. async def post(self, url, **kwargs):
  461. return _MockResp({"access_token": "mock", "token_type": "Bearer", "id_token": id_token})
  462. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientEV):
  463. await async_client.get(
  464. f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
  465. follow_redirects=False,
  466. )
  467. # Callback must NOT link to the existing_email_user — a new user is created
  468. # instead (because the email claim was ignored due to email_verified=False).
  469. # Either a new user is provisioned (redirect with oidc_token) or the callback
  470. # fails. In either case, the existing user must not have an OIDC link.
  471. from sqlalchemy import select as sa_select
  472. from backend.app.models.oidc_provider import UserOIDCLink
  473. link_result = await db_session.execute(
  474. sa_select(UserOIDCLink)
  475. .join(User, UserOIDCLink.user_id == User.id)
  476. .where(User.email == "existing@example.com")
  477. )
  478. link = link_result.scalar_one_or_none()
  479. assert link is None, "Existing user must not be auto-linked when email_verified is False"
  480. # ===========================================================================
  481. # Gap 5: Email OTP max-attempts invalidation
  482. # ===========================================================================
  483. class TestEmailOTPMaxAttempts:
  484. """After MAX_ATTEMPTS wrong codes, the OTP is permanently invalidated."""
  485. @pytest.mark.asyncio
  486. @pytest.mark.integration
  487. async def test_email_otp_invalidated_after_max_attempts(self, async_client: AsyncClient, db_session: AsyncSession):
  488. from passlib.context import CryptContext
  489. from sqlalchemy import select as sa_select
  490. from backend.app.models.user_otp_code import UserOTPCode
  491. _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  492. admin_token = await _setup_and_login(async_client, "otp_max_admin", "otp_max_admin1")
  493. # Enable email OTP for admin user
  494. result = await db_session.execute(sa_select(User).where(User.username == "otp_max_admin"))
  495. user = result.scalar_one()
  496. user.email = "otpmax@example.com"
  497. await db_session.commit()
  498. setup_code = "123456"
  499. from backend.app.models.auth_ephemeral import AuthEphemeralToken as AET
  500. setup_token = secrets.token_urlsafe(32)
  501. db_session.add(
  502. AET(
  503. token=setup_token,
  504. token_type="email_otp_setup",
  505. username="otp_max_admin",
  506. nonce=_pwd_ctx.hash(setup_code),
  507. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  508. )
  509. )
  510. await db_session.commit()
  511. await async_client.post(
  512. "/api/v1/auth/2fa/email/enable/confirm",
  513. json={"setup_token": setup_token, "code": setup_code},
  514. headers=_auth_header(admin_token),
  515. )
  516. # Login to get pre_auth_token
  517. login_resp = await async_client.post(
  518. LOGIN_URL, json={"username": "otp_max_admin", "password": "Otp_max_admin1"}
  519. )
  520. pre_auth_token = login_resp.json()["pre_auth_token"]
  521. # Insert an OTP record directly (bypassing SMTP)
  522. real_code = "654321"
  523. otp = UserOTPCode(
  524. user_id=user.id,
  525. code_hash=_pwd_ctx.hash(real_code),
  526. attempts=0,
  527. used=False,
  528. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  529. )
  530. db_session.add(otp)
  531. await db_session.commit()
  532. # Submit MAX_ATTEMPTS wrong codes
  533. from backend.app.api.routes.mfa import MAX_2FA_ATTEMPTS
  534. for _ in range(MAX_2FA_ATTEMPTS):
  535. r = await async_client.post(
  536. "/api/v1/auth/2fa/verify",
  537. json={"pre_auth_token": pre_auth_token, "code": "000000", "method": "email"},
  538. )
  539. # Each attempt must fail with 401
  540. assert r.status_code == 401
  541. # After max attempts, the correct code is also rejected (either OTP
  542. # invalidated → 401, or rate limit hit → 429). Either means locked out.
  543. final = await async_client.post(
  544. "/api/v1/auth/2fa/verify",
  545. json={"pre_auth_token": pre_auth_token, "code": real_code, "method": "email"},
  546. )
  547. assert final.status_code in (401, 429), f"Expected lockout, got {final.status_code}: {final.json()}"
  548. # ===========================================================================
  549. # Gap 6: OIDC callback SSRF protection — invalid authorization_endpoint scheme
  550. # ===========================================================================
  551. class TestOIDCSSRFProtection:
  552. """authorization_endpoint with non-http(s) scheme must be rejected."""
  553. @pytest.mark.asyncio
  554. @pytest.mark.integration
  555. async def test_invalid_authorization_endpoint_scheme_rejected(
  556. self, async_client: AsyncClient, db_session: AsyncSession
  557. ):
  558. issuer = "https://idp.ssrf.example.com"
  559. client_id = "ssrf-client"
  560. admin_token = await _setup_and_login(async_client, "ssrf_admin", "ssrf_admin1")
  561. create_resp = await async_client.post(
  562. "/api/v1/auth/oidc/providers",
  563. json={
  564. "name": "SSRF-IdP",
  565. "issuer_url": issuer,
  566. "client_id": client_id,
  567. "client_secret": "secret",
  568. "scopes": "openid",
  569. "is_enabled": True,
  570. "auto_create_users": False,
  571. },
  572. headers=_auth_header(admin_token),
  573. )
  574. assert create_resp.status_code == 201
  575. provider_id = create_resp.json()["id"]
  576. # Discovery doc returns a javascript: authorization_endpoint
  577. malicious_discovery = {
  578. "issuer": issuer,
  579. "authorization_endpoint": "javascript:alert(1)", # <-- malicious
  580. "token_endpoint": f"{issuer}/token",
  581. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  582. }
  583. class _MockResp:
  584. def __init__(self, data):
  585. self._data = data
  586. self.status_code = 200
  587. self.is_success = True
  588. self.text = str(data)
  589. def json(self):
  590. return self._data
  591. def raise_for_status(self):
  592. pass
  593. class _MockHttpxClientSSRF:
  594. def __init__(self, *args, **kwargs):
  595. pass
  596. async def __aenter__(self):
  597. return self
  598. async def __aexit__(self, *_):
  599. pass
  600. async def get(self, url, **kwargs):
  601. return _MockResp(malicious_discovery)
  602. async def post(self, url, **kwargs):
  603. return _MockResp({})
  604. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClientSSRF):
  605. # oidc_authorize uses a path parameter, not query param
  606. authorize_resp = await async_client.get(
  607. f"/api/v1/auth/oidc/authorize/{provider_id}",
  608. follow_redirects=False,
  609. )
  610. # Must be rejected with 502 — B2 guard rejects invalid authorization_endpoint scheme
  611. assert authorize_resp.status_code == 502, authorize_resp.json()
  612. detail = authorize_resp.json().get("detail", "").lower()
  613. assert "authorization_endpoint" in detail or "invalid" in detail
  614. # ===========================================================================
  615. # Gap 7: Login rate limiting
  616. # ===========================================================================
  617. class TestLoginRateLimiting:
  618. """10+ failed logins for the same username must return 429."""
  619. @pytest.mark.asyncio
  620. @pytest.mark.integration
  621. async def test_excessive_failed_logins_return_429(self, async_client: AsyncClient):
  622. from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
  623. # Setup auth but do NOT log in
  624. await async_client.post(
  625. AUTH_SETUP_URL,
  626. json={"auth_enabled": True, "admin_username": "ratelimit_user", "admin_password": "Ratelimit_pw1"},
  627. )
  628. status_codes = []
  629. for _ in range(MAX_LOGIN_ATTEMPTS + 2):
  630. resp = await async_client.post(
  631. LOGIN_URL,
  632. json={"username": "ratelimit_user", "password": "wrong_password"},
  633. )
  634. status_codes.append(resp.status_code)
  635. # The last attempts must be 429 (Too Many Requests)
  636. assert status_codes[-1] == 429, f"Expected 429 after {MAX_LOGIN_ATTEMPTS} failures, got: {status_codes}"
  637. # ===========================================================================
  638. # Gap 8: challenge_id cookie binding
  639. # ===========================================================================
  640. class TestChallengeIdCookieBinding:
  641. """A pre-auth token stolen from session A cannot be used from session B."""
  642. @pytest.mark.asyncio
  643. @pytest.mark.integration
  644. async def test_pre_auth_token_rejected_without_matching_cookie(
  645. self, async_client: AsyncClient, db_session: AsyncSession
  646. ):
  647. import pyotp
  648. from passlib.context import CryptContext
  649. _pwd_ctx = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  650. # Set up user with TOTP
  651. await _setup_and_login(async_client, "cookie_bind_user", "cookie_bind_pw1")
  652. secret = pyotp.random_base32()
  653. totp_obj = pyotp.TOTP(secret)
  654. from sqlalchemy import select as sa_select
  655. from backend.app.models.user_totp import UserTOTP
  656. result = await db_session.execute(sa_select(User).where(User.username == "cookie_bind_user"))
  657. user = result.scalar_one()
  658. db_session.add(UserTOTP(user_id=user.id, secret=secret, is_enabled=True))
  659. await db_session.commit()
  660. # Login from "session A" — gets a pre_auth_token and a 2fa_challenge cookie
  661. login_resp = await async_client.post(
  662. LOGIN_URL, json={"username": "cookie_bind_user", "password": "Cookie_bind_pw1"}
  663. )
  664. assert login_resp.status_code == 200
  665. assert login_resp.json()["requires_2fa"] is True
  666. pre_auth_token = login_resp.json()["pre_auth_token"]
  667. # The async_client jar now holds the 2fa_challenge cookie for session A
  668. # Simulate session B by creating a new client WITHOUT the cookie
  669. from httpx import ASGITransport, AsyncClient as FreshClient
  670. from backend.app.main import app
  671. async with FreshClient(transport=ASGITransport(app=app), base_url="http://test") as session_b:
  672. # Attempt to use session A's pre_auth_token from session B (no cookie)
  673. verify_resp = await session_b.post(
  674. "/api/v1/auth/2fa/verify",
  675. json={
  676. "pre_auth_token": pre_auth_token,
  677. "code": totp_obj.now(),
  678. "method": "totp",
  679. },
  680. )
  681. # Must be rejected — pre_auth_token is bound to session A's cookie
  682. assert verify_resp.status_code == 401, (
  683. f"Expected 401 for token replay from cookieless session, got {verify_resp.status_code}: "
  684. f"{verify_resp.json()}"
  685. )
  686. # ===========================================================================
  687. # C2: Security-header middleware
  688. # ===========================================================================
  689. class TestSecurityHeaders:
  690. """Every HTTP response must include standard security headers (C2)."""
  691. @pytest.mark.asyncio
  692. @pytest.mark.integration
  693. async def test_security_headers_present(self, async_client: AsyncClient):
  694. """GET /api/v1/auth/me (unauthenticated → 401) still carries security headers."""
  695. resp = await async_client.get(ME_URL)
  696. assert resp.status_code == 401 # sanity — no auth token
  697. assert resp.headers.get("x-content-type-options") == "nosniff"
  698. assert resp.headers.get("x-frame-options") == "SAMEORIGIN"
  699. assert resp.headers.get("referrer-policy") == "strict-origin-when-cross-origin"
  700. csp = resp.headers.get("content-security-policy", "")
  701. assert "default-src 'self'" in csp
  702. assert "script-src 'self'" in csp
  703. assert "frame-ancestors 'none'" in csp
  704. assert "object-src 'none'" in csp
  705. @pytest.mark.asyncio
  706. @pytest.mark.integration
  707. async def test_hsts_absent_for_http(self, async_client: AsyncClient):
  708. """HSTS must NOT be set over plain HTTP (test transport uses http)."""
  709. resp = await async_client.get(ME_URL)
  710. assert "strict-transport-security" not in resp.headers
  711. # ===========================================================================
  712. # I3: Rate-limit bucket interaction — IP spray vs. username spray
  713. # ===========================================================================
  714. class TestRateLimitBuckets:
  715. """IP-spray and username-spray must each trip the correct independent bucket."""
  716. @pytest.mark.asyncio
  717. @pytest.mark.integration
  718. async def test_ip_spray_trips_ip_bucket(self, async_client: AsyncClient):
  719. """20 failed logins from one IP across 20 different usernames trips the IP bucket.
  720. Each per-username bucket only has 1 failure (well below MAX_LOGIN_ATTEMPTS=10),
  721. so the username bucket is never the reason for the 429.
  722. """
  723. from unittest.mock import patch as _patch
  724. unique_ip = "10.99.1.1"
  725. # Ensure auth is enabled
  726. await async_client.post(
  727. AUTH_SETUP_URL,
  728. json={"auth_enabled": True, "admin_username": "spray_ip_admin", "admin_password": "SprayIp_admin1"},
  729. )
  730. status_codes: list[int] = []
  731. with _patch("backend.app.api.routes.auth._get_client_ip", return_value=unique_ip):
  732. for i in range(22):
  733. resp = await async_client.post(
  734. LOGIN_URL,
  735. json={"username": f"spray_ip_victim_{i}", "password": "wrong"},
  736. )
  737. status_codes.append(resp.status_code)
  738. # The first 20 attempts fail with 401; the 21st+ must be 429 (IP bucket full)
  739. assert status_codes[-1] == 429, f"Expected 429 after 20 IP-spray failures, got: {status_codes}"
  740. # No single username saw more than one attempt → username buckets not tripped
  741. non_429 = [c for c in status_codes[:-2] if c == 429]
  742. assert not non_429, f"Username bucket triggered early: {status_codes}"
  743. @pytest.mark.asyncio
  744. @pytest.mark.integration
  745. async def test_username_spray_trips_username_bucket(self, async_client: AsyncClient):
  746. """One username targeted from 10+ different IPs trips the username bucket.
  747. Each per-IP bucket only sees 1 failure, so no IP bucket is tripped.
  748. The username bucket (max 10) is what fires the 429.
  749. """
  750. from unittest.mock import patch as _patch
  751. from backend.app.api.routes.mfa import MAX_LOGIN_ATTEMPTS
  752. # Ensure auth is enabled
  753. await async_client.post(
  754. AUTH_SETUP_URL,
  755. json={
  756. "auth_enabled": True,
  757. "admin_username": "spray_uname_admin",
  758. "admin_password": "SprayUname_admin1",
  759. },
  760. )
  761. target_username = "spray_uname_victim"
  762. status_codes: list[int] = []
  763. for i in range(MAX_LOGIN_ATTEMPTS + 2):
  764. rotating_ip = f"10.99.2.{i + 1}"
  765. with _patch("backend.app.api.routes.auth._get_client_ip", return_value=rotating_ip):
  766. resp = await async_client.post(
  767. LOGIN_URL,
  768. json={"username": target_username, "password": "wrong"},
  769. )
  770. status_codes.append(resp.status_code)
  771. # After MAX_LOGIN_ATTEMPTS failures for same username the bucket fires
  772. assert status_codes[-1] == 429, (
  773. f"Expected 429 after {MAX_LOGIN_ATTEMPTS} username-spray failures, got: {status_codes}"
  774. )
  775. # ============================================================================
  776. # TestEncryptLegacyMigration
  777. # ============================================================================
  778. class TestEncryptLegacyMigration:
  779. """Re-encryption migration of legacy plaintext OIDC + TOTP rows.
  780. The migration runs against its own ``async_session`` factory (not the
  781. ``db_session`` fixture) so each test patches the module-level factory to
  782. point at the test-engine before invoking the helper. ``db_session`` is
  783. used to seed and to verify state via the same engine.
  784. """
  785. @staticmethod
  786. def _patch_module_session(monkeypatch, db_session):
  787. """Bind ``database.async_session`` to the test engine for one test."""
  788. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
  789. from backend.app.core import database as db_mod
  790. test_factory = async_sessionmaker(db_session.bind, class_=AsyncSession, expire_on_commit=False)
  791. monkeypatch.setattr(db_mod, "async_session", test_factory)
  792. @staticmethod
  793. def _set_active_key(monkeypatch):
  794. """Configure a valid Fernet key for the migration to use."""
  795. from cryptography.fernet import Fernet
  796. import backend.app.core.encryption as enc_mod
  797. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  798. enc_mod._fernet_instance = None
  799. @pytest.mark.asyncio
  800. @pytest.mark.integration
  801. async def test_migration_encrypts_plaintext_oidc_secret(self, db_session, monkeypatch):
  802. from sqlalchemy import select
  803. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  804. from backend.app.models.oidc_provider import OIDCProvider
  805. self._patch_module_session(monkeypatch, db_session)
  806. self._set_active_key(monkeypatch)
  807. provider = OIDCProvider(
  808. name="LegacyProv",
  809. issuer_url="https://legacy.example.com",
  810. client_id="cid",
  811. _client_secret_enc="legacy-plaintext",
  812. scopes="openid email profile",
  813. is_enabled=True,
  814. )
  815. db_session.add(provider)
  816. await db_session.commit()
  817. await _migrate_encrypt_legacy_secrets()
  818. # Re-fetch on a fresh row state
  819. await db_session.refresh(provider)
  820. assert provider._client_secret_enc.startswith("fernet:")
  821. # Decrypted value matches the original plaintext
  822. assert provider.client_secret == "legacy-plaintext"
  823. # Sanity: a SELECT also sees the encrypted value
  824. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == provider.id))
  825. fetched = result.scalar_one()
  826. assert fetched._client_secret_enc.startswith("fernet:")
  827. @pytest.mark.asyncio
  828. @pytest.mark.integration
  829. async def test_migration_skips_already_encrypted_rows(self, db_session, monkeypatch):
  830. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  831. from backend.app.models.oidc_provider import OIDCProvider
  832. self._patch_module_session(monkeypatch, db_session)
  833. self._set_active_key(monkeypatch)
  834. # Use the property setter so the value is encrypted up front.
  835. provider = OIDCProvider(
  836. name="EncProv",
  837. issuer_url="https://enc.example.com",
  838. client_id="cid",
  839. client_secret="already-encrypted",
  840. scopes="openid email profile",
  841. is_enabled=True,
  842. )
  843. db_session.add(provider)
  844. await db_session.commit()
  845. original_enc = provider._client_secret_enc
  846. await _migrate_encrypt_legacy_secrets()
  847. await _migrate_encrypt_legacy_secrets() # idempotent
  848. await db_session.refresh(provider)
  849. # Value unchanged across two migration runs (still the same ciphertext).
  850. assert provider._client_secret_enc == original_enc
  851. @pytest.mark.asyncio
  852. @pytest.mark.integration
  853. async def test_migration_no_op_when_key_unset(self, db_session, monkeypatch):
  854. import backend.app.core.encryption as enc_mod
  855. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  856. from backend.app.models.oidc_provider import OIDCProvider
  857. self._patch_module_session(monkeypatch, db_session)
  858. # Force "no key" branch
  859. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  860. enc_mod._fernet_instance = None
  861. provider = OIDCProvider(
  862. name="NoKeyProv",
  863. issuer_url="https://nokey.example.com",
  864. client_id="cid",
  865. _client_secret_enc="still-plaintext",
  866. scopes="openid email profile",
  867. is_enabled=True,
  868. )
  869. db_session.add(provider)
  870. await db_session.commit()
  871. await _migrate_encrypt_legacy_secrets()
  872. await db_session.refresh(provider)
  873. # Migration should have early-returned; plaintext untouched.
  874. assert provider._client_secret_enc == "still-plaintext"
  875. @pytest.mark.asyncio
  876. @pytest.mark.integration
  877. async def test_migration_handles_mixed_state(self, db_session, monkeypatch):
  878. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  879. from backend.app.models.oidc_provider import OIDCProvider
  880. self._patch_module_session(monkeypatch, db_session)
  881. self._set_active_key(monkeypatch)
  882. legacy = OIDCProvider(
  883. name="LegacyMix",
  884. issuer_url="https://l.example.com",
  885. client_id="c1",
  886. _client_secret_enc="plain-mix",
  887. scopes="openid email profile",
  888. )
  889. encrypted = OIDCProvider(
  890. name="EncMix",
  891. issuer_url="https://e.example.com",
  892. client_id="c2",
  893. client_secret="encrypted-mix", # uses setter
  894. scopes="openid email profile",
  895. )
  896. db_session.add_all([legacy, encrypted])
  897. await db_session.commit()
  898. original_encrypted = encrypted._client_secret_enc
  899. await _migrate_encrypt_legacy_secrets()
  900. await db_session.refresh(legacy)
  901. await db_session.refresh(encrypted)
  902. assert legacy._client_secret_enc.startswith("fernet:")
  903. assert legacy.client_secret == "plain-mix"
  904. assert encrypted._client_secret_enc == original_encrypted
  905. @pytest.mark.asyncio
  906. @pytest.mark.integration
  907. async def test_migration_encrypts_plaintext_totp_secret(self, db_session, monkeypatch):
  908. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  909. from backend.app.models.user import User
  910. from backend.app.models.user_totp import UserTOTP
  911. self._patch_module_session(monkeypatch, db_session)
  912. self._set_active_key(monkeypatch)
  913. user = User(username="totpuser1219", email="t@example.com", password_hash="x")
  914. db_session.add(user)
  915. await db_session.flush()
  916. totp = UserTOTP(user_id=user.id, _secret_enc="JBSWY3DPEHPK3PXP", is_enabled=True)
  917. db_session.add(totp)
  918. await db_session.commit()
  919. await _migrate_encrypt_legacy_secrets()
  920. await db_session.refresh(totp)
  921. assert totp._secret_enc.startswith("fernet:")
  922. assert totp.secret == "JBSWY3DPEHPK3PXP"
  923. @pytest.mark.asyncio
  924. @pytest.mark.integration
  925. async def test_migration_logs_count_of_rows_re_encrypted(self, db_session, monkeypatch, caplog):
  926. import logging
  927. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  928. from backend.app.models.oidc_provider import OIDCProvider
  929. from backend.app.models.user import User
  930. from backend.app.models.user_totp import UserTOTP
  931. self._patch_module_session(monkeypatch, db_session)
  932. self._set_active_key(monkeypatch)
  933. provider = OIDCProvider(
  934. name="LegacyLog",
  935. issuer_url="https://log.example.com",
  936. client_id="c",
  937. _client_secret_enc="p",
  938. scopes="openid email profile",
  939. )
  940. user = User(username="logger1219", email="l@example.com", password_hash="x")
  941. db_session.add_all([provider, user])
  942. await db_session.flush()
  943. totp = UserTOTP(user_id=user.id, _secret_enc="JBSWY3DPEHPK3PXP", is_enabled=True)
  944. db_session.add(totp)
  945. await db_session.commit()
  946. with caplog.at_level(logging.INFO, logger="backend.app.core.database"):
  947. await _migrate_encrypt_legacy_secrets()
  948. # The migration logs once with both counts.
  949. assert any(
  950. "Re-encrypted legacy plaintext secrets" in rec.message
  951. and "1 OIDC client_secret(s)" in rec.message
  952. and "1 TOTP secret(s)" in rec.message
  953. for rec in caplog.records
  954. )
  955. @pytest.mark.asyncio
  956. @pytest.mark.integration
  957. async def test_migration_continues_on_row_error(self, db_session, monkeypatch, caplog):
  958. """B2: per-row commit semantics — when one row fails to re-encrypt,
  959. OTHER successfully-encrypted rows must remain committed and the
  960. failure surfaces via get_migration_error_count.
  961. Replaces the previous "rollback all" behaviour: a single poison row
  962. used to block every successful re-encryption on every startup forever.
  963. """
  964. import logging
  965. import backend.app.core.encryption as enc_mod # noqa: F401
  966. from backend.app.core.database import (
  967. _migrate_encrypt_legacy_secrets,
  968. get_migration_error_count,
  969. )
  970. from backend.app.models.oidc_provider import OIDCProvider
  971. self._patch_module_session(monkeypatch, db_session)
  972. self._set_active_key(monkeypatch)
  973. good = OIDCProvider(
  974. name="GoodRow",
  975. issuer_url="https://good.example.com",
  976. client_id="c1",
  977. _client_secret_enc="plaintext-good",
  978. scopes="openid email profile",
  979. )
  980. bad = OIDCProvider(
  981. name="BadRow",
  982. issuer_url="https://bad.example.com",
  983. client_id="c2",
  984. _client_secret_enc="plaintext-bad",
  985. scopes="openid email profile",
  986. )
  987. db_session.add_all([good, bad])
  988. await db_session.commit()
  989. original_bad = bad._client_secret_enc
  990. # Force the setter on the SECOND row to raise — patch at the model's
  991. # import location so the property setter picks up the patched function.
  992. import backend.app.models.oidc_provider as oidc_mod
  993. real_encrypt = oidc_mod.mfa_encrypt
  994. call_count = [0]
  995. def _sometimes_raise(value):
  996. call_count[0] += 1
  997. if call_count[0] == 2:
  998. raise RuntimeError("simulated encrypt failure")
  999. return real_encrypt(value)
  1000. monkeypatch.setattr(oidc_mod, "mfa_encrypt", _sometimes_raise)
  1001. with caplog.at_level(logging.ERROR, logger="backend.app.core.database"):
  1002. await _migrate_encrypt_legacy_secrets()
  1003. # B2: per-row commit — good IS encrypted, bad is unchanged.
  1004. await db_session.refresh(good)
  1005. await db_session.refresh(bad)
  1006. assert good._client_secret_enc.startswith("fernet:"), (
  1007. "good row must be successfully re-encrypted (per-row commit)"
  1008. )
  1009. assert bad._client_secret_enc == original_bad, "bad row must remain unchanged (savepoint-style isolation)"
  1010. assert get_migration_error_count() == 1, "the skipped row must be exposed via get_migration_error_count"
  1011. assert any("skipping" in rec.message.lower() for rec in caplog.records)
  1012. @pytest.mark.asyncio
  1013. @pytest.mark.integration
  1014. async def test_migration_logs_no_op_when_all_encrypted(self, db_session, monkeypatch, caplog):
  1015. """A2: when all rows are already encrypted, migration logs a debug no-op."""
  1016. import logging
  1017. from backend.app.core.database import _migrate_encrypt_legacy_secrets
  1018. from backend.app.models.oidc_provider import OIDCProvider
  1019. self._patch_module_session(monkeypatch, db_session)
  1020. self._set_active_key(monkeypatch)
  1021. provider = OIDCProvider(
  1022. name="AlreadyEnc",
  1023. issuer_url="https://ae.example.com",
  1024. client_id="cae",
  1025. client_secret="already-encrypted",
  1026. scopes="openid email profile",
  1027. )
  1028. db_session.add(provider)
  1029. await db_session.commit()
  1030. with caplog.at_level(logging.DEBUG, logger="backend.app.core.database"):
  1031. await _migrate_encrypt_legacy_secrets()
  1032. assert any("no rows needed re-encryption" in rec.message for rec in caplog.records)
  1033. @pytest.mark.asyncio
  1034. @pytest.mark.integration
  1035. async def test_init_db_propagates_unexpected_migration_error(self, monkeypatch, tmp_path):
  1036. """B3: an unexpected error from _migrate_encrypt_legacy_secrets must
  1037. surface (re-raise) instead of being silently swallowed.
  1038. Pins the contract introduced for B3: a startup-fatal error like a
  1039. session-creation failure must fail the lifespan / CLI / restore
  1040. handler explicitly, never run the app with half-migrated rows.
  1041. Implementation note: we patch _migrate_encrypt_legacy_secrets itself
  1042. rather than poking the inner read phase, because that is the contract
  1043. boundary the rest of the codebase relies on (init_db -> migration).
  1044. """
  1045. from sqlalchemy import event
  1046. from sqlalchemy.ext.asyncio import create_async_engine
  1047. import backend.app.core.database as db_mod
  1048. from backend.app.core.config import settings
  1049. # init_db() uses the module-level `engine`, which was bound at import
  1050. # time to settings.database_url — that resolves to the real shared
  1051. # bambuddy.db at the project root (or, when DATABASE_URL is set, the
  1052. # configured Postgres). The autouse DATA_DIR fixture runs too late to
  1053. # influence either. Letting this test write to that real DB makes it
  1054. # (a) non-hermetic and (b) flake under `-n 30` with "database is
  1055. # locked" when two workers race on the file. Substitute an isolated
  1056. # per-test SQLite engine — and override settings.database_url for
  1057. # this test so the is_sqlite() / is_postgres() dialect guards inside
  1058. # run_migrations pick the SQLite path against this engine.
  1059. test_db_url = f"sqlite+aiosqlite:///{tmp_path / 'init_db_test.db'}"
  1060. test_engine = create_async_engine(test_db_url, echo=False)
  1061. event.listen(test_engine.sync_engine, "connect", db_mod._set_sqlite_pragmas)
  1062. monkeypatch.setattr(db_mod, "engine", test_engine)
  1063. monkeypatch.setattr(settings, "database_url", test_db_url)
  1064. async def boom():
  1065. raise RuntimeError("simulated startup-fatal failure")
  1066. # Stub out the rest of init_db so we exercise only the migration step.
  1067. # init_db opens the engine.begin() block, runs metadata.create_all,
  1068. # run_migrations, then awaits _migrate_encrypt_legacy_secrets — the
  1069. # only call we want to fail.
  1070. monkeypatch.setattr(db_mod, "_migrate_encrypt_legacy_secrets", boom)
  1071. monkeypatch.setattr(db_mod, "seed_notification_templates", lambda: _noop_async())
  1072. monkeypatch.setattr(db_mod, "seed_default_groups", lambda: _noop_async())
  1073. monkeypatch.setattr(db_mod, "seed_spool_catalog", lambda: _noop_async())
  1074. monkeypatch.setattr(db_mod, "seed_color_catalog", lambda: _noop_async())
  1075. try:
  1076. with pytest.raises(RuntimeError, match="simulated startup-fatal failure"):
  1077. await db_mod.init_db()
  1078. finally:
  1079. await test_engine.dispose()
  1080. async def _noop_async():
  1081. """Helper for tests that need to stub out `seed_*` async coroutines."""
  1082. return None
  1083. # ============================================================================
  1084. # TestEncryptionStatusEndpoint
  1085. # ============================================================================
  1086. class TestEncryptionStatusEndpoint:
  1087. """GET /api/v1/auth/encryption-status: key source, counts, decryption_broken."""
  1088. STATUS_URL = "/api/v1/auth/encryption-status"
  1089. async def _create_admin_and_login(self, async_client: AsyncClient) -> str:
  1090. """Bootstrap auth + return a Bearer token for an admin."""
  1091. await async_client.post(
  1092. "/api/v1/auth/setup",
  1093. json={
  1094. "auth_enabled": True,
  1095. "admin_username": "admin1219",
  1096. "admin_password": "Admin1219!Pass",
  1097. },
  1098. )
  1099. login = await async_client.post(
  1100. "/api/v1/auth/login",
  1101. json={"username": "admin1219", "password": "Admin1219!Pass"},
  1102. )
  1103. assert login.status_code == 200, login.text
  1104. return login.json()["access_token"]
  1105. @pytest.mark.asyncio
  1106. @pytest.mark.integration
  1107. async def test_status_reports_env_source(self, async_client, monkeypatch):
  1108. from cryptography.fernet import Fernet
  1109. import backend.app.core.encryption as enc_mod
  1110. token = await self._create_admin_and_login(async_client)
  1111. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1112. enc_mod._fernet_instance = None
  1113. enc_mod._key_source = None
  1114. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1115. assert resp.status_code == 200
  1116. data = resp.json()
  1117. assert data["key_configured"] is True
  1118. assert data["key_source"] == "env"
  1119. assert data["decryption_broken"] is False
  1120. @pytest.mark.asyncio
  1121. @pytest.mark.integration
  1122. async def test_status_reports_file_source(self, async_client, monkeypatch, tmp_path):
  1123. from cryptography.fernet import Fernet
  1124. import backend.app.core.encryption as enc_mod
  1125. token = await self._create_admin_and_login(async_client)
  1126. # Pre-place a valid key file in DATA_DIR.
  1127. key_file = tmp_path / ".mfa_encryption_key"
  1128. key_file.write_text(Fernet.generate_key().decode())
  1129. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1130. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1131. enc_mod._fernet_instance = None
  1132. enc_mod._key_source = None
  1133. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1134. assert resp.status_code == 200
  1135. data = resp.json()
  1136. assert data["key_source"] == "file"
  1137. @pytest.mark.asyncio
  1138. @pytest.mark.integration
  1139. async def test_status_reports_generated_source(self, async_client, monkeypatch, tmp_path):
  1140. import backend.app.core.encryption as enc_mod
  1141. token = await self._create_admin_and_login(async_client)
  1142. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1143. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1144. enc_mod._fernet_instance = None
  1145. enc_mod._key_source = None
  1146. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1147. assert resp.status_code == 200
  1148. data = resp.json()
  1149. assert data["key_source"] == "generated"
  1150. assert (tmp_path / ".mfa_encryption_key").exists()
  1151. @pytest.mark.asyncio
  1152. @pytest.mark.integration
  1153. async def test_status_reports_none_source(self, async_client, monkeypatch):
  1154. import backend.app.core.encryption as enc_mod
  1155. token = await self._create_admin_and_login(async_client)
  1156. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1157. enc_mod._fernet_instance = None
  1158. enc_mod._key_source = None
  1159. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1160. assert resp.status_code == 200
  1161. data = resp.json()
  1162. assert data["key_configured"] is False
  1163. assert data["key_source"] == "none"
  1164. @pytest.mark.asyncio
  1165. @pytest.mark.integration
  1166. async def test_status_counts_legacy_rows(self, async_client, db_session, monkeypatch):
  1167. from backend.app.models.oidc_provider import OIDCProvider
  1168. token = await self._create_admin_and_login(async_client)
  1169. provider = OIDCProvider(
  1170. name="LegacyStatus",
  1171. issuer_url="https://ls.example.com",
  1172. client_id="c",
  1173. _client_secret_enc="plaintext-no-prefix",
  1174. scopes="openid email profile",
  1175. )
  1176. db_session.add(provider)
  1177. await db_session.commit()
  1178. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1179. assert resp.status_code == 200
  1180. data = resp.json()
  1181. assert data["legacy_plaintext_rows"]["oidc_providers"] >= 1
  1182. @pytest.mark.asyncio
  1183. @pytest.mark.integration
  1184. async def test_status_counts_encrypted_rows(self, async_client, db_session, monkeypatch):
  1185. from cryptography.fernet import Fernet
  1186. import backend.app.core.encryption as enc_mod
  1187. from backend.app.models.oidc_provider import OIDCProvider
  1188. token = await self._create_admin_and_login(async_client)
  1189. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1190. enc_mod._fernet_instance = None
  1191. enc_mod._key_source = None
  1192. provider = OIDCProvider(
  1193. name="EncStatus",
  1194. issuer_url="https://es.example.com",
  1195. client_id="c",
  1196. client_secret="real-secret", # via setter → encrypted
  1197. scopes="openid email profile",
  1198. )
  1199. db_session.add(provider)
  1200. await db_session.commit()
  1201. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1202. assert resp.status_code == 200
  1203. data = resp.json()
  1204. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1205. @pytest.mark.asyncio
  1206. @pytest.mark.integration
  1207. async def test_status_warns_on_encrypted_rows_without_key(self, async_client, db_session, monkeypatch):
  1208. """Gap 2: encrypted rows present but no key loadable → decryption_broken=true."""
  1209. import backend.app.core.encryption as enc_mod
  1210. from backend.app.models.oidc_provider import OIDCProvider
  1211. token = await self._create_admin_and_login(async_client)
  1212. # Insert a row whose value is already prefixed (simulates a previously-encrypted row).
  1213. provider = OIDCProvider(
  1214. name="BrokenEnc",
  1215. issuer_url="https://be.example.com",
  1216. client_id="c",
  1217. _client_secret_enc="fernet:gAAAAA-fake-but-prefixed",
  1218. scopes="openid email profile",
  1219. )
  1220. db_session.add(provider)
  1221. await db_session.commit()
  1222. # Now disable key loading so decryption is impossible.
  1223. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1224. enc_mod._fernet_instance = None
  1225. enc_mod._key_source = None
  1226. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1227. assert resp.status_code == 200
  1228. data = resp.json()
  1229. assert data["key_configured"] is False
  1230. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1231. assert data["decryption_broken"] is True
  1232. @pytest.mark.asyncio
  1233. @pytest.mark.integration
  1234. async def test_status_requires_settings_read_permission(self, async_client, db_session):
  1235. """Non-admin without settings:read permission gets 403."""
  1236. from backend.app.models.user import User
  1237. await self._create_admin_and_login(async_client)
  1238. # Create a low-privilege user (no group → no permissions in default seed).
  1239. from backend.app.core.auth import get_password_hash
  1240. viewer = User(
  1241. username="viewer1219",
  1242. email="viewer1219@example.com",
  1243. password_hash=get_password_hash("Viewer1219!Pass"),
  1244. role="user",
  1245. is_active=True,
  1246. )
  1247. db_session.add(viewer)
  1248. await db_session.commit()
  1249. login = await async_client.post(
  1250. "/api/v1/auth/login",
  1251. json={"username": "viewer1219", "password": "Viewer1219!Pass"},
  1252. )
  1253. assert login.status_code == 200, login.text
  1254. token = login.json().get("access_token")
  1255. assert token is not None, f"Expected access_token in login response, got: {login.json()}"
  1256. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1257. assert resp.status_code == 403
  1258. @pytest.mark.asyncio
  1259. @pytest.mark.integration
  1260. async def test_status_returns_503_on_db_error(self, async_client, monkeypatch):
  1261. """A8: a DB failure during the request must NOT leak internal detail
  1262. and must NOT silently succeed.
  1263. Post-GHSA-6mf4-q26m-47pv: the auth middleware's ``is_auth_enabled``
  1264. probe runs its own DB query before the route is dispatched. Patching
  1265. ``AsyncSession.execute`` to raise now trips the middleware first and
  1266. the request fails closed with 503 — a stronger guarantee than the
  1267. previous route-level 500, because under the old fail-open the
  1268. middleware would have proceeded to dispatch the route unauthenticated.
  1269. Either status would be acceptable; the assertion here pins the
  1270. defense-in-depth posture (request denied, no leak).
  1271. """
  1272. from unittest.mock import AsyncMock
  1273. from sqlalchemy.exc import SQLAlchemyError
  1274. token = await self._create_admin_and_login(async_client)
  1275. async def _raise(*args, **kwargs):
  1276. raise SQLAlchemyError("simulated DB failure")
  1277. monkeypatch.setattr("sqlalchemy.ext.asyncio.AsyncSession.execute", AsyncMock(side_effect=_raise))
  1278. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1279. assert resp.status_code in (500, 503)
  1280. # Either layer's error message must not leak the SQLAlchemy details.
  1281. body = resp.json().get("detail", "").lower()
  1282. assert "simulated" not in body
  1283. assert "sqlalchemy" not in body
  1284. @pytest.mark.asyncio
  1285. @pytest.mark.integration
  1286. async def test_status_returns_403_for_viewer_in_viewers_group(self, async_client, db_session):
  1287. """S2: a user in the Viewers group (has SETTINGS_READ but NOT SETTINGS_UPDATE)
  1288. must get 403 — encryption-status is admin/operator only.
  1289. """
  1290. from sqlalchemy import insert, select
  1291. from backend.app.core.auth import get_password_hash
  1292. from backend.app.models.group import Group, user_groups
  1293. from backend.app.models.user import User
  1294. # Bootstrap auth (creates default groups via setup endpoint).
  1295. await self._create_admin_and_login(async_client)
  1296. # Create a user explicitly in the Viewers group — it has SETTINGS_READ
  1297. # but not SETTINGS_UPDATE, which is the discriminator for S2.
  1298. viewer = User(
  1299. username="viewer_s2",
  1300. email="viewer_s2@example.com",
  1301. password_hash=get_password_hash("ViewerS2!Pass1"),
  1302. role="user",
  1303. is_active=True,
  1304. )
  1305. db_session.add(viewer)
  1306. await db_session.flush()
  1307. viewers_group = (await db_session.execute(select(Group).where(Group.name == "Viewers"))).scalar_one_or_none()
  1308. assert viewers_group is not None, "Viewers group must be seeded by setup"
  1309. # Insert the association row directly to avoid touching the lazy
  1310. # `viewer.groups` relationship (which would trigger an implicit
  1311. # IO inside an active async transaction and fail with MissingGreenlet).
  1312. await db_session.execute(insert(user_groups).values(user_id=viewer.id, group_id=viewers_group.id))
  1313. await db_session.commit()
  1314. login = await async_client.post(
  1315. "/api/v1/auth/login",
  1316. json={"username": "viewer_s2", "password": "ViewerS2!Pass1"},
  1317. )
  1318. assert login.status_code == 200, login.text
  1319. token = login.json()["access_token"]
  1320. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1321. assert resp.status_code == 403, "S2: Viewers (SETTINGS_READ only) must NOT be able to read encryption-status"
  1322. @pytest.mark.asyncio
  1323. @pytest.mark.integration
  1324. async def test_status_decryption_broken_when_wrong_key_active(self, async_client, db_session, monkeypatch):
  1325. """B4: key is configured but cannot decrypt existing rows → decryption_broken=True.
  1326. This is the "wrong key" state that the legacy computed_field check
  1327. missed — operator pasted a different valid Fernet key (rotation,
  1328. cross-deployment restore, env override). Status used to show GREEN
  1329. while every encrypted row was unrecoverable.
  1330. """
  1331. from cryptography.fernet import Fernet
  1332. import backend.app.core.encryption as enc_mod
  1333. from backend.app.models.oidc_provider import OIDCProvider
  1334. token = await self._create_admin_and_login(async_client)
  1335. # Insert a row whose value is fernet-prefixed but encrypted under a
  1336. # DIFFERENT key (the prefix matches, but decrypt will throw).
  1337. provider = OIDCProvider(
  1338. name="WrongKeyEnc",
  1339. issuer_url="https://wk.example.com",
  1340. client_id="c",
  1341. _client_secret_enc=("fernet:" + Fernet(Fernet.generate_key()).encrypt(b"original").decode()),
  1342. scopes="openid email profile",
  1343. )
  1344. db_session.add(provider)
  1345. await db_session.commit()
  1346. # Now activate a DIFFERENT key — sample-decrypt must fail.
  1347. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1348. enc_mod._fernet_instance = None
  1349. enc_mod._key_source = None
  1350. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1351. assert resp.status_code == 200, resp.text
  1352. data = resp.json()
  1353. assert data["key_configured"] is True, "different key is still 'configured'"
  1354. assert data["encrypted_rows"]["oidc_providers"] >= 1
  1355. assert data["decryption_broken"] is True, "B4: sample-decrypt must detect wrong-key state"
  1356. @pytest.mark.asyncio
  1357. @pytest.mark.integration
  1358. async def test_status_decryption_broken_with_only_totp_rows(self, async_client, db_session, monkeypatch):
  1359. """B4: the sample-decrypt fallback to UserTOTP fires when there are no
  1360. encrypted OIDC rows but TOTP rows exist. The OIDC-only test above
  1361. proves the primary path; this pins the second branch in the same
  1362. try-block so a future refactor of the row-source switch can't silently
  1363. regress wrong-key detection for TOTP-only deployments.
  1364. """
  1365. from cryptography.fernet import Fernet
  1366. from sqlalchemy import select
  1367. import backend.app.core.encryption as enc_mod
  1368. from backend.app.models.user import User
  1369. from backend.app.models.user_totp import UserTOTP
  1370. token = await self._create_admin_and_login(async_client)
  1371. # Look up the admin user created by login so we can attach a TOTP row.
  1372. admin_row = await db_session.execute(select(User).where(User.username == "admin1219"))
  1373. admin = admin_row.scalar_one()
  1374. # Seed a UserTOTP row encrypted under key A. No OIDC rows exist, so
  1375. # the endpoint's first branch (oidc_providers > 0) misses and the
  1376. # sample falls through to UserTOTP.
  1377. key_a_ciphertext = Fernet(Fernet.generate_key()).encrypt(b"original-totp-secret").decode()
  1378. db_session.add(UserTOTP(user_id=admin.id, _secret_enc=f"fernet:{key_a_ciphertext}", is_enabled=True))
  1379. await db_session.commit()
  1380. # Activate a DIFFERENT key — the TOTP-fallback sample-decrypt must fail.
  1381. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1382. enc_mod._fernet_instance = None
  1383. enc_mod._key_source = None
  1384. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1385. assert resp.status_code == 200, resp.text
  1386. data = resp.json()
  1387. assert data["key_configured"] is True
  1388. assert data["encrypted_rows"]["oidc_providers"] == 0, "test premise: no OIDC rows so TOTP branch fires"
  1389. assert data["encrypted_rows"]["user_totp"] >= 1
  1390. assert data["decryption_broken"] is True, "B4: TOTP-fallback sample-decrypt must detect wrong-key state"
  1391. @pytest.mark.asyncio
  1392. @pytest.mark.integration
  1393. async def test_status_surfaces_real_migration_error_count(self, async_client, db_session, monkeypatch, caplog):
  1394. """B2: a real migration with a poison row produces an error_count that
  1395. flows through to the endpoint's `migration_error_count` field.
  1396. Replaces an earlier tautology that patched the module-level counter
  1397. directly. The chained version verifies the full path: poison row →
  1398. per-row migration skip → ``get_migration_error_count()`` →
  1399. ``GET /encryption-status``.
  1400. """
  1401. import logging
  1402. from backend.app.core.database import _migrate_encrypt_legacy_secrets, get_migration_error_count
  1403. from backend.app.models.oidc_provider import OIDCProvider
  1404. token = await self._create_admin_and_login(async_client)
  1405. # Bind the migration's session factory to the test engine and activate a key.
  1406. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
  1407. from backend.app.core import database as db_mod
  1408. test_factory = async_sessionmaker(db_session.bind, class_=AsyncSession, expire_on_commit=False)
  1409. monkeypatch.setattr(db_mod, "async_session", test_factory)
  1410. from cryptography.fernet import Fernet
  1411. import backend.app.core.encryption as enc_mod
  1412. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1413. enc_mod._fernet_instance = None
  1414. # Two legacy plaintext rows; force the SECOND row's encrypt call to raise.
  1415. db_session.add_all(
  1416. [
  1417. OIDCProvider(
  1418. name="GoodRow",
  1419. issuer_url="https://good.example.com",
  1420. client_id="c1",
  1421. _client_secret_enc="plaintext-good",
  1422. scopes="openid email profile",
  1423. ),
  1424. OIDCProvider(
  1425. name="BadRow",
  1426. issuer_url="https://bad.example.com",
  1427. client_id="c2",
  1428. _client_secret_enc="plaintext-bad",
  1429. scopes="openid email profile",
  1430. ),
  1431. ]
  1432. )
  1433. await db_session.commit()
  1434. import backend.app.models.oidc_provider as oidc_mod
  1435. real_encrypt = oidc_mod.mfa_encrypt
  1436. call_count = [0]
  1437. def _sometimes_raise(value):
  1438. call_count[0] += 1
  1439. if call_count[0] == 2:
  1440. raise RuntimeError("simulated encrypt failure")
  1441. return real_encrypt(value)
  1442. monkeypatch.setattr(oidc_mod, "mfa_encrypt", _sometimes_raise)
  1443. with caplog.at_level(logging.ERROR, logger="backend.app.core.database"):
  1444. await _migrate_encrypt_legacy_secrets()
  1445. # Sanity: the migration's own counter saw the failure.
  1446. assert get_migration_error_count() == 1
  1447. # The endpoint must surface the same number — full path pinned, not just the getter.
  1448. resp = await async_client.get(self.STATUS_URL, headers={"Authorization": f"Bearer {token}"})
  1449. assert resp.status_code == 200, resp.text
  1450. data = resp.json()
  1451. assert data["migration_error_count"] == 1, (
  1452. "endpoint must report the actual migration outcome, not just read a stub global"
  1453. )
  1454. # ============================================================================
  1455. # TestEncryptionRoundtrip (E2E)
  1456. # ============================================================================
  1457. class TestEncryptionRoundtrip:
  1458. """End-to-end: writes via the property setter store ciphertext at the column
  1459. level; reads via the property getter return the original plaintext."""
  1460. @pytest.mark.asyncio
  1461. @pytest.mark.integration
  1462. async def test_oidc_provider_secret_encrypted_at_rest_e2e(self, db_session, monkeypatch):
  1463. from cryptography.fernet import Fernet
  1464. from sqlalchemy import select
  1465. import backend.app.core.encryption as enc_mod
  1466. from backend.app.models.oidc_provider import OIDCProvider
  1467. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1468. enc_mod._fernet_instance = None
  1469. provider = OIDCProvider(
  1470. name="E2E_OIDC",
  1471. issuer_url="https://e2e.example.com",
  1472. client_id="cid",
  1473. client_secret="my-real-client-secret", # via setter → encrypted
  1474. scopes="openid email profile",
  1475. is_enabled=True,
  1476. )
  1477. db_session.add(provider)
  1478. await db_session.commit()
  1479. # Raw column read: must be ciphertext, not the plaintext.
  1480. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == provider.id))
  1481. fetched = result.scalar_one()
  1482. assert fetched._client_secret_enc.startswith("fernet:")
  1483. assert fetched._client_secret_enc != "my-real-client-secret"
  1484. # Property read: returns original plaintext.
  1485. assert fetched.client_secret == "my-real-client-secret"
  1486. @pytest.mark.asyncio
  1487. @pytest.mark.integration
  1488. async def test_totp_secret_encrypted_at_rest_e2e(self, db_session, monkeypatch):
  1489. from cryptography.fernet import Fernet
  1490. from sqlalchemy import select
  1491. import backend.app.core.encryption as enc_mod
  1492. from backend.app.models.user import User
  1493. from backend.app.models.user_totp import UserTOTP
  1494. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  1495. enc_mod._fernet_instance = None
  1496. user = User(username="e2etotp1219", email="e@example.com", password_hash="x")
  1497. db_session.add(user)
  1498. await db_session.flush()
  1499. totp = UserTOTP(user_id=user.id, secret="JBSWY3DPEHPK3PXP", is_enabled=True)
  1500. db_session.add(totp)
  1501. await db_session.commit()
  1502. result = await db_session.execute(select(UserTOTP).where(UserTOTP.user_id == user.id))
  1503. fetched = result.scalar_one()
  1504. assert fetched._secret_enc.startswith("fernet:")
  1505. assert fetched._secret_enc != "JBSWY3DPEHPK3PXP"
  1506. assert fetched.secret == "JBSWY3DPEHPK3PXP"
  1507. # ============================================================================
  1508. # TestBackupKeyFiles
  1509. # Verifies that .mfa_encryption_key is included in backup ZIPs (so backups
  1510. # are self-contained) and restored with chmod 0600 — and that path-traversal
  1511. # payloads in a malicious ZIP are rejected.
  1512. # ============================================================================
  1513. class TestBackupKeyFiles:
  1514. @pytest.mark.asyncio
  1515. @pytest.mark.integration
  1516. async def test_backup_includes_mfa_encryption_key_when_present(self, async_client, monkeypatch, tmp_path):
  1517. import zipfile
  1518. from backend.app.api.routes.settings import create_backup_zip
  1519. from backend.app.core.config import settings as app_settings
  1520. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1521. # Ensure `app_settings.base_dir` follows DATA_DIR for this test by
  1522. # patching the module attribute (config caches it at import time).
  1523. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1524. key_path = tmp_path / ".mfa_encryption_key"
  1525. key_path.write_text("test-key-content")
  1526. zip_path, _filename = await create_backup_zip(output_path=tmp_path)
  1527. try:
  1528. with zipfile.ZipFile(zip_path) as zf:
  1529. names = zf.namelist()
  1530. assert ".mfa_encryption_key" in names
  1531. assert zf.read(".mfa_encryption_key").decode() == "test-key-content"
  1532. finally:
  1533. zip_path.unlink(missing_ok=True)
  1534. @pytest.mark.asyncio
  1535. @pytest.mark.integration
  1536. async def test_backup_skips_mfa_encryption_key_when_absent(self, async_client, monkeypatch, tmp_path):
  1537. import zipfile
  1538. from backend.app.api.routes.settings import create_backup_zip
  1539. from backend.app.core.config import settings as app_settings
  1540. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1541. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1542. # No .mfa_encryption_key written — must not crash.
  1543. zip_path, _filename = await create_backup_zip(output_path=tmp_path)
  1544. try:
  1545. with zipfile.ZipFile(zip_path) as zf:
  1546. names = zf.namelist()
  1547. assert ".mfa_encryption_key" not in names
  1548. finally:
  1549. zip_path.unlink(missing_ok=True)
  1550. @pytest.mark.asyncio
  1551. @pytest.mark.integration
  1552. async def test_restore_writes_key_files_with_chmod_0600(self, async_client, monkeypatch, tmp_path):
  1553. """T1: restore endpoint writes key file with mode 0o600.
  1554. Bypasses the SQLite-copy step via patches so execution reaches the
  1555. key-write code unconditionally — the previous version used a stub
  1556. ``b"SQLite format 3"`` which made ``sqlite3.backup()`` fail and the
  1557. key-write code never ran.
  1558. """
  1559. import io
  1560. import zipfile
  1561. from unittest.mock import AsyncMock, patch
  1562. from backend.app.core.config import settings as app_settings
  1563. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1564. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1565. # Build a minimal ZIP with a stub DB and the key file.
  1566. buf = io.BytesIO()
  1567. with zipfile.ZipFile(buf, "w") as zf:
  1568. zf.writestr("bambuddy.db", b"SQLite format 3")
  1569. zf.writestr(".mfa_encryption_key", "test-restored-key")
  1570. buf.seek(0)
  1571. with (
  1572. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1573. patch(
  1574. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1575. new_callable=AsyncMock,
  1576. ),
  1577. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1578. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1579. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1580. ):
  1581. resp = await async_client.post(
  1582. "/api/v1/settings/restore",
  1583. files={"file": ("backup.zip", buf, "application/zip")},
  1584. )
  1585. assert resp.status_code == 200
  1586. restored_key = tmp_path / ".mfa_encryption_key"
  1587. assert restored_key.exists()
  1588. assert restored_key.read_text() == "test-restored-key"
  1589. assert (restored_key.stat().st_mode & 0o777) == 0o600
  1590. @pytest.mark.asyncio
  1591. @pytest.mark.integration
  1592. async def test_restore_handles_missing_key_files(self, async_client, monkeypatch, tmp_path):
  1593. """T2: ZIP without key file → restore succeeds, no key written to DATA_DIR."""
  1594. import io
  1595. import zipfile
  1596. from unittest.mock import AsyncMock, patch
  1597. from backend.app.core.config import settings as app_settings
  1598. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1599. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1600. buf = io.BytesIO()
  1601. with zipfile.ZipFile(buf, "w") as zf:
  1602. zf.writestr("bambuddy.db", b"SQLite format 3")
  1603. # Intentionally no .mfa_encryption_key entry.
  1604. buf.seek(0)
  1605. with (
  1606. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1607. patch(
  1608. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1609. new_callable=AsyncMock,
  1610. ),
  1611. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1612. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1613. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1614. ):
  1615. resp = await async_client.post(
  1616. "/api/v1/settings/restore",
  1617. files={"file": ("backup.zip", buf, "application/zip")},
  1618. )
  1619. assert resp.status_code == 200
  1620. assert not (tmp_path / ".mfa_encryption_key").exists()
  1621. @pytest.mark.asyncio
  1622. @pytest.mark.integration
  1623. async def test_restore_aborts_db_swap_when_key_write_fails(self, async_client, monkeypatch, tmp_path):
  1624. """B1: when MFA key write fails, restore must abort BEFORE the database
  1625. swap so the live DB is not left with rows encrypted under a key that
  1626. no longer exists on disk."""
  1627. import io
  1628. import os
  1629. import zipfile
  1630. from unittest.mock import AsyncMock, patch
  1631. from backend.app.core.config import settings as app_settings
  1632. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1633. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1634. # Build ZIP with a key file that we will fail to write to DATA_DIR.
  1635. buf = io.BytesIO()
  1636. with zipfile.ZipFile(buf, "w") as zf:
  1637. zf.writestr("bambuddy.db", b"SQLite format 3 backup data")
  1638. zf.writestr(".mfa_encryption_key", "backup-key-content")
  1639. buf.seek(0)
  1640. # Track whether the database swap functions were called.
  1641. # If B1 is correct, key-write failure aborts BEFORE these run.
  1642. import_pg_mock = AsyncMock()
  1643. reinit_mock = AsyncMock()
  1644. init_mock = AsyncMock()
  1645. original_open = os.open
  1646. def _key_write_fails(path, flags, mode=0o777, **kwargs):
  1647. # `shutil.rmtree` calls os.open(... dir_fd=...) during temp-dir
  1648. # cleanup — accept and forward any extra kwargs so the mock
  1649. # doesn't break the cleanup path.
  1650. if str(path).endswith(".mfa_encryption_key.restore-tmp"):
  1651. raise OSError(28, "No space left on device", str(path))
  1652. return original_open(path, flags, mode, **kwargs)
  1653. with (
  1654. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1655. patch(
  1656. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1657. import_pg_mock,
  1658. ),
  1659. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1660. patch("backend.app.core.database.reinitialize_database", reinit_mock),
  1661. patch("backend.app.core.database.init_db", init_mock),
  1662. ):
  1663. monkeypatch.setattr(os, "open", _key_write_fails)
  1664. resp = await async_client.post(
  1665. "/api/v1/settings/restore",
  1666. files={"file": ("backup.zip", buf, "application/zip")},
  1667. )
  1668. assert resp.status_code == 500
  1669. assert "Database is unchanged" in resp.json().get("detail", "")
  1670. # Database swap functions must NOT have been called — the abort
  1671. # happens before that step.
  1672. import_pg_mock.assert_not_awaited()
  1673. reinit_mock.assert_not_awaited()
  1674. init_mock.assert_not_awaited()
  1675. # No partial key file should be left behind.
  1676. assert not (tmp_path / ".mfa_encryption_key").exists()
  1677. @pytest.mark.asyncio
  1678. @pytest.mark.integration
  1679. async def test_restore_resets_encryption_singleton_after_key_replace(self, async_client, monkeypatch, tmp_path):
  1680. """B1: after a successful key replace, the encryption singleton must be
  1681. cleared so init_db's re-encryption migration picks up the restored key
  1682. instead of the cached Fernet from the previous key.
  1683. """
  1684. import io
  1685. import zipfile
  1686. from unittest.mock import AsyncMock, patch
  1687. from cryptography.fernet import Fernet
  1688. import backend.app.core.encryption as enc_mod
  1689. from backend.app.core.config import settings as app_settings
  1690. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1691. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1692. # Pre-warm the singleton with an "old" key so we can detect the reset.
  1693. old_key = Fernet.generate_key().decode()
  1694. monkeypatch.setenv("MFA_ENCRYPTION_KEY", old_key)
  1695. enc_mod._fernet_instance = None
  1696. enc_mod._key_source = None
  1697. # Trigger lazy load → singleton holds the old Fernet.
  1698. assert enc_mod.is_encryption_active() is True
  1699. assert enc_mod._fernet_instance is not None
  1700. old_fernet_obj = enc_mod._fernet_instance
  1701. # Build ZIP that delivers a DIFFERENT key file.
  1702. new_key = Fernet.generate_key().decode()
  1703. assert new_key != old_key
  1704. buf = io.BytesIO()
  1705. with zipfile.ZipFile(buf, "w") as zf:
  1706. zf.writestr("bambuddy.db", b"SQLite format 3 backup data")
  1707. zf.writestr(".mfa_encryption_key", new_key)
  1708. buf.seek(0)
  1709. with (
  1710. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1711. patch(
  1712. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1713. new_callable=AsyncMock,
  1714. ),
  1715. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1716. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1717. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1718. ):
  1719. resp = await async_client.post(
  1720. "/api/v1/settings/restore",
  1721. files={"file": ("backup.zip", buf, "application/zip")},
  1722. )
  1723. assert resp.status_code == 200, resp.text
  1724. # The singleton must have been invalidated. The exact post-state depends
  1725. # on whether init_db (mocked) re-loaded the singleton, but the cached
  1726. # _fernet_instance reference from before the restore must not be the
  1727. # active one any more.
  1728. assert enc_mod._fernet_instance is None or enc_mod._fernet_instance is not old_fernet_obj, (
  1729. "B1: encryption singleton must be reset after key replace so init_db's migration picks up the restored key"
  1730. )
  1731. # The key file must be on disk with the new content.
  1732. restored = (tmp_path / ".mfa_encryption_key").read_text()
  1733. assert restored == new_key
  1734. @pytest.mark.asyncio
  1735. @pytest.mark.integration
  1736. async def test_restore_rejects_path_traversal_in_zip(self, async_client, monkeypatch, tmp_path):
  1737. """A4: ZIP with path-traversal entry → HTTP 400, no file written outside temp dir."""
  1738. import io
  1739. import zipfile
  1740. from backend.app.core.config import settings as app_settings
  1741. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1742. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1743. # Build ZIP with a relative path-traversal entry.
  1744. buf = io.BytesIO()
  1745. with zipfile.ZipFile(buf, "w") as zf:
  1746. zf.writestr("../etc/passwd", "root:x:0:0")
  1747. zf.writestr("bambuddy.db", b"SQLite format 3")
  1748. buf.seek(0)
  1749. resp = await async_client.post(
  1750. "/api/v1/settings/restore",
  1751. files={"file": ("backup.zip", buf, "application/zip")},
  1752. )
  1753. assert resp.status_code == 400
  1754. assert "unsafe path" in resp.json().get("detail", "").lower()
  1755. @pytest.mark.asyncio
  1756. @pytest.mark.integration
  1757. async def test_restore_rejects_prefix_collision_zipslip(self, async_client, monkeypatch, tmp_path):
  1758. """T1: ZIP entry with prefix-collision path must be rejected.
  1759. A startswith() check would accept '/tmp/abc_evil/file' when the
  1760. extraction root was '/tmp/abc' — is_relative_to correctly rejects it.
  1761. The restore handler creates a tempfile.TemporaryDirectory inside the
  1762. system temp dir; we craft an entry that resolves to a sibling path
  1763. whose name starts with the temp dir's basename.
  1764. """
  1765. import io
  1766. import zipfile
  1767. from backend.app.core.config import settings as app_settings
  1768. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1769. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1770. # Use a path with traversal — the resolved path will share the parent
  1771. # temp directory's basename as a prefix but NOT be inside the
  1772. # extraction root. We don't know the random extraction-root name at
  1773. # ZIP-build time, so we pick a literal "../poc-evil-prefix-collision/"
  1774. # which traverses up one level from the extraction root and lands in
  1775. # a sibling directory. is_relative_to() must reject this; a naive
  1776. # startswith() against the parent's parent would accept it.
  1777. evil_name = "../escaped-prefix-collision/poc.txt"
  1778. buf = io.BytesIO()
  1779. with zipfile.ZipFile(buf, "w") as zf:
  1780. zf.writestr(evil_name, "pwned")
  1781. zf.writestr("bambuddy.db", b"SQLite format 3\x00")
  1782. buf.seek(0)
  1783. resp = await async_client.post(
  1784. "/api/v1/settings/restore",
  1785. files={"file": ("backup.zip", buf, "application/zip")},
  1786. )
  1787. assert resp.status_code == 400
  1788. assert "unsafe path" in resp.json().get("detail", "").lower()
  1789. @pytest.mark.asyncio
  1790. @pytest.mark.integration
  1791. async def test_restore_rejects_absolute_path_in_zip(self, async_client, monkeypatch, tmp_path):
  1792. """B1: ZIP with an absolute path entry must be rejected by is_relative_to check."""
  1793. import io
  1794. import zipfile
  1795. from backend.app.core.config import settings as app_settings
  1796. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1797. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1798. buf = io.BytesIO()
  1799. with zipfile.ZipFile(buf, "w") as zf:
  1800. # Absolute path in the archive — extracts outside temp_path on
  1801. # systems where (temp_path / "/etc/passwd") resolves to /etc/passwd.
  1802. zf.writestr("/etc/passwd", "root:x:0:0")
  1803. zf.writestr("bambuddy.db", b"SQLite format 3")
  1804. buf.seek(0)
  1805. resp = await async_client.post(
  1806. "/api/v1/settings/restore",
  1807. files={"file": ("backup.zip", buf, "application/zip")},
  1808. )
  1809. assert resp.status_code == 400
  1810. assert "unsafe path" in resp.json().get("detail", "").lower()
  1811. @pytest.mark.asyncio
  1812. @pytest.mark.integration
  1813. async def test_backup_fails_when_key_file_unreadable(self, async_client, monkeypatch, tmp_path):
  1814. """A5: OSError while copying key file propagates out of create_backup_zip."""
  1815. import shutil
  1816. from backend.app.api.routes.settings import create_backup_zip
  1817. from backend.app.core.config import settings as app_settings
  1818. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1819. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1820. (tmp_path / ".mfa_encryption_key").write_text("key")
  1821. original_copy2 = shutil.copy2
  1822. def _raise_on_key(src, dst):
  1823. if ".mfa_encryption_key" in str(src):
  1824. raise OSError("simulated unreadable key file")
  1825. return original_copy2(src, dst)
  1826. monkeypatch.setattr(shutil, "copy2", _raise_on_key)
  1827. import pytest as _pytest
  1828. with _pytest.raises(OSError, match="simulated unreadable"):
  1829. await create_backup_zip(output_path=tmp_path)
  1830. @pytest.mark.asyncio
  1831. @pytest.mark.integration
  1832. async def test_backup_restore_roundtrip_preserves_encrypted_oidc_secret(
  1833. self, async_client, db_session, monkeypatch, tmp_path
  1834. ):
  1835. """T3: encrypt → backup → simulate key loss → restore → decrypt.
  1836. Verifies the user-facing promise that local backup ZIPs are
  1837. self-contained: an OIDC client_secret encrypted under one key still
  1838. decrypts after restore even when the running install no longer has
  1839. the key on disk or in the env. Exercises the B1 key-first restore
  1840. path and the B4 sample-decrypt status check together.
  1841. """
  1842. import zipfile
  1843. from pathlib import Path
  1844. from unittest.mock import AsyncMock, patch
  1845. from cryptography.fernet import Fernet
  1846. from sqlalchemy import select
  1847. import backend.app.core.encryption as enc_mod
  1848. from backend.app.api.routes.settings import create_backup_zip
  1849. from backend.app.core.config import settings as app_settings
  1850. from backend.app.models.oidc_provider import OIDCProvider
  1851. # 1. Pin a key, encrypt an OIDC secret via the property setter.
  1852. key = Fernet.generate_key().decode()
  1853. monkeypatch.setenv("MFA_ENCRYPTION_KEY", key)
  1854. monkeypatch.setenv("DATA_DIR", str(tmp_path))
  1855. monkeypatch.setattr(app_settings, "base_dir", tmp_path)
  1856. # Persist the key file too, so create_backup_zip picks it up.
  1857. (tmp_path / ".mfa_encryption_key").write_text(key)
  1858. enc_mod._fernet_instance = None
  1859. enc_mod._key_source = None
  1860. provider = OIDCProvider(
  1861. name="RoundtripProv",
  1862. issuer_url="https://rt.example.com",
  1863. client_id="cid",
  1864. client_secret="my-original-secret", # via setter -> encrypted
  1865. scopes="openid email profile",
  1866. is_enabled=True,
  1867. )
  1868. db_session.add(provider)
  1869. await db_session.commit()
  1870. original_id = provider.id
  1871. assert provider._client_secret_enc.startswith("fernet:")
  1872. # 2. Create a backup ZIP (must include .mfa_encryption_key).
  1873. zip_path, _ = await create_backup_zip(output_path=tmp_path)
  1874. try:
  1875. with zipfile.ZipFile(zip_path) as zf:
  1876. names = zf.namelist()
  1877. assert ".mfa_encryption_key" in names, "T3: backup ZIP must include the key file"
  1878. # 3. Simulate key loss: delete the key file from DATA_DIR, drop
  1879. # the env var, reset the cached fernet singleton.
  1880. (tmp_path / ".mfa_encryption_key").unlink()
  1881. monkeypatch.delenv("MFA_ENCRYPTION_KEY", raising=False)
  1882. enc_mod._fernet_instance = None
  1883. enc_mod._key_source = None
  1884. # 4. Restore the ZIP via the endpoint. Mock out the DB-swap
  1885. # (we keep the live in-memory test DB) and init_db side effects
  1886. # so this test focuses on the key-restore path.
  1887. with (
  1888. patch("backend.app.core.db_dialect.is_sqlite", return_value=False),
  1889. patch(
  1890. "backend.app.api.routes.settings._import_sqlite_to_postgres",
  1891. new_callable=AsyncMock,
  1892. ),
  1893. patch("backend.app.core.database.close_all_connections", new_callable=AsyncMock),
  1894. patch("backend.app.core.database.reinitialize_database", new_callable=AsyncMock),
  1895. patch("backend.app.core.database.init_db", new_callable=AsyncMock),
  1896. open(zip_path, "rb") as f,
  1897. ):
  1898. resp = await async_client.post(
  1899. "/api/v1/settings/restore",
  1900. files={"file": ("backup.zip", f, "application/zip")},
  1901. )
  1902. assert resp.status_code == 200, resp.text
  1903. # 5. Reset the singleton again (B1 already does this in production,
  1904. # but here init_db is mocked so we explicitly invalidate).
  1905. enc_mod._fernet_instance = None
  1906. enc_mod._key_source = None
  1907. # 6. The key file must be back on disk with restrictive permissions.
  1908. restored = Path(tmp_path) / ".mfa_encryption_key"
  1909. assert restored.exists(), "T3: key file must be restored to DATA_DIR"
  1910. assert (restored.stat().st_mode & 0o777) == 0o600
  1911. # 7. Decryption works again — the property getter must return the
  1912. # original plaintext, proving the restored key matches the
  1913. # cipher in the (still in-memory) DB row.
  1914. result = await db_session.execute(select(OIDCProvider).where(OIDCProvider.id == original_id))
  1915. restored_provider = result.scalar_one()
  1916. assert restored_provider.client_secret == "my-original-secret"
  1917. finally:
  1918. zip_path.unlink(missing_ok=True)
  1919. # ============================================================================
  1920. # TestTOTPDecryptionBroken (C9)
  1921. # Verifies the decryption-broken state (encrypted TOTP row + no key) for each
  1922. # TOTP endpoint. Behaviour differs between recovery-aware and non-recovery
  1923. # endpoints:
  1924. # - setup_totp / enable_totp / verify_2fa: HTTP 500 (no backup-code path).
  1925. # - disable_totp / regenerate_backup_codes: fall through to the backup-code
  1926. # branch — HTTP 200 with a valid backup code, HTTP 400 without.
  1927. # ============================================================================
  1928. class TestTOTPDecryptionBroken:
  1929. """C9: RuntimeError from mfa_decrypt — 500 for non-recovery endpoints,
  1930. backup-code fall-through for disable_totp / regenerate_backup_codes."""
  1931. async def _setup_admin_and_totp_user(self, async_client, db_session):
  1932. """Create admin (enables auth), log in as admin, add TOTP record with fernet secret."""
  1933. from backend.app.models.user_totp import UserTOTP
  1934. admin_username = f"admin_c9_{secrets.token_hex(4)}"
  1935. setup = await async_client.post(
  1936. "/api/v1/auth/setup",
  1937. json={
  1938. "auth_enabled": True,
  1939. "admin_username": admin_username,
  1940. "admin_password": "Admin_C9_Pass1!",
  1941. },
  1942. )
  1943. assert setup.status_code in (200, 201), setup.text
  1944. login = await async_client.post(
  1945. "/api/v1/auth/login",
  1946. json={"username": admin_username, "password": "Admin_C9_Pass1!"},
  1947. )
  1948. assert login.status_code == 200, login.text
  1949. token = login.json()["access_token"]
  1950. # Get the admin user_id from the /me endpoint
  1951. me = await async_client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
  1952. assert me.status_code == 200
  1953. user_id = me.json()["id"]
  1954. # Insert a TOTP row with a fernet-prefixed secret directly (no key needed for insert).
  1955. totp = UserTOTP(
  1956. user_id=user_id,
  1957. _secret_enc="fernet:gAAAAA-not-really-encrypted",
  1958. is_enabled=True,
  1959. )
  1960. db_session.add(totp)
  1961. await db_session.commit()
  1962. return token, admin_username, user_id
  1963. @pytest.mark.asyncio
  1964. @pytest.mark.integration
  1965. async def test_enable_totp_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  1966. """C9: enable endpoint → 500 when TOTP secret is encrypted but key unavailable."""
  1967. import backend.app.core.encryption as enc_mod
  1968. token, _, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  1969. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  1970. enc_mod._fernet_instance = None
  1971. # enable_totp requires setup-but-not-yet-enabled state; force is_enabled=False
  1972. from sqlalchemy import select as _select
  1973. from backend.app.models.user_totp import UserTOTP
  1974. result = await db_session.execute(_select(UserTOTP))
  1975. for t in result.scalars().all():
  1976. t.is_enabled = False
  1977. await db_session.commit()
  1978. resp = await async_client.post(
  1979. "/api/v1/auth/2fa/totp/enable",
  1980. json={"code": "123456"},
  1981. headers={"Authorization": f"Bearer {token}"},
  1982. )
  1983. assert resp.status_code == 500
  1984. assert "unavailable" in resp.json().get("detail", "").lower()
  1985. @pytest.mark.asyncio
  1986. @pytest.mark.integration
  1987. async def test_disable_totp_returns_400_when_decryption_broken_and_no_backup_codes(
  1988. self, async_client, db_session, monkeypatch
  1989. ):
  1990. """B2a + S3: disable falls through to backup-code branch when TOTP secret
  1991. cannot be decrypted; with no backup codes seeded, the request is
  1992. rejected as an invalid code (400), not a server error.
  1993. S3: AND the failed-attempt counter must NOT be incremented — the
  1994. cause was a server-side key loss, not a user mistake.
  1995. """
  1996. from sqlalchemy import select as _select
  1997. import backend.app.core.encryption as enc_mod
  1998. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  1999. token, admin_username, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  2000. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2001. enc_mod._fernet_instance = None
  2002. resp = await async_client.post(
  2003. "/api/v1/auth/2fa/totp/disable",
  2004. json={"code": "123456"},
  2005. headers={"Authorization": f"Bearer {token}"},
  2006. )
  2007. assert resp.status_code == 400
  2008. assert "invalid" in resp.json().get("detail", "").lower()
  2009. # S3: no fail-counter debit on server-side key loss.
  2010. events = (
  2011. (
  2012. await db_session.execute(
  2013. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2014. )
  2015. )
  2016. .scalars()
  2017. .all()
  2018. )
  2019. assert len(events) == 0, "S3: must not debit fail-counter on key-loss"
  2020. @pytest.mark.asyncio
  2021. @pytest.mark.integration
  2022. async def test_regenerate_backup_codes_returns_400_when_decryption_broken_and_no_backup_codes(
  2023. self, async_client, db_session, monkeypatch
  2024. ):
  2025. """B2b + S3: regenerate-backup-codes falls through to backup-code branch when
  2026. TOTP secret cannot be decrypted; with no backup codes seeded, the
  2027. request is rejected as an invalid code (400) AND the fail-counter
  2028. is NOT incremented (S3: server-side cause, not user mistake).
  2029. """
  2030. from sqlalchemy import select as _select
  2031. import backend.app.core.encryption as enc_mod
  2032. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2033. token, admin_username, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  2034. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2035. enc_mod._fernet_instance = None
  2036. resp = await async_client.post(
  2037. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2038. json={"code": "123456"},
  2039. headers={"Authorization": f"Bearer {token}"},
  2040. )
  2041. assert resp.status_code == 400
  2042. assert "invalid" in resp.json().get("detail", "").lower()
  2043. events = (
  2044. (
  2045. await db_session.execute(
  2046. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2047. )
  2048. )
  2049. .scalars()
  2050. .all()
  2051. )
  2052. assert len(events) == 0, "S3: must not debit fail-counter on key-loss"
  2053. @pytest.mark.asyncio
  2054. @pytest.mark.integration
  2055. async def test_disable_totp_succeeds_via_backup_code_when_decryption_broken(
  2056. self, async_client, db_session, monkeypatch
  2057. ):
  2058. """B2a: a valid backup code disables TOTP even when the secret cannot
  2059. be decrypted — recovery path for users who lost the encryption key."""
  2060. from sqlalchemy import select as _select
  2061. import backend.app.core.encryption as enc_mod
  2062. from backend.app.api.routes.mfa import _generate_backup_codes
  2063. from backend.app.models.user_totp import UserTOTP
  2064. token, _, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2065. # Seed a real backup-code hash on the existing TOTP row.
  2066. plain_codes, hashed_codes = _generate_backup_codes()
  2067. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2068. totp = result.scalar_one()
  2069. totp.backup_code_hashes = hashed_codes
  2070. await db_session.commit()
  2071. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2072. enc_mod._fernet_instance = None
  2073. resp = await async_client.post(
  2074. "/api/v1/auth/2fa/totp/disable",
  2075. json={"code": plain_codes[0]},
  2076. headers={"Authorization": f"Bearer {token}"},
  2077. )
  2078. assert resp.status_code == 200, resp.text
  2079. # The TOTP row must have been deleted.
  2080. result_after = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2081. assert result_after.scalar_one_or_none() is None
  2082. @pytest.mark.asyncio
  2083. @pytest.mark.integration
  2084. async def test_regenerate_backup_codes_succeeds_via_backup_code_when_decryption_broken(
  2085. self, async_client, db_session, monkeypatch
  2086. ):
  2087. """B2b: a valid backup code rotates the codes even when the secret
  2088. cannot be decrypted — recovery path mirrors disable_totp."""
  2089. from sqlalchemy import select as _select
  2090. import backend.app.core.encryption as enc_mod
  2091. from backend.app.api.routes.mfa import _generate_backup_codes
  2092. from backend.app.models.user_totp import UserTOTP
  2093. token, _, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2094. plain_codes, hashed_codes = _generate_backup_codes()
  2095. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2096. totp = result.scalar_one()
  2097. totp.backup_code_hashes = hashed_codes
  2098. await db_session.commit()
  2099. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2100. enc_mod._fernet_instance = None
  2101. resp = await async_client.post(
  2102. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2103. json={"code": plain_codes[0]},
  2104. headers={"Authorization": f"Bearer {token}"},
  2105. )
  2106. assert resp.status_code == 200, resp.text
  2107. body = resp.json()
  2108. assert "backup_codes" in body
  2109. assert len(body["backup_codes"]) == 10
  2110. @pytest.mark.asyncio
  2111. @pytest.mark.integration
  2112. async def test_disable_totp_wrong_code_with_seeded_hashes_returns_400_and_debits_counter(
  2113. self, async_client, db_session, monkeypatch
  2114. ):
  2115. """T2: with backup_code_hashes seeded AND a working encryption key,
  2116. a wrong code is rejected (400) AND the fail-counter IS incremented.
  2117. This pins the behaviour that a future refactor swallowing
  2118. compare_digest mismatches would still let the existing 'no codes
  2119. configured' tests pass — only this assertion exercises the actual
  2120. pwd_context.verify mismatch path.
  2121. """
  2122. from cryptography.fernet import Fernet
  2123. from sqlalchemy import select as _select
  2124. import backend.app.core.encryption as enc_mod
  2125. from backend.app.api.routes.mfa import _generate_backup_codes
  2126. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2127. from backend.app.models.user_totp import UserTOTP
  2128. # Active key — secret can be decrypted, this is NOT key-loss.
  2129. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  2130. enc_mod._fernet_instance = None
  2131. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2132. # Replace stub fernet:-prefixed value with a real encrypted secret so
  2133. # disable_totp's TOTP-decrypt path doesn't throw, AND seed real hashes.
  2134. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2135. totp = result.scalar_one()
  2136. totp.secret = "JBSWY3DPEHPK3PXP" # via setter -> mfa_encrypt
  2137. plain_codes, hashed_codes = _generate_backup_codes()
  2138. totp.backup_code_hashes = hashed_codes
  2139. await db_session.commit()
  2140. # Submit a code that matches NEITHER the TOTP nor any backup-code hash.
  2141. resp = await async_client.post(
  2142. "/api/v1/auth/2fa/totp/disable",
  2143. json={"code": "WRONGCD1"}, # wrong but well-formed
  2144. headers={"Authorization": f"Bearer {token}"},
  2145. )
  2146. assert resp.status_code == 400
  2147. assert "invalid" in resp.json().get("detail", "").lower()
  2148. # T2 + S3: with key intact, the fail-counter MUST increment for a
  2149. # real wrong-code attempt (this is the user-error path, not key-loss).
  2150. events = (
  2151. (
  2152. await db_session.execute(
  2153. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2154. )
  2155. )
  2156. .scalars()
  2157. .all()
  2158. )
  2159. assert len(events) >= 1, "T2: with key intact, wrong code must debit the fail-counter"
  2160. @pytest.mark.asyncio
  2161. @pytest.mark.integration
  2162. async def test_regenerate_backup_codes_wrong_code_with_seeded_hashes_returns_400_and_debits_counter(
  2163. self, async_client, db_session, monkeypatch
  2164. ):
  2165. """T2: same as the disable_totp variant for /regenerate-backup-codes."""
  2166. from cryptography.fernet import Fernet
  2167. from sqlalchemy import select as _select
  2168. import backend.app.core.encryption as enc_mod
  2169. from backend.app.api.routes.mfa import _generate_backup_codes
  2170. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2171. from backend.app.models.user_totp import UserTOTP
  2172. monkeypatch.setenv("MFA_ENCRYPTION_KEY", Fernet.generate_key().decode())
  2173. enc_mod._fernet_instance = None
  2174. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2175. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2176. totp = result.scalar_one()
  2177. totp.secret = "JBSWY3DPEHPK3PXP"
  2178. plain_codes, hashed_codes = _generate_backup_codes()
  2179. totp.backup_code_hashes = hashed_codes
  2180. await db_session.commit()
  2181. resp = await async_client.post(
  2182. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  2183. json={"code": "WRONGCD2"},
  2184. headers={"Authorization": f"Bearer {token}"},
  2185. )
  2186. assert resp.status_code == 400
  2187. assert "invalid" in resp.json().get("detail", "").lower()
  2188. events = (
  2189. (
  2190. await db_session.execute(
  2191. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2192. )
  2193. )
  2194. .scalars()
  2195. .all()
  2196. )
  2197. assert len(events) >= 1, "T2: with key intact, wrong code must debit the fail-counter"
  2198. @pytest.mark.asyncio
  2199. @pytest.mark.integration
  2200. async def test_disable_totp_wrong_code_with_seeded_hashes_at_keyloss_no_counter_debit(
  2201. self, async_client, db_session, monkeypatch
  2202. ):
  2203. """T2 + S3 cross-check: with hashes seeded but encryption key gone,
  2204. a wrong code returns 400 BUT the fail-counter MUST NOT increment.
  2205. This is the dual of the test above — same wrong-code 400 outcome,
  2206. but the counter debit is gated on the cause of failure (server-side
  2207. key loss must NOT penalise the user).
  2208. """
  2209. from sqlalchemy import select as _select
  2210. import backend.app.core.encryption as enc_mod
  2211. from backend.app.api.routes.mfa import _generate_backup_codes
  2212. from backend.app.models.auth_ephemeral import AuthRateLimitEvent
  2213. from backend.app.models.user_totp import UserTOTP
  2214. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2215. # Seed real hashes on the existing TOTP row.
  2216. result = await db_session.execute(_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2217. totp = result.scalar_one()
  2218. plain_codes, hashed_codes = _generate_backup_codes()
  2219. totp.backup_code_hashes = hashed_codes
  2220. await db_session.commit()
  2221. # Now simulate key loss.
  2222. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2223. enc_mod._fernet_instance = None
  2224. resp = await async_client.post(
  2225. "/api/v1/auth/2fa/totp/disable",
  2226. json={"code": "WRONGCD3"},
  2227. headers={"Authorization": f"Bearer {token}"},
  2228. )
  2229. assert resp.status_code == 400
  2230. # S3: counter MUST be unchanged — this is a server-side problem.
  2231. events = (
  2232. (
  2233. await db_session.execute(
  2234. _select(AuthRateLimitEvent).where(AuthRateLimitEvent.username == admin_username.lower())
  2235. )
  2236. )
  2237. .scalars()
  2238. .all()
  2239. )
  2240. assert len(events) == 0, "S3: must not debit fail-counter when cause is server-side key-loss"
  2241. @pytest.mark.asyncio
  2242. @pytest.mark.integration
  2243. async def test_setup_totp_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  2244. """B3: setup endpoint → 500 when an active TOTP secret can't be decrypted.
  2245. Replacing an active authenticator requires verifying the current TOTP
  2246. code; with no recovery (backup-code) path on this endpoint, the only
  2247. safe outcome is a 500 surface to the operator.
  2248. """
  2249. import backend.app.core.encryption as enc_mod
  2250. token, _, _ = await self._setup_admin_and_totp_user(async_client, db_session)
  2251. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2252. enc_mod._fernet_instance = None
  2253. resp = await async_client.post(
  2254. "/api/v1/auth/2fa/totp/setup",
  2255. json={"code": "123456"},
  2256. headers={"Authorization": f"Bearer {token}"},
  2257. )
  2258. assert resp.status_code == 500
  2259. assert "unavailable" in resp.json().get("detail", "").lower()
  2260. @pytest.mark.asyncio
  2261. @pytest.mark.integration
  2262. async def test_verify_2fa_returns_500_when_decryption_broken(self, async_client, db_session, monkeypatch):
  2263. """C9: verify endpoint (TOTP method) → 500 when TOTP secret unreadable."""
  2264. from datetime import datetime, timedelta, timezone
  2265. import backend.app.core.encryption as enc_mod
  2266. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  2267. token, admin_username, user_id = await self._setup_admin_and_totp_user(async_client, db_session)
  2268. monkeypatch.setattr(enc_mod, "_load_or_generate_key", lambda: (None, "none"))
  2269. enc_mod._fernet_instance = None
  2270. # Create a pre_auth token to simulate the post-login 2FA challenge step.
  2271. raw_token = secrets.token_urlsafe(32)
  2272. ephemeral = AuthEphemeralToken(
  2273. token=raw_token,
  2274. token_type="pre_auth",
  2275. username=admin_username,
  2276. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2277. )
  2278. db_session.add(ephemeral)
  2279. await db_session.commit()
  2280. resp = await async_client.post(
  2281. "/api/v1/auth/2fa/verify",
  2282. json={"pre_auth_token": raw_token, "method": "totp", "code": "123456"},
  2283. )
  2284. assert resp.status_code == 500
  2285. assert "unavailable" in resp.json().get("detail", "").lower()