test_mfa_api.py 157 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033
  1. """Integration tests for 2FA and OIDC API endpoints.
  2. Tests the full request/response cycle for:
  3. - GET /api/v1/auth/2fa/status
  4. - POST /api/v1/auth/2fa/totp/setup
  5. - POST /api/v1/auth/2fa/totp/enable
  6. - POST /api/v1/auth/2fa/totp/disable
  7. - POST /api/v1/auth/2fa/email/enable
  8. - POST /api/v1/auth/2fa/email/disable
  9. - POST /api/v1/auth/2fa/verify (TOTP, email, backup paths)
  10. - DELETE /api/v1/auth/2fa/admin/{user_id}
  11. - GET /api/v1/auth/oidc/providers
  12. - POST /api/v1/auth/oidc/providers
  13. - PATCH /api/v1/auth/oidc/providers/{id}
  14. - DELETE /api/v1/auth/oidc/providers/{id}
  15. """
  16. from __future__ import annotations
  17. import secrets
  18. import time
  19. from datetime import datetime, timedelta, timezone
  20. from unittest.mock import patch
  21. import jwt as pyjwt
  22. import pyotp
  23. import pytest
  24. from httpx import AsyncClient
  25. from passlib.context import CryptContext
  26. from sqlalchemy.ext.asyncio import AsyncSession
  27. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  28. from backend.app.models.user import User
  29. _pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  30. # ---------------------------------------------------------------------------
  31. # Fixtures / helpers
  32. # ---------------------------------------------------------------------------
  33. AUTH_SETUP_URL = "/api/v1/auth/setup"
  34. LOGIN_URL = "/api/v1/auth/login"
  35. def _norm_pw(password: str) -> str:
  36. """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
  37. if not any(c.isupper() for c in password):
  38. password = password[0].upper() + password[1:]
  39. if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
  40. password = password + "!"
  41. return password
  42. async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
  43. """Enable auth, create an admin user, login, and return the bearer token."""
  44. password = _norm_pw(password)
  45. await client.post(
  46. AUTH_SETUP_URL,
  47. json={
  48. "auth_enabled": True,
  49. "admin_username": username,
  50. "admin_password": password,
  51. },
  52. )
  53. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  54. assert resp.status_code == 200
  55. return resp.json()["access_token"]
  56. async def _login_get_pre_auth_token(client: AsyncClient, username: str, password: str) -> str:
  57. """Login a user who has 2FA enabled; return the pre_auth_token from the response."""
  58. password = _norm_pw(password)
  59. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  60. assert resp.status_code == 200
  61. data = resp.json()
  62. assert data["requires_2fa"] is True, f"Expected requires_2fa=True, got {data}"
  63. assert data["pre_auth_token"] is not None
  64. return data["pre_auth_token"]
  65. def _auth_header(token: str) -> dict[str, str]:
  66. return {"Authorization": f"Bearer {token}"}
  67. # ===========================================================================
  68. # 2FA Status
  69. # ===========================================================================
  70. class TestTwoFAStatus:
  71. """Tests for GET /api/v1/auth/2fa/status."""
  72. @pytest.mark.asyncio
  73. @pytest.mark.integration
  74. async def test_status_requires_auth(self, async_client: AsyncClient):
  75. response = await async_client.get("/api/v1/auth/2fa/status")
  76. assert response.status_code == 401
  77. @pytest.mark.asyncio
  78. @pytest.mark.integration
  79. async def test_status_default_disabled(self, async_client: AsyncClient):
  80. token = await _setup_and_login(async_client, "statususer", "statuspass123")
  81. response = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  82. assert response.status_code == 200
  83. data = response.json()
  84. assert data["totp_enabled"] is False
  85. assert data["email_otp_enabled"] is False
  86. assert data["backup_codes_remaining"] == 0
  87. # ===========================================================================
  88. # TOTP Setup
  89. # ===========================================================================
  90. class TestTOTPSetup:
  91. """Tests for POST /api/v1/auth/2fa/totp/setup."""
  92. @pytest.mark.asyncio
  93. @pytest.mark.integration
  94. async def test_setup_requires_auth(self, async_client: AsyncClient):
  95. response = await async_client.post("/api/v1/auth/2fa/totp/setup")
  96. assert response.status_code == 401
  97. @pytest.mark.asyncio
  98. @pytest.mark.integration
  99. async def test_setup_returns_secret_and_qr(self, async_client: AsyncClient):
  100. token = await _setup_and_login(async_client, "totpsetup", "totpsetup123")
  101. response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  102. assert response.status_code == 200
  103. data = response.json()
  104. assert "secret" in data
  105. assert len(data["secret"]) > 0
  106. assert "qr_code_b64" in data
  107. assert data["issuer"] == "Bambuddy"
  108. @pytest.mark.asyncio
  109. @pytest.mark.integration
  110. async def test_setup_secret_is_valid_base32(self, async_client: AsyncClient):
  111. token = await _setup_and_login(async_client, "totpbase32", "totpbase32pw")
  112. response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  113. assert response.status_code == 200
  114. secret = response.json()["secret"]
  115. # pyotp will raise on invalid base32
  116. totp = pyotp.TOTP(secret)
  117. assert len(totp.now()) == 6
  118. # ===========================================================================
  119. # TOTP Enable
  120. # ===========================================================================
  121. class TestTOTPEnable:
  122. """Tests for POST /api/v1/auth/2fa/totp/enable."""
  123. @pytest.mark.asyncio
  124. @pytest.mark.integration
  125. async def test_enable_without_setup_returns_400(self, async_client: AsyncClient):
  126. token = await _setup_and_login(async_client, "nosetupenable", "nosetupenable1")
  127. response = await async_client.post(
  128. "/api/v1/auth/2fa/totp/enable",
  129. json={"code": "123456"},
  130. headers=_auth_header(token),
  131. )
  132. assert response.status_code == 400
  133. @pytest.mark.asyncio
  134. @pytest.mark.integration
  135. async def test_enable_with_invalid_code_returns_400(self, async_client: AsyncClient):
  136. token = await _setup_and_login(async_client, "badcodeuser", "badcodeuser1")
  137. await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  138. response = await async_client.post(
  139. "/api/v1/auth/2fa/totp/enable",
  140. json={"code": "000000"},
  141. headers=_auth_header(token),
  142. )
  143. assert response.status_code == 400
  144. @pytest.mark.asyncio
  145. @pytest.mark.integration
  146. async def test_enable_with_valid_code_returns_backup_codes(self, async_client: AsyncClient):
  147. token = await _setup_and_login(async_client, "enableok", "enableok123")
  148. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  149. secret = setup_resp.json()["secret"]
  150. valid_code = pyotp.TOTP(secret).now()
  151. response = await async_client.post(
  152. "/api/v1/auth/2fa/totp/enable",
  153. json={"code": valid_code},
  154. headers=_auth_header(token),
  155. )
  156. assert response.status_code == 200
  157. data = response.json()
  158. assert "backup_codes" in data
  159. assert len(data["backup_codes"]) == 10
  160. for code in data["backup_codes"]:
  161. assert len(code) == 8
  162. @pytest.mark.asyncio
  163. @pytest.mark.integration
  164. async def test_status_reflects_enabled_totp(self, async_client: AsyncClient):
  165. token = await _setup_and_login(async_client, "statustotp", "statustotp1")
  166. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  167. secret = setup_resp.json()["secret"]
  168. valid_code = pyotp.TOTP(secret).now()
  169. await async_client.post(
  170. "/api/v1/auth/2fa/totp/enable",
  171. json={"code": valid_code},
  172. headers=_auth_header(token),
  173. )
  174. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  175. data = status_resp.json()
  176. assert data["totp_enabled"] is True
  177. assert data["backup_codes_remaining"] == 10
  178. # ===========================================================================
  179. # TOTP Disable
  180. # ===========================================================================
  181. class TestTOTPDisable:
  182. """Tests for POST /api/v1/auth/2fa/totp/disable."""
  183. @pytest.mark.asyncio
  184. @pytest.mark.integration
  185. async def test_disable_when_not_enabled_returns_400(self, async_client: AsyncClient):
  186. token = await _setup_and_login(async_client, "disablenoenab", "disablenoenab1")
  187. response = await async_client.post(
  188. "/api/v1/auth/2fa/totp/disable",
  189. json={"code": "123456"},
  190. headers=_auth_header(token),
  191. )
  192. assert response.status_code == 400
  193. @pytest.mark.asyncio
  194. @pytest.mark.integration
  195. async def test_disable_with_valid_code(self, async_client: AsyncClient):
  196. token = await _setup_and_login(async_client, "disableok", "disableok123")
  197. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  198. secret = setup_resp.json()["secret"]
  199. valid_code = pyotp.TOTP(secret).now()
  200. await async_client.post(
  201. "/api/v1/auth/2fa/totp/enable",
  202. json={"code": valid_code},
  203. headers=_auth_header(token),
  204. )
  205. # Disable with a fresh valid code
  206. disable_code = pyotp.TOTP(secret).now()
  207. response = await async_client.post(
  208. "/api/v1/auth/2fa/totp/disable",
  209. json={"code": disable_code},
  210. headers=_auth_header(token),
  211. )
  212. assert response.status_code == 200
  213. assert "disabled" in response.json()["message"].lower()
  214. # Status should now show disabled
  215. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  216. assert status_resp.json()["totp_enabled"] is False
  217. # ===========================================================================
  218. # Email OTP Enable/Disable
  219. # ===========================================================================
  220. class TestEmailOTP:
  221. """Tests for POST /api/v1/auth/2fa/email/enable, /enable/confirm and /disable."""
  222. @pytest.mark.asyncio
  223. @pytest.mark.integration
  224. async def test_enable_email_otp_without_email_returns_400(self, async_client: AsyncClient):
  225. """Users without an email address cannot enable email OTP."""
  226. token = await _setup_and_login(async_client, "noemailuser", "noemailuser1")
  227. response = await async_client.post("/api/v1/auth/2fa/email/enable", headers=_auth_header(token))
  228. assert response.status_code == 400
  229. assert "email" in response.json()["detail"].lower()
  230. @pytest.mark.asyncio
  231. @pytest.mark.integration
  232. async def test_confirm_enable_email_otp_happy_path(self, async_client: AsyncClient, db_session: AsyncSession):
  233. """Confirm step activates email OTP when setup_token + code are valid (C5)."""
  234. token = await _setup_and_login(async_client, "confirmenable", "confirmenable1")
  235. # Give user an email address directly (SMTP not available in tests)
  236. from sqlalchemy import select as sa_select
  237. result = await db_session.execute(sa_select(User).where(User.username == "confirmenable"))
  238. user = result.scalar_one()
  239. user.email = "confirmenable@example.com"
  240. await db_session.commit()
  241. # Inject a known setup token directly into the DB (bypasses SMTP)
  242. code = "123456"
  243. code_hash = _pwd_context.hash(code)
  244. setup_token = secrets.token_urlsafe(32)
  245. db_session.add(
  246. AuthEphemeralToken(
  247. token=setup_token,
  248. token_type="email_otp_setup",
  249. username="confirmenable",
  250. nonce=code_hash,
  251. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  252. )
  253. )
  254. await db_session.commit()
  255. resp = await async_client.post(
  256. "/api/v1/auth/2fa/email/enable/confirm",
  257. json={"setup_token": setup_token, "code": code},
  258. headers=_auth_header(token),
  259. )
  260. assert resp.status_code == 200
  261. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  262. assert status_resp.json()["email_otp_enabled"] is True
  263. @pytest.mark.asyncio
  264. @pytest.mark.integration
  265. async def test_confirm_enable_email_otp_wrong_code(self, async_client: AsyncClient, db_session: AsyncSession):
  266. """Wrong code on confirm step returns 400 and does not enable email OTP."""
  267. token = await _setup_and_login(async_client, "confirmwrong", "confirmwrong1")
  268. code_hash = _pwd_context.hash("654321")
  269. setup_token = secrets.token_urlsafe(32)
  270. db_session.add(
  271. AuthEphemeralToken(
  272. token=setup_token,
  273. token_type="email_otp_setup",
  274. username="confirmwrong",
  275. nonce=code_hash,
  276. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  277. )
  278. )
  279. await db_session.commit()
  280. resp = await async_client.post(
  281. "/api/v1/auth/2fa/email/enable/confirm",
  282. json={"setup_token": setup_token, "code": "000000"},
  283. headers=_auth_header(token),
  284. )
  285. assert resp.status_code == 400
  286. @pytest.mark.asyncio
  287. @pytest.mark.integration
  288. async def test_confirm_enable_email_otp_setup_token_is_single_use(
  289. self, async_client: AsyncClient, db_session: AsyncSession
  290. ):
  291. """Setup token is consumed on first use; replay returns 400."""
  292. token = await _setup_and_login(async_client, "confirmonce", "confirmonce1")
  293. code = "111111"
  294. code_hash = _pwd_context.hash(code)
  295. setup_token = secrets.token_urlsafe(32)
  296. db_session.add(
  297. AuthEphemeralToken(
  298. token=setup_token,
  299. token_type="email_otp_setup",
  300. username="confirmonce",
  301. nonce=code_hash,
  302. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  303. )
  304. )
  305. await db_session.commit()
  306. first = await async_client.post(
  307. "/api/v1/auth/2fa/email/enable/confirm",
  308. json={"setup_token": setup_token, "code": code},
  309. headers=_auth_header(token),
  310. )
  311. assert first.status_code == 200
  312. second = await async_client.post(
  313. "/api/v1/auth/2fa/email/enable/confirm",
  314. json={"setup_token": setup_token, "code": code},
  315. headers=_auth_header(token),
  316. )
  317. assert second.status_code == 400
  318. @pytest.mark.asyncio
  319. @pytest.mark.integration
  320. async def test_disable_email_otp_requires_password(self, async_client: AsyncClient):
  321. """Disabling email OTP requires the account password (C6: re-auth)."""
  322. token = await _setup_and_login(async_client, "disemailotp", "disemailotp1")
  323. # Wrong password → 401
  324. response = await async_client.post(
  325. "/api/v1/auth/2fa/email/disable",
  326. json={"password": "wrongpassword"},
  327. headers=_auth_header(token),
  328. )
  329. assert response.status_code == 401
  330. @pytest.mark.asyncio
  331. @pytest.mark.integration
  332. async def test_disable_email_otp_when_enabled(self, async_client: AsyncClient, db_session: AsyncSession):
  333. """Disabling email OTP when enabled turns it off and status reflects that."""
  334. token = await _setup_and_login(async_client, "disemailpw", "disemailpw1")
  335. # Enable email OTP via direct DB injection (no SMTP)
  336. code = "222222"
  337. setup_token = secrets.token_urlsafe(32)
  338. db_session.add(
  339. AuthEphemeralToken(
  340. token=setup_token,
  341. token_type="email_otp_setup",
  342. username="disemailpw",
  343. nonce=_pwd_context.hash(code),
  344. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  345. )
  346. )
  347. await db_session.commit()
  348. await async_client.post(
  349. "/api/v1/auth/2fa/email/enable/confirm",
  350. json={"setup_token": setup_token, "code": code},
  351. headers=_auth_header(token),
  352. )
  353. # Now disable
  354. response = await async_client.post(
  355. "/api/v1/auth/2fa/email/disable",
  356. json={"password": _norm_pw("disemailpw1")},
  357. headers=_auth_header(token),
  358. )
  359. assert response.status_code == 200
  360. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  361. assert status_resp.json()["email_otp_enabled"] is False
  362. # ===========================================================================
  363. # 2FA Verify — TOTP path
  364. # ===========================================================================
  365. class TestTwoFAVerifyTOTP:
  366. """Tests for POST /api/v1/auth/2fa/verify using the TOTP method."""
  367. @pytest.mark.asyncio
  368. @pytest.mark.integration
  369. async def test_verify_with_invalid_pre_auth_token(self, async_client: AsyncClient):
  370. response = await async_client.post(
  371. "/api/v1/auth/2fa/verify",
  372. json={"pre_auth_token": "bogus", "method": "totp", "code": "123456"},
  373. )
  374. assert response.status_code == 401
  375. @pytest.mark.asyncio
  376. @pytest.mark.integration
  377. async def test_verify_totp_issues_jwt(self, async_client: AsyncClient):
  378. """Full flow: setup → enable TOTP → login → pre_auth_token → verify → JWT."""
  379. token = await _setup_and_login(async_client, "verifytotpok", "verifytotpok1")
  380. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  381. secret = setup_resp.json()["secret"]
  382. valid_code = pyotp.TOTP(secret).now()
  383. await async_client.post(
  384. "/api/v1/auth/2fa/totp/enable",
  385. json={"code": valid_code},
  386. headers=_auth_header(token),
  387. )
  388. # Login now returns requires_2fa=True + pre_auth_token
  389. pre_auth_token = await _login_get_pre_auth_token(async_client, "verifytotpok", "verifytotpok1")
  390. verify_resp = await async_client.post(
  391. "/api/v1/auth/2fa/verify",
  392. json={
  393. "pre_auth_token": pre_auth_token,
  394. "method": "totp",
  395. "code": pyotp.TOTP(secret).now(),
  396. },
  397. )
  398. assert verify_resp.status_code == 200
  399. data = verify_resp.json()
  400. assert "access_token" in data
  401. assert data["token_type"] == "bearer"
  402. assert data["user"]["username"] == "verifytotpok"
  403. @pytest.mark.asyncio
  404. @pytest.mark.integration
  405. async def test_verify_totp_invalid_code(self, async_client: AsyncClient):
  406. token = await _setup_and_login(async_client, "verifybadcode", "verifybadcode1")
  407. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  408. secret = setup_resp.json()["secret"]
  409. valid_code = pyotp.TOTP(secret).now()
  410. await async_client.post(
  411. "/api/v1/auth/2fa/totp/enable",
  412. json={"code": valid_code},
  413. headers=_auth_header(token),
  414. )
  415. pre_auth_token = await _login_get_pre_auth_token(async_client, "verifybadcode", "verifybadcode1")
  416. verify_resp = await async_client.post(
  417. "/api/v1/auth/2fa/verify",
  418. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  419. )
  420. assert verify_resp.status_code == 401
  421. @pytest.mark.asyncio
  422. @pytest.mark.integration
  423. async def test_verify_invalid_method(self, async_client: AsyncClient):
  424. """An invalid 2FA method should return 400 even with a valid pre_auth_token."""
  425. token = await _setup_and_login(async_client, "invalidmethod", "invalidmethod1")
  426. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  427. secret = setup_resp.json()["secret"]
  428. valid_code = pyotp.TOTP(secret).now()
  429. await async_client.post(
  430. "/api/v1/auth/2fa/totp/enable",
  431. json={"code": valid_code},
  432. headers=_auth_header(token),
  433. )
  434. pre_auth_token = await _login_get_pre_auth_token(async_client, "invalidmethod", "invalidmethod1")
  435. response = await async_client.post(
  436. "/api/v1/auth/2fa/verify",
  437. json={"pre_auth_token": pre_auth_token, "method": "sms", "code": "123456"},
  438. )
  439. assert response.status_code == 422 # Pydantic Literal validation
  440. # ===========================================================================
  441. # 2FA Verify — Backup code path
  442. # ===========================================================================
  443. class TestTwoFAVerifyBackup:
  444. """Tests for POST /api/v1/auth/2fa/verify using the backup method."""
  445. @pytest.mark.asyncio
  446. @pytest.mark.integration
  447. async def test_verify_with_backup_code(self, async_client: AsyncClient):
  448. token = await _setup_and_login(async_client, "backupcodeok", "backupcodeok1")
  449. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  450. secret = setup_resp.json()["secret"]
  451. valid_code = pyotp.TOTP(secret).now()
  452. enable_resp = await async_client.post(
  453. "/api/v1/auth/2fa/totp/enable",
  454. json={"code": valid_code},
  455. headers=_auth_header(token),
  456. )
  457. backup_code = enable_resp.json()["backup_codes"][0]
  458. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcodeok", "backupcodeok1")
  459. verify_resp = await async_client.post(
  460. "/api/v1/auth/2fa/verify",
  461. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  462. )
  463. assert verify_resp.status_code == 200
  464. assert "access_token" in verify_resp.json()
  465. @pytest.mark.asyncio
  466. @pytest.mark.integration
  467. async def test_backup_code_is_single_use(self, async_client: AsyncClient):
  468. token = await _setup_and_login(async_client, "backupsingle", "backupsingle1")
  469. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  470. secret = setup_resp.json()["secret"]
  471. valid_code = pyotp.TOTP(secret).now()
  472. enable_resp = await async_client.post(
  473. "/api/v1/auth/2fa/totp/enable",
  474. json={"code": valid_code},
  475. headers=_auth_header(token),
  476. )
  477. backup_code = enable_resp.json()["backup_codes"][0]
  478. # First use — should succeed
  479. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
  480. first_resp = await async_client.post(
  481. "/api/v1/auth/2fa/verify",
  482. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  483. )
  484. assert first_resp.status_code == 200
  485. # Second use of the same code — must fail (need new pre_auth_token + same backup code)
  486. pre_auth_token2 = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
  487. second_resp = await async_client.post(
  488. "/api/v1/auth/2fa/verify",
  489. json={"pre_auth_token": pre_auth_token2, "method": "backup", "code": backup_code},
  490. )
  491. assert second_resp.status_code == 401
  492. @pytest.mark.asyncio
  493. @pytest.mark.integration
  494. async def test_backup_code_count_decrements(self, async_client: AsyncClient):
  495. token = await _setup_and_login(async_client, "backupcount", "backupcount1")
  496. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  497. secret = setup_resp.json()["secret"]
  498. valid_code = pyotp.TOTP(secret).now()
  499. enable_resp = await async_client.post(
  500. "/api/v1/auth/2fa/totp/enable",
  501. json={"code": valid_code},
  502. headers=_auth_header(token),
  503. )
  504. backup_code = enable_resp.json()["backup_codes"][0]
  505. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcount", "backupcount1")
  506. await async_client.post(
  507. "/api/v1/auth/2fa/verify",
  508. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  509. )
  510. # Status is readable with the original full token (still valid)
  511. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  512. assert status_resp.json()["backup_codes_remaining"] == 9
  513. # ===========================================================================
  514. # Rate Limiting
  515. # ===========================================================================
  516. class TestRateLimiting:
  517. """Ensure 429 is returned after 5 failed 2FA attempts."""
  518. @pytest.mark.asyncio
  519. @pytest.mark.integration
  520. async def test_rate_limit_lockout(self, async_client: AsyncClient):
  521. """After 5 failed TOTP attempts the 6th must return 429."""
  522. token = await _setup_and_login(async_client, "ratelimituser", "ratelimituser1")
  523. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  524. secret = setup_resp.json()["secret"]
  525. valid_code = pyotp.TOTP(secret).now()
  526. await async_client.post(
  527. "/api/v1/auth/2fa/totp/enable",
  528. json={"code": valid_code},
  529. headers=_auth_header(token),
  530. )
  531. # 5 failed attempts via the login → pre_auth_token → verify flow
  532. for _ in range(5):
  533. pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
  534. await async_client.post(
  535. "/api/v1/auth/2fa/verify",
  536. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  537. )
  538. # 6th attempt should hit the rate limit
  539. pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
  540. response = await async_client.post(
  541. "/api/v1/auth/2fa/verify",
  542. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  543. )
  544. assert response.status_code == 429
  545. # ===========================================================================
  546. # Admin 2FA Disable
  547. # ===========================================================================
  548. class TestAdminDisable2FA:
  549. """Tests for DELETE /api/v1/auth/2fa/admin/{user_id}."""
  550. @pytest.mark.asyncio
  551. @pytest.mark.integration
  552. async def test_admin_disable_requires_admin(self, async_client: AsyncClient):
  553. """Only admins can use the admin disable endpoint."""
  554. # The only user in a fresh setup IS admin, so just check the 404 path
  555. token = await _setup_and_login(async_client, "admincheck", "admincheck123")
  556. # Try to disable for a non-existent user_id — should get 200 (no-op) or 404
  557. response = await async_client.request(
  558. "DELETE",
  559. "/api/v1/auth/2fa/admin/99999",
  560. json={"admin_password": _norm_pw("admincheck123")},
  561. headers=_auth_header(token),
  562. )
  563. # Admin users succeed regardless (returns 200 even if user doesn't exist)
  564. assert response.status_code == 200
  565. @pytest.mark.asyncio
  566. @pytest.mark.integration
  567. async def test_admin_disable_clears_totp(self, async_client: AsyncClient):
  568. from sqlalchemy import select
  569. from backend.app.models.user import User
  570. token = await _setup_and_login(async_client, "admintotp", "admintotp123")
  571. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  572. secret = setup_resp.json()["secret"]
  573. valid_code = pyotp.TOTP(secret).now()
  574. await async_client.post(
  575. "/api/v1/auth/2fa/totp/enable",
  576. json={"code": valid_code},
  577. headers=_auth_header(token),
  578. )
  579. # Find the user's id by querying status (which works with the token)
  580. me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
  581. user_id = me_resp.json()["id"]
  582. response = await async_client.request(
  583. "DELETE",
  584. f"/api/v1/auth/2fa/admin/{user_id}",
  585. json={"admin_password": _norm_pw("admintotp123")},
  586. headers=_auth_header(token),
  587. )
  588. assert response.status_code == 200
  589. # I2: admin_disable_2fa bumps password_changed_at, invalidating the old token.
  590. # Re-login to get a fresh token before checking status.
  591. new_login = await async_client.post(
  592. LOGIN_URL, json={"username": "admintotp", "password": _norm_pw("admintotp123")}
  593. )
  594. assert new_login.status_code == 200, f"re-login failed: {new_login.json()}"
  595. assert new_login.json().get("requires_2fa") is False, f"still requires 2FA: {new_login.json()}"
  596. new_token = new_login.json()["access_token"]
  597. assert new_token is not None, f"no access_token in: {new_login.json()}"
  598. # Status should now show TOTP disabled
  599. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(new_token))
  600. assert status_resp.status_code == 200, f"status check failed: {status_resp.json()}"
  601. assert status_resp.json()["totp_enabled"] is False
  602. # ===========================================================================
  603. # OIDC Provider CRUD
  604. # ===========================================================================
  605. class TestOIDCProviders:
  606. """Tests for OIDC provider management endpoints."""
  607. @pytest.mark.asyncio
  608. @pytest.mark.integration
  609. async def test_list_public_providers_empty(self, async_client: AsyncClient):
  610. response = await async_client.get("/api/v1/auth/oidc/providers")
  611. assert response.status_code == 200
  612. assert isinstance(response.json(), list)
  613. @pytest.mark.asyncio
  614. @pytest.mark.integration
  615. async def test_create_provider_requires_admin(self, async_client: AsyncClient):
  616. token = await _setup_and_login(async_client, "oidcadmincreate", "oidcadmincreate1")
  617. response = await async_client.post(
  618. "/api/v1/auth/oidc/providers",
  619. json={
  620. "name": "PocketID",
  621. "issuer_url": "https://auth.example.com",
  622. "client_id": "bambuddy",
  623. "client_secret": "supersecret",
  624. "scopes": "openid email profile",
  625. "is_enabled": True,
  626. "auto_create_users": False,
  627. },
  628. headers=_auth_header(token),
  629. )
  630. assert response.status_code == 201
  631. data = response.json()
  632. assert data["name"] == "PocketID"
  633. assert data["issuer_url"] == "https://auth.example.com"
  634. assert "client_secret" not in data # Secret must not be returned
  635. @pytest.mark.asyncio
  636. @pytest.mark.integration
  637. async def test_created_provider_appears_in_all_list(self, async_client: AsyncClient):
  638. token = await _setup_and_login(async_client, "oidclistall", "oidclistall123")
  639. await async_client.post(
  640. "/api/v1/auth/oidc/providers",
  641. json={
  642. "name": "TestProvider",
  643. "issuer_url": "https://test.example.com",
  644. "client_id": "testclient",
  645. "client_secret": "testsecret",
  646. "scopes": "openid",
  647. "is_enabled": True,
  648. "auto_create_users": False,
  649. },
  650. headers=_auth_header(token),
  651. )
  652. response = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
  653. assert response.status_code == 200
  654. names = [p["name"] for p in response.json()]
  655. assert "TestProvider" in names
  656. @pytest.mark.asyncio
  657. @pytest.mark.integration
  658. async def test_disabled_provider_not_in_public_list(self, async_client: AsyncClient):
  659. token = await _setup_and_login(async_client, "oidcdisabled", "oidcdisabled1")
  660. await async_client.post(
  661. "/api/v1/auth/oidc/providers",
  662. json={
  663. "name": "DisabledProvider",
  664. "issuer_url": "https://disabled.example.com",
  665. "client_id": "dc",
  666. "client_secret": "ds",
  667. "scopes": "openid",
  668. "is_enabled": False,
  669. "auto_create_users": False,
  670. },
  671. headers=_auth_header(token),
  672. )
  673. response = await async_client.get("/api/v1/auth/oidc/providers")
  674. names = [p["name"] for p in response.json()]
  675. assert "DisabledProvider" not in names
  676. @pytest.mark.asyncio
  677. @pytest.mark.integration
  678. async def test_update_provider(self, async_client: AsyncClient):
  679. token = await _setup_and_login(async_client, "oidcupdate", "oidcupdate123")
  680. create_resp = await async_client.post(
  681. "/api/v1/auth/oidc/providers",
  682. json={
  683. "name": "OldName",
  684. "issuer_url": "https://update.example.com",
  685. "client_id": "uc",
  686. "client_secret": "us",
  687. "scopes": "openid",
  688. "is_enabled": True,
  689. "auto_create_users": False,
  690. },
  691. headers=_auth_header(token),
  692. )
  693. provider_id = create_resp.json()["id"]
  694. put_resp = await async_client.put(
  695. f"/api/v1/auth/oidc/providers/{provider_id}",
  696. json={"name": "NewName"},
  697. headers=_auth_header(token),
  698. )
  699. assert put_resp.status_code == 200
  700. assert put_resp.json()["name"] == "NewName"
  701. @pytest.mark.asyncio
  702. @pytest.mark.integration
  703. async def test_delete_provider(self, async_client: AsyncClient):
  704. token = await _setup_and_login(async_client, "oidcdelete", "oidcdelete123")
  705. create_resp = await async_client.post(
  706. "/api/v1/auth/oidc/providers",
  707. json={
  708. "name": "ToDelete",
  709. "issuer_url": "https://delete.example.com",
  710. "client_id": "dc",
  711. "client_secret": "ds",
  712. "scopes": "openid",
  713. "is_enabled": True,
  714. "auto_create_users": False,
  715. },
  716. headers=_auth_header(token),
  717. )
  718. provider_id = create_resp.json()["id"]
  719. del_resp = await async_client.delete(
  720. f"/api/v1/auth/oidc/providers/{provider_id}",
  721. headers=_auth_header(token),
  722. )
  723. assert del_resp.status_code == 200
  724. # No longer in list
  725. all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
  726. ids = [p["id"] for p in all_resp.json()]
  727. assert provider_id not in ids
  728. @pytest.mark.asyncio
  729. @pytest.mark.integration
  730. async def test_update_nonexistent_provider_returns_404(self, async_client: AsyncClient):
  731. token = await _setup_and_login(async_client, "oidc404", "oidc404pass1")
  732. response = await async_client.put(
  733. "/api/v1/auth/oidc/providers/99999",
  734. json={"name": "ghost"},
  735. headers=_auth_header(token),
  736. )
  737. assert response.status_code == 404
  738. # ===========================================================================
  739. # Security: pre-auth token single-use
  740. # ===========================================================================
  741. class TestPreAuthTokenSingleUse:
  742. """pre_auth_token must be consumed on successful 2FA and cannot be reused."""
  743. @pytest.mark.asyncio
  744. @pytest.mark.integration
  745. async def test_pre_auth_token_is_single_use(self, async_client: AsyncClient):
  746. """A pre_auth_token that was successfully used cannot be reused."""
  747. token = await _setup_and_login(async_client, "singleusepat", "singleusepat1")
  748. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  749. secret = setup_resp.json()["secret"]
  750. valid_code = pyotp.TOTP(secret).now()
  751. await async_client.post(
  752. "/api/v1/auth/2fa/totp/enable",
  753. json={"code": valid_code},
  754. headers=_auth_header(token),
  755. )
  756. pre_auth_token = await _login_get_pre_auth_token(async_client, "singleusepat", "singleusepat1")
  757. # First use — succeeds
  758. first = await async_client.post(
  759. "/api/v1/auth/2fa/verify",
  760. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  761. )
  762. assert first.status_code == 200
  763. # Second use of the same token — must fail (token already consumed on success)
  764. second = await async_client.post(
  765. "/api/v1/auth/2fa/verify",
  766. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  767. )
  768. assert second.status_code == 401
  769. @pytest.mark.asyncio
  770. @pytest.mark.integration
  771. async def test_pre_auth_token_survives_wrong_code(self, async_client: AsyncClient):
  772. """A wrong 2FA code must NOT burn the pre_auth_token (user can retry)."""
  773. token = await _setup_and_login(async_client, "survivepatuser", "survivepatuser1")
  774. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  775. secret = setup_resp.json()["secret"]
  776. valid_code = pyotp.TOTP(secret).now()
  777. await async_client.post(
  778. "/api/v1/auth/2fa/totp/enable",
  779. json={"code": valid_code},
  780. headers=_auth_header(token),
  781. )
  782. pre_auth_token = await _login_get_pre_auth_token(async_client, "survivepatuser", "survivepatuser1")
  783. # Wrong code — should fail but not burn the token
  784. bad = await async_client.post(
  785. "/api/v1/auth/2fa/verify",
  786. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  787. )
  788. assert bad.status_code == 401
  789. # Same token, correct code — should succeed (token still valid)
  790. good = await async_client.post(
  791. "/api/v1/auth/2fa/verify",
  792. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  793. )
  794. assert good.status_code == 200
  795. # ===========================================================================
  796. # Security: cross-user token isolation
  797. # ===========================================================================
  798. class TestCrossUserTokenIsolation:
  799. """A pre_auth_token issued for user A cannot authenticate as user B."""
  800. @pytest.mark.asyncio
  801. @pytest.mark.integration
  802. async def test_token_cannot_be_used_for_different_user(self, async_client: AsyncClient):
  803. """pre_auth_token is bound to the issuing user; using it to verify a different
  804. user's TOTP code must fail."""
  805. # Set up two users with TOTP
  806. token_a = await _setup_and_login(async_client, "crossusera", "crossusera1")
  807. setup_a = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token_a))
  808. secret_a = setup_a.json()["secret"]
  809. await async_client.post(
  810. "/api/v1/auth/2fa/totp/enable",
  811. json={"code": pyotp.TOTP(secret_a).now()},
  812. headers=_auth_header(token_a),
  813. )
  814. # Get pre_auth_token for user A
  815. pre_auth_a = await _login_get_pre_auth_token(async_client, "crossusera", "crossusera1")
  816. # Try to use user A's token but supply a clearly invalid code — must fail
  817. resp = await async_client.post(
  818. "/api/v1/auth/2fa/verify",
  819. json={"pre_auth_token": pre_auth_a, "method": "totp", "code": "000000"},
  820. )
  821. assert resp.status_code == 401
  822. # ===========================================================================
  823. # Security: admin disable non-admin rejection
  824. # ===========================================================================
  825. class TestAdminDisableNonAdminRejection:
  826. """Non-admin users must be rejected from the admin disable endpoint."""
  827. @pytest.mark.asyncio
  828. @pytest.mark.integration
  829. async def test_non_admin_cannot_disable_2fa(self, async_client: AsyncClient):
  830. """A regular (non-admin) user must receive 403 from DELETE /auth/2fa/admin/{id}."""
  831. # Set up admin, then create a regular user
  832. admin_token = await _setup_and_login(async_client, "adminusr2fa", "adminusr2fa1")
  833. # Create a regular user via user management
  834. create_resp = await async_client.post(
  835. "/api/v1/users",
  836. json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
  837. headers=_auth_header(admin_token),
  838. )
  839. assert create_resp.status_code == 201
  840. # Login as regular user
  841. login_resp = await async_client.post(
  842. LOGIN_URL,
  843. json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
  844. )
  845. regular_token = login_resp.json()["access_token"]
  846. # Try to call admin endpoint with the regular user's token
  847. resp = await async_client.delete(
  848. f"/api/v1/auth/2fa/admin/{create_resp.json()['id']}",
  849. headers=_auth_header(regular_token),
  850. )
  851. assert resp.status_code == 403
  852. # ===========================================================================
  853. # Regenerate backup codes
  854. # ===========================================================================
  855. class TestRegenerateBackupCodes:
  856. """Tests for POST /api/v1/auth/2fa/totp/regenerate-backup-codes."""
  857. @pytest.mark.asyncio
  858. @pytest.mark.integration
  859. async def test_regenerate_requires_totp_enabled(self, async_client: AsyncClient):
  860. token = await _setup_and_login(async_client, "regennototp", "regennototp1")
  861. resp = await async_client.post(
  862. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  863. json={"code": "123456"},
  864. headers=_auth_header(token),
  865. )
  866. assert resp.status_code == 400
  867. @pytest.mark.asyncio
  868. @pytest.mark.integration
  869. async def test_regenerate_invalidates_old_codes(self, async_client: AsyncClient):
  870. """After regenerating, old backup codes must no longer work."""
  871. token = await _setup_and_login(async_client, "regeninval", "regeninval1")
  872. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  873. secret = setup_resp.json()["secret"]
  874. enable_resp = await async_client.post(
  875. "/api/v1/auth/2fa/totp/enable",
  876. json={"code": pyotp.TOTP(secret).now()},
  877. headers=_auth_header(token),
  878. )
  879. old_backup = enable_resp.json()["backup_codes"][0]
  880. # Regenerate backup codes
  881. regen_resp = await async_client.post(
  882. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  883. json={"code": pyotp.TOTP(secret).now()},
  884. headers=_auth_header(token),
  885. )
  886. assert regen_resp.status_code == 200
  887. new_codes = regen_resp.json()["backup_codes"]
  888. assert len(new_codes) == 10
  889. assert old_backup not in new_codes
  890. # Old backup code must now fail
  891. pre_auth_token = await _login_get_pre_auth_token(async_client, "regeninval", "regeninval1")
  892. fail_resp = await async_client.post(
  893. "/api/v1/auth/2fa/verify",
  894. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": old_backup},
  895. )
  896. assert fail_resp.status_code == 401
  897. @pytest.mark.asyncio
  898. @pytest.mark.integration
  899. async def test_regenerate_with_invalid_code_fails(self, async_client: AsyncClient):
  900. token = await _setup_and_login(async_client, "regeninvcode", "regeninvcode1")
  901. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  902. secret = setup_resp.json()["secret"]
  903. await async_client.post(
  904. "/api/v1/auth/2fa/totp/enable",
  905. json={"code": pyotp.TOTP(secret).now()},
  906. headers=_auth_header(token),
  907. )
  908. resp = await async_client.post(
  909. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  910. json={"code": "000000"},
  911. headers=_auth_header(token),
  912. )
  913. assert resp.status_code == 400
  914. # ===========================================================================
  915. # Security: method field validation
  916. # ===========================================================================
  917. class TestVerifyMethodValidation:
  918. """The method field must be one of totp/email/backup (Pydantic Literal)."""
  919. @pytest.mark.asyncio
  920. @pytest.mark.integration
  921. async def test_invalid_method_rejected_by_schema(self, async_client: AsyncClient):
  922. """Pydantic should reject unknown method values with 422."""
  923. resp = await async_client.post(
  924. "/api/v1/auth/2fa/verify",
  925. json={"pre_auth_token": "anytoken", "code": "123456", "method": "sms"},
  926. )
  927. assert resp.status_code == 422
  928. @pytest.mark.asyncio
  929. @pytest.mark.integration
  930. async def test_oversized_pre_auth_token_rejected(self, async_client: AsyncClient):
  931. """pre_auth_token exceeding max_length=128 should be rejected with 422."""
  932. resp = await async_client.post(
  933. "/api/v1/auth/2fa/verify",
  934. json={"pre_auth_token": "x" * 200, "code": "123456", "method": "totp"},
  935. )
  936. assert resp.status_code == 422
  937. # ===========================================================================
  938. # Login response shape for 2FA users
  939. # ===========================================================================
  940. class TestLoginResponseShape:
  941. """Login for a 2FA-enabled user must return requires_2fa+pre_auth_token
  942. and must NOT include access_token (which would bypass the 2FA gate)."""
  943. @pytest.mark.asyncio
  944. @pytest.mark.integration
  945. async def test_login_2fa_user_omits_access_token(self, async_client: AsyncClient):
  946. """A user with TOTP enabled must not receive an access_token on /auth/login."""
  947. token = await _setup_and_login(async_client, "loginshape", "loginshape1")
  948. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  949. secret = setup_resp.json()["secret"]
  950. await async_client.post(
  951. "/api/v1/auth/2fa/totp/enable",
  952. json={"code": pyotp.TOTP(secret).now()},
  953. headers=_auth_header(token),
  954. )
  955. login_resp = await async_client.post(LOGIN_URL, json={"username": "loginshape", "password": "Loginshape1!"})
  956. assert login_resp.status_code == 200
  957. data = login_resp.json()
  958. assert data.get("requires_2fa") is True
  959. assert data.get("pre_auth_token") is not None
  960. # access_token must NOT be present — it would bypass the 2FA gate
  961. assert "access_token" not in data or data["access_token"] is None
  962. # ===========================================================================
  963. # TOTP replay protection
  964. # ===========================================================================
  965. async def _setup_totp_user(client: AsyncClient, username: str, password: str) -> tuple[str, str]:
  966. """Create user, set up and enable TOTP; return (bearer_token, totp_secret)."""
  967. token = await _setup_and_login(client, username, password)
  968. setup_resp = await client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  969. secret = setup_resp.json()["secret"]
  970. await client.post(
  971. "/api/v1/auth/2fa/totp/enable",
  972. json={"code": pyotp.TOTP(secret).now()},
  973. headers=_auth_header(token),
  974. )
  975. return token, secret
  976. class TestTOTPReplay:
  977. """The same TOTP code must not be accepted twice within one 30-second window."""
  978. @pytest.mark.asyncio
  979. @pytest.mark.integration
  980. async def test_totp_replay_rejected_on_verify(self, async_client: AsyncClient):
  981. """Replaying the same code on /2fa/verify must return 400."""
  982. _token, secret = await _setup_totp_user(async_client, "replayverify", "replayverify1")
  983. code = pyotp.TOTP(secret).now()
  984. pre_auth = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
  985. first = await async_client.post(
  986. "/api/v1/auth/2fa/verify",
  987. json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
  988. )
  989. assert first.status_code == 200
  990. # Second login to get a fresh pre_auth_token (first was consumed)
  991. pre_auth2 = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
  992. second = await async_client.post(
  993. "/api/v1/auth/2fa/verify",
  994. json={"pre_auth_token": pre_auth2, "method": "totp", "code": code},
  995. )
  996. assert second.status_code == 400
  997. @pytest.mark.asyncio
  998. @pytest.mark.integration
  999. async def test_totp_replay_rejected_on_disable(self, async_client: AsyncClient):
  1000. """A code already used in verify_2fa must be rejected on /2fa/totp/disable."""
  1001. _setup_token, secret = await _setup_totp_user(async_client, "replaydisable", "replaydisable1")
  1002. code = pyotp.TOTP(secret).now()
  1003. # Use the code in verify_2fa — this sets last_totp_counter in DB
  1004. pre_auth = await _login_get_pre_auth_token(async_client, "replaydisable", "replaydisable1")
  1005. verify_resp = await async_client.post(
  1006. "/api/v1/auth/2fa/verify",
  1007. json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
  1008. )
  1009. assert verify_resp.status_code == 200
  1010. authed_token = verify_resp.json()["access_token"]
  1011. # Replay the same code on disable — must be rejected (same 30-second window)
  1012. disable_resp = await async_client.post(
  1013. "/api/v1/auth/2fa/totp/disable",
  1014. json={"code": code},
  1015. headers=_auth_header(authed_token),
  1016. )
  1017. assert disable_resp.status_code == 400
  1018. # ===========================================================================
  1019. # Rate limiting on disable_totp and regenerate_backup_codes (I10)
  1020. # ===========================================================================
  1021. class TestRateLimitingDisableRegenerate:
  1022. """disable_totp and regenerate_backup_codes must enforce rate limiting (I10)."""
  1023. @pytest.mark.asyncio
  1024. @pytest.mark.integration
  1025. async def test_disable_totp_rate_limited_after_failures(self, async_client: AsyncClient):
  1026. """Repeated wrong codes on /2fa/totp/disable trigger 429."""
  1027. token, _secret = await _setup_totp_user(async_client, "rldisable", "rldisable1")
  1028. for _ in range(5):
  1029. await async_client.post(
  1030. "/api/v1/auth/2fa/totp/disable",
  1031. json={"code": "000000"},
  1032. headers=_auth_header(token),
  1033. )
  1034. resp = await async_client.post(
  1035. "/api/v1/auth/2fa/totp/disable",
  1036. json={"code": "000000"},
  1037. headers=_auth_header(token),
  1038. )
  1039. assert resp.status_code == 429
  1040. @pytest.mark.asyncio
  1041. @pytest.mark.integration
  1042. async def test_regenerate_backup_codes_rate_limited_after_failures(self, async_client: AsyncClient):
  1043. """Repeated wrong codes on /2fa/totp/regenerate-backup-codes trigger 429."""
  1044. token, _secret = await _setup_totp_user(async_client, "rlregen", "rlregen1")
  1045. for _ in range(5):
  1046. await async_client.post(
  1047. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  1048. json={"code": "000000"},
  1049. headers=_auth_header(token),
  1050. )
  1051. resp = await async_client.post(
  1052. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  1053. json={"code": "000000"},
  1054. headers=_auth_header(token),
  1055. )
  1056. assert resp.status_code == 429
  1057. # ===========================================================================
  1058. # Email OTP send → verify end-to-end (coverage gap C3)
  1059. # ===========================================================================
  1060. class TestEmailOTPSendVerify:
  1061. """Full email OTP login: send code → verify code → JWT."""
  1062. @pytest.mark.asyncio
  1063. @pytest.mark.integration
  1064. async def test_email_otp_send_and_verify(self, async_client: AsyncClient, db_session: AsyncSession):
  1065. """login → POST /2fa/email/send (patched SMTP) → POST /2fa/verify → JWT."""
  1066. import re
  1067. from unittest.mock import AsyncMock, MagicMock, patch
  1068. from sqlalchemy import select as sa_select
  1069. token = await _setup_and_login(async_client, "emailsendok", "emailsendok1")
  1070. # Give the user an email address
  1071. result = await db_session.execute(sa_select(User).where(User.username == "emailsendok"))
  1072. user = result.scalar_one()
  1073. user.email = "emailsendok@example.com"
  1074. await db_session.commit()
  1075. # Enable email OTP via DB injection
  1076. setup_code = "444444"
  1077. setup_token = secrets.token_urlsafe(32)
  1078. db_session.add(
  1079. AuthEphemeralToken(
  1080. token=setup_token,
  1081. token_type="email_otp_setup",
  1082. username="emailsendok",
  1083. nonce=_pwd_context.hash(setup_code),
  1084. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1085. )
  1086. )
  1087. await db_session.commit()
  1088. await async_client.post(
  1089. "/api/v1/auth/2fa/email/enable/confirm",
  1090. json={"setup_token": setup_token, "code": setup_code},
  1091. headers=_auth_header(token),
  1092. )
  1093. # Login now requires 2FA — get pre_auth_token (cookie set automatically)
  1094. pre_auth_token = await _login_get_pre_auth_token(async_client, "emailsendok", "emailsendok1")
  1095. # Mock SMTP and capture the sent OTP code
  1096. captured: dict[str, str] = {}
  1097. smtp_settings_mock = MagicMock()
  1098. def _capture_email(smtp_settings, to_email, subject, body_text, body_html):
  1099. m = re.search(r"login code is: (\d{6})", body_text)
  1100. if m:
  1101. captured["otp"] = m.group(1)
  1102. with (
  1103. patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_settings_mock)),
  1104. patch("backend.app.api.routes.mfa.send_email", side_effect=_capture_email),
  1105. ):
  1106. send_resp = await async_client.post(
  1107. "/api/v1/auth/2fa/email/send",
  1108. json={"pre_auth_token": pre_auth_token},
  1109. )
  1110. assert send_resp.status_code == 200, send_resp.text
  1111. fresh_token = send_resp.json()["pre_auth_token"]
  1112. assert "otp" in captured, "send_email was not called or code not found in body"
  1113. # Verify with the captured OTP code — cookie still in the async_client jar
  1114. verify_resp = await async_client.post(
  1115. "/api/v1/auth/2fa/verify",
  1116. json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
  1117. )
  1118. assert verify_resp.status_code == 200
  1119. data = verify_resp.json()
  1120. assert "access_token" in data
  1121. assert data["user"]["username"] == "emailsendok"
  1122. @pytest.mark.asyncio
  1123. @pytest.mark.integration
  1124. async def test_email_otp_wrong_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  1125. """A wrong email OTP code must return 401 without burning the pre_auth_token."""
  1126. from unittest.mock import AsyncMock, MagicMock, patch
  1127. from sqlalchemy import select as sa_select
  1128. token = await _setup_and_login(async_client, "emailwrongcode", "emailwrongcode1")
  1129. result = await db_session.execute(sa_select(User).where(User.username == "emailwrongcode"))
  1130. user = result.scalar_one()
  1131. user.email = "emailwrongcode@example.com"
  1132. await db_session.commit()
  1133. setup_code = "555555"
  1134. setup_token = secrets.token_urlsafe(32)
  1135. db_session.add(
  1136. AuthEphemeralToken(
  1137. token=setup_token,
  1138. token_type="email_otp_setup",
  1139. username="emailwrongcode",
  1140. nonce=_pwd_context.hash(setup_code),
  1141. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1142. )
  1143. )
  1144. await db_session.commit()
  1145. await async_client.post(
  1146. "/api/v1/auth/2fa/email/enable/confirm",
  1147. json={"setup_token": setup_token, "code": setup_code},
  1148. headers=_auth_header(token),
  1149. )
  1150. pre_auth_token = await _login_get_pre_auth_token(async_client, "emailwrongcode", "emailwrongcode1")
  1151. captured: dict[str, str] = {}
  1152. smtp_mock = MagicMock()
  1153. def _capture(smtp_settings, to_email, subject, body_text, body_html):
  1154. import re
  1155. m = re.search(r"login code is: (\d{6})", body_text)
  1156. if m:
  1157. captured["otp"] = m.group(1)
  1158. with (
  1159. patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_mock)),
  1160. patch("backend.app.api.routes.mfa.send_email", side_effect=_capture),
  1161. ):
  1162. send_resp = await async_client.post(
  1163. "/api/v1/auth/2fa/email/send",
  1164. json={"pre_auth_token": pre_auth_token},
  1165. )
  1166. assert send_resp.status_code == 200
  1167. fresh_token = send_resp.json()["pre_auth_token"]
  1168. # Wrong code → 401
  1169. bad = await async_client.post(
  1170. "/api/v1/auth/2fa/verify",
  1171. json={"pre_auth_token": fresh_token, "method": "email", "code": "000000"},
  1172. )
  1173. assert bad.status_code == 401
  1174. # Correct code still works (token not burned by wrong attempt)
  1175. good = await async_client.post(
  1176. "/api/v1/auth/2fa/verify",
  1177. json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
  1178. )
  1179. assert good.status_code == 200
  1180. # ===========================================================================
  1181. # OIDC end-to-end (coverage gap C4)
  1182. # ===========================================================================
  1183. def _make_test_rsa_key():
  1184. """Generate a throwaway RSA key pair and a matching JWK set for tests."""
  1185. import base64
  1186. from cryptography.hazmat.primitives import serialization
  1187. from cryptography.hazmat.primitives.asymmetric import rsa
  1188. private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  1189. private_pem = private_key.private_bytes(
  1190. serialization.Encoding.PEM,
  1191. serialization.PrivateFormat.TraditionalOpenSSL,
  1192. serialization.NoEncryption(),
  1193. )
  1194. pub_numbers = private_key.public_key().public_numbers()
  1195. def _b64url(n: int, length: int) -> str:
  1196. return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
  1197. jwks = {
  1198. "keys": [
  1199. {
  1200. "kty": "RSA",
  1201. "use": "sig",
  1202. "alg": "RS256",
  1203. "kid": "test-kid-1",
  1204. "n": _b64url(pub_numbers.n, 256),
  1205. "e": _b64url(pub_numbers.e, 3),
  1206. }
  1207. ]
  1208. }
  1209. return private_pem, jwks
  1210. class TestOIDCEndToEnd:
  1211. """Full OIDC auth-code flow: state → callback (mocked IdP) → exchange → JWT."""
  1212. @pytest.mark.asyncio
  1213. @pytest.mark.integration
  1214. async def test_oidc_callback_creates_user_and_issues_jwt(self, async_client: AsyncClient, db_session: AsyncSession):
  1215. """callback validates the mocked ID token, creates a user, and redirects
  1216. with an oidc_exchange token; exchanging that token returns a full JWT."""
  1217. import time
  1218. from unittest.mock import patch
  1219. import jwt as pyjwt
  1220. private_pem, jwks_data = _make_test_rsa_key()
  1221. issuer = "https://idp.test.example.com"
  1222. client_id = "oidc-test-client"
  1223. nonce = secrets.token_urlsafe(16)
  1224. now = int(time.time())
  1225. id_token = pyjwt.encode(
  1226. {
  1227. "sub": "oidc-sub-e2e",
  1228. "iss": issuer,
  1229. "aud": client_id,
  1230. "nonce": nonce,
  1231. "email": "oidce2e@example.com",
  1232. "email_verified": True,
  1233. "iat": now,
  1234. "exp": now + 300,
  1235. },
  1236. private_pem,
  1237. algorithm="RS256",
  1238. headers={"kid": "test-kid-1"},
  1239. )
  1240. # Create OIDC provider
  1241. admin_token = await _setup_and_login(async_client, "oidce2eadm", "oidce2eadm1")
  1242. create_resp = await async_client.post(
  1243. "/api/v1/auth/oidc/providers",
  1244. json={
  1245. "name": "E2E-IdP",
  1246. "issuer_url": issuer,
  1247. "client_id": client_id,
  1248. "client_secret": "test-secret",
  1249. "scopes": "openid email profile",
  1250. "is_enabled": True,
  1251. "auto_create_users": True,
  1252. },
  1253. headers=_auth_header(admin_token),
  1254. )
  1255. assert create_resp.status_code == 201
  1256. provider_id = create_resp.json()["id"]
  1257. # Simulate the authorize step: insert an oidc_state token directly
  1258. state = secrets.token_urlsafe(32)
  1259. code_verifier = secrets.token_urlsafe(48)
  1260. db_session.add(
  1261. AuthEphemeralToken(
  1262. token=state,
  1263. token_type="oidc_state",
  1264. provider_id=provider_id,
  1265. nonce=nonce,
  1266. code_verifier=code_verifier,
  1267. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  1268. )
  1269. )
  1270. await db_session.commit()
  1271. # Mock httpx calls made inside oidc_callback
  1272. discovery_doc = {
  1273. "issuer": issuer,
  1274. "authorization_endpoint": f"{issuer}/auth",
  1275. "token_endpoint": f"{issuer}/token",
  1276. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  1277. }
  1278. token_response = {
  1279. "access_token": "mock-access",
  1280. "token_type": "Bearer",
  1281. "id_token": id_token,
  1282. }
  1283. class _MockResp:
  1284. def __init__(self, data):
  1285. self._data = data
  1286. self.status_code = 200
  1287. self.is_success = True
  1288. self.text = str(data)
  1289. def json(self):
  1290. return self._data
  1291. def raise_for_status(self):
  1292. pass
  1293. class _MockHttpxClient:
  1294. def __init__(self, *args, **kwargs):
  1295. pass
  1296. async def __aenter__(self):
  1297. return self
  1298. async def __aexit__(self, *args):
  1299. pass
  1300. async def get(self, url, **kwargs):
  1301. if "jwks" in url:
  1302. return _MockResp(jwks_data)
  1303. return _MockResp(discovery_doc)
  1304. async def post(self, url, **kwargs):
  1305. return _MockResp(token_response)
  1306. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  1307. callback_resp = await async_client.get(
  1308. f"/api/v1/auth/oidc/callback?code=test-auth-code&state={state}",
  1309. follow_redirects=False,
  1310. )
  1311. assert callback_resp.status_code == 302, callback_resp.text
  1312. location = callback_resp.headers.get("location", "")
  1313. assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
  1314. # Extract and exchange the oidc_exchange token
  1315. oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0]
  1316. exchange_resp = await async_client.post(
  1317. "/api/v1/auth/oidc/exchange",
  1318. json={"oidc_token": oidc_exchange_token},
  1319. )
  1320. assert exchange_resp.status_code == 200
  1321. data = exchange_resp.json()
  1322. assert "access_token" in data
  1323. assert data["user"]["username"] is not None
  1324. @pytest.mark.asyncio
  1325. @pytest.mark.integration
  1326. async def test_oidc_callback_invalid_state_redirects_error(self, async_client: AsyncClient):
  1327. """An unknown state token must redirect to /?oidc_error=invalid_state."""
  1328. resp = await async_client.get(
  1329. "/api/v1/auth/oidc/callback?code=x&state=totally-bogus-state",
  1330. follow_redirects=False,
  1331. )
  1332. assert resp.status_code == 302
  1333. assert "invalid_state" in resp.headers.get("location", "")
  1334. @pytest.mark.asyncio
  1335. @pytest.mark.integration
  1336. async def test_oidc_state_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
  1337. """Replaying the same state token must fail on the second callback."""
  1338. import time
  1339. from unittest.mock import patch
  1340. import jwt as pyjwt
  1341. private_pem, jwks_data = _make_test_rsa_key()
  1342. issuer = "https://idp2.test.example.com"
  1343. client_id = "oidc-client-2"
  1344. nonce = secrets.token_urlsafe(16)
  1345. now = int(time.time())
  1346. id_token = pyjwt.encode(
  1347. {
  1348. "sub": "sub-single-use",
  1349. "iss": issuer,
  1350. "aud": client_id,
  1351. "nonce": nonce,
  1352. "email": "su@example.com",
  1353. "email_verified": True,
  1354. "iat": now,
  1355. "exp": now + 300,
  1356. },
  1357. private_pem,
  1358. algorithm="RS256",
  1359. headers={"kid": "test-kid-1"},
  1360. )
  1361. admin_token = await _setup_and_login(async_client, "oidcsuadm", "oidcsuadm1")
  1362. cr = await async_client.post(
  1363. "/api/v1/auth/oidc/providers",
  1364. json={
  1365. "name": "SU-IdP",
  1366. "issuer_url": issuer,
  1367. "client_id": client_id,
  1368. "client_secret": "s",
  1369. "scopes": "openid",
  1370. "is_enabled": True,
  1371. "auto_create_users": True,
  1372. },
  1373. headers=_auth_header(admin_token),
  1374. )
  1375. provider_id = cr.json()["id"]
  1376. state = secrets.token_urlsafe(32)
  1377. db_session.add(
  1378. AuthEphemeralToken(
  1379. token=state,
  1380. token_type="oidc_state",
  1381. provider_id=provider_id,
  1382. nonce=nonce,
  1383. code_verifier=secrets.token_urlsafe(48),
  1384. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  1385. )
  1386. )
  1387. await db_session.commit()
  1388. discovery_doc = {
  1389. "issuer": issuer,
  1390. "authorization_endpoint": f"{issuer}/auth",
  1391. "token_endpoint": f"{issuer}/token",
  1392. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  1393. }
  1394. token_response = {"access_token": "a", "token_type": "Bearer", "id_token": id_token}
  1395. class _MockResp:
  1396. def __init__(self, data):
  1397. self._data = data
  1398. self.status_code = 200
  1399. self.is_success = True
  1400. self.text = str(data)
  1401. def json(self):
  1402. return self._data
  1403. def raise_for_status(self):
  1404. pass
  1405. class _MockHttpxClient:
  1406. def __init__(self, *a, **kw):
  1407. pass
  1408. async def __aenter__(self):
  1409. return self
  1410. async def __aexit__(self, *a):
  1411. pass
  1412. async def get(self, url, **kw):
  1413. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  1414. async def post(self, url, **kw):
  1415. return _MockResp(token_response)
  1416. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  1417. first = await async_client.get(
  1418. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  1419. follow_redirects=False,
  1420. )
  1421. assert first.status_code == 302
  1422. assert "oidc_token=" in first.headers.get("location", "")
  1423. # Replay: second callback with the same state must fail
  1424. second = await async_client.get(
  1425. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  1426. follow_redirects=False,
  1427. )
  1428. assert second.status_code == 302
  1429. assert "invalid_state" in second.headers.get("location", "")
  1430. # ===========================================================================
  1431. # H-2: Wrong code must NOT consume the email OTP setup token (peek-then-consume)
  1432. # ===========================================================================
  1433. class TestEmailOTPSetupTokenPreservedOnWrongCode:
  1434. """After H-2 fix: a wrong code leaves the setup token intact so the user can retry."""
  1435. @pytest.mark.asyncio
  1436. @pytest.mark.integration
  1437. async def test_wrong_code_does_not_consume_setup_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1438. """Wrong code returns 400 but the setup token survives; correct code then works."""
  1439. token = await _setup_and_login(async_client, "h2retryuser", "h2retrypass1")
  1440. code = "999999"
  1441. code_hash = _pwd_context.hash(code)
  1442. setup_token = secrets.token_urlsafe(32)
  1443. db_session.add(
  1444. AuthEphemeralToken(
  1445. token=setup_token,
  1446. token_type="email_otp_setup",
  1447. username="h2retryuser",
  1448. nonce=code_hash,
  1449. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1450. )
  1451. )
  1452. await db_session.commit()
  1453. # First attempt: wrong code → 400
  1454. wrong = await async_client.post(
  1455. "/api/v1/auth/2fa/email/enable/confirm",
  1456. json={"setup_token": setup_token, "code": "000000"},
  1457. headers=_auth_header(token),
  1458. )
  1459. assert wrong.status_code == 400
  1460. # Second attempt: correct code → must succeed (token was NOT consumed)
  1461. correct = await async_client.post(
  1462. "/api/v1/auth/2fa/email/enable/confirm",
  1463. json={"setup_token": setup_token, "code": code},
  1464. headers=_auth_header(token),
  1465. )
  1466. assert correct.status_code == 200
  1467. # ===========================================================================
  1468. # M-2: New OIDC provider must default to auto_link_existing_accounts=False
  1469. # ===========================================================================
  1470. class TestOIDCProviderAutoLinkDefault:
  1471. """auto_link_existing_accounts must default to False (M-2 fix)."""
  1472. @pytest.mark.asyncio
  1473. @pytest.mark.integration
  1474. async def test_new_provider_auto_link_defaults_to_false(self, async_client: AsyncClient):
  1475. token = await _setup_and_login(async_client, "m2autolinkadmin", "m2autolinkadmin1")
  1476. resp = await async_client.post(
  1477. "/api/v1/auth/oidc/providers",
  1478. json={
  1479. "name": "AutoLinkTest",
  1480. "issuer_url": "https://autolink.example.com",
  1481. "client_id": "alc",
  1482. "client_secret": "als",
  1483. "scopes": "openid",
  1484. "is_enabled": True,
  1485. "auto_create_users": False,
  1486. # auto_link_existing_accounts intentionally omitted
  1487. },
  1488. headers=_auth_header(token),
  1489. )
  1490. assert resp.status_code == 201
  1491. assert resp.json()["auto_link_existing_accounts"] is False
  1492. # ===========================================================================
  1493. # L-5: 2FA verify code format validation
  1494. # ===========================================================================
  1495. class TestTwoFAVerifyCodeFormat:
  1496. """TwoFAVerifyRequest.code must be 6–8 alphanumeric characters (L-5)."""
  1497. @pytest.mark.asyncio
  1498. @pytest.mark.integration
  1499. async def test_code_too_long_rejected(self, async_client: AsyncClient):
  1500. """code > 8 characters must be rejected with 422."""
  1501. resp = await async_client.post(
  1502. "/api/v1/auth/2fa/verify",
  1503. json={"pre_auth_token": "anytoken", "code": "1" * 9, "method": "totp"},
  1504. )
  1505. assert resp.status_code == 422
  1506. @pytest.mark.asyncio
  1507. @pytest.mark.integration
  1508. async def test_code_non_alphanumeric_rejected(self, async_client: AsyncClient):
  1509. """code containing non-alphanumeric chars must be rejected with 422."""
  1510. resp = await async_client.post(
  1511. "/api/v1/auth/2fa/verify",
  1512. json={"pre_auth_token": "anytoken", "code": "12-456", "method": "totp"},
  1513. )
  1514. assert resp.status_code == 422
  1515. @pytest.mark.asyncio
  1516. @pytest.mark.integration
  1517. async def test_code_too_short_rejected(self, async_client: AsyncClient):
  1518. """code < 6 characters must be rejected with 422."""
  1519. resp = await async_client.post(
  1520. "/api/v1/auth/2fa/verify",
  1521. json={"pre_auth_token": "anytoken", "code": "12345", "method": "totp"},
  1522. )
  1523. assert resp.status_code == 422
  1524. @pytest.mark.asyncio
  1525. @pytest.mark.integration
  1526. async def test_code_exactly_6_passes_schema(self, async_client: AsyncClient):
  1527. """6-character alphanumeric code passes schema (may fail 2FA logic with 400)."""
  1528. resp = await async_client.post(
  1529. "/api/v1/auth/2fa/verify",
  1530. json={"pre_auth_token": "x" * 32, "code": "123456", "method": "totp"},
  1531. )
  1532. assert resp.status_code != 422
  1533. @pytest.mark.asyncio
  1534. @pytest.mark.integration
  1535. async def test_code_exactly_8_passes_schema(self, async_client: AsyncClient):
  1536. """8-character alphanumeric backup code passes schema."""
  1537. resp = await async_client.post(
  1538. "/api/v1/auth/2fa/verify",
  1539. json={"pre_auth_token": "x" * 32, "code": "ABCD1234", "method": "backup"},
  1540. )
  1541. assert resp.status_code != 422
  1542. # ===========================================================================
  1543. # M-NEW-1: verify_slicer_download_token must NOT consume token on wrong resource
  1544. # ===========================================================================
  1545. class TestSlicerTokenResourceBinding:
  1546. """Token for resource A must survive a wrong-resource check and still work for A."""
  1547. @pytest.mark.asyncio
  1548. @pytest.mark.integration
  1549. async def test_wrong_resource_does_not_consume_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1550. """A slicer token bound to archive:5 must NOT be consumed when checked against archive:6."""
  1551. from datetime import datetime, timedelta, timezone
  1552. from backend.app.core.auth import verify_slicer_download_token
  1553. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  1554. now = datetime.now(timezone.utc)
  1555. token_val = secrets.token_urlsafe(24)
  1556. db_session.add(
  1557. AuthEphemeralToken(
  1558. token=token_val,
  1559. token_type="slicer_download",
  1560. nonce="archive:5",
  1561. expires_at=now + timedelta(minutes=5),
  1562. )
  1563. )
  1564. await db_session.commit()
  1565. # Wrong resource → must return False and NOT consume the token
  1566. wrong = await verify_slicer_download_token(token_val, "archive", 6)
  1567. assert wrong is False
  1568. # Correct resource → must return True (token survived the wrong-resource check)
  1569. correct = await verify_slicer_download_token(token_val, "archive", 5)
  1570. assert correct is True
  1571. @pytest.mark.asyncio
  1572. @pytest.mark.integration
  1573. async def test_correct_resource_consumes_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1574. """A slicer token is single-use: second correct-resource check must return False."""
  1575. from datetime import datetime, timedelta, timezone
  1576. from backend.app.core.auth import verify_slicer_download_token
  1577. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  1578. now = datetime.now(timezone.utc)
  1579. token_val = secrets.token_urlsafe(24)
  1580. db_session.add(
  1581. AuthEphemeralToken(
  1582. token=token_val,
  1583. token_type="slicer_download",
  1584. nonce="library:99",
  1585. expires_at=now + timedelta(minutes=5),
  1586. )
  1587. )
  1588. await db_session.commit()
  1589. first = await verify_slicer_download_token(token_val, "library", 99)
  1590. assert first is True
  1591. second = await verify_slicer_download_token(token_val, "library", 99)
  1592. assert second is False
  1593. # ===========================================================================
  1594. # M-NEW-3 / L-NEW-1: Schema length validation for change-password & forgot-password
  1595. # ===========================================================================
  1596. class TestSchemaLengthValidationR2:
  1597. """Input length limits added in review round 2."""
  1598. @pytest.mark.asyncio
  1599. @pytest.mark.integration
  1600. async def test_change_password_current_too_long_rejected(self, async_client: AsyncClient):
  1601. """current_password > 256 chars must be rejected with 422 (prevents pbkdf2 DoS)."""
  1602. resp = await async_client.post(
  1603. "/api/v1/users/me/change-password",
  1604. json={"current_password": "x" * 257, "new_password": "ValidPass1!"},
  1605. )
  1606. assert resp.status_code == 422
  1607. @pytest.mark.asyncio
  1608. @pytest.mark.integration
  1609. async def test_forgot_password_email_too_long_rejected(self, async_client: AsyncClient):
  1610. """email > 254 chars must be rejected with 422."""
  1611. resp = await async_client.post(
  1612. "/api/v1/auth/forgot-password",
  1613. json={"email": "a" * 243 + "@example.com"},
  1614. )
  1615. assert resp.status_code == 422
  1616. @pytest.mark.asyncio
  1617. @pytest.mark.integration
  1618. async def test_forgot_password_email_at_limit_passes_schema(self, async_client: AsyncClient):
  1619. """Short email passes schema (may return 400/200 from business logic)."""
  1620. resp = await async_client.post(
  1621. "/api/v1/auth/forgot-password",
  1622. json={"email": "user@example.com"},
  1623. )
  1624. assert resp.status_code != 422
  1625. # ===========================================================================
  1626. # L-NEW-2: TOTPSetupRequest.code max_length
  1627. # ===========================================================================
  1628. class TestTOTPSetupCodeMaxLength:
  1629. """TOTPSetupRequest.code must be bounded (L-NEW-2)."""
  1630. @pytest.mark.asyncio
  1631. @pytest.mark.integration
  1632. async def test_setup_code_too_long_rejected(self, async_client: AsyncClient):
  1633. """code > 8 chars must be rejected with 422."""
  1634. import pyotp as _pyotp
  1635. token = await _setup_and_login(async_client, "totp_setup_maxlen", "totp_setup_maxlen1")
  1636. # Enable TOTP so the setup-code guard path is active
  1637. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  1638. secret = setup_resp.json()["secret"]
  1639. await async_client.post(
  1640. "/api/v1/auth/2fa/totp/enable",
  1641. json={"code": _pyotp.TOTP(secret).now()},
  1642. headers=_auth_header(token),
  1643. )
  1644. resp = await async_client.post(
  1645. "/api/v1/auth/2fa/totp/setup",
  1646. json={"code": "1" * 9},
  1647. headers=_auth_header(token),
  1648. )
  1649. assert resp.status_code == 422
  1650. # ===========================================================================
  1651. # L-NEW-3: EmailOTPEnableConfirmRequest.code must be exactly 6 digits
  1652. # ===========================================================================
  1653. class TestEmailOTPConfirmCodeFormat:
  1654. """EmailOTPEnableConfirmRequest.code must be 6 digits (L-NEW-3)."""
  1655. @pytest.mark.asyncio
  1656. @pytest.mark.integration
  1657. async def test_non_digit_code_rejected(self, async_client: AsyncClient):
  1658. """Alpha characters in the email OTP confirm code must be rejected with 422."""
  1659. token = await _setup_and_login(async_client, "emailotpfmt", "emailotpfmt1")
  1660. resp = await async_client.post(
  1661. "/api/v1/auth/2fa/email/enable/confirm",
  1662. json={"setup_token": "x" * 32, "code": "ABCDEF"},
  1663. headers=_auth_header(token),
  1664. )
  1665. assert resp.status_code == 422
  1666. @pytest.mark.asyncio
  1667. @pytest.mark.integration
  1668. async def test_seven_digit_code_rejected(self, async_client: AsyncClient):
  1669. """7-digit code must be rejected with 422 (min_length=max_length=6)."""
  1670. token = await _setup_and_login(async_client, "emailotplen7", "emailotplen7x")
  1671. resp = await async_client.post(
  1672. "/api/v1/auth/2fa/email/enable/confirm",
  1673. json={"setup_token": "x" * 32, "code": "1234567"},
  1674. headers=_auth_header(token),
  1675. )
  1676. assert resp.status_code == 422
  1677. @pytest.mark.asyncio
  1678. @pytest.mark.integration
  1679. async def test_valid_six_digit_code_passes_schema(self, async_client: AsyncClient):
  1680. """6-digit numeric code passes schema (may return 400 on bad token — that's fine)."""
  1681. token = await _setup_and_login(async_client, "emailotpfmt6", "emailotpfmt6x")
  1682. resp = await async_client.post(
  1683. "/api/v1/auth/2fa/email/enable/confirm",
  1684. json={"setup_token": "x" * 32, "code": "123456"},
  1685. headers=_auth_header(token),
  1686. )
  1687. assert resp.status_code != 422
  1688. # ===========================================================================
  1689. # L-NEW-4: OIDCProviderCreate field max_length constraints
  1690. # ===========================================================================
  1691. class TestOIDCProviderFieldLengths:
  1692. """OIDCProviderCreate fields must reject inputs exceeding max_length (L-NEW-4)."""
  1693. @pytest.mark.asyncio
  1694. @pytest.mark.integration
  1695. async def test_name_too_long_rejected(self, async_client: AsyncClient):
  1696. token = await _setup_and_login(async_client, "oidcfldadmin", "oidcfldadmin1")
  1697. resp = await async_client.post(
  1698. "/api/v1/auth/oidc/providers",
  1699. json={
  1700. "name": "n" * 101,
  1701. "issuer_url": "https://test.example.com",
  1702. "client_id": "cid",
  1703. "client_secret": "csec",
  1704. "scopes": "openid",
  1705. },
  1706. headers=_auth_header(token),
  1707. )
  1708. assert resp.status_code == 422
  1709. @pytest.mark.asyncio
  1710. @pytest.mark.integration
  1711. async def test_client_secret_too_long_rejected(self, async_client: AsyncClient):
  1712. token = await _setup_and_login(async_client, "oidcseclen", "oidcseclen123")
  1713. resp = await async_client.post(
  1714. "/api/v1/auth/oidc/providers",
  1715. json={
  1716. "name": "ValidName",
  1717. "issuer_url": "https://test.example.com",
  1718. "client_id": "cid",
  1719. "client_secret": "s" * 513,
  1720. "scopes": "openid",
  1721. },
  1722. headers=_auth_header(token),
  1723. )
  1724. assert resp.status_code == 422
  1725. # ---------------------------------------------------------------------------
  1726. # M-NEW-4 / M-NEW-5 / L-NEW-5: UserCreate & UserUpdate field length limits
  1727. # ---------------------------------------------------------------------------
  1728. class TestUserCreateUpdateFieldLengths:
  1729. """UserCreate and UserUpdate must enforce max_length on username, password, email."""
  1730. @pytest.fixture
  1731. async def admin_token(self, async_client: AsyncClient) -> str:
  1732. return await _setup_and_login(async_client, "ucfldadmin", "ucfldadmin1!")
  1733. @pytest.mark.asyncio
  1734. @pytest.mark.integration
  1735. async def test_create_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1736. resp = await async_client.post(
  1737. "/api/v1/users/",
  1738. json={
  1739. "username": "u" * 151,
  1740. "password": "ValidPass1!",
  1741. "role": "user",
  1742. },
  1743. headers=_auth_header(admin_token),
  1744. )
  1745. assert resp.status_code == 422
  1746. @pytest.mark.asyncio
  1747. @pytest.mark.integration
  1748. async def test_create_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1749. resp = await async_client.post(
  1750. "/api/v1/users/",
  1751. json={
  1752. "username": "newuserX",
  1753. "password": "A1!" + "x" * 254,
  1754. "role": "user",
  1755. },
  1756. headers=_auth_header(admin_token),
  1757. )
  1758. assert resp.status_code == 422
  1759. @pytest.mark.asyncio
  1760. @pytest.mark.integration
  1761. async def test_create_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1762. resp = await async_client.post(
  1763. "/api/v1/users/",
  1764. json={
  1765. "username": "newuserY",
  1766. "password": "ValidPass1!",
  1767. "email": "a" * 246 + "@x.com", # total 253 chars -> fine; 248+@x.com=255 -> too long
  1768. "role": "user",
  1769. },
  1770. headers=_auth_header(admin_token),
  1771. )
  1772. # 248 'a' + '@x.com' (6) = 254 chars — just at limit, should pass
  1773. # Use 249 + '@x.com' = 255 chars to trigger the 422
  1774. assert resp.status_code in (201, 422) # boundary sanity check
  1775. @pytest.mark.asyncio
  1776. @pytest.mark.integration
  1777. async def test_create_email_exceeds_limit_rejected(self, async_client: AsyncClient, admin_token: str):
  1778. resp = await async_client.post(
  1779. "/api/v1/users/",
  1780. json={
  1781. "username": "newuserZ",
  1782. "password": "ValidPass1!",
  1783. "email": "a" * 249 + "@x.com", # 255 chars — exceeds RFC 5321 max of 254
  1784. "role": "user",
  1785. },
  1786. headers=_auth_header(admin_token),
  1787. )
  1788. assert resp.status_code == 422
  1789. @pytest.mark.asyncio
  1790. @pytest.mark.integration
  1791. async def test_update_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1792. # Create a user first
  1793. create_resp = await async_client.post(
  1794. "/api/v1/users/",
  1795. json={"username": "updusr1", "password": "ValidPass1!", "role": "user"},
  1796. headers=_auth_header(admin_token),
  1797. )
  1798. assert create_resp.status_code == 201
  1799. user_id = create_resp.json()["id"]
  1800. resp = await async_client.patch(
  1801. f"/api/v1/users/{user_id}",
  1802. json={"username": "u" * 151},
  1803. headers=_auth_header(admin_token),
  1804. )
  1805. assert resp.status_code == 422
  1806. @pytest.mark.asyncio
  1807. @pytest.mark.integration
  1808. async def test_update_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1809. create_resp = await async_client.post(
  1810. "/api/v1/users/",
  1811. json={"username": "updusr2", "password": "ValidPass1!", "role": "user"},
  1812. headers=_auth_header(admin_token),
  1813. )
  1814. assert create_resp.status_code == 201
  1815. user_id = create_resp.json()["id"]
  1816. resp = await async_client.patch(
  1817. f"/api/v1/users/{user_id}",
  1818. json={"password": "A1!" + "x" * 254},
  1819. headers=_auth_header(admin_token),
  1820. )
  1821. assert resp.status_code == 422
  1822. @pytest.mark.asyncio
  1823. @pytest.mark.integration
  1824. async def test_update_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1825. create_resp = await async_client.post(
  1826. "/api/v1/users/",
  1827. json={"username": "updusr3", "password": "ValidPass1!", "role": "user"},
  1828. headers=_auth_header(admin_token),
  1829. )
  1830. assert create_resp.status_code == 201
  1831. user_id = create_resp.json()["id"]
  1832. resp = await async_client.patch(
  1833. f"/api/v1/users/{user_id}",
  1834. json={"email": "a" * 249 + "@x.com"}, # 255 chars
  1835. headers=_auth_header(admin_token),
  1836. )
  1837. assert resp.status_code == 422
  1838. # ---------------------------------------------------------------------------
  1839. # L-NEW-6: per-IP rate limiting on /forgot-password
  1840. # ---------------------------------------------------------------------------
  1841. _SMTP_DATA_FOR_IPLIMIT = {
  1842. "smtp_host": "smtp.test.com",
  1843. "smtp_port": 587,
  1844. "smtp_username": "test@test.com",
  1845. "smtp_password": "testpass",
  1846. "smtp_security": "starttls",
  1847. "smtp_auth_enabled": True,
  1848. "smtp_from_email": "noreply@test.com",
  1849. }
  1850. class TestForgotPasswordPerIpRateLimit:
  1851. """POST /forgot-password must enforce a per-IP cap (L-NEW-6).
  1852. The test sends 11 requests from the simulated test-client IP using 11
  1853. different email addresses (so the per-email bucket is never exhausted).
  1854. The 11th request must be rejected with 429.
  1855. """
  1856. @pytest.fixture
  1857. async def advanced_auth_token(self, async_client: AsyncClient) -> str:
  1858. """Set up auth, SMTP, and enable advanced auth; return admin token."""
  1859. token = await _setup_and_login(async_client, "iprladmin", "iprladmin1!")
  1860. headers = _auth_header(token)
  1861. await async_client.post("/api/v1/auth/smtp", headers=headers, json=_SMTP_DATA_FOR_IPLIMIT)
  1862. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  1863. return token
  1864. @pytest.mark.asyncio
  1865. @pytest.mark.integration
  1866. async def test_per_ip_limit_triggers_429(self, async_client: AsyncClient, advanced_auth_token: str):
  1867. # Send 11 requests from the same test-client IP using unique email
  1868. # addresses so the per-email bucket (limit=3) is never exhausted.
  1869. responses = []
  1870. for i in range(11):
  1871. resp = await async_client.post(
  1872. "/api/v1/auth/forgot-password",
  1873. json={"email": f"unique{i}@example.com"},
  1874. )
  1875. responses.append(resp.status_code)
  1876. # First 10 must not be rate-limited by the IP bucket
  1877. for code in responses[:10]:
  1878. assert code != 429, f"Unexpected 429 before limit reached: {responses}"
  1879. # The 11th must be rate-limited
  1880. assert responses[10] == 429, f"Expected 429 on 11th request, got {responses[10]}"
  1881. # ---------------------------------------------------------------------------
  1882. # M-NEW-6: OIDC auto-link must be rejected if target user already has an
  1883. # OIDC link to a different provider
  1884. # ---------------------------------------------------------------------------
  1885. class TestOIDCAutoLinkExistingLinkRejection:
  1886. """OIDC callback must reject auto-linking when the email-matched user
  1887. already has an OIDC link to a different provider (M-NEW-6)."""
  1888. @pytest.mark.asyncio
  1889. @pytest.mark.integration
  1890. async def test_auto_link_rejected_when_user_already_linked(
  1891. self, async_client: AsyncClient, db_session: AsyncSession
  1892. ):
  1893. """Auto-link via email-match is rejected when the target user is
  1894. already linked to another OIDC provider."""
  1895. import base64
  1896. import hashlib
  1897. from unittest.mock import AsyncMock, MagicMock, patch
  1898. from backend.app.core.auth import get_password_hash
  1899. from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
  1900. from backend.app.models.user import User
  1901. # ── 1. Target user with a known email ────────────────────────────
  1902. target = User(
  1903. username="oidcALTarget",
  1904. email="alinktest@example.com",
  1905. auth_source="oidc",
  1906. password_hash=get_password_hash(secrets.token_urlsafe(16)),
  1907. role="user",
  1908. is_active=True,
  1909. )
  1910. db_session.add(target)
  1911. await db_session.flush()
  1912. # ── 2. Provider B — legitimate, already linked to target ──────────
  1913. prov_b = OIDCProvider(
  1914. name="ProvB_m6test",
  1915. issuer_url="https://providerb-m6.example.com",
  1916. client_id="client_b",
  1917. _client_secret_enc="secret_b",
  1918. scopes="openid email profile",
  1919. is_enabled=True,
  1920. auto_link_existing_accounts=False,
  1921. auto_create_users=False,
  1922. )
  1923. db_session.add(prov_b)
  1924. await db_session.flush()
  1925. db_session.add(
  1926. UserOIDCLink(
  1927. user_id=target.id,
  1928. provider_id=prov_b.id,
  1929. provider_user_id="legitimate_sub",
  1930. provider_email="alinktest@example.com",
  1931. )
  1932. )
  1933. # ── 3. Provider A — attacker-controlled, auto_link=True ───────────
  1934. prov_a = OIDCProvider(
  1935. name="ProvA_m6test",
  1936. issuer_url="https://providera-m6.example.com",
  1937. client_id="client_a",
  1938. _client_secret_enc="secret_a",
  1939. scopes="openid email profile",
  1940. is_enabled=True,
  1941. auto_link_existing_accounts=True,
  1942. auto_create_users=False,
  1943. )
  1944. db_session.add(prov_a)
  1945. await db_session.flush()
  1946. # ── 4. OIDC state for Provider A ──────────────────────────────────
  1947. state = secrets.token_urlsafe(32)
  1948. nonce = secrets.token_urlsafe(32)
  1949. code_verifier = secrets.token_urlsafe(48)
  1950. db_session.add(
  1951. AuthEphemeralToken(
  1952. token=state,
  1953. token_type="oidc_state",
  1954. provider_id=prov_a.id,
  1955. nonce=nonce,
  1956. code_verifier=code_verifier,
  1957. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1958. )
  1959. )
  1960. await db_session.commit()
  1961. # ── 5. Mock HTTP + JWT so the callback can reach the auto-link check ─
  1962. fake_discovery = {
  1963. "issuer": "https://providera-m6.example.com",
  1964. "token_endpoint": "https://providera-m6.example.com/token",
  1965. "jwks_uri": "https://providera-m6.example.com/jwks",
  1966. }
  1967. fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
  1968. fake_claims = {
  1969. "sub": "attacker_sub_unique",
  1970. "email": "alinktest@example.com",
  1971. "email_verified": True,
  1972. "nonce": nonce,
  1973. "iss": "https://providera-m6.example.com",
  1974. "aud": "client_a",
  1975. "exp": 9_999_999_999,
  1976. }
  1977. disc_resp = AsyncMock()
  1978. disc_resp.raise_for_status = MagicMock()
  1979. disc_resp.json = MagicMock(return_value=fake_discovery)
  1980. token_resp = AsyncMock()
  1981. token_resp.ok = True
  1982. token_resp.json = MagicMock(return_value=fake_token)
  1983. jwks_resp = AsyncMock()
  1984. jwks_resp.raise_for_status = MagicMock()
  1985. jwks_resp.json = MagicMock(return_value={})
  1986. mock_http = AsyncMock()
  1987. mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
  1988. mock_http.post = AsyncMock(return_value=token_resp)
  1989. mock_signing_key = MagicMock()
  1990. mock_signing_key.key = "fake_key"
  1991. with (
  1992. patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
  1993. patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
  1994. patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
  1995. ):
  1996. mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
  1997. mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
  1998. mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
  1999. resp = await async_client.get(
  2000. f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
  2001. follow_redirects=False,
  2002. )
  2003. # M-NEW-6: must redirect with no_linked_account — NOT create a second link
  2004. assert resp.status_code == 302
  2005. location = resp.headers.get("location", "")
  2006. assert "no_linked_account" in location, f"Expected no_linked_account in redirect, got: {location}"
  2007. # Verify no second OIDC link was created for Provider A
  2008. from sqlalchemy import select as sa_select
  2009. from backend.app.models.oidc_provider import UserOIDCLink as _UOL
  2010. async with db_session as s:
  2011. links_result = await s.execute(
  2012. sa_select(_UOL).where(_UOL.user_id == target.id, _UOL.provider_id == prov_a.id)
  2013. )
  2014. assert links_result.scalar_one_or_none() is None, "No link to Provider A must exist"
  2015. # ===========================================================================
  2016. # Test Gap 1: OIDC state token is single-use — replay must be rejected
  2017. # ===========================================================================
  2018. class TestOIDCStateReplay:
  2019. """OIDC state token must be consumed on first use; a second callback with
  2020. the same state must redirect to ``?oidc_error=invalid_state``."""
  2021. @pytest.mark.asyncio
  2022. @pytest.mark.integration
  2023. async def test_state_replay_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2024. """Replaying a consumed OIDC state token must return invalid_state."""
  2025. from backend.app.models.oidc_provider import OIDCProvider
  2026. # ── 1. Seed a minimal provider ────────────────────────────────────
  2027. provider = OIDCProvider(
  2028. name="StateReplayIdP",
  2029. issuer_url="https://statereplay-idp.example.com",
  2030. client_id="client_replay",
  2031. _client_secret_enc="secret_replay",
  2032. scopes="openid",
  2033. is_enabled=True,
  2034. auto_link_existing_accounts=False,
  2035. auto_create_users=False,
  2036. )
  2037. db_session.add(provider)
  2038. await db_session.flush()
  2039. # ── 2. Seed an OIDC state token ───────────────────────────────────
  2040. state = secrets.token_urlsafe(32)
  2041. db_session.add(
  2042. AuthEphemeralToken(
  2043. token=state,
  2044. token_type="oidc_state",
  2045. provider_id=provider.id,
  2046. nonce=secrets.token_urlsafe(32),
  2047. code_verifier=secrets.token_urlsafe(48),
  2048. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  2049. )
  2050. )
  2051. await db_session.commit()
  2052. # ── 3. First callback — discovery will fail (no real IdP), but the
  2053. # state token is atomically consumed (DELETE…RETURNING + commit)
  2054. # before the HTTP call is attempted.
  2055. first = await async_client.get(
  2056. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2057. follow_redirects=False,
  2058. )
  2059. assert first.status_code == 302
  2060. # The first call may fail for any reason except invalid_state
  2061. assert "invalid_state" not in first.headers.get("location", ""), (
  2062. f"First call should NOT get invalid_state: {first.headers.get('location')}"
  2063. )
  2064. # ── 4. Second callback with the same state → must be invalid_state ─
  2065. second = await async_client.get(
  2066. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2067. follow_redirects=False,
  2068. )
  2069. assert second.status_code == 302
  2070. assert "invalid_state" in second.headers.get("location", ""), (
  2071. f"Replayed state must redirect to invalid_state, got: {second.headers.get('location')}"
  2072. )
  2073. # ===========================================================================
  2074. # Test Gap 2: OIDC iss claim mismatch must redirect to token_validation_failed
  2075. # ===========================================================================
  2076. class TestOIDCIssMismatch:
  2077. """JWT whose iss claim does not match the discovery issuer must be rejected."""
  2078. @pytest.mark.asyncio
  2079. @pytest.mark.integration
  2080. async def test_iss_mismatch_redirects_token_validation_failed(
  2081. self, async_client: AsyncClient, db_session: AsyncSession
  2082. ):
  2083. import time
  2084. from unittest.mock import patch
  2085. import jwt as pyjwt
  2086. private_pem, jwks_data = _make_test_rsa_key()
  2087. correct_issuer = "https://correct-iss.example.com"
  2088. wrong_issuer = "https://wrong-iss.example.com"
  2089. client_id = "iss-mismatch-client"
  2090. nonce = secrets.token_urlsafe(16)
  2091. now = int(time.time())
  2092. # Sign the token with the WRONG issuer (iss != discovery_issuer)
  2093. id_token = pyjwt.encode(
  2094. {
  2095. "sub": "sub-iss-test",
  2096. "iss": wrong_issuer,
  2097. "aud": client_id,
  2098. "nonce": nonce,
  2099. "email": "iss@example.com",
  2100. "email_verified": True,
  2101. "iat": now,
  2102. "exp": now + 300,
  2103. },
  2104. private_pem,
  2105. algorithm="RS256",
  2106. headers={"kid": "test-kid-1"},
  2107. )
  2108. admin_token = await _setup_and_login(async_client, "issadmin1", "issadmin1!")
  2109. cr = await async_client.post(
  2110. "/api/v1/auth/oidc/providers",
  2111. json={
  2112. "name": "IssTest-IdP",
  2113. "issuer_url": correct_issuer,
  2114. "client_id": client_id,
  2115. "client_secret": "s",
  2116. "scopes": "openid",
  2117. "is_enabled": True,
  2118. "auto_create_users": True,
  2119. },
  2120. headers=_auth_header(admin_token),
  2121. )
  2122. assert cr.status_code in (200, 201), cr.text
  2123. provider_id = cr.json()["id"]
  2124. state = secrets.token_urlsafe(32)
  2125. db_session.add(
  2126. AuthEphemeralToken(
  2127. token=state,
  2128. token_type="oidc_state",
  2129. provider_id=provider_id,
  2130. nonce=nonce,
  2131. code_verifier=secrets.token_urlsafe(48),
  2132. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2133. )
  2134. )
  2135. await db_session.commit()
  2136. # Discovery returns the CORRECT issuer; JWT carries the WRONG one.
  2137. discovery_doc = {
  2138. "issuer": correct_issuer,
  2139. "token_endpoint": f"{correct_issuer}/token",
  2140. "jwks_uri": f"{correct_issuer}/.well-known/jwks.json",
  2141. }
  2142. token_response = {"access_token": "a", "id_token": id_token}
  2143. class _MockResp:
  2144. def __init__(self, data):
  2145. self._data = data
  2146. self.status_code = 200
  2147. self.is_success = True
  2148. self.text = ""
  2149. def json(self):
  2150. return self._data
  2151. def raise_for_status(self):
  2152. pass
  2153. class _MockHttpxClient:
  2154. def __init__(self, *a, **kw):
  2155. pass
  2156. async def __aenter__(self):
  2157. return self
  2158. async def __aexit__(self, *a):
  2159. pass
  2160. async def get(self, url, **kw):
  2161. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2162. async def post(self, url, **kw):
  2163. return _MockResp(token_response)
  2164. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2165. resp = await async_client.get(
  2166. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2167. follow_redirects=False,
  2168. )
  2169. assert resp.status_code == 302
  2170. location = resp.headers.get("location", "")
  2171. assert "token_validation_failed" in location, f"Expected token_validation_failed, got: {location}"
  2172. # ===========================================================================
  2173. # Test Gap 3: /forgot-password/confirm token is single-use
  2174. # ===========================================================================
  2175. class TestForgotPasswordTokenSingleUse:
  2176. """POST /forgot-password/confirm must reject a token after its first use."""
  2177. @pytest.mark.asyncio
  2178. @pytest.mark.integration
  2179. async def test_token_reuse_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2180. from backend.app.core.auth import get_password_hash
  2181. from backend.app.models.user import User as _User
  2182. user = _User(
  2183. username="fpcuser1",
  2184. email="fpc@example.com",
  2185. password_hash=get_password_hash("OldPass1!"),
  2186. role="user",
  2187. is_active=True,
  2188. )
  2189. db_session.add(user)
  2190. await db_session.flush()
  2191. reset_token = secrets.token_urlsafe(32)
  2192. db_session.add(
  2193. AuthEphemeralToken(
  2194. token=reset_token,
  2195. token_type="password_reset",
  2196. username="fpcuser1",
  2197. expires_at=datetime.now(timezone.utc) + timedelta(hours=1),
  2198. )
  2199. )
  2200. await db_session.commit()
  2201. # First use → success
  2202. resp1 = await async_client.post(
  2203. "/api/v1/auth/forgot-password/confirm",
  2204. json={"token": reset_token, "new_password": "NewPass1!"},
  2205. )
  2206. assert resp1.status_code == 200, resp1.text
  2207. # Second use → token already consumed, must fail
  2208. resp2 = await async_client.post(
  2209. "/api/v1/auth/forgot-password/confirm",
  2210. json={"token": reset_token, "new_password": "AnotherNew1!"},
  2211. )
  2212. assert resp2.status_code == 400
  2213. # ===========================================================================
  2214. # C1 regression: setup_totp must reject a replayed TOTP code
  2215. # ===========================================================================
  2216. class TestSetupTOTPReplayRejected:
  2217. """setup_totp must reject a TOTP code that was already accepted in its window."""
  2218. @pytest.mark.asyncio
  2219. @pytest.mark.integration
  2220. async def test_replayed_setup_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2221. from sqlalchemy import select as sa_select
  2222. from backend.app.models.user_totp import UserTOTP
  2223. token = await _setup_and_login(async_client, "setupreplay1", "setupreplay1!")
  2224. # Step 1: Initial TOTP setup (no active TOTP yet → no code required)
  2225. setup_resp = await async_client.post(
  2226. "/api/v1/auth/2fa/totp/setup",
  2227. headers=_auth_header(token),
  2228. )
  2229. assert setup_resp.status_code == 200
  2230. secret = setup_resp.json()["secret"]
  2231. # Step 2: Enable TOTP with a valid code
  2232. totp_obj = pyotp.TOTP(secret)
  2233. enable_resp = await async_client.post(
  2234. "/api/v1/auth/2fa/totp/enable",
  2235. json={"code": totp_obj.now()},
  2236. headers=_auth_header(token),
  2237. )
  2238. assert enable_resp.status_code == 200 # TOTP is now active (is_enabled=True)
  2239. # Step 3: Determine current valid code and its counter
  2240. me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
  2241. user_id = me_resp.json()["id"]
  2242. totp_result = await db_session.execute(sa_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2243. totp_record = totp_result.scalar_one()
  2244. secret_now = totp_record.secret # decrypted via property
  2245. totp_now = pyotp.TOTP(secret_now)
  2246. valid_code = totp_now.now()
  2247. accepted_counter = totp_now.timecode(datetime.now(timezone.utc))
  2248. # Step 4: Pre-set last_totp_counter so this code looks already used
  2249. totp_record.last_totp_counter = accepted_counter
  2250. await db_session.commit()
  2251. # Step 5: Attempt setup_totp with the "already used" code → must be rejected
  2252. replay_resp = await async_client.post(
  2253. "/api/v1/auth/2fa/totp/setup",
  2254. json={"code": valid_code},
  2255. headers=_auth_header(token),
  2256. )
  2257. assert replay_resp.status_code == 400
  2258. assert "already used" in replay_resp.json()["detail"]
  2259. # ===========================================================================
  2260. # Nit8: OIDC aud mismatch and nonce mismatch tests
  2261. # ===========================================================================
  2262. class TestOIDCAudAndNonceMismatch:
  2263. """Nit8: aud != client_id and nonce != stored value must each fail the callback."""
  2264. def _make_oidc_provider_setup(self):
  2265. """Return a helper for building OIDC test fixtures inline."""
  2266. private_pem, jwks_data = _make_test_rsa_key()
  2267. return private_pem, jwks_data
  2268. @pytest.mark.asyncio
  2269. @pytest.mark.integration
  2270. async def test_aud_mismatch_redirects_token_validation_failed(
  2271. self, async_client: AsyncClient, db_session: AsyncSession
  2272. ):
  2273. """ID token with aud != client_id must be rejected (PyJWT InvalidAudienceError)."""
  2274. import time
  2275. from unittest.mock import patch
  2276. import jwt as pyjwt
  2277. private_pem, jwks_data = _make_test_rsa_key()
  2278. issuer = "https://aud-mismatch.example.com"
  2279. client_id = "aud-test-client"
  2280. wrong_aud = "some-other-client"
  2281. nonce = secrets.token_urlsafe(16)
  2282. now = int(time.time())
  2283. id_token = pyjwt.encode(
  2284. {
  2285. "sub": "sub-aud-test",
  2286. "iss": issuer,
  2287. "aud": wrong_aud, # <-- wrong audience
  2288. "nonce": nonce,
  2289. "email": "aud@example.com",
  2290. "email_verified": True,
  2291. "iat": now,
  2292. "exp": now + 300,
  2293. },
  2294. private_pem,
  2295. algorithm="RS256",
  2296. headers={"kid": "test-kid-1"},
  2297. )
  2298. admin_token = await _setup_and_login(async_client, "audmismatch_admin", "AudMismatch_admin1")
  2299. cr = await async_client.post(
  2300. "/api/v1/auth/oidc/providers",
  2301. json={
  2302. "name": "AudMismatch-IdP",
  2303. "issuer_url": issuer,
  2304. "client_id": client_id,
  2305. "client_secret": "s",
  2306. "scopes": "openid",
  2307. "is_enabled": True,
  2308. "auto_create_users": True,
  2309. },
  2310. headers=_auth_header(admin_token),
  2311. )
  2312. assert cr.status_code in (200, 201), cr.text
  2313. provider_id = cr.json()["id"]
  2314. state = secrets.token_urlsafe(32)
  2315. db_session.add(
  2316. AuthEphemeralToken(
  2317. token=state,
  2318. token_type="oidc_state",
  2319. provider_id=provider_id,
  2320. nonce=nonce,
  2321. code_verifier=secrets.token_urlsafe(48),
  2322. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2323. )
  2324. )
  2325. await db_session.commit()
  2326. discovery_doc = {
  2327. "issuer": issuer,
  2328. "token_endpoint": f"{issuer}/token",
  2329. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  2330. }
  2331. class _MockResp:
  2332. def __init__(self, data):
  2333. self._data = data
  2334. self.status_code = 200
  2335. self.is_success = True
  2336. self.text = ""
  2337. def json(self):
  2338. return self._data
  2339. def raise_for_status(self):
  2340. pass
  2341. class _MockHttpxClient:
  2342. def __init__(self, *a, **kw):
  2343. pass
  2344. async def __aenter__(self):
  2345. return self
  2346. async def __aexit__(self, *a):
  2347. pass
  2348. async def get(self, url, **kw):
  2349. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2350. async def post(self, url, **kw):
  2351. return _MockResp({"access_token": "a", "id_token": id_token})
  2352. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2353. resp = await async_client.get(
  2354. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2355. follow_redirects=False,
  2356. )
  2357. assert resp.status_code == 302
  2358. location = resp.headers.get("location", "")
  2359. assert "token_validation_failed" in location, (
  2360. f"Expected token_validation_failed redirect for aud mismatch, got: {location}"
  2361. )
  2362. @pytest.mark.asyncio
  2363. @pytest.mark.integration
  2364. async def test_nonce_mismatch_redirects_token_validation_failed(
  2365. self, async_client: AsyncClient, db_session: AsyncSession
  2366. ):
  2367. """ID token with nonce != stored state nonce must be rejected."""
  2368. import time
  2369. from unittest.mock import patch
  2370. import jwt as pyjwt
  2371. private_pem, jwks_data = _make_test_rsa_key()
  2372. issuer = "https://nonce-mismatch.example.com"
  2373. client_id = "nonce-test-client"
  2374. stored_nonce = secrets.token_urlsafe(16)
  2375. wrong_nonce = secrets.token_urlsafe(16) # different from stored_nonce
  2376. now = int(time.time())
  2377. id_token = pyjwt.encode(
  2378. {
  2379. "sub": "sub-nonce-test",
  2380. "iss": issuer,
  2381. "aud": client_id,
  2382. "nonce": wrong_nonce, # <-- does not match stored_nonce
  2383. "email": "nonce@example.com",
  2384. "email_verified": True,
  2385. "iat": now,
  2386. "exp": now + 300,
  2387. },
  2388. private_pem,
  2389. algorithm="RS256",
  2390. headers={"kid": "test-kid-1"},
  2391. )
  2392. admin_token = await _setup_and_login(async_client, "noncemismatch_admin", "NonceMismatch_admin1")
  2393. cr = await async_client.post(
  2394. "/api/v1/auth/oidc/providers",
  2395. json={
  2396. "name": "NonceMismatch-IdP",
  2397. "issuer_url": issuer,
  2398. "client_id": client_id,
  2399. "client_secret": "s",
  2400. "scopes": "openid",
  2401. "is_enabled": True,
  2402. "auto_create_users": True,
  2403. },
  2404. headers=_auth_header(admin_token),
  2405. )
  2406. assert cr.status_code in (200, 201), cr.text
  2407. provider_id = cr.json()["id"]
  2408. state = secrets.token_urlsafe(32)
  2409. db_session.add(
  2410. AuthEphemeralToken(
  2411. token=state,
  2412. token_type="oidc_state",
  2413. provider_id=provider_id,
  2414. nonce=stored_nonce, # state has correct nonce; JWT carries wrong_nonce
  2415. code_verifier=secrets.token_urlsafe(48),
  2416. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2417. )
  2418. )
  2419. await db_session.commit()
  2420. discovery_doc = {
  2421. "issuer": issuer,
  2422. "token_endpoint": f"{issuer}/token",
  2423. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  2424. }
  2425. class _MockResp:
  2426. def __init__(self, data):
  2427. self._data = data
  2428. self.status_code = 200
  2429. self.is_success = True
  2430. self.text = ""
  2431. def json(self):
  2432. return self._data
  2433. def raise_for_status(self):
  2434. pass
  2435. class _MockHttpxClient:
  2436. def __init__(self, *a, **kw):
  2437. pass
  2438. async def __aenter__(self):
  2439. return self
  2440. async def __aexit__(self, *a):
  2441. pass
  2442. async def get(self, url, **kw):
  2443. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2444. async def post(self, url, **kw):
  2445. return _MockResp({"access_token": "a", "id_token": id_token})
  2446. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2447. resp = await async_client.get(
  2448. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2449. follow_redirects=False,
  2450. )
  2451. assert resp.status_code == 302
  2452. location = resp.headers.get("location", "")
  2453. # The callback redirects to ?oidc_error=nonce_mismatch when nonces differ.
  2454. assert "nonce_mismatch" in location, f"Expected nonce_mismatch redirect for nonce mismatch, got: {location}"
  2455. # ===========================================================================
  2456. # Expired OIDC token rejection — state and exchange tokens
  2457. # ===========================================================================
  2458. class TestOIDCExpiredTokenRejection:
  2459. """Expired OIDC state and exchange tokens must be rejected atomically.
  2460. The DELETE … WHERE expires_at > now must ensure that an already-expired
  2461. token is never consumed (committed) before the expiry is checked, so the
  2462. token row stays in the DB and is not silently discarded.
  2463. """
  2464. @pytest.mark.asyncio
  2465. @pytest.mark.integration
  2466. async def test_expired_state_token_rejected_as_invalid_state(
  2467. self, async_client: AsyncClient, db_session: AsyncSession
  2468. ):
  2469. """An expired OIDC state token must redirect to invalid_state without
  2470. being consumed — it must still exist in the DB after the rejected call."""
  2471. from backend.app.models.oidc_provider import OIDCProvider
  2472. provider = OIDCProvider(
  2473. name="ExpiredStateIdP",
  2474. issuer_url="https://expired-state.example.com",
  2475. client_id="client_expired_state",
  2476. _client_secret_enc="secret_exp_state",
  2477. scopes="openid",
  2478. is_enabled=True,
  2479. auto_link_existing_accounts=False,
  2480. auto_create_users=False,
  2481. )
  2482. db_session.add(provider)
  2483. await db_session.flush()
  2484. state = secrets.token_urlsafe(32)
  2485. db_session.add(
  2486. AuthEphemeralToken(
  2487. token=state,
  2488. token_type="oidc_state",
  2489. provider_id=provider.id,
  2490. nonce=secrets.token_urlsafe(16),
  2491. code_verifier=secrets.token_urlsafe(48),
  2492. # already expired
  2493. expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
  2494. )
  2495. )
  2496. await db_session.commit()
  2497. resp = await async_client.get(
  2498. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2499. follow_redirects=False,
  2500. )
  2501. assert resp.status_code == 302
  2502. location = resp.headers.get("location", "")
  2503. assert "invalid_state" in location, f"Expected invalid_state redirect for expired state, got: {location}"
  2504. @pytest.mark.asyncio
  2505. @pytest.mark.integration
  2506. async def test_expired_exchange_token_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2507. """An expired OIDC exchange token must return 401 without being consumed."""
  2508. from sqlalchemy import select as sa_select
  2509. expired_token = secrets.token_urlsafe(32)
  2510. db_session.add(
  2511. AuthEphemeralToken(
  2512. token=expired_token,
  2513. token_type="oidc_exchange",
  2514. username="some_user",
  2515. # already expired
  2516. expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
  2517. )
  2518. )
  2519. await db_session.commit()
  2520. resp = await async_client.post(
  2521. "/api/v1/auth/oidc/exchange",
  2522. json={"oidc_token": expired_token},
  2523. )
  2524. assert resp.status_code == 401
  2525. assert "expired" in resp.json().get("detail", "").lower() or "invalid" in resp.json().get("detail", "").lower()
  2526. # Token must NOT have been consumed — it should still be in the DB
  2527. # (the atomic DELETE WHERE expires_at > now left it untouched)
  2528. result = await db_session.execute(
  2529. sa_select(AuthEphemeralToken).where(AuthEphemeralToken.token == expired_token)
  2530. )
  2531. remaining = result.scalar_one_or_none()
  2532. assert remaining is not None, "Expired exchange token must not be consumed by a rejected request"
  2533. # ===========================================================================
  2534. # Trailing slash in issuer_url — discovery URL must not contain double slash
  2535. # ===========================================================================
  2536. class TestOIDCIssuerUrlTrailingSlash:
  2537. """Providers like Authentik use issuer URLs with a trailing slash.
  2538. BamBuddy must strip the slash before appending /.well-known/openid-configuration
  2539. to avoid a double-slash that results in a 404.
  2540. """
  2541. @pytest.mark.asyncio
  2542. @pytest.mark.integration
  2543. async def test_trailing_slash_issuer_url_fetches_correct_discovery_url(self, async_client: AsyncClient):
  2544. from unittest.mock import AsyncMock, MagicMock, patch
  2545. issuer_with_slash = "https://authentik.example.com/application/o/bambuddy/"
  2546. admin_token = await _setup_and_login(async_client, "oidcslashadm", "oidcslashadm1")
  2547. create_resp = await async_client.post(
  2548. "/api/v1/auth/oidc/providers",
  2549. json={
  2550. "name": "Authentik-Slash",
  2551. "issuer_url": issuer_with_slash,
  2552. "client_id": "bambuddy",
  2553. "client_secret": "secret",
  2554. "scopes": "openid email profile",
  2555. "is_enabled": True,
  2556. "auto_create_users": False,
  2557. },
  2558. headers=_auth_header(admin_token),
  2559. )
  2560. assert create_resp.status_code == 201
  2561. provider_id = create_resp.json()["id"]
  2562. fake_discovery = {
  2563. "issuer": issuer_with_slash,
  2564. "authorization_endpoint": "https://authentik.example.com/application/o/bambuddy/authorize",
  2565. }
  2566. disc_resp = AsyncMock()
  2567. disc_resp.raise_for_status = MagicMock()
  2568. disc_resp.json = MagicMock(return_value=fake_discovery)
  2569. mock_http = AsyncMock()
  2570. mock_http.get = AsyncMock(return_value=disc_resp)
  2571. with patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_cls:
  2572. mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
  2573. mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
  2574. resp = await async_client.get(f"/api/v1/auth/oidc/authorize/{provider_id}")
  2575. assert resp.status_code == 200
  2576. called_url = mock_http.get.call_args_list[0][0][0]
  2577. assert "//" not in called_url.replace("https://", ""), (
  2578. f"Discovery URL must not contain double slash: {called_url}"
  2579. )
  2580. assert called_url.endswith("/.well-known/openid-configuration"), (
  2581. f"Expected discovery URL to end with /.well-known/openid-configuration, got: {called_url}"
  2582. )
  2583. @pytest.mark.asyncio
  2584. @pytest.mark.integration
  2585. async def test_iss_claim_trailing_slash_accepted(self, async_client: AsyncClient, db_session: AsyncSession):
  2586. """Provider configured without trailing slash, Authentik JWT iss has trailing slash.
  2587. Both sides must be normalised before comparison so the login succeeds.
  2588. """
  2589. import time
  2590. from unittest.mock import patch
  2591. import jwt as pyjwt
  2592. private_pem, jwks_data = _make_test_rsa_key()
  2593. issuer_no_slash = "https://authentik.example.com/application/o/bambuddy"
  2594. issuer_with_slash = issuer_no_slash + "/"
  2595. client_id = "bambuddy-client"
  2596. nonce = secrets.token_urlsafe(16)
  2597. now = int(time.time())
  2598. id_token = pyjwt.encode(
  2599. {
  2600. "sub": "authentik-sub-123",
  2601. "iss": issuer_with_slash,
  2602. "aud": client_id,
  2603. "nonce": nonce,
  2604. "email": "authentik-user@example.com",
  2605. "email_verified": True,
  2606. "iat": now,
  2607. "exp": now + 300,
  2608. },
  2609. private_pem,
  2610. algorithm="RS256",
  2611. headers={"kid": "test-kid-1"},
  2612. )
  2613. admin_token = await _setup_and_login(async_client, "authentikadm", "authentikadm1")
  2614. create_resp = await async_client.post(
  2615. "/api/v1/auth/oidc/providers",
  2616. json={
  2617. "name": "Authentik-ISS",
  2618. "issuer_url": issuer_no_slash,
  2619. "client_id": client_id,
  2620. "client_secret": "secret",
  2621. "scopes": "openid email profile",
  2622. "is_enabled": True,
  2623. "auto_create_users": True,
  2624. },
  2625. headers=_auth_header(admin_token),
  2626. )
  2627. assert create_resp.status_code == 201
  2628. provider_id = create_resp.json()["id"]
  2629. state = secrets.token_urlsafe(32)
  2630. code_verifier = secrets.token_urlsafe(48)
  2631. db_session.add(
  2632. AuthEphemeralToken(
  2633. token=state,
  2634. token_type="oidc_state",
  2635. provider_id=provider_id,
  2636. nonce=nonce,
  2637. code_verifier=code_verifier,
  2638. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2639. )
  2640. )
  2641. await db_session.commit()
  2642. discovery_doc = {
  2643. "issuer": issuer_with_slash,
  2644. "authorization_endpoint": f"{issuer_no_slash}/authorize",
  2645. "token_endpoint": f"{issuer_no_slash}/token",
  2646. "jwks_uri": f"{issuer_no_slash}/.well-known/jwks.json",
  2647. }
  2648. token_response = {"access_token": "mock", "token_type": "Bearer", "id_token": id_token}
  2649. class _MockResp:
  2650. def __init__(self, data):
  2651. self._data = data
  2652. self.is_success = True
  2653. self.status_code = 200
  2654. self.text = str(data)
  2655. def json(self):
  2656. return self._data
  2657. def raise_for_status(self):
  2658. pass
  2659. class _MockHttpxClient:
  2660. def __init__(self, *a, **kw):
  2661. pass
  2662. async def __aenter__(self):
  2663. return self
  2664. async def __aexit__(self, *a):
  2665. pass
  2666. async def get(self, url, **kw):
  2667. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2668. async def post(self, url, **kw):
  2669. return _MockResp(token_response)
  2670. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2671. resp = await async_client.get(
  2672. f"/api/v1/auth/oidc/callback?code=auth-code&state={state}",
  2673. follow_redirects=False,
  2674. )
  2675. location = resp.headers.get("location", "")
  2676. assert resp.status_code == 302, f"Expected redirect, got {resp.status_code}"
  2677. assert "token_validation_failed" not in location, (
  2678. "Trailing slash mismatch in iss claim must not cause token_validation_failed"
  2679. )
  2680. assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
  2681. class TestOIDCCallbackCodeLength:
  2682. """OIDC callback code/state query params must accept up to 2048 characters (OAuth spec)."""
  2683. @pytest.mark.asyncio
  2684. @pytest.mark.integration
  2685. async def test_code_512_chars_accepted(self, async_client: AsyncClient):
  2686. """A 512-character code (old limit) must not be rejected with 422."""
  2687. code = "a" * 512
  2688. resp = await async_client.get(
  2689. f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
  2690. follow_redirects=False,
  2691. )
  2692. assert resp.status_code != 422, "512-char code must not be rejected by Pydantic"
  2693. @pytest.mark.asyncio
  2694. @pytest.mark.integration
  2695. async def test_code_2048_chars_accepted(self, async_client: AsyncClient):
  2696. """A 2048-character code must not be rejected with 422."""
  2697. code = "a" * 2048
  2698. resp = await async_client.get(
  2699. f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
  2700. follow_redirects=False,
  2701. )
  2702. assert resp.status_code != 422, "2048-char code must not be rejected by Pydantic"
  2703. @pytest.mark.asyncio
  2704. @pytest.mark.integration
  2705. async def test_code_2049_chars_rejected(self, async_client: AsyncClient):
  2706. """A 2049-character code must be rejected with 422."""
  2707. code = "a" * 2049
  2708. resp = await async_client.get(
  2709. f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
  2710. follow_redirects=False,
  2711. )
  2712. assert resp.status_code == 422, "2049-char code must be rejected by Pydantic"
  2713. @pytest.mark.asyncio
  2714. @pytest.mark.integration
  2715. async def test_state_512_chars_accepted(self, async_client: AsyncClient):
  2716. """A 512-character state (old limit) must not be rejected with 422."""
  2717. state = "a" * 512
  2718. resp = await async_client.get(
  2719. f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
  2720. follow_redirects=False,
  2721. )
  2722. assert resp.status_code != 422, "512-char state must not be rejected by Pydantic"
  2723. @pytest.mark.asyncio
  2724. @pytest.mark.integration
  2725. async def test_state_2048_chars_accepted(self, async_client: AsyncClient):
  2726. """A 2048-character state must not be rejected with 422."""
  2727. state = "a" * 2048
  2728. resp = await async_client.get(
  2729. f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
  2730. follow_redirects=False,
  2731. )
  2732. assert resp.status_code != 422, "2048-char state must not be rejected by Pydantic"
  2733. @pytest.mark.asyncio
  2734. @pytest.mark.integration
  2735. async def test_state_2049_chars_rejected(self, async_client: AsyncClient):
  2736. """A 2049-character state must be rejected with 422."""
  2737. state = "a" * 2049
  2738. resp = await async_client.get(
  2739. f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
  2740. follow_redirects=False,
  2741. )
  2742. assert resp.status_code == 422, "2049-char state must be rejected by Pydantic"
  2743. # ---------------------------------------------------------------------------
  2744. # Helpers shared by TestOIDCEmailClaimResolution
  2745. # ---------------------------------------------------------------------------
  2746. async def _run_oidc_callback(
  2747. async_client: AsyncClient,
  2748. db_session: AsyncSession,
  2749. *,
  2750. provider_id: int,
  2751. claims: dict,
  2752. private_pem: bytes,
  2753. jwks_data: dict,
  2754. issuer: str,
  2755. client_id: str,
  2756. ) -> str:
  2757. """Run a full OIDC callback flow and return the redirect location."""
  2758. nonce = secrets.token_urlsafe(16)
  2759. now = int(time.time())
  2760. token_claims = {
  2761. "sub": claims.get("sub", f"sub-{secrets.token_hex(8)}"),
  2762. "iss": issuer,
  2763. "aud": client_id,
  2764. "nonce": nonce,
  2765. "iat": now,
  2766. "exp": now + 300,
  2767. **{k: v for k, v in claims.items() if k not in ("sub",)},
  2768. }
  2769. id_token = pyjwt.encode(token_claims, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"})
  2770. state = secrets.token_urlsafe(32)
  2771. code_verifier = secrets.token_urlsafe(48)
  2772. db_session.add(
  2773. AuthEphemeralToken(
  2774. token=state,
  2775. token_type="oidc_state",
  2776. provider_id=provider_id,
  2777. nonce=nonce,
  2778. code_verifier=code_verifier,
  2779. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2780. )
  2781. )
  2782. await db_session.commit()
  2783. discovery_doc = {
  2784. "issuer": issuer,
  2785. "authorization_endpoint": f"{issuer}/auth",
  2786. "token_endpoint": f"{issuer}/token",
  2787. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  2788. }
  2789. token_response = {"access_token": "mock-access", "token_type": "Bearer", "id_token": id_token}
  2790. class _R:
  2791. def __init__(self, data):
  2792. self._data = data
  2793. self.status_code = 200
  2794. self.is_success = True
  2795. self.text = str(data)
  2796. def json(self):
  2797. return self._data
  2798. def raise_for_status(self):
  2799. pass
  2800. class _C:
  2801. def __init__(self, *a, **kw):
  2802. pass
  2803. async def __aenter__(self):
  2804. return self
  2805. async def __aexit__(self, *a):
  2806. pass
  2807. async def get(self, url, **kw):
  2808. return _R(jwks_data if "jwks" in url else discovery_doc)
  2809. async def post(self, url, **kw):
  2810. return _R(token_response)
  2811. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _C):
  2812. resp = await async_client.get(
  2813. f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
  2814. follow_redirects=False,
  2815. )
  2816. return resp.headers.get("location", "")
  2817. class TestOIDCEmailClaimResolution:
  2818. """Three-case email resolution logic: Fall A / Fall B / Fall C."""
  2819. # ── shared helpers ────────────────────────────────────────────────────────
  2820. async def _create_provider(
  2821. self,
  2822. async_client: AsyncClient,
  2823. admin_token: str,
  2824. issuer: str,
  2825. client_id: str,
  2826. *,
  2827. email_claim: str = "email",
  2828. require_email_verified: bool = True,
  2829. auto_link_existing_accounts: bool = False,
  2830. suffix: str = "",
  2831. ) -> int:
  2832. resp = await async_client.post(
  2833. "/api/v1/auth/oidc/providers",
  2834. json={
  2835. "name": f"TestIdP-{suffix or secrets.token_hex(4)}",
  2836. "issuer_url": issuer,
  2837. "client_id": client_id,
  2838. "client_secret": "sec",
  2839. "scopes": "openid email profile",
  2840. "is_enabled": True,
  2841. "auto_create_users": True,
  2842. "auto_link_existing_accounts": auto_link_existing_accounts,
  2843. "email_claim": email_claim,
  2844. "require_email_verified": require_email_verified,
  2845. },
  2846. headers={"Authorization": f"Bearer {admin_token}"},
  2847. )
  2848. assert resp.status_code == 201, resp.text
  2849. return resp.json()["id"]
  2850. async def _get_oidc_link(self, db_session: AsyncSession, provider_id: int, sub: str):
  2851. from sqlalchemy import select
  2852. from backend.app.models.oidc_provider import UserOIDCLink
  2853. result = await db_session.execute(
  2854. select(UserOIDCLink)
  2855. .where(UserOIDCLink.provider_id == provider_id)
  2856. .where(UserOIDCLink.provider_user_id == sub)
  2857. )
  2858. return result.scalar_one_or_none()
  2859. # ── Parametrized matrix: Fall A / Fall B / Fall C ─────────────────────────
  2860. @pytest.mark.asyncio
  2861. @pytest.mark.integration
  2862. @pytest.mark.parametrize(
  2863. "email_claim,require_ev,claims,expected",
  2864. [
  2865. # Fall A: standard claim + require_ev=True (default)
  2866. ("email", True, {"email": "fa@example.com", "email_verified": True}, "fa@example.com"),
  2867. ("email", True, {"email": "fa@example.com", "email_verified": False}, None),
  2868. ("email", True, {"email": "fa@example.com"}, None), # Azure Entra with default config
  2869. # Fall B: standard claim + require_ev=False (Azure Entra permissive)
  2870. ("email", False, {"email": "fb@example.com", "email_verified": True}, "fb@example.com"),
  2871. ("email", False, {"email": "fb@example.com", "email_verified": False}, None),
  2872. ("email", False, {"email": "azure@company.com"}, "azure@company.com"), # ev absent → kept
  2873. # Fall C: custom claim (preferred_username) — no email_verified check
  2874. ("preferred_username", True, {"preferred_username": "User@Company.COM"}, "user@company.com"),
  2875. ("preferred_username", True, {"preferred_username": " User@EXAMPLE.COM "}, "user@example.com"),
  2876. ("preferred_username", True, {"preferred_username": "justausername"}, None),
  2877. ("preferred_username", True, {"preferred_username": "@"}, None), # SEC-2: "@" only
  2878. ("preferred_username", True, {"preferred_username": "@domain.com"}, None), # SEC-2: empty local
  2879. ("preferred_username", True, {"preferred_username": "user@"}, None), # SEC-2: empty domain
  2880. ("preferred_username", True, {"preferred_username": "user@nodot"}, None), # SEC-2: no dot in domain
  2881. ("preferred_username", True, {}, None), # claim absent
  2882. # Fall C: email_verified=False present alongside custom claim — must NOT suppress the email
  2883. ("preferred_username", True, {"preferred_username": "user@co.com", "email_verified": False}, "user@co.com"),
  2884. ],
  2885. ids=[
  2886. "fall-a-ev-true",
  2887. "fall-a-ev-false",
  2888. "fall-a-ev-absent",
  2889. "fall-b-ev-true",
  2890. "fall-b-ev-false",
  2891. "fall-b-ev-absent",
  2892. "fall-c-valid-upn",
  2893. "fall-c-lowercase-strip",
  2894. "fall-c-no-at",
  2895. "fall-c-at-only",
  2896. "fall-c-empty-local",
  2897. "fall-c-empty-domain",
  2898. "fall-c-no-dot-in-domain",
  2899. "fall-c-claim-absent",
  2900. "fall-c-ev-false-ignored",
  2901. ],
  2902. )
  2903. async def test_email_resolution_matrix(
  2904. self,
  2905. async_client: AsyncClient,
  2906. db_session: AsyncSession,
  2907. email_claim: str,
  2908. require_ev: bool,
  2909. claims: dict,
  2910. expected: str | None,
  2911. ):
  2912. """C4: Verify link exists AND check provider_email — avoids false-passing on callback failure."""
  2913. issuer = "https://matrix.test"
  2914. client_id = "matrix-client"
  2915. admin_token = await _setup_and_login(async_client, "matrix_adm", "Matrix123!")
  2916. private_pem, jwks_data = _make_test_rsa_key()
  2917. provider_id = await self._create_provider(
  2918. async_client,
  2919. admin_token,
  2920. issuer,
  2921. client_id,
  2922. email_claim=email_claim,
  2923. require_email_verified=require_ev,
  2924. suffix="matrix",
  2925. )
  2926. sub = f"sub-matrix-{secrets.token_hex(6)}"
  2927. await _run_oidc_callback(
  2928. async_client,
  2929. db_session,
  2930. provider_id=provider_id,
  2931. claims={"sub": sub, **claims},
  2932. private_pem=private_pem,
  2933. jwks_data=jwks_data,
  2934. issuer=issuer,
  2935. client_id=client_id,
  2936. )
  2937. db_session.expire_all()
  2938. link = await self._get_oidc_link(db_session, provider_id, sub)
  2939. assert link is not None, "UserOIDCLink must be created even when email is dropped"
  2940. assert link.provider_email == expected
  2941. # ── Security: auto_link guards (CREATE endpoint) ──────────────────────────
  2942. @pytest.mark.asyncio
  2943. @pytest.mark.integration
  2944. async def test_auto_link_blocked_with_require_ev_false(self, async_client: AsyncClient):
  2945. """SEC-1: auto_link + require_email_verified=False must be rejected at schema level (422)."""
  2946. admin_token = await _setup_and_login(async_client, "sec1_adm", "Sec1Adm123!")
  2947. resp = await async_client.post(
  2948. "/api/v1/auth/oidc/providers",
  2949. json={
  2950. "name": "SEC1-Test",
  2951. "issuer_url": "https://sec1.test",
  2952. "client_id": "sec1-client",
  2953. "client_secret": "sec",
  2954. "scopes": "openid email profile",
  2955. "auto_link_existing_accounts": True,
  2956. "require_email_verified": False,
  2957. },
  2958. headers={"Authorization": f"Bearer {admin_token}"},
  2959. )
  2960. assert resp.status_code == 422
  2961. @pytest.mark.asyncio
  2962. @pytest.mark.integration
  2963. async def test_auto_link_blocked_with_custom_claim_create(self, async_client: AsyncClient):
  2964. """SEC-6: auto_link + email_claim!='email' must be rejected at schema level on CREATE (422)."""
  2965. admin_token = await _setup_and_login(async_client, "sec6c_adm", "Sec6CAdm123!")
  2966. resp = await async_client.post(
  2967. "/api/v1/auth/oidc/providers",
  2968. json={
  2969. "name": "SEC6-Create-Test",
  2970. "issuer_url": "https://sec6c.test",
  2971. "client_id": "sec6c-client",
  2972. "client_secret": "sec",
  2973. "scopes": "openid email profile",
  2974. "auto_link_existing_accounts": True,
  2975. "email_claim": "upn",
  2976. },
  2977. headers={"Authorization": f"Bearer {admin_token}"},
  2978. )
  2979. assert resp.status_code == 422
  2980. @pytest.mark.asyncio
  2981. @pytest.mark.integration
  2982. async def test_auto_link_blocked_with_custom_claim_update(self, async_client: AsyncClient):
  2983. """SEC-6: auto_link=True + email_claim='upn' in same UPDATE request → 422 (T4)."""
  2984. admin_token = await _setup_and_login(async_client, "sec6u_adm", "Sec6UAdm123!")
  2985. create_resp = await async_client.post(
  2986. "/api/v1/auth/oidc/providers",
  2987. json={
  2988. "name": "SEC6-Update-Test",
  2989. "issuer_url": "https://sec6u.test",
  2990. "client_id": "sec6u-client",
  2991. "client_secret": "sec",
  2992. "scopes": "openid email profile",
  2993. },
  2994. headers={"Authorization": f"Bearer {admin_token}"},
  2995. )
  2996. assert create_resp.status_code == 201
  2997. provider_id = create_resp.json()["id"]
  2998. resp = await async_client.put(
  2999. f"/api/v1/auth/oidc/providers/{provider_id}",
  3000. json={"auto_link_existing_accounts": True, "email_claim": "upn"},
  3001. headers={"Authorization": f"Bearer {admin_token}"},
  3002. )
  3003. assert resp.status_code == 422
  3004. # ── Combined-State-Guard (partial updates across two requests) ─────────────
  3005. @pytest.mark.asyncio
  3006. @pytest.mark.integration
  3007. async def test_partial_update_guard_require_ev(self, async_client: AsyncClient):
  3008. """SEC-1 Combined-State-Guard: require_ev=False then auto_link=True → 422 (T1 require_ev path)."""
  3009. admin_token = await _setup_and_login(async_client, "pg_rev_adm", "PgRev123!")
  3010. create_resp = await async_client.post(
  3011. "/api/v1/auth/oidc/providers",
  3012. json={
  3013. "name": "PG-RequireEV-Test",
  3014. "issuer_url": "https://pg-rev.test",
  3015. "client_id": "pg-rev-client",
  3016. "client_secret": "sec",
  3017. "scopes": "openid email profile",
  3018. },
  3019. headers={"Authorization": f"Bearer {admin_token}"},
  3020. )
  3021. assert create_resp.status_code == 201
  3022. provider_id = create_resp.json()["id"]
  3023. upd1 = await async_client.put(
  3024. f"/api/v1/auth/oidc/providers/{provider_id}",
  3025. json={"require_email_verified": False},
  3026. headers={"Authorization": f"Bearer {admin_token}"},
  3027. )
  3028. assert upd1.status_code == 200
  3029. upd2 = await async_client.put(
  3030. f"/api/v1/auth/oidc/providers/{provider_id}",
  3031. json={"auto_link_existing_accounts": True},
  3032. headers={"Authorization": f"Bearer {admin_token}"},
  3033. )
  3034. assert upd2.status_code == 422
  3035. @pytest.mark.asyncio
  3036. @pytest.mark.integration
  3037. async def test_partial_update_guard_email_claim(self, async_client: AsyncClient):
  3038. """SEC-6 Combined-State-Guard: email_claim='upn' then auto_link=True → 422 (T1 email_claim path)."""
  3039. admin_token = await _setup_and_login(async_client, "pg_ec_adm", "PgEc123!")
  3040. create_resp = await async_client.post(
  3041. "/api/v1/auth/oidc/providers",
  3042. json={
  3043. "name": "PG-EmailClaim-Test",
  3044. "issuer_url": "https://pg-ec.test",
  3045. "client_id": "pg-ec-client",
  3046. "client_secret": "sec",
  3047. "scopes": "openid email profile",
  3048. },
  3049. headers={"Authorization": f"Bearer {admin_token}"},
  3050. )
  3051. assert create_resp.status_code == 201
  3052. provider_id = create_resp.json()["id"]
  3053. upd1 = await async_client.put(
  3054. f"/api/v1/auth/oidc/providers/{provider_id}",
  3055. json={"email_claim": "upn"},
  3056. headers={"Authorization": f"Bearer {admin_token}"},
  3057. )
  3058. assert upd1.status_code == 200
  3059. upd2 = await async_client.put(
  3060. f"/api/v1/auth/oidc/providers/{provider_id}",
  3061. json={"auto_link_existing_accounts": True},
  3062. headers={"Authorization": f"Bearer {admin_token}"},
  3063. )
  3064. assert upd2.status_code == 422
  3065. @pytest.mark.asyncio
  3066. @pytest.mark.integration
  3067. async def test_partial_update_guard_inverse_order(self, async_client: AsyncClient):
  3068. """T2: auto_link=True first (valid), then require_ev=False → Combined-State-Guard fires (422)."""
  3069. admin_token = await _setup_and_login(async_client, "pg_inv_adm", "PgInv123!")
  3070. create_resp = await async_client.post(
  3071. "/api/v1/auth/oidc/providers",
  3072. json={
  3073. "name": "PG-Inverse-Test",
  3074. "issuer_url": "https://pg-inv.test",
  3075. "client_id": "pg-inv-client",
  3076. "client_secret": "sec",
  3077. "scopes": "openid email profile",
  3078. },
  3079. headers={"Authorization": f"Bearer {admin_token}"},
  3080. )
  3081. assert create_resp.status_code == 201
  3082. provider_id = create_resp.json()["id"]
  3083. # auto_link=True is safe when require_ev=True (default)
  3084. upd1 = await async_client.put(
  3085. f"/api/v1/auth/oidc/providers/{provider_id}",
  3086. json={"auto_link_existing_accounts": True},
  3087. headers={"Authorization": f"Bearer {admin_token}"},
  3088. )
  3089. assert upd1.status_code == 200
  3090. # Disabling require_ev with auto_link already on → unsafe combined state
  3091. upd2 = await async_client.put(
  3092. f"/api/v1/auth/oidc/providers/{provider_id}",
  3093. json={"require_email_verified": False},
  3094. headers={"Authorization": f"Bearer {admin_token}"},
  3095. )
  3096. assert upd2.status_code == 422
  3097. # ── Low5: Response fields verified ───────────────────────────────────────
  3098. @pytest.mark.asyncio
  3099. @pytest.mark.integration
  3100. async def test_create_response_includes_new_fields(self, async_client: AsyncClient):
  3101. """Low5: OIDCProviderResponse must include email_claim and require_email_verified."""
  3102. admin_token = await _setup_and_login(async_client, "resp_adm", "Resp123!")
  3103. resp = await async_client.post(
  3104. "/api/v1/auth/oidc/providers",
  3105. json={
  3106. "name": "ResponseFields-Test",
  3107. "issuer_url": "https://resp.test",
  3108. "client_id": "resp-client",
  3109. "client_secret": "sec",
  3110. "scopes": "openid email profile",
  3111. "email_claim": "preferred_username",
  3112. "require_email_verified": False,
  3113. },
  3114. headers={"Authorization": f"Bearer {admin_token}"},
  3115. )
  3116. assert resp.status_code == 201
  3117. data = resp.json()
  3118. assert data["email_claim"] == "preferred_username"
  3119. assert data["require_email_verified"] is False
  3120. @pytest.mark.asyncio
  3121. @pytest.mark.integration
  3122. async def test_update_response_reflects_new_fields(self, async_client: AsyncClient):
  3123. """Low5: PUT response must reflect updated email_claim and require_email_verified."""
  3124. admin_token = await _setup_and_login(async_client, "upd_resp_adm", "UpdResp123!")
  3125. create_resp = await async_client.post(
  3126. "/api/v1/auth/oidc/providers",
  3127. json={
  3128. "name": "UpdateResponse-Test",
  3129. "issuer_url": "https://upd-resp.test",
  3130. "client_id": "upd-resp-client",
  3131. "client_secret": "sec",
  3132. "scopes": "openid email profile",
  3133. },
  3134. headers={"Authorization": f"Bearer {admin_token}"},
  3135. )
  3136. assert create_resp.status_code == 201
  3137. provider_id = create_resp.json()["id"]
  3138. upd = await async_client.put(
  3139. f"/api/v1/auth/oidc/providers/{provider_id}",
  3140. json={"email_claim": "upn", "require_email_verified": True},
  3141. headers={"Authorization": f"Bearer {admin_token}"},
  3142. )
  3143. assert upd.status_code == 200
  3144. data = upd.json()
  3145. assert data["email_claim"] == "upn"
  3146. assert data["require_email_verified"] is True
  3147. # ===========================================================================
  3148. # TestOIDCEmailClaimValidation — T2: email_claim field validator coverage
  3149. # ===========================================================================
  3150. class TestOIDCEmailClaimValidation:
  3151. """Schema-level validation for the email_claim field."""
  3152. @pytest.mark.asyncio
  3153. @pytest.mark.integration
  3154. async def test_invalid_claim_name_dot_rejected(self, async_client: AsyncClient):
  3155. """email_claim with a dot (log-injection risk) must be rejected."""
  3156. admin_token = await _setup_and_login(async_client, "ecv_adm1", "Ecv123!")
  3157. resp = await async_client.post(
  3158. "/api/v1/auth/oidc/providers",
  3159. json={
  3160. "name": "ECVTest1",
  3161. "issuer_url": "https://ecv1.test",
  3162. "client_id": "ecv1",
  3163. "client_secret": "sec",
  3164. "scopes": "openid email",
  3165. "email_claim": "email.address",
  3166. },
  3167. headers={"Authorization": f"Bearer {admin_token}"},
  3168. )
  3169. assert resp.status_code == 422
  3170. @pytest.mark.asyncio
  3171. @pytest.mark.integration
  3172. async def test_invalid_claim_name_starts_with_digit_rejected(self, async_client: AsyncClient):
  3173. admin_token = await _setup_and_login(async_client, "ecv_adm2", "Ecv123!")
  3174. resp = await async_client.post(
  3175. "/api/v1/auth/oidc/providers",
  3176. json={
  3177. "name": "ECVTest2",
  3178. "issuer_url": "https://ecv2.test",
  3179. "client_id": "ecv2",
  3180. "client_secret": "sec",
  3181. "scopes": "openid email",
  3182. "email_claim": "1invalid",
  3183. },
  3184. headers={"Authorization": f"Bearer {admin_token}"},
  3185. )
  3186. assert resp.status_code == 422
  3187. @pytest.mark.asyncio
  3188. @pytest.mark.integration
  3189. async def test_invalid_claim_name_newline_rejected(self, async_client: AsyncClient):
  3190. """T2 regex-bug guard: re.fullmatch must reject trailing newline."""
  3191. admin_token = await _setup_and_login(async_client, "ecv_adm3", "Ecv123!")
  3192. resp = await async_client.post(
  3193. "/api/v1/auth/oidc/providers",
  3194. json={
  3195. "name": "ECVTest3",
  3196. "issuer_url": "https://ecv3.test",
  3197. "client_id": "ecv3",
  3198. "client_secret": "sec",
  3199. "scopes": "openid email",
  3200. "email_claim": "email\n",
  3201. },
  3202. headers={"Authorization": f"Bearer {admin_token}"},
  3203. )
  3204. assert resp.status_code == 422
  3205. @pytest.mark.asyncio
  3206. @pytest.mark.integration
  3207. async def test_claim_name_65_chars_rejected(self, async_client: AsyncClient):
  3208. """email_claim longer than 64 characters must be rejected."""
  3209. admin_token = await _setup_and_login(async_client, "ecv_adm4", "Ecv123!")
  3210. resp = await async_client.post(
  3211. "/api/v1/auth/oidc/providers",
  3212. json={
  3213. "name": "ECVTest4",
  3214. "issuer_url": "https://ecv4.test",
  3215. "client_id": "ecv4",
  3216. "client_secret": "sec",
  3217. "scopes": "openid email",
  3218. "email_claim": "a" * 65,
  3219. },
  3220. headers={"Authorization": f"Bearer {admin_token}"},
  3221. )
  3222. assert resp.status_code == 422
  3223. @pytest.mark.asyncio
  3224. @pytest.mark.integration
  3225. async def test_valid_claim_name_accepted(self, async_client: AsyncClient):
  3226. """Valid claim names like preferred_username and upn must be accepted."""
  3227. admin_token = await _setup_and_login(async_client, "ecv_adm5", "Ecv123!")
  3228. for claim in ("preferred_username", "upn", "email", "emailAddress"):
  3229. resp = await async_client.post(
  3230. "/api/v1/auth/oidc/providers",
  3231. json={
  3232. "name": f"ECVTest-{claim[:8]}",
  3233. "issuer_url": f"https://ecv-{claim[:8]}.test",
  3234. "client_id": f"ecv-{claim[:8]}",
  3235. "client_secret": "sec",
  3236. "scopes": "openid email",
  3237. "email_claim": claim,
  3238. },
  3239. headers={"Authorization": f"Bearer {admin_token}"},
  3240. )
  3241. assert resp.status_code == 201, f"claim {claim!r} was rejected: {resp.text}"
  3242. # ===========================================================================
  3243. # TestOIDCEmailResolutionExtra — T1 / T3 / T4 additional coverage
  3244. # ===========================================================================
  3245. async def _create_provider_via_api(
  3246. async_client: AsyncClient,
  3247. admin_token: str,
  3248. issuer: str,
  3249. client_id: str,
  3250. *,
  3251. email_claim: str = "email",
  3252. require_email_verified: bool = True,
  3253. suffix: str = "",
  3254. ) -> int:
  3255. resp = await async_client.post(
  3256. "/api/v1/auth/oidc/providers",
  3257. json={
  3258. "name": f"TestIdP-extra-{suffix or secrets.token_hex(4)}",
  3259. "issuer_url": issuer,
  3260. "client_id": client_id,
  3261. "client_secret": "sec",
  3262. "scopes": "openid email profile",
  3263. "is_enabled": True,
  3264. "auto_create_users": True,
  3265. "email_claim": email_claim,
  3266. "require_email_verified": require_email_verified,
  3267. },
  3268. headers={"Authorization": f"Bearer {admin_token}"},
  3269. )
  3270. assert resp.status_code == 201, resp.text
  3271. return resp.json()["id"]
  3272. class TestOIDCEmailResolutionExtra:
  3273. """T1: isinstance guard, T3: SEC-3 normalisation for Fall A/B, T4: inverse Combined-State-Guard."""
  3274. @pytest.mark.asyncio
  3275. @pytest.mark.integration
  3276. async def test_non_string_claim_value_drops_email(
  3277. self,
  3278. async_client: AsyncClient,
  3279. db_session: AsyncSession,
  3280. ):
  3281. """T1: A non-string email_claim value (list) must be silently dropped — no crash."""
  3282. private_pem, jwks_data = _make_test_rsa_key()
  3283. issuer = "https://nonstring-test.example"
  3284. client_id = "nonstring-client"
  3285. admin_token = await _setup_and_login(async_client, "nonstr_adm", "Nonstr123!")
  3286. provider_id = await _create_provider_via_api(
  3287. async_client,
  3288. admin_token,
  3289. issuer,
  3290. client_id,
  3291. email_claim="preferred_username",
  3292. require_email_verified=False,
  3293. suffix="nonstr",
  3294. )
  3295. from sqlalchemy import select
  3296. from backend.app.models.oidc_provider import UserOIDCLink
  3297. # IdP sends preferred_username as a list (non-string) — must not crash
  3298. location = await _run_oidc_callback(
  3299. async_client,
  3300. db_session,
  3301. provider_id=provider_id,
  3302. claims={"sub": "nonstr-sub-1", "preferred_username": ["user@example.com"]},
  3303. private_pem=private_pem,
  3304. jwks_data=jwks_data,
  3305. issuer=issuer,
  3306. client_id=client_id,
  3307. )
  3308. assert "internal_error" not in location, f"Unexpected error redirect: {location}"
  3309. link_result = await db_session.execute(
  3310. select(UserOIDCLink)
  3311. .where(UserOIDCLink.provider_id == provider_id)
  3312. .where(UserOIDCLink.provider_user_id == "nonstr-sub-1")
  3313. )
  3314. link = link_result.scalar_one_or_none()
  3315. assert link is not None
  3316. assert link.provider_email is None # list value dropped
  3317. @pytest.mark.asyncio
  3318. @pytest.mark.integration
  3319. async def test_fall_a_sec3_normalisation(
  3320. self,
  3321. async_client: AsyncClient,
  3322. db_session: AsyncSession,
  3323. ):
  3324. """T3: Fall A — uppercase + whitespace in email claim must be normalised to lowercase."""
  3325. private_pem, jwks_data = _make_test_rsa_key()
  3326. issuer = "https://sec3a-test.example"
  3327. client_id = "sec3a-client"
  3328. admin_token = await _setup_and_login(async_client, "sec3a_adm", "Sec3a123!")
  3329. provider_id = await _create_provider_via_api(
  3330. async_client,
  3331. admin_token,
  3332. issuer,
  3333. client_id,
  3334. email_claim="email",
  3335. require_email_verified=True,
  3336. suffix="sec3a",
  3337. )
  3338. from sqlalchemy import select
  3339. from backend.app.models.oidc_provider import UserOIDCLink
  3340. location = await _run_oidc_callback(
  3341. async_client,
  3342. db_session,
  3343. provider_id=provider_id,
  3344. claims={"sub": "sec3a-sub-1", "email": " USER@EXAMPLE.COM ", "email_verified": True},
  3345. private_pem=private_pem,
  3346. jwks_data=jwks_data,
  3347. issuer=issuer,
  3348. client_id=client_id,
  3349. )
  3350. assert "internal_error" not in location
  3351. link_result = await db_session.execute(
  3352. select(UserOIDCLink)
  3353. .where(UserOIDCLink.provider_id == provider_id)
  3354. .where(UserOIDCLink.provider_user_id == "sec3a-sub-1")
  3355. )
  3356. link = link_result.scalar_one_or_none()
  3357. assert link is not None
  3358. assert link.provider_email == "user@example.com"
  3359. @pytest.mark.asyncio
  3360. @pytest.mark.integration
  3361. async def test_fall_b_sec3_normalisation(
  3362. self,
  3363. async_client: AsyncClient,
  3364. db_session: AsyncSession,
  3365. ):
  3366. """T3: Fall B — uppercase + whitespace in email claim must be normalised to lowercase."""
  3367. private_pem, jwks_data = _make_test_rsa_key()
  3368. issuer = "https://sec3b-test.example"
  3369. client_id = "sec3b-client"
  3370. admin_token = await _setup_and_login(async_client, "sec3b_adm", "Sec3b123!")
  3371. provider_id = await _create_provider_via_api(
  3372. async_client,
  3373. admin_token,
  3374. issuer,
  3375. client_id,
  3376. email_claim="email",
  3377. require_email_verified=False,
  3378. suffix="sec3b",
  3379. )
  3380. from sqlalchemy import select
  3381. from backend.app.models.oidc_provider import UserOIDCLink
  3382. location = await _run_oidc_callback(
  3383. async_client,
  3384. db_session,
  3385. provider_id=provider_id,
  3386. claims={"sub": "sec3b-sub-1", "email": " USER@EXAMPLE.COM "},
  3387. private_pem=private_pem,
  3388. jwks_data=jwks_data,
  3389. issuer=issuer,
  3390. client_id=client_id,
  3391. )
  3392. assert "internal_error" not in location
  3393. link_result = await db_session.execute(
  3394. select(UserOIDCLink)
  3395. .where(UserOIDCLink.provider_id == provider_id)
  3396. .where(UserOIDCLink.provider_user_id == "sec3b-sub-1")
  3397. )
  3398. link = link_result.scalar_one_or_none()
  3399. assert link is not None
  3400. assert link.provider_email == "user@example.com"
  3401. @pytest.mark.asyncio
  3402. @pytest.mark.integration
  3403. async def test_combined_state_guard_email_claim_inverse_order(self, async_client: AsyncClient):
  3404. """T4: Combined-State-Guard — set auto_link=True first, then switch email_claim to custom."""
  3405. admin_token = await _setup_and_login(async_client, "inv_ec_adm", "InvEc123!")
  3406. create_resp = await async_client.post(
  3407. "/api/v1/auth/oidc/providers",
  3408. json={
  3409. "name": "InvEcTest",
  3410. "issuer_url": "https://inv-ec.test",
  3411. "client_id": "inv-ec",
  3412. "client_secret": "sec",
  3413. "scopes": "openid email",
  3414. },
  3415. headers={"Authorization": f"Bearer {admin_token}"},
  3416. )
  3417. assert create_resp.status_code == 201
  3418. provider_id = create_resp.json()["id"]
  3419. # First: enable auto_link (safe — email_claim="email", require_ev=True)
  3420. upd1 = await async_client.put(
  3421. f"/api/v1/auth/oidc/providers/{provider_id}",
  3422. json={"auto_link_existing_accounts": True},
  3423. headers={"Authorization": f"Bearer {admin_token}"},
  3424. )
  3425. assert upd1.status_code == 200
  3426. # Second: switch email_claim to custom while auto_link is on → unsafe combined state
  3427. upd2 = await async_client.put(
  3428. f"/api/v1/auth/oidc/providers/{provider_id}",
  3429. json={"email_claim": "preferred_username"},
  3430. headers={"Authorization": f"Bearer {admin_token}"},
  3431. )
  3432. assert upd2.status_code == 422