test_mfa_api.py 190 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845
  1. """Integration tests for 2FA and OIDC API endpoints.
  2. Tests the full request/response cycle for:
  3. - GET /api/v1/auth/2fa/status
  4. - POST /api/v1/auth/2fa/totp/setup
  5. - POST /api/v1/auth/2fa/totp/enable
  6. - POST /api/v1/auth/2fa/totp/disable
  7. - POST /api/v1/auth/2fa/email/enable
  8. - POST /api/v1/auth/2fa/email/disable
  9. - POST /api/v1/auth/2fa/verify (TOTP, email, backup paths)
  10. - DELETE /api/v1/auth/2fa/admin/{user_id}
  11. - GET /api/v1/auth/oidc/providers
  12. - POST /api/v1/auth/oidc/providers
  13. - PATCH /api/v1/auth/oidc/providers/{id}
  14. - DELETE /api/v1/auth/oidc/providers/{id}
  15. """
  16. from __future__ import annotations
  17. import secrets
  18. import time
  19. from datetime import datetime, timedelta, timezone
  20. from unittest.mock import patch
  21. import jwt as pyjwt
  22. import pyotp
  23. import pytest
  24. from httpx import AsyncClient
  25. from passlib.context import CryptContext
  26. from sqlalchemy.ext.asyncio import AsyncSession
  27. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  28. from backend.app.models.user import User
  29. _pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  30. # ---------------------------------------------------------------------------
  31. # Fixtures / helpers
  32. # ---------------------------------------------------------------------------
  33. AUTH_SETUP_URL = "/api/v1/auth/setup"
  34. LOGIN_URL = "/api/v1/auth/login"
  35. def _norm_pw(password: str) -> str:
  36. """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
  37. if not any(c.isupper() for c in password):
  38. password = password[0].upper() + password[1:]
  39. if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
  40. password = password + "!"
  41. return password
  42. async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
  43. """Enable auth, create an admin user, login, and return the bearer token."""
  44. password = _norm_pw(password)
  45. await client.post(
  46. AUTH_SETUP_URL,
  47. json={
  48. "auth_enabled": True,
  49. "admin_username": username,
  50. "admin_password": password,
  51. },
  52. )
  53. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  54. assert resp.status_code == 200
  55. return resp.json()["access_token"]
  56. async def _login_get_pre_auth_token(client: AsyncClient, username: str, password: str) -> str:
  57. """Login a user who has 2FA enabled; return the pre_auth_token from the response."""
  58. password = _norm_pw(password)
  59. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  60. assert resp.status_code == 200
  61. data = resp.json()
  62. assert data["requires_2fa"] is True, f"Expected requires_2fa=True, got {data}"
  63. assert data["pre_auth_token"] is not None
  64. return data["pre_auth_token"]
  65. def _auth_header(token: str) -> dict[str, str]:
  66. return {"Authorization": f"Bearer {token}"}
  67. # ===========================================================================
  68. # 2FA Status
  69. # ===========================================================================
  70. class TestTwoFAStatus:
  71. """Tests for GET /api/v1/auth/2fa/status."""
  72. @pytest.mark.asyncio
  73. @pytest.mark.integration
  74. async def test_status_requires_auth(self, async_client: AsyncClient):
  75. response = await async_client.get("/api/v1/auth/2fa/status")
  76. assert response.status_code == 401
  77. @pytest.mark.asyncio
  78. @pytest.mark.integration
  79. async def test_status_default_disabled(self, async_client: AsyncClient):
  80. token = await _setup_and_login(async_client, "statususer", "statuspass123")
  81. response = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  82. assert response.status_code == 200
  83. data = response.json()
  84. assert data["totp_enabled"] is False
  85. assert data["email_otp_enabled"] is False
  86. assert data["backup_codes_remaining"] == 0
  87. # ===========================================================================
  88. # TOTP Setup
  89. # ===========================================================================
  90. class TestTOTPSetup:
  91. """Tests for POST /api/v1/auth/2fa/totp/setup."""
  92. @pytest.mark.asyncio
  93. @pytest.mark.integration
  94. async def test_setup_requires_auth(self, async_client: AsyncClient):
  95. response = await async_client.post("/api/v1/auth/2fa/totp/setup")
  96. assert response.status_code == 401
  97. @pytest.mark.asyncio
  98. @pytest.mark.integration
  99. async def test_setup_returns_secret_and_qr(self, async_client: AsyncClient):
  100. token = await _setup_and_login(async_client, "totpsetup", "totpsetup123")
  101. response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  102. assert response.status_code == 200
  103. data = response.json()
  104. assert "secret" in data
  105. assert len(data["secret"]) > 0
  106. assert "qr_code_b64" in data
  107. assert data["issuer"] == "Bambuddy"
  108. @pytest.mark.asyncio
  109. @pytest.mark.integration
  110. async def test_setup_secret_is_valid_base32(self, async_client: AsyncClient):
  111. token = await _setup_and_login(async_client, "totpbase32", "totpbase32pw")
  112. response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  113. assert response.status_code == 200
  114. secret = response.json()["secret"]
  115. # pyotp will raise on invalid base32
  116. totp = pyotp.TOTP(secret)
  117. assert len(totp.now()) == 6
  118. # ===========================================================================
  119. # TOTP Enable
  120. # ===========================================================================
  121. class TestTOTPEnable:
  122. """Tests for POST /api/v1/auth/2fa/totp/enable."""
  123. @pytest.mark.asyncio
  124. @pytest.mark.integration
  125. async def test_enable_without_setup_returns_400(self, async_client: AsyncClient):
  126. token = await _setup_and_login(async_client, "nosetupenable", "nosetupenable1")
  127. response = await async_client.post(
  128. "/api/v1/auth/2fa/totp/enable",
  129. json={"code": "123456"},
  130. headers=_auth_header(token),
  131. )
  132. assert response.status_code == 400
  133. @pytest.mark.asyncio
  134. @pytest.mark.integration
  135. async def test_enable_with_invalid_code_returns_400(self, async_client: AsyncClient):
  136. token = await _setup_and_login(async_client, "badcodeuser", "badcodeuser1")
  137. await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  138. response = await async_client.post(
  139. "/api/v1/auth/2fa/totp/enable",
  140. json={"code": "000000"},
  141. headers=_auth_header(token),
  142. )
  143. assert response.status_code == 400
  144. @pytest.mark.asyncio
  145. @pytest.mark.integration
  146. async def test_enable_with_valid_code_returns_backup_codes(self, async_client: AsyncClient):
  147. token = await _setup_and_login(async_client, "enableok", "enableok123")
  148. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  149. secret = setup_resp.json()["secret"]
  150. valid_code = pyotp.TOTP(secret).now()
  151. response = await async_client.post(
  152. "/api/v1/auth/2fa/totp/enable",
  153. json={"code": valid_code},
  154. headers=_auth_header(token),
  155. )
  156. assert response.status_code == 200
  157. data = response.json()
  158. assert "backup_codes" in data
  159. assert len(data["backup_codes"]) == 10
  160. for code in data["backup_codes"]:
  161. assert len(code) == 8
  162. @pytest.mark.asyncio
  163. @pytest.mark.integration
  164. async def test_status_reflects_enabled_totp(self, async_client: AsyncClient):
  165. token = await _setup_and_login(async_client, "statustotp", "statustotp1")
  166. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  167. secret = setup_resp.json()["secret"]
  168. valid_code = pyotp.TOTP(secret).now()
  169. await async_client.post(
  170. "/api/v1/auth/2fa/totp/enable",
  171. json={"code": valid_code},
  172. headers=_auth_header(token),
  173. )
  174. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  175. data = status_resp.json()
  176. assert data["totp_enabled"] is True
  177. assert data["backup_codes_remaining"] == 10
  178. # ===========================================================================
  179. # TOTP Disable
  180. # ===========================================================================
  181. class TestTOTPDisable:
  182. """Tests for POST /api/v1/auth/2fa/totp/disable."""
  183. @pytest.mark.asyncio
  184. @pytest.mark.integration
  185. async def test_disable_when_not_enabled_returns_400(self, async_client: AsyncClient):
  186. token = await _setup_and_login(async_client, "disablenoenab", "disablenoenab1")
  187. response = await async_client.post(
  188. "/api/v1/auth/2fa/totp/disable",
  189. json={"code": "123456"},
  190. headers=_auth_header(token),
  191. )
  192. assert response.status_code == 400
  193. @pytest.mark.asyncio
  194. @pytest.mark.integration
  195. async def test_disable_with_valid_code(self, async_client: AsyncClient):
  196. token = await _setup_and_login(async_client, "disableok", "disableok123")
  197. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  198. secret = setup_resp.json()["secret"]
  199. valid_code = pyotp.TOTP(secret).now()
  200. await async_client.post(
  201. "/api/v1/auth/2fa/totp/enable",
  202. json={"code": valid_code},
  203. headers=_auth_header(token),
  204. )
  205. # Disable with a fresh valid code
  206. disable_code = pyotp.TOTP(secret).now()
  207. response = await async_client.post(
  208. "/api/v1/auth/2fa/totp/disable",
  209. json={"code": disable_code},
  210. headers=_auth_header(token),
  211. )
  212. assert response.status_code == 200
  213. assert "disabled" in response.json()["message"].lower()
  214. # Status should now show disabled
  215. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  216. assert status_resp.json()["totp_enabled"] is False
  217. # ===========================================================================
  218. # Email OTP Enable/Disable
  219. # ===========================================================================
  220. class TestEmailOTP:
  221. """Tests for POST /api/v1/auth/2fa/email/enable, /enable/confirm and /disable."""
  222. @pytest.mark.asyncio
  223. @pytest.mark.integration
  224. async def test_enable_email_otp_without_email_returns_400(self, async_client: AsyncClient):
  225. """Users without an email address cannot enable email OTP."""
  226. token = await _setup_and_login(async_client, "noemailuser", "noemailuser1")
  227. response = await async_client.post("/api/v1/auth/2fa/email/enable", headers=_auth_header(token))
  228. assert response.status_code == 400
  229. assert "email" in response.json()["detail"].lower()
  230. @pytest.mark.asyncio
  231. @pytest.mark.integration
  232. async def test_confirm_enable_email_otp_happy_path(self, async_client: AsyncClient, db_session: AsyncSession):
  233. """Confirm step activates email OTP when setup_token + code are valid (C5)."""
  234. token = await _setup_and_login(async_client, "confirmenable", "confirmenable1")
  235. # Give user an email address directly (SMTP not available in tests)
  236. from sqlalchemy import select as sa_select
  237. result = await db_session.execute(sa_select(User).where(User.username == "confirmenable"))
  238. user = result.scalar_one()
  239. user.email = "confirmenable@example.com"
  240. await db_session.commit()
  241. # Inject a known setup token directly into the DB (bypasses SMTP)
  242. code = "123456"
  243. code_hash = _pwd_context.hash(code)
  244. setup_token = secrets.token_urlsafe(32)
  245. db_session.add(
  246. AuthEphemeralToken(
  247. token=setup_token,
  248. token_type="email_otp_setup",
  249. username="confirmenable",
  250. nonce=code_hash,
  251. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  252. )
  253. )
  254. await db_session.commit()
  255. resp = await async_client.post(
  256. "/api/v1/auth/2fa/email/enable/confirm",
  257. json={"setup_token": setup_token, "code": code},
  258. headers=_auth_header(token),
  259. )
  260. assert resp.status_code == 200
  261. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  262. assert status_resp.json()["email_otp_enabled"] is True
  263. @pytest.mark.asyncio
  264. @pytest.mark.integration
  265. async def test_confirm_enable_email_otp_wrong_code(self, async_client: AsyncClient, db_session: AsyncSession):
  266. """Wrong code on confirm step returns 400 and does not enable email OTP."""
  267. token = await _setup_and_login(async_client, "confirmwrong", "confirmwrong1")
  268. code_hash = _pwd_context.hash("654321")
  269. setup_token = secrets.token_urlsafe(32)
  270. db_session.add(
  271. AuthEphemeralToken(
  272. token=setup_token,
  273. token_type="email_otp_setup",
  274. username="confirmwrong",
  275. nonce=code_hash,
  276. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  277. )
  278. )
  279. await db_session.commit()
  280. resp = await async_client.post(
  281. "/api/v1/auth/2fa/email/enable/confirm",
  282. json={"setup_token": setup_token, "code": "000000"},
  283. headers=_auth_header(token),
  284. )
  285. assert resp.status_code == 400
  286. @pytest.mark.asyncio
  287. @pytest.mark.integration
  288. async def test_confirm_enable_email_otp_setup_token_is_single_use(
  289. self, async_client: AsyncClient, db_session: AsyncSession
  290. ):
  291. """Setup token is consumed on first use; replay returns 400."""
  292. token = await _setup_and_login(async_client, "confirmonce", "confirmonce1")
  293. code = "111111"
  294. code_hash = _pwd_context.hash(code)
  295. setup_token = secrets.token_urlsafe(32)
  296. db_session.add(
  297. AuthEphemeralToken(
  298. token=setup_token,
  299. token_type="email_otp_setup",
  300. username="confirmonce",
  301. nonce=code_hash,
  302. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  303. )
  304. )
  305. await db_session.commit()
  306. first = await async_client.post(
  307. "/api/v1/auth/2fa/email/enable/confirm",
  308. json={"setup_token": setup_token, "code": code},
  309. headers=_auth_header(token),
  310. )
  311. assert first.status_code == 200
  312. second = await async_client.post(
  313. "/api/v1/auth/2fa/email/enable/confirm",
  314. json={"setup_token": setup_token, "code": code},
  315. headers=_auth_header(token),
  316. )
  317. assert second.status_code == 400
  318. @pytest.mark.asyncio
  319. @pytest.mark.integration
  320. async def test_disable_email_otp_requires_password(self, async_client: AsyncClient):
  321. """Disabling email OTP requires the account password (C6: re-auth)."""
  322. token = await _setup_and_login(async_client, "disemailotp", "disemailotp1")
  323. # Wrong password → 401
  324. response = await async_client.post(
  325. "/api/v1/auth/2fa/email/disable",
  326. json={"password": "wrongpassword"},
  327. headers=_auth_header(token),
  328. )
  329. assert response.status_code == 401
  330. @pytest.mark.asyncio
  331. @pytest.mark.integration
  332. async def test_disable_email_otp_when_enabled(self, async_client: AsyncClient, db_session: AsyncSession):
  333. """Disabling email OTP when enabled turns it off and status reflects that."""
  334. token = await _setup_and_login(async_client, "disemailpw", "disemailpw1")
  335. # Enable email OTP via direct DB injection (no SMTP)
  336. code = "222222"
  337. setup_token = secrets.token_urlsafe(32)
  338. db_session.add(
  339. AuthEphemeralToken(
  340. token=setup_token,
  341. token_type="email_otp_setup",
  342. username="disemailpw",
  343. nonce=_pwd_context.hash(code),
  344. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  345. )
  346. )
  347. await db_session.commit()
  348. await async_client.post(
  349. "/api/v1/auth/2fa/email/enable/confirm",
  350. json={"setup_token": setup_token, "code": code},
  351. headers=_auth_header(token),
  352. )
  353. # Now disable
  354. response = await async_client.post(
  355. "/api/v1/auth/2fa/email/disable",
  356. json={"password": _norm_pw("disemailpw1")},
  357. headers=_auth_header(token),
  358. )
  359. assert response.status_code == 200
  360. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  361. assert status_resp.json()["email_otp_enabled"] is False
  362. # ===========================================================================
  363. # 2FA Verify — TOTP path
  364. # ===========================================================================
  365. class TestTwoFAVerifyTOTP:
  366. """Tests for POST /api/v1/auth/2fa/verify using the TOTP method."""
  367. @pytest.mark.asyncio
  368. @pytest.mark.integration
  369. async def test_verify_with_invalid_pre_auth_token(self, async_client: AsyncClient):
  370. response = await async_client.post(
  371. "/api/v1/auth/2fa/verify",
  372. json={"pre_auth_token": "bogus", "method": "totp", "code": "123456"},
  373. )
  374. assert response.status_code == 401
  375. @pytest.mark.asyncio
  376. @pytest.mark.integration
  377. async def test_verify_totp_issues_jwt(self, async_client: AsyncClient):
  378. """Full flow: setup → enable TOTP → login → pre_auth_token → verify → JWT."""
  379. token = await _setup_and_login(async_client, "verifytotpok", "verifytotpok1")
  380. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  381. secret = setup_resp.json()["secret"]
  382. valid_code = pyotp.TOTP(secret).now()
  383. await async_client.post(
  384. "/api/v1/auth/2fa/totp/enable",
  385. json={"code": valid_code},
  386. headers=_auth_header(token),
  387. )
  388. # Login now returns requires_2fa=True + pre_auth_token
  389. pre_auth_token = await _login_get_pre_auth_token(async_client, "verifytotpok", "verifytotpok1")
  390. verify_resp = await async_client.post(
  391. "/api/v1/auth/2fa/verify",
  392. json={
  393. "pre_auth_token": pre_auth_token,
  394. "method": "totp",
  395. "code": pyotp.TOTP(secret).now(),
  396. },
  397. )
  398. assert verify_resp.status_code == 200
  399. data = verify_resp.json()
  400. assert "access_token" in data
  401. assert data["token_type"] == "bearer"
  402. assert data["user"]["username"] == "verifytotpok"
  403. @pytest.mark.asyncio
  404. @pytest.mark.integration
  405. async def test_verify_totp_invalid_code(self, async_client: AsyncClient):
  406. token = await _setup_and_login(async_client, "verifybadcode", "verifybadcode1")
  407. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  408. secret = setup_resp.json()["secret"]
  409. valid_code = pyotp.TOTP(secret).now()
  410. await async_client.post(
  411. "/api/v1/auth/2fa/totp/enable",
  412. json={"code": valid_code},
  413. headers=_auth_header(token),
  414. )
  415. pre_auth_token = await _login_get_pre_auth_token(async_client, "verifybadcode", "verifybadcode1")
  416. verify_resp = await async_client.post(
  417. "/api/v1/auth/2fa/verify",
  418. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  419. )
  420. assert verify_resp.status_code == 401
  421. @pytest.mark.asyncio
  422. @pytest.mark.integration
  423. async def test_verify_invalid_method(self, async_client: AsyncClient):
  424. """An invalid 2FA method should return 400 even with a valid pre_auth_token."""
  425. token = await _setup_and_login(async_client, "invalidmethod", "invalidmethod1")
  426. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  427. secret = setup_resp.json()["secret"]
  428. valid_code = pyotp.TOTP(secret).now()
  429. await async_client.post(
  430. "/api/v1/auth/2fa/totp/enable",
  431. json={"code": valid_code},
  432. headers=_auth_header(token),
  433. )
  434. pre_auth_token = await _login_get_pre_auth_token(async_client, "invalidmethod", "invalidmethod1")
  435. response = await async_client.post(
  436. "/api/v1/auth/2fa/verify",
  437. json={"pre_auth_token": pre_auth_token, "method": "sms", "code": "123456"},
  438. )
  439. assert response.status_code == 422 # Pydantic Literal validation
  440. # ===========================================================================
  441. # 2FA Verify — Backup code path
  442. # ===========================================================================
  443. class TestTwoFAVerifyBackup:
  444. """Tests for POST /api/v1/auth/2fa/verify using the backup method."""
  445. @pytest.mark.asyncio
  446. @pytest.mark.integration
  447. async def test_verify_with_backup_code(self, async_client: AsyncClient):
  448. token = await _setup_and_login(async_client, "backupcodeok", "backupcodeok1")
  449. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  450. secret = setup_resp.json()["secret"]
  451. valid_code = pyotp.TOTP(secret).now()
  452. enable_resp = await async_client.post(
  453. "/api/v1/auth/2fa/totp/enable",
  454. json={"code": valid_code},
  455. headers=_auth_header(token),
  456. )
  457. backup_code = enable_resp.json()["backup_codes"][0]
  458. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcodeok", "backupcodeok1")
  459. verify_resp = await async_client.post(
  460. "/api/v1/auth/2fa/verify",
  461. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  462. )
  463. assert verify_resp.status_code == 200
  464. assert "access_token" in verify_resp.json()
  465. @pytest.mark.asyncio
  466. @pytest.mark.integration
  467. async def test_backup_code_is_single_use(self, async_client: AsyncClient):
  468. token = await _setup_and_login(async_client, "backupsingle", "backupsingle1")
  469. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  470. secret = setup_resp.json()["secret"]
  471. valid_code = pyotp.TOTP(secret).now()
  472. enable_resp = await async_client.post(
  473. "/api/v1/auth/2fa/totp/enable",
  474. json={"code": valid_code},
  475. headers=_auth_header(token),
  476. )
  477. backup_code = enable_resp.json()["backup_codes"][0]
  478. # First use — should succeed
  479. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
  480. first_resp = await async_client.post(
  481. "/api/v1/auth/2fa/verify",
  482. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  483. )
  484. assert first_resp.status_code == 200
  485. # Second use of the same code — must fail (need new pre_auth_token + same backup code)
  486. pre_auth_token2 = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
  487. second_resp = await async_client.post(
  488. "/api/v1/auth/2fa/verify",
  489. json={"pre_auth_token": pre_auth_token2, "method": "backup", "code": backup_code},
  490. )
  491. assert second_resp.status_code == 401
  492. @pytest.mark.asyncio
  493. @pytest.mark.integration
  494. async def test_backup_code_count_decrements(self, async_client: AsyncClient):
  495. token = await _setup_and_login(async_client, "backupcount", "backupcount1")
  496. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  497. secret = setup_resp.json()["secret"]
  498. valid_code = pyotp.TOTP(secret).now()
  499. enable_resp = await async_client.post(
  500. "/api/v1/auth/2fa/totp/enable",
  501. json={"code": valid_code},
  502. headers=_auth_header(token),
  503. )
  504. backup_code = enable_resp.json()["backup_codes"][0]
  505. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcount", "backupcount1")
  506. await async_client.post(
  507. "/api/v1/auth/2fa/verify",
  508. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  509. )
  510. # Status is readable with the original full token (still valid)
  511. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  512. assert status_resp.json()["backup_codes_remaining"] == 9
  513. # ===========================================================================
  514. # Rate Limiting
  515. # ===========================================================================
  516. class TestRateLimiting:
  517. """Ensure 429 is returned after 5 failed 2FA attempts."""
  518. @pytest.mark.asyncio
  519. @pytest.mark.integration
  520. async def test_rate_limit_lockout(self, async_client: AsyncClient):
  521. """After 5 failed TOTP attempts the 6th must return 429."""
  522. token = await _setup_and_login(async_client, "ratelimituser", "ratelimituser1")
  523. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  524. secret = setup_resp.json()["secret"]
  525. valid_code = pyotp.TOTP(secret).now()
  526. await async_client.post(
  527. "/api/v1/auth/2fa/totp/enable",
  528. json={"code": valid_code},
  529. headers=_auth_header(token),
  530. )
  531. # 5 failed attempts via the login → pre_auth_token → verify flow
  532. for _ in range(5):
  533. pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
  534. await async_client.post(
  535. "/api/v1/auth/2fa/verify",
  536. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  537. )
  538. # 6th attempt should hit the rate limit
  539. pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
  540. response = await async_client.post(
  541. "/api/v1/auth/2fa/verify",
  542. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  543. )
  544. assert response.status_code == 429
  545. # ===========================================================================
  546. # Admin 2FA Disable
  547. # ===========================================================================
  548. class TestAdminDisable2FA:
  549. """Tests for DELETE /api/v1/auth/2fa/admin/{user_id}."""
  550. @pytest.mark.asyncio
  551. @pytest.mark.integration
  552. async def test_admin_disable_requires_admin(self, async_client: AsyncClient):
  553. """Only admins can use the admin disable endpoint."""
  554. # The only user in a fresh setup IS admin, so just check the 404 path
  555. token = await _setup_and_login(async_client, "admincheck", "admincheck123")
  556. # Try to disable for a non-existent user_id — should get 200 (no-op) or 404
  557. response = await async_client.request(
  558. "DELETE",
  559. "/api/v1/auth/2fa/admin/99999",
  560. json={"admin_password": _norm_pw("admincheck123")},
  561. headers=_auth_header(token),
  562. )
  563. # Admin users succeed regardless (returns 200 even if user doesn't exist)
  564. assert response.status_code == 200
  565. @pytest.mark.asyncio
  566. @pytest.mark.integration
  567. async def test_admin_disable_clears_totp(self, async_client: AsyncClient):
  568. from sqlalchemy import select
  569. from backend.app.models.user import User
  570. token = await _setup_and_login(async_client, "admintotp", "admintotp123")
  571. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  572. secret = setup_resp.json()["secret"]
  573. valid_code = pyotp.TOTP(secret).now()
  574. await async_client.post(
  575. "/api/v1/auth/2fa/totp/enable",
  576. json={"code": valid_code},
  577. headers=_auth_header(token),
  578. )
  579. # Find the user's id by querying status (which works with the token)
  580. me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
  581. user_id = me_resp.json()["id"]
  582. response = await async_client.request(
  583. "DELETE",
  584. f"/api/v1/auth/2fa/admin/{user_id}",
  585. json={"admin_password": _norm_pw("admintotp123")},
  586. headers=_auth_header(token),
  587. )
  588. assert response.status_code == 200
  589. # I2: admin_disable_2fa bumps password_changed_at, invalidating the old token.
  590. # Re-login to get a fresh token before checking status.
  591. new_login = await async_client.post(
  592. LOGIN_URL, json={"username": "admintotp", "password": _norm_pw("admintotp123")}
  593. )
  594. assert new_login.status_code == 200, f"re-login failed: {new_login.json()}"
  595. assert new_login.json().get("requires_2fa") is False, f"still requires 2FA: {new_login.json()}"
  596. new_token = new_login.json()["access_token"]
  597. assert new_token is not None, f"no access_token in: {new_login.json()}"
  598. # Status should now show TOTP disabled
  599. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(new_token))
  600. assert status_resp.status_code == 200, f"status check failed: {status_resp.json()}"
  601. assert status_resp.json()["totp_enabled"] is False
  602. # ===========================================================================
  603. # OIDC Provider CRUD
  604. # ===========================================================================
  605. class TestOIDCProviders:
  606. """Tests for OIDC provider management endpoints."""
  607. @pytest.mark.asyncio
  608. @pytest.mark.integration
  609. async def test_list_public_providers_empty(self, async_client: AsyncClient):
  610. response = await async_client.get("/api/v1/auth/oidc/providers")
  611. assert response.status_code == 200
  612. assert isinstance(response.json(), list)
  613. @pytest.mark.asyncio
  614. @pytest.mark.integration
  615. async def test_create_provider_requires_admin(self, async_client: AsyncClient):
  616. token = await _setup_and_login(async_client, "oidcadmincreate", "oidcadmincreate1")
  617. response = await async_client.post(
  618. "/api/v1/auth/oidc/providers",
  619. json={
  620. "name": "PocketID",
  621. "issuer_url": "https://auth.example.com",
  622. "client_id": "bambuddy",
  623. "client_secret": "supersecret",
  624. "scopes": "openid email profile",
  625. "is_enabled": True,
  626. "auto_create_users": False,
  627. },
  628. headers=_auth_header(token),
  629. )
  630. assert response.status_code == 201
  631. data = response.json()
  632. assert data["name"] == "PocketID"
  633. assert data["issuer_url"] == "https://auth.example.com"
  634. assert "client_secret" not in data # Secret must not be returned
  635. @pytest.mark.asyncio
  636. @pytest.mark.integration
  637. async def test_created_provider_appears_in_all_list(self, async_client: AsyncClient):
  638. token = await _setup_and_login(async_client, "oidclistall", "oidclistall123")
  639. await async_client.post(
  640. "/api/v1/auth/oidc/providers",
  641. json={
  642. "name": "TestProvider",
  643. "issuer_url": "https://test.example.com",
  644. "client_id": "testclient",
  645. "client_secret": "testsecret",
  646. "scopes": "openid",
  647. "is_enabled": True,
  648. "auto_create_users": False,
  649. },
  650. headers=_auth_header(token),
  651. )
  652. response = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
  653. assert response.status_code == 200
  654. names = [p["name"] for p in response.json()]
  655. assert "TestProvider" in names
  656. @pytest.mark.asyncio
  657. @pytest.mark.integration
  658. async def test_disabled_provider_not_in_public_list(self, async_client: AsyncClient):
  659. token = await _setup_and_login(async_client, "oidcdisabled", "oidcdisabled1")
  660. await async_client.post(
  661. "/api/v1/auth/oidc/providers",
  662. json={
  663. "name": "DisabledProvider",
  664. "issuer_url": "https://disabled.example.com",
  665. "client_id": "dc",
  666. "client_secret": "ds",
  667. "scopes": "openid",
  668. "is_enabled": False,
  669. "auto_create_users": False,
  670. },
  671. headers=_auth_header(token),
  672. )
  673. response = await async_client.get("/api/v1/auth/oidc/providers")
  674. names = [p["name"] for p in response.json()]
  675. assert "DisabledProvider" not in names
  676. @pytest.mark.asyncio
  677. @pytest.mark.integration
  678. async def test_update_provider(self, async_client: AsyncClient):
  679. token = await _setup_and_login(async_client, "oidcupdate", "oidcupdate123")
  680. create_resp = await async_client.post(
  681. "/api/v1/auth/oidc/providers",
  682. json={
  683. "name": "OldName",
  684. "issuer_url": "https://update.example.com",
  685. "client_id": "uc",
  686. "client_secret": "us",
  687. "scopes": "openid",
  688. "is_enabled": True,
  689. "auto_create_users": False,
  690. },
  691. headers=_auth_header(token),
  692. )
  693. provider_id = create_resp.json()["id"]
  694. put_resp = await async_client.put(
  695. f"/api/v1/auth/oidc/providers/{provider_id}",
  696. json={"name": "NewName"},
  697. headers=_auth_header(token),
  698. )
  699. assert put_resp.status_code == 200
  700. assert put_resp.json()["name"] == "NewName"
  701. @pytest.mark.asyncio
  702. @pytest.mark.integration
  703. async def test_delete_provider(self, async_client: AsyncClient):
  704. token = await _setup_and_login(async_client, "oidcdelete", "oidcdelete123")
  705. create_resp = await async_client.post(
  706. "/api/v1/auth/oidc/providers",
  707. json={
  708. "name": "ToDelete",
  709. "issuer_url": "https://delete.example.com",
  710. "client_id": "dc",
  711. "client_secret": "ds",
  712. "scopes": "openid",
  713. "is_enabled": True,
  714. "auto_create_users": False,
  715. },
  716. headers=_auth_header(token),
  717. )
  718. provider_id = create_resp.json()["id"]
  719. del_resp = await async_client.delete(
  720. f"/api/v1/auth/oidc/providers/{provider_id}",
  721. headers=_auth_header(token),
  722. )
  723. assert del_resp.status_code == 200
  724. # No longer in list
  725. all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
  726. ids = [p["id"] for p in all_resp.json()]
  727. assert provider_id not in ids
  728. @pytest.mark.asyncio
  729. @pytest.mark.integration
  730. async def test_update_nonexistent_provider_returns_404(self, async_client: AsyncClient):
  731. token = await _setup_and_login(async_client, "oidc404", "oidc404pass1")
  732. response = await async_client.put(
  733. "/api/v1/auth/oidc/providers/99999",
  734. json={"name": "ghost"},
  735. headers=_auth_header(token),
  736. )
  737. assert response.status_code == 404
  738. @pytest.mark.asyncio
  739. @pytest.mark.integration
  740. async def test_create_provider_with_default_group_id(self, async_client: AsyncClient, db_session: AsyncSession):
  741. """Creating a provider with a valid default_group_id stores and returns the value."""
  742. from sqlalchemy import select
  743. from backend.app.models.group import Group
  744. token = await _setup_and_login(async_client, "oidcdg_create", "OidcDgCreate1!")
  745. grp_result = await db_session.execute(select(Group).where(Group.name == "Operators"))
  746. operators = grp_result.scalar_one()
  747. resp = await async_client.post(
  748. "/api/v1/auth/oidc/providers",
  749. json={
  750. "name": "DgCreateProvider",
  751. "issuer_url": "https://dgcreate.example.com",
  752. "client_id": "dgcreate-client",
  753. "client_secret": "secret",
  754. "scopes": "openid",
  755. "is_enabled": True,
  756. "auto_create_users": False,
  757. "default_group_id": operators.id,
  758. },
  759. headers=_auth_header(token),
  760. )
  761. assert resp.status_code == 201, resp.text
  762. assert resp.json()["default_group_id"] == operators.id
  763. @pytest.mark.asyncio
  764. @pytest.mark.integration
  765. async def test_create_provider_invalid_default_group_id_returns_422(self, async_client: AsyncClient):
  766. """A default_group_id referencing a non-existent group returns 422."""
  767. token = await _setup_and_login(async_client, "oidcdg_bad", "OidcDgBad1!")
  768. resp = await async_client.post(
  769. "/api/v1/auth/oidc/providers",
  770. json={
  771. "name": "DgBadProvider",
  772. "issuer_url": "https://dgbad.example.com",
  773. "client_id": "dgbad-client",
  774. "client_secret": "secret",
  775. "scopes": "openid",
  776. "is_enabled": True,
  777. "auto_create_users": False,
  778. "default_group_id": 999999,
  779. },
  780. headers=_auth_header(token),
  781. )
  782. assert resp.status_code == 422
  783. @pytest.mark.asyncio
  784. @pytest.mark.integration
  785. async def test_create_provider_omit_default_group_id_stores_null(self, async_client: AsyncClient):
  786. """Omitting default_group_id results in null in the response."""
  787. token = await _setup_and_login(async_client, "oidcdg_null", "OidcDgNull1!")
  788. resp = await async_client.post(
  789. "/api/v1/auth/oidc/providers",
  790. json={
  791. "name": "DgNullProvider",
  792. "issuer_url": "https://dgnull.example.com",
  793. "client_id": "dgnull-client",
  794. "client_secret": "secret",
  795. "scopes": "openid",
  796. "is_enabled": True,
  797. "auto_create_users": False,
  798. },
  799. headers=_auth_header(token),
  800. )
  801. assert resp.status_code == 201, resp.text
  802. assert resp.json()["default_group_id"] is None
  803. @pytest.mark.asyncio
  804. @pytest.mark.integration
  805. async def test_update_provider_default_group_id(self, async_client: AsyncClient, db_session: AsyncSession):
  806. """Updating default_group_id via PUT stores the new value."""
  807. from sqlalchemy import select
  808. from backend.app.models.group import Group
  809. token = await _setup_and_login(async_client, "oidcdg_update", "OidcDgUpdate1!")
  810. create_resp = await async_client.post(
  811. "/api/v1/auth/oidc/providers",
  812. json={
  813. "name": "DgUpdateProvider",
  814. "issuer_url": "https://dgupdate.example.com",
  815. "client_id": "dgupdate-client",
  816. "client_secret": "secret",
  817. "scopes": "openid",
  818. "is_enabled": True,
  819. "auto_create_users": False,
  820. },
  821. headers=_auth_header(token),
  822. )
  823. provider_id = create_resp.json()["id"]
  824. assert create_resp.json()["default_group_id"] is None
  825. grp_result = await db_session.execute(select(Group).where(Group.name == "Operators"))
  826. operators = grp_result.scalar_one()
  827. put_resp = await async_client.put(
  828. f"/api/v1/auth/oidc/providers/{provider_id}",
  829. json={"default_group_id": operators.id},
  830. headers=_auth_header(token),
  831. )
  832. assert put_resp.status_code == 200, put_resp.text
  833. assert put_resp.json()["default_group_id"] == operators.id
  834. @pytest.mark.asyncio
  835. @pytest.mark.integration
  836. async def test_default_group_id_in_public_and_admin_list(self, async_client: AsyncClient, db_session: AsyncSession):
  837. """default_group_id appears in both the public and admin list responses."""
  838. from sqlalchemy import select
  839. from backend.app.models.group import Group
  840. token = await _setup_and_login(async_client, "oidcdg_list", "OidcDgList1!")
  841. grp_result = await db_session.execute(select(Group).where(Group.name == "Operators"))
  842. operators = grp_result.scalar_one()
  843. create_resp = await async_client.post(
  844. "/api/v1/auth/oidc/providers",
  845. json={
  846. "name": "DgListProvider",
  847. "issuer_url": "https://dglist.example.com",
  848. "client_id": "dglist-client",
  849. "client_secret": "secret",
  850. "scopes": "openid",
  851. "is_enabled": True,
  852. "auto_create_users": False,
  853. "default_group_id": operators.id,
  854. },
  855. headers=_auth_header(token),
  856. )
  857. provider_id = create_resp.json()["id"]
  858. all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
  859. match = next((p for p in all_resp.json() if p["id"] == provider_id), None)
  860. assert match is not None
  861. assert match["default_group_id"] == operators.id
  862. pub_resp = await async_client.get("/api/v1/auth/oidc/providers")
  863. pub_match = next((p for p in pub_resp.json() if p["id"] == provider_id), None)
  864. assert pub_match is not None
  865. assert pub_match["default_group_id"] == operators.id
  866. # ===========================================================================
  867. # Security: pre-auth token single-use
  868. # ===========================================================================
  869. class TestPreAuthTokenSingleUse:
  870. """pre_auth_token must be consumed on successful 2FA and cannot be reused."""
  871. @pytest.mark.asyncio
  872. @pytest.mark.integration
  873. async def test_pre_auth_token_is_single_use(self, async_client: AsyncClient):
  874. """A pre_auth_token that was successfully used cannot be reused."""
  875. token = await _setup_and_login(async_client, "singleusepat", "singleusepat1")
  876. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  877. secret = setup_resp.json()["secret"]
  878. valid_code = pyotp.TOTP(secret).now()
  879. await async_client.post(
  880. "/api/v1/auth/2fa/totp/enable",
  881. json={"code": valid_code},
  882. headers=_auth_header(token),
  883. )
  884. pre_auth_token = await _login_get_pre_auth_token(async_client, "singleusepat", "singleusepat1")
  885. # First use — succeeds
  886. first = await async_client.post(
  887. "/api/v1/auth/2fa/verify",
  888. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  889. )
  890. assert first.status_code == 200
  891. # Second use of the same token — must fail (token already consumed on success)
  892. second = await async_client.post(
  893. "/api/v1/auth/2fa/verify",
  894. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  895. )
  896. assert second.status_code == 401
  897. @pytest.mark.asyncio
  898. @pytest.mark.integration
  899. async def test_pre_auth_token_survives_wrong_code(self, async_client: AsyncClient):
  900. """A wrong 2FA code must NOT burn the pre_auth_token (user can retry)."""
  901. token = await _setup_and_login(async_client, "survivepatuser", "survivepatuser1")
  902. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  903. secret = setup_resp.json()["secret"]
  904. valid_code = pyotp.TOTP(secret).now()
  905. await async_client.post(
  906. "/api/v1/auth/2fa/totp/enable",
  907. json={"code": valid_code},
  908. headers=_auth_header(token),
  909. )
  910. pre_auth_token = await _login_get_pre_auth_token(async_client, "survivepatuser", "survivepatuser1")
  911. # Wrong code — should fail but not burn the token
  912. bad = await async_client.post(
  913. "/api/v1/auth/2fa/verify",
  914. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  915. )
  916. assert bad.status_code == 401
  917. # Same token, correct code — should succeed (token still valid)
  918. good = await async_client.post(
  919. "/api/v1/auth/2fa/verify",
  920. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  921. )
  922. assert good.status_code == 200
  923. # ===========================================================================
  924. # Security: cross-user token isolation
  925. # ===========================================================================
  926. class TestCrossUserTokenIsolation:
  927. """A pre_auth_token issued for user A cannot authenticate as user B."""
  928. @pytest.mark.asyncio
  929. @pytest.mark.integration
  930. async def test_token_cannot_be_used_for_different_user(self, async_client: AsyncClient):
  931. """pre_auth_token is bound to the issuing user; using it to verify a different
  932. user's TOTP code must fail."""
  933. # Set up two users with TOTP
  934. token_a = await _setup_and_login(async_client, "crossusera", "crossusera1")
  935. setup_a = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token_a))
  936. secret_a = setup_a.json()["secret"]
  937. await async_client.post(
  938. "/api/v1/auth/2fa/totp/enable",
  939. json={"code": pyotp.TOTP(secret_a).now()},
  940. headers=_auth_header(token_a),
  941. )
  942. # Get pre_auth_token for user A
  943. pre_auth_a = await _login_get_pre_auth_token(async_client, "crossusera", "crossusera1")
  944. # Try to use user A's token but supply a clearly invalid code — must fail
  945. resp = await async_client.post(
  946. "/api/v1/auth/2fa/verify",
  947. json={"pre_auth_token": pre_auth_a, "method": "totp", "code": "000000"},
  948. )
  949. assert resp.status_code == 401
  950. # ===========================================================================
  951. # Security: admin disable non-admin rejection
  952. # ===========================================================================
  953. class TestAdminDisableNonAdminRejection:
  954. """Non-admin users must be rejected from the admin disable endpoint."""
  955. @pytest.mark.asyncio
  956. @pytest.mark.integration
  957. async def test_non_admin_cannot_disable_2fa(self, async_client: AsyncClient):
  958. """A regular (non-admin) user must receive 403 from DELETE /auth/2fa/admin/{id}."""
  959. # Set up admin, then create a regular user
  960. admin_token = await _setup_and_login(async_client, "adminusr2fa", "adminusr2fa1")
  961. # Create a regular user via user management
  962. create_resp = await async_client.post(
  963. "/api/v1/users",
  964. json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
  965. headers=_auth_header(admin_token),
  966. )
  967. assert create_resp.status_code == 201
  968. # Login as regular user
  969. login_resp = await async_client.post(
  970. LOGIN_URL,
  971. json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
  972. )
  973. regular_token = login_resp.json()["access_token"]
  974. # Try to call admin endpoint with the regular user's token
  975. resp = await async_client.delete(
  976. f"/api/v1/auth/2fa/admin/{create_resp.json()['id']}",
  977. headers=_auth_header(regular_token),
  978. )
  979. assert resp.status_code == 403
  980. # ===========================================================================
  981. # Regenerate backup codes
  982. # ===========================================================================
  983. class TestRegenerateBackupCodes:
  984. """Tests for POST /api/v1/auth/2fa/totp/regenerate-backup-codes."""
  985. @pytest.mark.asyncio
  986. @pytest.mark.integration
  987. async def test_regenerate_requires_totp_enabled(self, async_client: AsyncClient):
  988. token = await _setup_and_login(async_client, "regennototp", "regennototp1")
  989. resp = await async_client.post(
  990. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  991. json={"code": "123456"},
  992. headers=_auth_header(token),
  993. )
  994. assert resp.status_code == 400
  995. @pytest.mark.asyncio
  996. @pytest.mark.integration
  997. async def test_regenerate_invalidates_old_codes(self, async_client: AsyncClient):
  998. """After regenerating, old backup codes must no longer work."""
  999. token = await _setup_and_login(async_client, "regeninval", "regeninval1")
  1000. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  1001. secret = setup_resp.json()["secret"]
  1002. enable_resp = await async_client.post(
  1003. "/api/v1/auth/2fa/totp/enable",
  1004. json={"code": pyotp.TOTP(secret).now()},
  1005. headers=_auth_header(token),
  1006. )
  1007. old_backup = enable_resp.json()["backup_codes"][0]
  1008. # Regenerate backup codes
  1009. regen_resp = await async_client.post(
  1010. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  1011. json={"code": pyotp.TOTP(secret).now()},
  1012. headers=_auth_header(token),
  1013. )
  1014. assert regen_resp.status_code == 200
  1015. new_codes = regen_resp.json()["backup_codes"]
  1016. assert len(new_codes) == 10
  1017. assert old_backup not in new_codes
  1018. # Old backup code must now fail
  1019. pre_auth_token = await _login_get_pre_auth_token(async_client, "regeninval", "regeninval1")
  1020. fail_resp = await async_client.post(
  1021. "/api/v1/auth/2fa/verify",
  1022. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": old_backup},
  1023. )
  1024. assert fail_resp.status_code == 401
  1025. @pytest.mark.asyncio
  1026. @pytest.mark.integration
  1027. async def test_regenerate_with_invalid_code_fails(self, async_client: AsyncClient):
  1028. token = await _setup_and_login(async_client, "regeninvcode", "regeninvcode1")
  1029. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  1030. secret = setup_resp.json()["secret"]
  1031. await async_client.post(
  1032. "/api/v1/auth/2fa/totp/enable",
  1033. json={"code": pyotp.TOTP(secret).now()},
  1034. headers=_auth_header(token),
  1035. )
  1036. resp = await async_client.post(
  1037. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  1038. json={"code": "000000"},
  1039. headers=_auth_header(token),
  1040. )
  1041. assert resp.status_code == 400
  1042. # ===========================================================================
  1043. # Security: method field validation
  1044. # ===========================================================================
  1045. class TestVerifyMethodValidation:
  1046. """The method field must be one of totp/email/backup (Pydantic Literal)."""
  1047. @pytest.mark.asyncio
  1048. @pytest.mark.integration
  1049. async def test_invalid_method_rejected_by_schema(self, async_client: AsyncClient):
  1050. """Pydantic should reject unknown method values with 422."""
  1051. resp = await async_client.post(
  1052. "/api/v1/auth/2fa/verify",
  1053. json={"pre_auth_token": "anytoken", "code": "123456", "method": "sms"},
  1054. )
  1055. assert resp.status_code == 422
  1056. @pytest.mark.asyncio
  1057. @pytest.mark.integration
  1058. async def test_oversized_pre_auth_token_rejected(self, async_client: AsyncClient):
  1059. """pre_auth_token exceeding max_length=128 should be rejected with 422."""
  1060. resp = await async_client.post(
  1061. "/api/v1/auth/2fa/verify",
  1062. json={"pre_auth_token": "x" * 200, "code": "123456", "method": "totp"},
  1063. )
  1064. assert resp.status_code == 422
  1065. # ===========================================================================
  1066. # Login response shape for 2FA users
  1067. # ===========================================================================
  1068. class TestLoginResponseShape:
  1069. """Login for a 2FA-enabled user must return requires_2fa+pre_auth_token
  1070. and must NOT include access_token (which would bypass the 2FA gate)."""
  1071. @pytest.mark.asyncio
  1072. @pytest.mark.integration
  1073. async def test_login_2fa_user_omits_access_token(self, async_client: AsyncClient):
  1074. """A user with TOTP enabled must not receive an access_token on /auth/login."""
  1075. token = await _setup_and_login(async_client, "loginshape", "loginshape1")
  1076. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  1077. secret = setup_resp.json()["secret"]
  1078. await async_client.post(
  1079. "/api/v1/auth/2fa/totp/enable",
  1080. json={"code": pyotp.TOTP(secret).now()},
  1081. headers=_auth_header(token),
  1082. )
  1083. login_resp = await async_client.post(LOGIN_URL, json={"username": "loginshape", "password": "Loginshape1!"})
  1084. assert login_resp.status_code == 200
  1085. data = login_resp.json()
  1086. assert data.get("requires_2fa") is True
  1087. assert data.get("pre_auth_token") is not None
  1088. # access_token must NOT be present — it would bypass the 2FA gate
  1089. assert "access_token" not in data or data["access_token"] is None
  1090. # ===========================================================================
  1091. # TOTP replay protection
  1092. # ===========================================================================
  1093. async def _setup_totp_user(client: AsyncClient, username: str, password: str) -> tuple[str, str]:
  1094. """Create user, set up and enable TOTP; return (bearer_token, totp_secret)."""
  1095. token = await _setup_and_login(client, username, password)
  1096. setup_resp = await client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  1097. secret = setup_resp.json()["secret"]
  1098. await client.post(
  1099. "/api/v1/auth/2fa/totp/enable",
  1100. json={"code": pyotp.TOTP(secret).now()},
  1101. headers=_auth_header(token),
  1102. )
  1103. return token, secret
  1104. class TestTOTPReplay:
  1105. """The same TOTP code must not be accepted twice within one 30-second window."""
  1106. @pytest.mark.asyncio
  1107. @pytest.mark.integration
  1108. async def test_totp_replay_rejected_on_verify(self, async_client: AsyncClient):
  1109. """Replaying the same code on /2fa/verify must return 400."""
  1110. _token, secret = await _setup_totp_user(async_client, "replayverify", "replayverify1")
  1111. code = pyotp.TOTP(secret).now()
  1112. pre_auth = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
  1113. first = await async_client.post(
  1114. "/api/v1/auth/2fa/verify",
  1115. json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
  1116. )
  1117. assert first.status_code == 200
  1118. # Second login to get a fresh pre_auth_token (first was consumed)
  1119. pre_auth2 = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
  1120. second = await async_client.post(
  1121. "/api/v1/auth/2fa/verify",
  1122. json={"pre_auth_token": pre_auth2, "method": "totp", "code": code},
  1123. )
  1124. assert second.status_code == 400
  1125. @pytest.mark.asyncio
  1126. @pytest.mark.integration
  1127. async def test_totp_replay_rejected_on_disable(self, async_client: AsyncClient):
  1128. """A code already used in verify_2fa must be rejected on /2fa/totp/disable."""
  1129. _setup_token, secret = await _setup_totp_user(async_client, "replaydisable", "replaydisable1")
  1130. code = pyotp.TOTP(secret).now()
  1131. # Use the code in verify_2fa — this sets last_totp_counter in DB
  1132. pre_auth = await _login_get_pre_auth_token(async_client, "replaydisable", "replaydisable1")
  1133. verify_resp = await async_client.post(
  1134. "/api/v1/auth/2fa/verify",
  1135. json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
  1136. )
  1137. assert verify_resp.status_code == 200
  1138. authed_token = verify_resp.json()["access_token"]
  1139. # Replay the same code on disable — must be rejected (same 30-second window)
  1140. disable_resp = await async_client.post(
  1141. "/api/v1/auth/2fa/totp/disable",
  1142. json={"code": code},
  1143. headers=_auth_header(authed_token),
  1144. )
  1145. assert disable_resp.status_code == 400
  1146. # ===========================================================================
  1147. # Rate limiting on disable_totp and regenerate_backup_codes (I10)
  1148. # ===========================================================================
  1149. class TestRateLimitingDisableRegenerate:
  1150. """disable_totp and regenerate_backup_codes must enforce rate limiting (I10)."""
  1151. @pytest.mark.asyncio
  1152. @pytest.mark.integration
  1153. async def test_disable_totp_rate_limited_after_failures(self, async_client: AsyncClient):
  1154. """Repeated wrong codes on /2fa/totp/disable trigger 429."""
  1155. token, _secret = await _setup_totp_user(async_client, "rldisable", "rldisable1")
  1156. for _ in range(5):
  1157. await async_client.post(
  1158. "/api/v1/auth/2fa/totp/disable",
  1159. json={"code": "000000"},
  1160. headers=_auth_header(token),
  1161. )
  1162. resp = await async_client.post(
  1163. "/api/v1/auth/2fa/totp/disable",
  1164. json={"code": "000000"},
  1165. headers=_auth_header(token),
  1166. )
  1167. assert resp.status_code == 429
  1168. @pytest.mark.asyncio
  1169. @pytest.mark.integration
  1170. async def test_regenerate_backup_codes_rate_limited_after_failures(self, async_client: AsyncClient):
  1171. """Repeated wrong codes on /2fa/totp/regenerate-backup-codes trigger 429."""
  1172. token, _secret = await _setup_totp_user(async_client, "rlregen", "rlregen1")
  1173. for _ in range(5):
  1174. await async_client.post(
  1175. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  1176. json={"code": "000000"},
  1177. headers=_auth_header(token),
  1178. )
  1179. resp = await async_client.post(
  1180. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  1181. json={"code": "000000"},
  1182. headers=_auth_header(token),
  1183. )
  1184. assert resp.status_code == 429
  1185. # ===========================================================================
  1186. # Email OTP send → verify end-to-end (coverage gap C3)
  1187. # ===========================================================================
  1188. class TestEmailOTPSendVerify:
  1189. """Full email OTP login: send code → verify code → JWT."""
  1190. @pytest.mark.asyncio
  1191. @pytest.mark.integration
  1192. async def test_email_otp_send_and_verify(self, async_client: AsyncClient, db_session: AsyncSession):
  1193. """login → POST /2fa/email/send (patched SMTP) → POST /2fa/verify → JWT."""
  1194. import re
  1195. from unittest.mock import AsyncMock, MagicMock, patch
  1196. from sqlalchemy import select as sa_select
  1197. token = await _setup_and_login(async_client, "emailsendok", "emailsendok1")
  1198. # Give the user an email address
  1199. result = await db_session.execute(sa_select(User).where(User.username == "emailsendok"))
  1200. user = result.scalar_one()
  1201. user.email = "emailsendok@example.com"
  1202. await db_session.commit()
  1203. # Enable email OTP via DB injection
  1204. setup_code = "444444"
  1205. setup_token = secrets.token_urlsafe(32)
  1206. db_session.add(
  1207. AuthEphemeralToken(
  1208. token=setup_token,
  1209. token_type="email_otp_setup",
  1210. username="emailsendok",
  1211. nonce=_pwd_context.hash(setup_code),
  1212. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1213. )
  1214. )
  1215. await db_session.commit()
  1216. await async_client.post(
  1217. "/api/v1/auth/2fa/email/enable/confirm",
  1218. json={"setup_token": setup_token, "code": setup_code},
  1219. headers=_auth_header(token),
  1220. )
  1221. # Login now requires 2FA — get pre_auth_token (cookie set automatically)
  1222. pre_auth_token = await _login_get_pre_auth_token(async_client, "emailsendok", "emailsendok1")
  1223. # Mock SMTP and capture the sent OTP code
  1224. captured: dict[str, str] = {}
  1225. smtp_settings_mock = MagicMock()
  1226. def _capture_email(smtp_settings, to_email, subject, body_text, body_html):
  1227. m = re.search(r"login code is: (\d{6})", body_text)
  1228. if m:
  1229. captured["otp"] = m.group(1)
  1230. with (
  1231. patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_settings_mock)),
  1232. patch("backend.app.api.routes.mfa.send_email", side_effect=_capture_email),
  1233. ):
  1234. send_resp = await async_client.post(
  1235. "/api/v1/auth/2fa/email/send",
  1236. json={"pre_auth_token": pre_auth_token},
  1237. )
  1238. assert send_resp.status_code == 200, send_resp.text
  1239. fresh_token = send_resp.json()["pre_auth_token"]
  1240. assert "otp" in captured, "send_email was not called or code not found in body"
  1241. # Verify with the captured OTP code — cookie still in the async_client jar
  1242. verify_resp = await async_client.post(
  1243. "/api/v1/auth/2fa/verify",
  1244. json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
  1245. )
  1246. assert verify_resp.status_code == 200
  1247. data = verify_resp.json()
  1248. assert "access_token" in data
  1249. assert data["user"]["username"] == "emailsendok"
  1250. @pytest.mark.asyncio
  1251. @pytest.mark.integration
  1252. async def test_email_otp_wrong_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  1253. """A wrong email OTP code must return 401 without burning the pre_auth_token."""
  1254. from unittest.mock import AsyncMock, MagicMock, patch
  1255. from sqlalchemy import select as sa_select
  1256. token = await _setup_and_login(async_client, "emailwrongcode", "emailwrongcode1")
  1257. result = await db_session.execute(sa_select(User).where(User.username == "emailwrongcode"))
  1258. user = result.scalar_one()
  1259. user.email = "emailwrongcode@example.com"
  1260. await db_session.commit()
  1261. setup_code = "555555"
  1262. setup_token = secrets.token_urlsafe(32)
  1263. db_session.add(
  1264. AuthEphemeralToken(
  1265. token=setup_token,
  1266. token_type="email_otp_setup",
  1267. username="emailwrongcode",
  1268. nonce=_pwd_context.hash(setup_code),
  1269. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1270. )
  1271. )
  1272. await db_session.commit()
  1273. await async_client.post(
  1274. "/api/v1/auth/2fa/email/enable/confirm",
  1275. json={"setup_token": setup_token, "code": setup_code},
  1276. headers=_auth_header(token),
  1277. )
  1278. pre_auth_token = await _login_get_pre_auth_token(async_client, "emailwrongcode", "emailwrongcode1")
  1279. captured: dict[str, str] = {}
  1280. smtp_mock = MagicMock()
  1281. def _capture(smtp_settings, to_email, subject, body_text, body_html):
  1282. import re
  1283. m = re.search(r"login code is: (\d{6})", body_text)
  1284. if m:
  1285. captured["otp"] = m.group(1)
  1286. with (
  1287. patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_mock)),
  1288. patch("backend.app.api.routes.mfa.send_email", side_effect=_capture),
  1289. ):
  1290. send_resp = await async_client.post(
  1291. "/api/v1/auth/2fa/email/send",
  1292. json={"pre_auth_token": pre_auth_token},
  1293. )
  1294. assert send_resp.status_code == 200
  1295. fresh_token = send_resp.json()["pre_auth_token"]
  1296. # Wrong code → 401
  1297. bad = await async_client.post(
  1298. "/api/v1/auth/2fa/verify",
  1299. json={"pre_auth_token": fresh_token, "method": "email", "code": "000000"},
  1300. )
  1301. assert bad.status_code == 401
  1302. # Correct code still works (token not burned by wrong attempt)
  1303. good = await async_client.post(
  1304. "/api/v1/auth/2fa/verify",
  1305. json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
  1306. )
  1307. assert good.status_code == 200
  1308. # ===========================================================================
  1309. # OIDC end-to-end (coverage gap C4)
  1310. # ===========================================================================
  1311. def _make_test_rsa_key():
  1312. """Generate a throwaway RSA key pair and a matching JWK set for tests."""
  1313. import base64
  1314. from cryptography.hazmat.primitives import serialization
  1315. from cryptography.hazmat.primitives.asymmetric import rsa
  1316. private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  1317. private_pem = private_key.private_bytes(
  1318. serialization.Encoding.PEM,
  1319. serialization.PrivateFormat.TraditionalOpenSSL,
  1320. serialization.NoEncryption(),
  1321. )
  1322. pub_numbers = private_key.public_key().public_numbers()
  1323. def _b64url(n: int, length: int) -> str:
  1324. return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
  1325. jwks = {
  1326. "keys": [
  1327. {
  1328. "kty": "RSA",
  1329. "use": "sig",
  1330. "alg": "RS256",
  1331. "kid": "test-kid-1",
  1332. "n": _b64url(pub_numbers.n, 256),
  1333. "e": _b64url(pub_numbers.e, 3),
  1334. }
  1335. ]
  1336. }
  1337. return private_pem, jwks
  1338. class TestOIDCEndToEnd:
  1339. """Full OIDC auth-code flow: state → callback (mocked IdP) → exchange → JWT."""
  1340. @pytest.mark.asyncio
  1341. @pytest.mark.integration
  1342. async def test_oidc_callback_creates_user_and_issues_jwt(self, async_client: AsyncClient, db_session: AsyncSession):
  1343. """callback validates the mocked ID token, creates a user, and redirects
  1344. with an oidc_exchange token; exchanging that token returns a full JWT."""
  1345. import time
  1346. from unittest.mock import patch
  1347. import jwt as pyjwt
  1348. private_pem, jwks_data = _make_test_rsa_key()
  1349. issuer = "https://idp.test.example.com"
  1350. client_id = "oidc-test-client"
  1351. nonce = secrets.token_urlsafe(16)
  1352. now = int(time.time())
  1353. id_token = pyjwt.encode(
  1354. {
  1355. "sub": "oidc-sub-e2e",
  1356. "iss": issuer,
  1357. "aud": client_id,
  1358. "nonce": nonce,
  1359. "email": "oidce2e@example.com",
  1360. "email_verified": True,
  1361. "iat": now,
  1362. "exp": now + 300,
  1363. },
  1364. private_pem,
  1365. algorithm="RS256",
  1366. headers={"kid": "test-kid-1"},
  1367. )
  1368. # Create OIDC provider
  1369. admin_token = await _setup_and_login(async_client, "oidce2eadm", "oidce2eadm1")
  1370. create_resp = await async_client.post(
  1371. "/api/v1/auth/oidc/providers",
  1372. json={
  1373. "name": "E2E-IdP",
  1374. "issuer_url": issuer,
  1375. "client_id": client_id,
  1376. "client_secret": "test-secret",
  1377. "scopes": "openid email profile",
  1378. "is_enabled": True,
  1379. "auto_create_users": True,
  1380. },
  1381. headers=_auth_header(admin_token),
  1382. )
  1383. assert create_resp.status_code == 201
  1384. provider_id = create_resp.json()["id"]
  1385. # Simulate the authorize step: insert an oidc_state token directly
  1386. state = secrets.token_urlsafe(32)
  1387. code_verifier = secrets.token_urlsafe(48)
  1388. db_session.add(
  1389. AuthEphemeralToken(
  1390. token=state,
  1391. token_type="oidc_state",
  1392. provider_id=provider_id,
  1393. nonce=nonce,
  1394. code_verifier=code_verifier,
  1395. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  1396. )
  1397. )
  1398. await db_session.commit()
  1399. # Mock httpx calls made inside oidc_callback
  1400. discovery_doc = {
  1401. "issuer": issuer,
  1402. "authorization_endpoint": f"{issuer}/auth",
  1403. "token_endpoint": f"{issuer}/token",
  1404. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  1405. }
  1406. token_response = {
  1407. "access_token": "mock-access",
  1408. "token_type": "Bearer",
  1409. "id_token": id_token,
  1410. }
  1411. class _MockResp:
  1412. def __init__(self, data):
  1413. self._data = data
  1414. self.status_code = 200
  1415. self.is_success = True
  1416. self.text = str(data)
  1417. def json(self):
  1418. return self._data
  1419. def raise_for_status(self):
  1420. pass
  1421. class _MockHttpxClient:
  1422. def __init__(self, *args, **kwargs):
  1423. pass
  1424. async def __aenter__(self):
  1425. return self
  1426. async def __aexit__(self, *args):
  1427. pass
  1428. async def get(self, url, **kwargs):
  1429. if "jwks" in url:
  1430. return _MockResp(jwks_data)
  1431. return _MockResp(discovery_doc)
  1432. async def post(self, url, **kwargs):
  1433. return _MockResp(token_response)
  1434. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  1435. callback_resp = await async_client.get(
  1436. f"/api/v1/auth/oidc/callback?code=test-auth-code&state={state}",
  1437. follow_redirects=False,
  1438. )
  1439. assert callback_resp.status_code == 302, callback_resp.text
  1440. location = callback_resp.headers.get("location", "")
  1441. assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
  1442. # Extract and exchange the oidc_exchange token
  1443. oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0]
  1444. exchange_resp = await async_client.post(
  1445. "/api/v1/auth/oidc/exchange",
  1446. json={"oidc_token": oidc_exchange_token},
  1447. )
  1448. assert exchange_resp.status_code == 200
  1449. data = exchange_resp.json()
  1450. assert "access_token" in data
  1451. assert data["user"]["username"] is not None
  1452. @pytest.mark.asyncio
  1453. @pytest.mark.integration
  1454. async def test_oidc_callback_invalid_state_redirects_error(self, async_client: AsyncClient):
  1455. """An unknown state token must redirect to /?oidc_error=invalid_state."""
  1456. resp = await async_client.get(
  1457. "/api/v1/auth/oidc/callback?code=x&state=totally-bogus-state",
  1458. follow_redirects=False,
  1459. )
  1460. assert resp.status_code == 302
  1461. assert "invalid_state" in resp.headers.get("location", "")
  1462. @pytest.mark.asyncio
  1463. @pytest.mark.integration
  1464. async def test_oidc_state_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
  1465. """Replaying the same state token must fail on the second callback."""
  1466. import time
  1467. from unittest.mock import patch
  1468. import jwt as pyjwt
  1469. private_pem, jwks_data = _make_test_rsa_key()
  1470. issuer = "https://idp2.test.example.com"
  1471. client_id = "oidc-client-2"
  1472. nonce = secrets.token_urlsafe(16)
  1473. now = int(time.time())
  1474. id_token = pyjwt.encode(
  1475. {
  1476. "sub": "sub-single-use",
  1477. "iss": issuer,
  1478. "aud": client_id,
  1479. "nonce": nonce,
  1480. "email": "su@example.com",
  1481. "email_verified": True,
  1482. "iat": now,
  1483. "exp": now + 300,
  1484. },
  1485. private_pem,
  1486. algorithm="RS256",
  1487. headers={"kid": "test-kid-1"},
  1488. )
  1489. admin_token = await _setup_and_login(async_client, "oidcsuadm", "oidcsuadm1")
  1490. cr = await async_client.post(
  1491. "/api/v1/auth/oidc/providers",
  1492. json={
  1493. "name": "SU-IdP",
  1494. "issuer_url": issuer,
  1495. "client_id": client_id,
  1496. "client_secret": "s",
  1497. "scopes": "openid",
  1498. "is_enabled": True,
  1499. "auto_create_users": True,
  1500. },
  1501. headers=_auth_header(admin_token),
  1502. )
  1503. provider_id = cr.json()["id"]
  1504. state = secrets.token_urlsafe(32)
  1505. db_session.add(
  1506. AuthEphemeralToken(
  1507. token=state,
  1508. token_type="oidc_state",
  1509. provider_id=provider_id,
  1510. nonce=nonce,
  1511. code_verifier=secrets.token_urlsafe(48),
  1512. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  1513. )
  1514. )
  1515. await db_session.commit()
  1516. discovery_doc = {
  1517. "issuer": issuer,
  1518. "authorization_endpoint": f"{issuer}/auth",
  1519. "token_endpoint": f"{issuer}/token",
  1520. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  1521. }
  1522. token_response = {"access_token": "a", "token_type": "Bearer", "id_token": id_token}
  1523. class _MockResp:
  1524. def __init__(self, data):
  1525. self._data = data
  1526. self.status_code = 200
  1527. self.is_success = True
  1528. self.text = str(data)
  1529. def json(self):
  1530. return self._data
  1531. def raise_for_status(self):
  1532. pass
  1533. class _MockHttpxClient:
  1534. def __init__(self, *a, **kw):
  1535. pass
  1536. async def __aenter__(self):
  1537. return self
  1538. async def __aexit__(self, *a):
  1539. pass
  1540. async def get(self, url, **kw):
  1541. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  1542. async def post(self, url, **kw):
  1543. return _MockResp(token_response)
  1544. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  1545. first = await async_client.get(
  1546. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  1547. follow_redirects=False,
  1548. )
  1549. assert first.status_code == 302
  1550. assert "oidc_token=" in first.headers.get("location", "")
  1551. # Replay: second callback with the same state must fail
  1552. second = await async_client.get(
  1553. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  1554. follow_redirects=False,
  1555. )
  1556. assert second.status_code == 302
  1557. assert "invalid_state" in second.headers.get("location", "")
  1558. # ===========================================================================
  1559. # H-2: Wrong code must NOT consume the email OTP setup token (peek-then-consume)
  1560. # ===========================================================================
  1561. class TestEmailOTPSetupTokenPreservedOnWrongCode:
  1562. """After H-2 fix: a wrong code leaves the setup token intact so the user can retry."""
  1563. @pytest.mark.asyncio
  1564. @pytest.mark.integration
  1565. async def test_wrong_code_does_not_consume_setup_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1566. """Wrong code returns 400 but the setup token survives; correct code then works."""
  1567. token = await _setup_and_login(async_client, "h2retryuser", "h2retrypass1")
  1568. code = "999999"
  1569. code_hash = _pwd_context.hash(code)
  1570. setup_token = secrets.token_urlsafe(32)
  1571. db_session.add(
  1572. AuthEphemeralToken(
  1573. token=setup_token,
  1574. token_type="email_otp_setup",
  1575. username="h2retryuser",
  1576. nonce=code_hash,
  1577. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1578. )
  1579. )
  1580. await db_session.commit()
  1581. # First attempt: wrong code → 400
  1582. wrong = await async_client.post(
  1583. "/api/v1/auth/2fa/email/enable/confirm",
  1584. json={"setup_token": setup_token, "code": "000000"},
  1585. headers=_auth_header(token),
  1586. )
  1587. assert wrong.status_code == 400
  1588. # Second attempt: correct code → must succeed (token was NOT consumed)
  1589. correct = await async_client.post(
  1590. "/api/v1/auth/2fa/email/enable/confirm",
  1591. json={"setup_token": setup_token, "code": code},
  1592. headers=_auth_header(token),
  1593. )
  1594. assert correct.status_code == 200
  1595. # ===========================================================================
  1596. # M-2: New OIDC provider must default to auto_link_existing_accounts=False
  1597. # ===========================================================================
  1598. class TestOIDCProviderAutoLinkDefault:
  1599. """auto_link_existing_accounts must default to False (M-2 fix)."""
  1600. @pytest.mark.asyncio
  1601. @pytest.mark.integration
  1602. async def test_new_provider_auto_link_defaults_to_false(self, async_client: AsyncClient):
  1603. token = await _setup_and_login(async_client, "m2autolinkadmin", "m2autolinkadmin1")
  1604. resp = await async_client.post(
  1605. "/api/v1/auth/oidc/providers",
  1606. json={
  1607. "name": "AutoLinkTest",
  1608. "issuer_url": "https://autolink.example.com",
  1609. "client_id": "alc",
  1610. "client_secret": "als",
  1611. "scopes": "openid",
  1612. "is_enabled": True,
  1613. "auto_create_users": False,
  1614. # auto_link_existing_accounts intentionally omitted
  1615. },
  1616. headers=_auth_header(token),
  1617. )
  1618. assert resp.status_code == 201
  1619. assert resp.json()["auto_link_existing_accounts"] is False
  1620. # ===========================================================================
  1621. # L-5: 2FA verify code format validation
  1622. # ===========================================================================
  1623. class TestTwoFAVerifyCodeFormat:
  1624. """TwoFAVerifyRequest.code must be 6–8 alphanumeric characters (L-5)."""
  1625. @pytest.mark.asyncio
  1626. @pytest.mark.integration
  1627. async def test_code_too_long_rejected(self, async_client: AsyncClient):
  1628. """code > 8 characters must be rejected with 422."""
  1629. resp = await async_client.post(
  1630. "/api/v1/auth/2fa/verify",
  1631. json={"pre_auth_token": "anytoken", "code": "1" * 9, "method": "totp"},
  1632. )
  1633. assert resp.status_code == 422
  1634. @pytest.mark.asyncio
  1635. @pytest.mark.integration
  1636. async def test_code_non_alphanumeric_rejected(self, async_client: AsyncClient):
  1637. """code containing non-alphanumeric chars must be rejected with 422."""
  1638. resp = await async_client.post(
  1639. "/api/v1/auth/2fa/verify",
  1640. json={"pre_auth_token": "anytoken", "code": "12-456", "method": "totp"},
  1641. )
  1642. assert resp.status_code == 422
  1643. @pytest.mark.asyncio
  1644. @pytest.mark.integration
  1645. async def test_code_too_short_rejected(self, async_client: AsyncClient):
  1646. """code < 6 characters must be rejected with 422."""
  1647. resp = await async_client.post(
  1648. "/api/v1/auth/2fa/verify",
  1649. json={"pre_auth_token": "anytoken", "code": "12345", "method": "totp"},
  1650. )
  1651. assert resp.status_code == 422
  1652. @pytest.mark.asyncio
  1653. @pytest.mark.integration
  1654. async def test_code_exactly_6_passes_schema(self, async_client: AsyncClient):
  1655. """6-character alphanumeric code passes schema (may fail 2FA logic with 400)."""
  1656. resp = await async_client.post(
  1657. "/api/v1/auth/2fa/verify",
  1658. json={"pre_auth_token": "x" * 32, "code": "123456", "method": "totp"},
  1659. )
  1660. assert resp.status_code != 422
  1661. @pytest.mark.asyncio
  1662. @pytest.mark.integration
  1663. async def test_code_exactly_8_passes_schema(self, async_client: AsyncClient):
  1664. """8-character alphanumeric backup code passes schema."""
  1665. resp = await async_client.post(
  1666. "/api/v1/auth/2fa/verify",
  1667. json={"pre_auth_token": "x" * 32, "code": "ABCD1234", "method": "backup"},
  1668. )
  1669. assert resp.status_code != 422
  1670. # ===========================================================================
  1671. # M-NEW-1: verify_slicer_download_token must NOT consume token on wrong resource
  1672. # ===========================================================================
  1673. class TestSlicerTokenResourceBinding:
  1674. """Token for resource A must survive a wrong-resource check and still work for A."""
  1675. @pytest.mark.asyncio
  1676. @pytest.mark.integration
  1677. async def test_wrong_resource_does_not_consume_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1678. """A slicer token bound to archive:5 must NOT be consumed when checked against archive:6."""
  1679. from datetime import datetime, timedelta, timezone
  1680. from backend.app.core.auth import verify_slicer_download_token
  1681. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  1682. now = datetime.now(timezone.utc)
  1683. token_val = secrets.token_urlsafe(24)
  1684. db_session.add(
  1685. AuthEphemeralToken(
  1686. token=token_val,
  1687. token_type="slicer_download",
  1688. nonce="archive:5",
  1689. expires_at=now + timedelta(minutes=5),
  1690. )
  1691. )
  1692. await db_session.commit()
  1693. # Wrong resource → must return False and NOT consume the token
  1694. wrong = await verify_slicer_download_token(token_val, "archive", 6)
  1695. assert wrong is False
  1696. # Correct resource → must return True (token survived the wrong-resource check)
  1697. correct = await verify_slicer_download_token(token_val, "archive", 5)
  1698. assert correct is True
  1699. @pytest.mark.asyncio
  1700. @pytest.mark.integration
  1701. async def test_correct_resource_consumes_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1702. """A slicer token is single-use: second correct-resource check must return False."""
  1703. from datetime import datetime, timedelta, timezone
  1704. from backend.app.core.auth import verify_slicer_download_token
  1705. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  1706. now = datetime.now(timezone.utc)
  1707. token_val = secrets.token_urlsafe(24)
  1708. db_session.add(
  1709. AuthEphemeralToken(
  1710. token=token_val,
  1711. token_type="slicer_download",
  1712. nonce="library:99",
  1713. expires_at=now + timedelta(minutes=5),
  1714. )
  1715. )
  1716. await db_session.commit()
  1717. first = await verify_slicer_download_token(token_val, "library", 99)
  1718. assert first is True
  1719. second = await verify_slicer_download_token(token_val, "library", 99)
  1720. assert second is False
  1721. # ===========================================================================
  1722. # M-NEW-3 / L-NEW-1: Schema length validation for change-password & forgot-password
  1723. # ===========================================================================
  1724. class TestSchemaLengthValidationR2:
  1725. """Input length limits added in review round 2."""
  1726. @pytest.mark.asyncio
  1727. @pytest.mark.integration
  1728. async def test_change_password_current_too_long_rejected(self, async_client: AsyncClient):
  1729. """current_password > 256 chars must be rejected with 422 (prevents pbkdf2 DoS)."""
  1730. resp = await async_client.post(
  1731. "/api/v1/users/me/change-password",
  1732. json={"current_password": "x" * 257, "new_password": "ValidPass1!"},
  1733. )
  1734. assert resp.status_code == 422
  1735. @pytest.mark.asyncio
  1736. @pytest.mark.integration
  1737. async def test_forgot_password_email_too_long_rejected(self, async_client: AsyncClient):
  1738. """email > 254 chars must be rejected with 422."""
  1739. resp = await async_client.post(
  1740. "/api/v1/auth/forgot-password",
  1741. json={"email": "a" * 243 + "@example.com"},
  1742. )
  1743. assert resp.status_code == 422
  1744. @pytest.mark.asyncio
  1745. @pytest.mark.integration
  1746. async def test_forgot_password_email_at_limit_passes_schema(self, async_client: AsyncClient):
  1747. """Short email passes schema (may return 400/200 from business logic)."""
  1748. resp = await async_client.post(
  1749. "/api/v1/auth/forgot-password",
  1750. json={"email": "user@example.com"},
  1751. )
  1752. assert resp.status_code != 422
  1753. # ===========================================================================
  1754. # L-NEW-2: TOTPSetupRequest.code max_length
  1755. # ===========================================================================
  1756. class TestTOTPSetupCodeMaxLength:
  1757. """TOTPSetupRequest.code must be bounded (L-NEW-2)."""
  1758. @pytest.mark.asyncio
  1759. @pytest.mark.integration
  1760. async def test_setup_code_too_long_rejected(self, async_client: AsyncClient):
  1761. """code > 8 chars must be rejected with 422."""
  1762. import pyotp as _pyotp
  1763. token = await _setup_and_login(async_client, "totp_setup_maxlen", "totp_setup_maxlen1")
  1764. # Enable TOTP so the setup-code guard path is active
  1765. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  1766. secret = setup_resp.json()["secret"]
  1767. await async_client.post(
  1768. "/api/v1/auth/2fa/totp/enable",
  1769. json={"code": _pyotp.TOTP(secret).now()},
  1770. headers=_auth_header(token),
  1771. )
  1772. resp = await async_client.post(
  1773. "/api/v1/auth/2fa/totp/setup",
  1774. json={"code": "1" * 9},
  1775. headers=_auth_header(token),
  1776. )
  1777. assert resp.status_code == 422
  1778. # ===========================================================================
  1779. # L-NEW-3: EmailOTPEnableConfirmRequest.code must be exactly 6 digits
  1780. # ===========================================================================
  1781. class TestEmailOTPConfirmCodeFormat:
  1782. """EmailOTPEnableConfirmRequest.code must be 6 digits (L-NEW-3)."""
  1783. @pytest.mark.asyncio
  1784. @pytest.mark.integration
  1785. async def test_non_digit_code_rejected(self, async_client: AsyncClient):
  1786. """Alpha characters in the email OTP confirm code must be rejected with 422."""
  1787. token = await _setup_and_login(async_client, "emailotpfmt", "emailotpfmt1")
  1788. resp = await async_client.post(
  1789. "/api/v1/auth/2fa/email/enable/confirm",
  1790. json={"setup_token": "x" * 32, "code": "ABCDEF"},
  1791. headers=_auth_header(token),
  1792. )
  1793. assert resp.status_code == 422
  1794. @pytest.mark.asyncio
  1795. @pytest.mark.integration
  1796. async def test_seven_digit_code_rejected(self, async_client: AsyncClient):
  1797. """7-digit code must be rejected with 422 (min_length=max_length=6)."""
  1798. token = await _setup_and_login(async_client, "emailotplen7", "emailotplen7x")
  1799. resp = await async_client.post(
  1800. "/api/v1/auth/2fa/email/enable/confirm",
  1801. json={"setup_token": "x" * 32, "code": "1234567"},
  1802. headers=_auth_header(token),
  1803. )
  1804. assert resp.status_code == 422
  1805. @pytest.mark.asyncio
  1806. @pytest.mark.integration
  1807. async def test_valid_six_digit_code_passes_schema(self, async_client: AsyncClient):
  1808. """6-digit numeric code passes schema (may return 400 on bad token — that's fine)."""
  1809. token = await _setup_and_login(async_client, "emailotpfmt6", "emailotpfmt6x")
  1810. resp = await async_client.post(
  1811. "/api/v1/auth/2fa/email/enable/confirm",
  1812. json={"setup_token": "x" * 32, "code": "123456"},
  1813. headers=_auth_header(token),
  1814. )
  1815. assert resp.status_code != 422
  1816. # ===========================================================================
  1817. # L-NEW-4: OIDCProviderCreate field max_length constraints
  1818. # ===========================================================================
  1819. class TestOIDCProviderFieldLengths:
  1820. """OIDCProviderCreate fields must reject inputs exceeding max_length (L-NEW-4)."""
  1821. @pytest.mark.asyncio
  1822. @pytest.mark.integration
  1823. async def test_name_too_long_rejected(self, async_client: AsyncClient):
  1824. token = await _setup_and_login(async_client, "oidcfldadmin", "oidcfldadmin1")
  1825. resp = await async_client.post(
  1826. "/api/v1/auth/oidc/providers",
  1827. json={
  1828. "name": "n" * 101,
  1829. "issuer_url": "https://test.example.com",
  1830. "client_id": "cid",
  1831. "client_secret": "csec",
  1832. "scopes": "openid",
  1833. },
  1834. headers=_auth_header(token),
  1835. )
  1836. assert resp.status_code == 422
  1837. @pytest.mark.asyncio
  1838. @pytest.mark.integration
  1839. async def test_client_secret_too_long_rejected(self, async_client: AsyncClient):
  1840. token = await _setup_and_login(async_client, "oidcseclen", "oidcseclen123")
  1841. resp = await async_client.post(
  1842. "/api/v1/auth/oidc/providers",
  1843. json={
  1844. "name": "ValidName",
  1845. "issuer_url": "https://test.example.com",
  1846. "client_id": "cid",
  1847. "client_secret": "s" * 513,
  1848. "scopes": "openid",
  1849. },
  1850. headers=_auth_header(token),
  1851. )
  1852. assert resp.status_code == 422
  1853. # ---------------------------------------------------------------------------
  1854. # M-NEW-4 / M-NEW-5 / L-NEW-5: UserCreate & UserUpdate field length limits
  1855. # ---------------------------------------------------------------------------
  1856. class TestUserCreateUpdateFieldLengths:
  1857. """UserCreate and UserUpdate must enforce max_length on username, password, email."""
  1858. @pytest.fixture
  1859. async def admin_token(self, async_client: AsyncClient) -> str:
  1860. return await _setup_and_login(async_client, "ucfldadmin", "ucfldadmin1!")
  1861. @pytest.mark.asyncio
  1862. @pytest.mark.integration
  1863. async def test_create_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1864. resp = await async_client.post(
  1865. "/api/v1/users/",
  1866. json={
  1867. "username": "u" * 151,
  1868. "password": "ValidPass1!",
  1869. "role": "user",
  1870. },
  1871. headers=_auth_header(admin_token),
  1872. )
  1873. assert resp.status_code == 422
  1874. @pytest.mark.asyncio
  1875. @pytest.mark.integration
  1876. async def test_create_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1877. resp = await async_client.post(
  1878. "/api/v1/users/",
  1879. json={
  1880. "username": "newuserX",
  1881. "password": "A1!" + "x" * 254,
  1882. "role": "user",
  1883. },
  1884. headers=_auth_header(admin_token),
  1885. )
  1886. assert resp.status_code == 422
  1887. @pytest.mark.asyncio
  1888. @pytest.mark.integration
  1889. async def test_create_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1890. resp = await async_client.post(
  1891. "/api/v1/users/",
  1892. json={
  1893. "username": "newuserY",
  1894. "password": "ValidPass1!",
  1895. "email": "a" * 246 + "@x.com", # total 253 chars -> fine; 248+@x.com=255 -> too long
  1896. "role": "user",
  1897. },
  1898. headers=_auth_header(admin_token),
  1899. )
  1900. # 248 'a' + '@x.com' (6) = 254 chars — just at limit, should pass
  1901. # Use 249 + '@x.com' = 255 chars to trigger the 422
  1902. assert resp.status_code in (201, 422) # boundary sanity check
  1903. @pytest.mark.asyncio
  1904. @pytest.mark.integration
  1905. async def test_create_email_exceeds_limit_rejected(self, async_client: AsyncClient, admin_token: str):
  1906. resp = await async_client.post(
  1907. "/api/v1/users/",
  1908. json={
  1909. "username": "newuserZ",
  1910. "password": "ValidPass1!",
  1911. "email": "a" * 249 + "@x.com", # 255 chars — exceeds RFC 5321 max of 254
  1912. "role": "user",
  1913. },
  1914. headers=_auth_header(admin_token),
  1915. )
  1916. assert resp.status_code == 422
  1917. @pytest.mark.asyncio
  1918. @pytest.mark.integration
  1919. async def test_update_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1920. # Create a user first
  1921. create_resp = await async_client.post(
  1922. "/api/v1/users/",
  1923. json={"username": "updusr1", "password": "ValidPass1!", "role": "user"},
  1924. headers=_auth_header(admin_token),
  1925. )
  1926. assert create_resp.status_code == 201
  1927. user_id = create_resp.json()["id"]
  1928. resp = await async_client.patch(
  1929. f"/api/v1/users/{user_id}",
  1930. json={"username": "u" * 151},
  1931. headers=_auth_header(admin_token),
  1932. )
  1933. assert resp.status_code == 422
  1934. @pytest.mark.asyncio
  1935. @pytest.mark.integration
  1936. async def test_update_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1937. create_resp = await async_client.post(
  1938. "/api/v1/users/",
  1939. json={"username": "updusr2", "password": "ValidPass1!", "role": "user"},
  1940. headers=_auth_header(admin_token),
  1941. )
  1942. assert create_resp.status_code == 201
  1943. user_id = create_resp.json()["id"]
  1944. resp = await async_client.patch(
  1945. f"/api/v1/users/{user_id}",
  1946. json={"password": "A1!" + "x" * 254},
  1947. headers=_auth_header(admin_token),
  1948. )
  1949. assert resp.status_code == 422
  1950. @pytest.mark.asyncio
  1951. @pytest.mark.integration
  1952. async def test_update_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1953. create_resp = await async_client.post(
  1954. "/api/v1/users/",
  1955. json={"username": "updusr3", "password": "ValidPass1!", "role": "user"},
  1956. headers=_auth_header(admin_token),
  1957. )
  1958. assert create_resp.status_code == 201
  1959. user_id = create_resp.json()["id"]
  1960. resp = await async_client.patch(
  1961. f"/api/v1/users/{user_id}",
  1962. json={"email": "a" * 249 + "@x.com"}, # 255 chars
  1963. headers=_auth_header(admin_token),
  1964. )
  1965. assert resp.status_code == 422
  1966. # ---------------------------------------------------------------------------
  1967. # L-NEW-6: per-IP rate limiting on /forgot-password
  1968. # ---------------------------------------------------------------------------
  1969. _SMTP_DATA_FOR_IPLIMIT = {
  1970. "smtp_host": "smtp.test.com",
  1971. "smtp_port": 587,
  1972. "smtp_username": "test@test.com",
  1973. "smtp_password": "testpass",
  1974. "smtp_security": "starttls",
  1975. "smtp_auth_enabled": True,
  1976. "smtp_from_email": "noreply@test.com",
  1977. }
  1978. class TestForgotPasswordPerIpRateLimit:
  1979. """POST /forgot-password must enforce a per-IP cap (L-NEW-6).
  1980. The test sends 11 requests from the simulated test-client IP using 11
  1981. different email addresses (so the per-email bucket is never exhausted).
  1982. The 11th request must be rejected with 429.
  1983. """
  1984. @pytest.fixture
  1985. async def advanced_auth_token(self, async_client: AsyncClient) -> str:
  1986. """Set up auth, SMTP, and enable advanced auth; return admin token."""
  1987. token = await _setup_and_login(async_client, "iprladmin", "iprladmin1!")
  1988. headers = _auth_header(token)
  1989. await async_client.post("/api/v1/auth/smtp", headers=headers, json=_SMTP_DATA_FOR_IPLIMIT)
  1990. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  1991. return token
  1992. @pytest.mark.asyncio
  1993. @pytest.mark.integration
  1994. async def test_per_ip_limit_triggers_429(self, async_client: AsyncClient, advanced_auth_token: str):
  1995. # Send 11 requests from the same test-client IP using unique email
  1996. # addresses so the per-email bucket (limit=3) is never exhausted.
  1997. responses = []
  1998. for i in range(11):
  1999. resp = await async_client.post(
  2000. "/api/v1/auth/forgot-password",
  2001. json={"email": f"unique{i}@example.com"},
  2002. )
  2003. responses.append(resp.status_code)
  2004. # First 10 must not be rate-limited by the IP bucket
  2005. for code in responses[:10]:
  2006. assert code != 429, f"Unexpected 429 before limit reached: {responses}"
  2007. # The 11th must be rate-limited
  2008. assert responses[10] == 429, f"Expected 429 on 11th request, got {responses[10]}"
  2009. # ---------------------------------------------------------------------------
  2010. # M-NEW-6: OIDC auto-link must be rejected if target user already has an
  2011. # OIDC link to a different provider
  2012. # ---------------------------------------------------------------------------
  2013. class TestOIDCAutoLinkExistingLinkRejection:
  2014. """OIDC callback must reject auto-linking when the email-matched user
  2015. already has an OIDC link to a different provider (M-NEW-6)."""
  2016. @pytest.mark.asyncio
  2017. @pytest.mark.integration
  2018. async def test_auto_link_rejected_when_user_already_linked(
  2019. self, async_client: AsyncClient, db_session: AsyncSession
  2020. ):
  2021. """Auto-link via email-match is rejected when the target user is
  2022. already linked to another OIDC provider."""
  2023. import base64
  2024. import hashlib
  2025. from unittest.mock import AsyncMock, MagicMock, patch
  2026. from backend.app.core.auth import get_password_hash
  2027. from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
  2028. from backend.app.models.user import User
  2029. # ── 1. Target user with a known email ────────────────────────────
  2030. target = User(
  2031. username="oidcALTarget",
  2032. email="alinktest@example.com",
  2033. auth_source="oidc",
  2034. password_hash=get_password_hash(secrets.token_urlsafe(16)),
  2035. role="user",
  2036. is_active=True,
  2037. )
  2038. db_session.add(target)
  2039. await db_session.flush()
  2040. # ── 2. Provider B — legitimate, already linked to target ──────────
  2041. prov_b = OIDCProvider(
  2042. name="ProvB_m6test",
  2043. issuer_url="https://providerb-m6.example.com",
  2044. client_id="client_b",
  2045. _client_secret_enc="secret_b",
  2046. scopes="openid email profile",
  2047. is_enabled=True,
  2048. auto_link_existing_accounts=False,
  2049. auto_create_users=False,
  2050. )
  2051. db_session.add(prov_b)
  2052. await db_session.flush()
  2053. db_session.add(
  2054. UserOIDCLink(
  2055. user_id=target.id,
  2056. provider_id=prov_b.id,
  2057. provider_user_id="legitimate_sub",
  2058. provider_email="alinktest@example.com",
  2059. )
  2060. )
  2061. # ── 3. Provider A — attacker-controlled, auto_link=True ───────────
  2062. prov_a = OIDCProvider(
  2063. name="ProvA_m6test",
  2064. issuer_url="https://providera-m6.example.com",
  2065. client_id="client_a",
  2066. _client_secret_enc="secret_a",
  2067. scopes="openid email profile",
  2068. is_enabled=True,
  2069. auto_link_existing_accounts=True,
  2070. auto_create_users=False,
  2071. )
  2072. db_session.add(prov_a)
  2073. await db_session.flush()
  2074. # ── 4. OIDC state for Provider A ──────────────────────────────────
  2075. state = secrets.token_urlsafe(32)
  2076. nonce = secrets.token_urlsafe(32)
  2077. code_verifier = secrets.token_urlsafe(48)
  2078. db_session.add(
  2079. AuthEphemeralToken(
  2080. token=state,
  2081. token_type="oidc_state",
  2082. provider_id=prov_a.id,
  2083. nonce=nonce,
  2084. code_verifier=code_verifier,
  2085. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  2086. )
  2087. )
  2088. await db_session.commit()
  2089. # ── 5. Mock HTTP + JWT so the callback can reach the auto-link check ─
  2090. fake_discovery = {
  2091. "issuer": "https://providera-m6.example.com",
  2092. "token_endpoint": "https://providera-m6.example.com/token",
  2093. "jwks_uri": "https://providera-m6.example.com/jwks",
  2094. }
  2095. fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
  2096. fake_claims = {
  2097. "sub": "attacker_sub_unique",
  2098. "email": "alinktest@example.com",
  2099. "email_verified": True,
  2100. "nonce": nonce,
  2101. "iss": "https://providera-m6.example.com",
  2102. "aud": "client_a",
  2103. "exp": 9_999_999_999,
  2104. }
  2105. disc_resp = AsyncMock()
  2106. disc_resp.raise_for_status = MagicMock()
  2107. disc_resp.json = MagicMock(return_value=fake_discovery)
  2108. token_resp = AsyncMock()
  2109. token_resp.ok = True
  2110. token_resp.json = MagicMock(return_value=fake_token)
  2111. jwks_resp = AsyncMock()
  2112. jwks_resp.raise_for_status = MagicMock()
  2113. jwks_resp.json = MagicMock(return_value={})
  2114. mock_http = AsyncMock()
  2115. mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
  2116. mock_http.post = AsyncMock(return_value=token_resp)
  2117. mock_signing_key = MagicMock()
  2118. mock_signing_key.key = "fake_key"
  2119. with (
  2120. patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
  2121. patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
  2122. patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
  2123. ):
  2124. mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
  2125. mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
  2126. mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
  2127. resp = await async_client.get(
  2128. f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
  2129. follow_redirects=False,
  2130. )
  2131. # M-NEW-6: must redirect with no_linked_account — NOT create a second link
  2132. assert resp.status_code == 302
  2133. location = resp.headers.get("location", "")
  2134. assert "no_linked_account" in location, f"Expected no_linked_account in redirect, got: {location}"
  2135. # Verify no second OIDC link was created for Provider A
  2136. from sqlalchemy import select as sa_select
  2137. from backend.app.models.oidc_provider import UserOIDCLink as _UOL
  2138. async with db_session as s:
  2139. links_result = await s.execute(
  2140. sa_select(_UOL).where(_UOL.user_id == target.id, _UOL.provider_id == prov_a.id)
  2141. )
  2142. assert links_result.scalar_one_or_none() is None, "No link to Provider A must exist"
  2143. # ===========================================================================
  2144. # Test Gap 1: OIDC state token is single-use — replay must be rejected
  2145. # ===========================================================================
  2146. class TestOIDCStateReplay:
  2147. """OIDC state token must be consumed on first use; a second callback with
  2148. the same state must redirect to ``?oidc_error=invalid_state``."""
  2149. @pytest.mark.asyncio
  2150. @pytest.mark.integration
  2151. async def test_state_replay_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2152. """Replaying a consumed OIDC state token must return invalid_state."""
  2153. from backend.app.models.oidc_provider import OIDCProvider
  2154. # ── 1. Seed a minimal provider ────────────────────────────────────
  2155. provider = OIDCProvider(
  2156. name="StateReplayIdP",
  2157. issuer_url="https://statereplay-idp.example.com",
  2158. client_id="client_replay",
  2159. _client_secret_enc="secret_replay",
  2160. scopes="openid",
  2161. is_enabled=True,
  2162. auto_link_existing_accounts=False,
  2163. auto_create_users=False,
  2164. )
  2165. db_session.add(provider)
  2166. await db_session.flush()
  2167. # ── 2. Seed an OIDC state token ───────────────────────────────────
  2168. state = secrets.token_urlsafe(32)
  2169. db_session.add(
  2170. AuthEphemeralToken(
  2171. token=state,
  2172. token_type="oidc_state",
  2173. provider_id=provider.id,
  2174. nonce=secrets.token_urlsafe(32),
  2175. code_verifier=secrets.token_urlsafe(48),
  2176. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  2177. )
  2178. )
  2179. await db_session.commit()
  2180. # ── 3. First callback — discovery will fail (no real IdP), but the
  2181. # state token is atomically consumed (DELETE…RETURNING + commit)
  2182. # before the HTTP call is attempted.
  2183. first = await async_client.get(
  2184. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2185. follow_redirects=False,
  2186. )
  2187. assert first.status_code == 302
  2188. # The first call may fail for any reason except invalid_state
  2189. assert "invalid_state" not in first.headers.get("location", ""), (
  2190. f"First call should NOT get invalid_state: {first.headers.get('location')}"
  2191. )
  2192. # ── 4. Second callback with the same state → must be invalid_state ─
  2193. second = await async_client.get(
  2194. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2195. follow_redirects=False,
  2196. )
  2197. assert second.status_code == 302
  2198. assert "invalid_state" in second.headers.get("location", ""), (
  2199. f"Replayed state must redirect to invalid_state, got: {second.headers.get('location')}"
  2200. )
  2201. # ===========================================================================
  2202. # Test Gap 2: OIDC iss claim mismatch must redirect to token_validation_failed
  2203. # ===========================================================================
  2204. class TestOIDCIssMismatch:
  2205. """JWT whose iss claim does not match the discovery issuer must be rejected."""
  2206. @pytest.mark.asyncio
  2207. @pytest.mark.integration
  2208. async def test_iss_mismatch_redirects_token_validation_failed(
  2209. self, async_client: AsyncClient, db_session: AsyncSession
  2210. ):
  2211. import time
  2212. from unittest.mock import patch
  2213. import jwt as pyjwt
  2214. private_pem, jwks_data = _make_test_rsa_key()
  2215. correct_issuer = "https://correct-iss.example.com"
  2216. wrong_issuer = "https://wrong-iss.example.com"
  2217. client_id = "iss-mismatch-client"
  2218. nonce = secrets.token_urlsafe(16)
  2219. now = int(time.time())
  2220. # Sign the token with the WRONG issuer (iss != discovery_issuer)
  2221. id_token = pyjwt.encode(
  2222. {
  2223. "sub": "sub-iss-test",
  2224. "iss": wrong_issuer,
  2225. "aud": client_id,
  2226. "nonce": nonce,
  2227. "email": "iss@example.com",
  2228. "email_verified": True,
  2229. "iat": now,
  2230. "exp": now + 300,
  2231. },
  2232. private_pem,
  2233. algorithm="RS256",
  2234. headers={"kid": "test-kid-1"},
  2235. )
  2236. admin_token = await _setup_and_login(async_client, "issadmin1", "issadmin1!")
  2237. cr = await async_client.post(
  2238. "/api/v1/auth/oidc/providers",
  2239. json={
  2240. "name": "IssTest-IdP",
  2241. "issuer_url": correct_issuer,
  2242. "client_id": client_id,
  2243. "client_secret": "s",
  2244. "scopes": "openid",
  2245. "is_enabled": True,
  2246. "auto_create_users": True,
  2247. },
  2248. headers=_auth_header(admin_token),
  2249. )
  2250. assert cr.status_code in (200, 201), cr.text
  2251. provider_id = cr.json()["id"]
  2252. state = secrets.token_urlsafe(32)
  2253. db_session.add(
  2254. AuthEphemeralToken(
  2255. token=state,
  2256. token_type="oidc_state",
  2257. provider_id=provider_id,
  2258. nonce=nonce,
  2259. code_verifier=secrets.token_urlsafe(48),
  2260. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2261. )
  2262. )
  2263. await db_session.commit()
  2264. # Discovery returns the CORRECT issuer; JWT carries the WRONG one.
  2265. discovery_doc = {
  2266. "issuer": correct_issuer,
  2267. "token_endpoint": f"{correct_issuer}/token",
  2268. "jwks_uri": f"{correct_issuer}/.well-known/jwks.json",
  2269. }
  2270. token_response = {"access_token": "a", "id_token": id_token}
  2271. class _MockResp:
  2272. def __init__(self, data):
  2273. self._data = data
  2274. self.status_code = 200
  2275. self.is_success = True
  2276. self.text = ""
  2277. def json(self):
  2278. return self._data
  2279. def raise_for_status(self):
  2280. pass
  2281. class _MockHttpxClient:
  2282. def __init__(self, *a, **kw):
  2283. pass
  2284. async def __aenter__(self):
  2285. return self
  2286. async def __aexit__(self, *a):
  2287. pass
  2288. async def get(self, url, **kw):
  2289. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2290. async def post(self, url, **kw):
  2291. return _MockResp(token_response)
  2292. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2293. resp = await async_client.get(
  2294. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2295. follow_redirects=False,
  2296. )
  2297. assert resp.status_code == 302
  2298. location = resp.headers.get("location", "")
  2299. assert "token_validation_failed" in location, f"Expected token_validation_failed, got: {location}"
  2300. # ===========================================================================
  2301. # Test Gap 3: /forgot-password/confirm token is single-use
  2302. # ===========================================================================
  2303. class TestForgotPasswordTokenSingleUse:
  2304. """POST /forgot-password/confirm must reject a token after its first use."""
  2305. @pytest.mark.asyncio
  2306. @pytest.mark.integration
  2307. async def test_token_reuse_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2308. from backend.app.core.auth import get_password_hash
  2309. from backend.app.models.user import User as _User
  2310. user = _User(
  2311. username="fpcuser1",
  2312. email="fpc@example.com",
  2313. password_hash=get_password_hash("OldPass1!"),
  2314. role="user",
  2315. is_active=True,
  2316. )
  2317. db_session.add(user)
  2318. await db_session.flush()
  2319. reset_token = secrets.token_urlsafe(32)
  2320. db_session.add(
  2321. AuthEphemeralToken(
  2322. token=reset_token,
  2323. token_type="password_reset",
  2324. username="fpcuser1",
  2325. expires_at=datetime.now(timezone.utc) + timedelta(hours=1),
  2326. )
  2327. )
  2328. await db_session.commit()
  2329. # First use → success
  2330. resp1 = await async_client.post(
  2331. "/api/v1/auth/forgot-password/confirm",
  2332. json={"token": reset_token, "new_password": "NewPass1!"},
  2333. )
  2334. assert resp1.status_code == 200, resp1.text
  2335. # Second use → token already consumed, must fail
  2336. resp2 = await async_client.post(
  2337. "/api/v1/auth/forgot-password/confirm",
  2338. json={"token": reset_token, "new_password": "AnotherNew1!"},
  2339. )
  2340. assert resp2.status_code == 400
  2341. # ===========================================================================
  2342. # C1 regression: setup_totp must reject a replayed TOTP code
  2343. # ===========================================================================
  2344. class TestSetupTOTPReplayRejected:
  2345. """setup_totp must reject a TOTP code that was already accepted in its window."""
  2346. @pytest.mark.asyncio
  2347. @pytest.mark.integration
  2348. async def test_replayed_setup_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2349. from sqlalchemy import select as sa_select
  2350. from backend.app.models.user_totp import UserTOTP
  2351. token = await _setup_and_login(async_client, "setupreplay1", "setupreplay1!")
  2352. # Step 1: Initial TOTP setup (no active TOTP yet → no code required)
  2353. setup_resp = await async_client.post(
  2354. "/api/v1/auth/2fa/totp/setup",
  2355. headers=_auth_header(token),
  2356. )
  2357. assert setup_resp.status_code == 200
  2358. secret = setup_resp.json()["secret"]
  2359. # Step 2: Enable TOTP with a valid code
  2360. totp_obj = pyotp.TOTP(secret)
  2361. enable_resp = await async_client.post(
  2362. "/api/v1/auth/2fa/totp/enable",
  2363. json={"code": totp_obj.now()},
  2364. headers=_auth_header(token),
  2365. )
  2366. assert enable_resp.status_code == 200 # TOTP is now active (is_enabled=True)
  2367. # Step 3: Determine current valid code and its counter
  2368. me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
  2369. user_id = me_resp.json()["id"]
  2370. totp_result = await db_session.execute(sa_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2371. totp_record = totp_result.scalar_one()
  2372. secret_now = totp_record.secret # decrypted via property
  2373. totp_now = pyotp.TOTP(secret_now)
  2374. valid_code = totp_now.now()
  2375. accepted_counter = totp_now.timecode(datetime.now(timezone.utc))
  2376. # Step 4: Pre-set last_totp_counter so this code looks already used
  2377. totp_record.last_totp_counter = accepted_counter
  2378. await db_session.commit()
  2379. # Step 5: Attempt setup_totp with the "already used" code → must be rejected
  2380. replay_resp = await async_client.post(
  2381. "/api/v1/auth/2fa/totp/setup",
  2382. json={"code": valid_code},
  2383. headers=_auth_header(token),
  2384. )
  2385. assert replay_resp.status_code == 400
  2386. assert "already used" in replay_resp.json()["detail"]
  2387. # ===========================================================================
  2388. # Nit8: OIDC aud mismatch and nonce mismatch tests
  2389. # ===========================================================================
  2390. class TestOIDCAudAndNonceMismatch:
  2391. """Nit8: aud != client_id and nonce != stored value must each fail the callback."""
  2392. def _make_oidc_provider_setup(self):
  2393. """Return a helper for building OIDC test fixtures inline."""
  2394. private_pem, jwks_data = _make_test_rsa_key()
  2395. return private_pem, jwks_data
  2396. @pytest.mark.asyncio
  2397. @pytest.mark.integration
  2398. async def test_aud_mismatch_redirects_token_validation_failed(
  2399. self, async_client: AsyncClient, db_session: AsyncSession
  2400. ):
  2401. """ID token with aud != client_id must be rejected (PyJWT InvalidAudienceError)."""
  2402. import time
  2403. from unittest.mock import patch
  2404. import jwt as pyjwt
  2405. private_pem, jwks_data = _make_test_rsa_key()
  2406. issuer = "https://aud-mismatch.example.com"
  2407. client_id = "aud-test-client"
  2408. wrong_aud = "some-other-client"
  2409. nonce = secrets.token_urlsafe(16)
  2410. now = int(time.time())
  2411. id_token = pyjwt.encode(
  2412. {
  2413. "sub": "sub-aud-test",
  2414. "iss": issuer,
  2415. "aud": wrong_aud, # <-- wrong audience
  2416. "nonce": nonce,
  2417. "email": "aud@example.com",
  2418. "email_verified": True,
  2419. "iat": now,
  2420. "exp": now + 300,
  2421. },
  2422. private_pem,
  2423. algorithm="RS256",
  2424. headers={"kid": "test-kid-1"},
  2425. )
  2426. admin_token = await _setup_and_login(async_client, "audmismatch_admin", "AudMismatch_admin1")
  2427. cr = await async_client.post(
  2428. "/api/v1/auth/oidc/providers",
  2429. json={
  2430. "name": "AudMismatch-IdP",
  2431. "issuer_url": issuer,
  2432. "client_id": client_id,
  2433. "client_secret": "s",
  2434. "scopes": "openid",
  2435. "is_enabled": True,
  2436. "auto_create_users": True,
  2437. },
  2438. headers=_auth_header(admin_token),
  2439. )
  2440. assert cr.status_code in (200, 201), cr.text
  2441. provider_id = cr.json()["id"]
  2442. state = secrets.token_urlsafe(32)
  2443. db_session.add(
  2444. AuthEphemeralToken(
  2445. token=state,
  2446. token_type="oidc_state",
  2447. provider_id=provider_id,
  2448. nonce=nonce,
  2449. code_verifier=secrets.token_urlsafe(48),
  2450. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2451. )
  2452. )
  2453. await db_session.commit()
  2454. discovery_doc = {
  2455. "issuer": issuer,
  2456. "token_endpoint": f"{issuer}/token",
  2457. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  2458. }
  2459. class _MockResp:
  2460. def __init__(self, data):
  2461. self._data = data
  2462. self.status_code = 200
  2463. self.is_success = True
  2464. self.text = ""
  2465. def json(self):
  2466. return self._data
  2467. def raise_for_status(self):
  2468. pass
  2469. class _MockHttpxClient:
  2470. def __init__(self, *a, **kw):
  2471. pass
  2472. async def __aenter__(self):
  2473. return self
  2474. async def __aexit__(self, *a):
  2475. pass
  2476. async def get(self, url, **kw):
  2477. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2478. async def post(self, url, **kw):
  2479. return _MockResp({"access_token": "a", "id_token": id_token})
  2480. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2481. resp = await async_client.get(
  2482. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2483. follow_redirects=False,
  2484. )
  2485. assert resp.status_code == 302
  2486. location = resp.headers.get("location", "")
  2487. assert "token_validation_failed" in location, (
  2488. f"Expected token_validation_failed redirect for aud mismatch, got: {location}"
  2489. )
  2490. @pytest.mark.asyncio
  2491. @pytest.mark.integration
  2492. async def test_nonce_mismatch_redirects_token_validation_failed(
  2493. self, async_client: AsyncClient, db_session: AsyncSession
  2494. ):
  2495. """ID token with nonce != stored state nonce must be rejected."""
  2496. import time
  2497. from unittest.mock import patch
  2498. import jwt as pyjwt
  2499. private_pem, jwks_data = _make_test_rsa_key()
  2500. issuer = "https://nonce-mismatch.example.com"
  2501. client_id = "nonce-test-client"
  2502. stored_nonce = secrets.token_urlsafe(16)
  2503. wrong_nonce = secrets.token_urlsafe(16) # different from stored_nonce
  2504. now = int(time.time())
  2505. id_token = pyjwt.encode(
  2506. {
  2507. "sub": "sub-nonce-test",
  2508. "iss": issuer,
  2509. "aud": client_id,
  2510. "nonce": wrong_nonce, # <-- does not match stored_nonce
  2511. "email": "nonce@example.com",
  2512. "email_verified": True,
  2513. "iat": now,
  2514. "exp": now + 300,
  2515. },
  2516. private_pem,
  2517. algorithm="RS256",
  2518. headers={"kid": "test-kid-1"},
  2519. )
  2520. admin_token = await _setup_and_login(async_client, "noncemismatch_admin", "NonceMismatch_admin1")
  2521. cr = await async_client.post(
  2522. "/api/v1/auth/oidc/providers",
  2523. json={
  2524. "name": "NonceMismatch-IdP",
  2525. "issuer_url": issuer,
  2526. "client_id": client_id,
  2527. "client_secret": "s",
  2528. "scopes": "openid",
  2529. "is_enabled": True,
  2530. "auto_create_users": True,
  2531. },
  2532. headers=_auth_header(admin_token),
  2533. )
  2534. assert cr.status_code in (200, 201), cr.text
  2535. provider_id = cr.json()["id"]
  2536. state = secrets.token_urlsafe(32)
  2537. db_session.add(
  2538. AuthEphemeralToken(
  2539. token=state,
  2540. token_type="oidc_state",
  2541. provider_id=provider_id,
  2542. nonce=stored_nonce, # state has correct nonce; JWT carries wrong_nonce
  2543. code_verifier=secrets.token_urlsafe(48),
  2544. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2545. )
  2546. )
  2547. await db_session.commit()
  2548. discovery_doc = {
  2549. "issuer": issuer,
  2550. "token_endpoint": f"{issuer}/token",
  2551. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  2552. }
  2553. class _MockResp:
  2554. def __init__(self, data):
  2555. self._data = data
  2556. self.status_code = 200
  2557. self.is_success = True
  2558. self.text = ""
  2559. def json(self):
  2560. return self._data
  2561. def raise_for_status(self):
  2562. pass
  2563. class _MockHttpxClient:
  2564. def __init__(self, *a, **kw):
  2565. pass
  2566. async def __aenter__(self):
  2567. return self
  2568. async def __aexit__(self, *a):
  2569. pass
  2570. async def get(self, url, **kw):
  2571. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2572. async def post(self, url, **kw):
  2573. return _MockResp({"access_token": "a", "id_token": id_token})
  2574. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2575. resp = await async_client.get(
  2576. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2577. follow_redirects=False,
  2578. )
  2579. assert resp.status_code == 302
  2580. location = resp.headers.get("location", "")
  2581. # The callback redirects to ?oidc_error=nonce_mismatch when nonces differ.
  2582. assert "nonce_mismatch" in location, f"Expected nonce_mismatch redirect for nonce mismatch, got: {location}"
  2583. # ===========================================================================
  2584. # Expired OIDC token rejection — state and exchange tokens
  2585. # ===========================================================================
  2586. class TestOIDCExpiredTokenRejection:
  2587. """Expired OIDC state and exchange tokens must be rejected atomically.
  2588. The DELETE … WHERE expires_at > now must ensure that an already-expired
  2589. token is never consumed (committed) before the expiry is checked, so the
  2590. token row stays in the DB and is not silently discarded.
  2591. """
  2592. @pytest.mark.asyncio
  2593. @pytest.mark.integration
  2594. async def test_expired_state_token_rejected_as_invalid_state(
  2595. self, async_client: AsyncClient, db_session: AsyncSession
  2596. ):
  2597. """An expired OIDC state token must redirect to invalid_state without
  2598. being consumed — it must still exist in the DB after the rejected call."""
  2599. from backend.app.models.oidc_provider import OIDCProvider
  2600. provider = OIDCProvider(
  2601. name="ExpiredStateIdP",
  2602. issuer_url="https://expired-state.example.com",
  2603. client_id="client_expired_state",
  2604. _client_secret_enc="secret_exp_state",
  2605. scopes="openid",
  2606. is_enabled=True,
  2607. auto_link_existing_accounts=False,
  2608. auto_create_users=False,
  2609. )
  2610. db_session.add(provider)
  2611. await db_session.flush()
  2612. state = secrets.token_urlsafe(32)
  2613. db_session.add(
  2614. AuthEphemeralToken(
  2615. token=state,
  2616. token_type="oidc_state",
  2617. provider_id=provider.id,
  2618. nonce=secrets.token_urlsafe(16),
  2619. code_verifier=secrets.token_urlsafe(48),
  2620. # already expired
  2621. expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
  2622. )
  2623. )
  2624. await db_session.commit()
  2625. resp = await async_client.get(
  2626. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2627. follow_redirects=False,
  2628. )
  2629. assert resp.status_code == 302
  2630. location = resp.headers.get("location", "")
  2631. assert "invalid_state" in location, f"Expected invalid_state redirect for expired state, got: {location}"
  2632. @pytest.mark.asyncio
  2633. @pytest.mark.integration
  2634. async def test_expired_exchange_token_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2635. """An expired OIDC exchange token must return 401 without being consumed."""
  2636. from sqlalchemy import select as sa_select
  2637. expired_token = secrets.token_urlsafe(32)
  2638. db_session.add(
  2639. AuthEphemeralToken(
  2640. token=expired_token,
  2641. token_type="oidc_exchange",
  2642. username="some_user",
  2643. # already expired
  2644. expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
  2645. )
  2646. )
  2647. await db_session.commit()
  2648. resp = await async_client.post(
  2649. "/api/v1/auth/oidc/exchange",
  2650. json={"oidc_token": expired_token},
  2651. )
  2652. assert resp.status_code == 401
  2653. assert "expired" in resp.json().get("detail", "").lower() or "invalid" in resp.json().get("detail", "").lower()
  2654. # Token must NOT have been consumed — it should still be in the DB
  2655. # (the atomic DELETE WHERE expires_at > now left it untouched)
  2656. result = await db_session.execute(
  2657. sa_select(AuthEphemeralToken).where(AuthEphemeralToken.token == expired_token)
  2658. )
  2659. remaining = result.scalar_one_or_none()
  2660. assert remaining is not None, "Expired exchange token must not be consumed by a rejected request"
  2661. # ===========================================================================
  2662. # Trailing slash in issuer_url — discovery URL must not contain double slash
  2663. # ===========================================================================
  2664. class TestOIDCIssuerUrlTrailingSlash:
  2665. """Providers like Authentik use issuer URLs with a trailing slash.
  2666. BamBuddy must strip the slash before appending /.well-known/openid-configuration
  2667. to avoid a double-slash that results in a 404.
  2668. """
  2669. @pytest.mark.asyncio
  2670. @pytest.mark.integration
  2671. async def test_trailing_slash_issuer_url_fetches_correct_discovery_url(self, async_client: AsyncClient):
  2672. from unittest.mock import AsyncMock, MagicMock, patch
  2673. issuer_with_slash = "https://authentik.example.com/application/o/bambuddy/"
  2674. admin_token = await _setup_and_login(async_client, "oidcslashadm", "oidcslashadm1")
  2675. create_resp = await async_client.post(
  2676. "/api/v1/auth/oidc/providers",
  2677. json={
  2678. "name": "Authentik-Slash",
  2679. "issuer_url": issuer_with_slash,
  2680. "client_id": "bambuddy",
  2681. "client_secret": "secret",
  2682. "scopes": "openid email profile",
  2683. "is_enabled": True,
  2684. "auto_create_users": False,
  2685. },
  2686. headers=_auth_header(admin_token),
  2687. )
  2688. assert create_resp.status_code == 201
  2689. provider_id = create_resp.json()["id"]
  2690. fake_discovery = {
  2691. "issuer": issuer_with_slash,
  2692. "authorization_endpoint": "https://authentik.example.com/application/o/bambuddy/authorize",
  2693. }
  2694. disc_resp = AsyncMock()
  2695. disc_resp.raise_for_status = MagicMock()
  2696. disc_resp.json = MagicMock(return_value=fake_discovery)
  2697. mock_http = AsyncMock()
  2698. mock_http.get = AsyncMock(return_value=disc_resp)
  2699. with patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_cls:
  2700. mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
  2701. mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
  2702. resp = await async_client.get(f"/api/v1/auth/oidc/authorize/{provider_id}")
  2703. assert resp.status_code == 200
  2704. called_url = mock_http.get.call_args_list[0][0][0]
  2705. assert "//" not in called_url.replace("https://", ""), (
  2706. f"Discovery URL must not contain double slash: {called_url}"
  2707. )
  2708. assert called_url.endswith("/.well-known/openid-configuration"), (
  2709. f"Expected discovery URL to end with /.well-known/openid-configuration, got: {called_url}"
  2710. )
  2711. @pytest.mark.asyncio
  2712. @pytest.mark.integration
  2713. async def test_iss_claim_trailing_slash_accepted(self, async_client: AsyncClient, db_session: AsyncSession):
  2714. """Provider configured without trailing slash, Authentik JWT iss has trailing slash.
  2715. Both sides must be normalised before comparison so the login succeeds.
  2716. """
  2717. import time
  2718. from unittest.mock import patch
  2719. import jwt as pyjwt
  2720. private_pem, jwks_data = _make_test_rsa_key()
  2721. issuer_no_slash = "https://authentik.example.com/application/o/bambuddy"
  2722. issuer_with_slash = issuer_no_slash + "/"
  2723. client_id = "bambuddy-client"
  2724. nonce = secrets.token_urlsafe(16)
  2725. now = int(time.time())
  2726. id_token = pyjwt.encode(
  2727. {
  2728. "sub": "authentik-sub-123",
  2729. "iss": issuer_with_slash,
  2730. "aud": client_id,
  2731. "nonce": nonce,
  2732. "email": "authentik-user@example.com",
  2733. "email_verified": True,
  2734. "iat": now,
  2735. "exp": now + 300,
  2736. },
  2737. private_pem,
  2738. algorithm="RS256",
  2739. headers={"kid": "test-kid-1"},
  2740. )
  2741. admin_token = await _setup_and_login(async_client, "authentikadm", "authentikadm1")
  2742. create_resp = await async_client.post(
  2743. "/api/v1/auth/oidc/providers",
  2744. json={
  2745. "name": "Authentik-ISS",
  2746. "issuer_url": issuer_no_slash,
  2747. "client_id": client_id,
  2748. "client_secret": "secret",
  2749. "scopes": "openid email profile",
  2750. "is_enabled": True,
  2751. "auto_create_users": True,
  2752. },
  2753. headers=_auth_header(admin_token),
  2754. )
  2755. assert create_resp.status_code == 201
  2756. provider_id = create_resp.json()["id"]
  2757. state = secrets.token_urlsafe(32)
  2758. code_verifier = secrets.token_urlsafe(48)
  2759. db_session.add(
  2760. AuthEphemeralToken(
  2761. token=state,
  2762. token_type="oidc_state",
  2763. provider_id=provider_id,
  2764. nonce=nonce,
  2765. code_verifier=code_verifier,
  2766. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2767. )
  2768. )
  2769. await db_session.commit()
  2770. discovery_doc = {
  2771. "issuer": issuer_with_slash,
  2772. "authorization_endpoint": f"{issuer_no_slash}/authorize",
  2773. "token_endpoint": f"{issuer_no_slash}/token",
  2774. "jwks_uri": f"{issuer_no_slash}/.well-known/jwks.json",
  2775. }
  2776. token_response = {"access_token": "mock", "token_type": "Bearer", "id_token": id_token}
  2777. class _MockResp:
  2778. def __init__(self, data):
  2779. self._data = data
  2780. self.is_success = True
  2781. self.status_code = 200
  2782. self.text = str(data)
  2783. def json(self):
  2784. return self._data
  2785. def raise_for_status(self):
  2786. pass
  2787. class _MockHttpxClient:
  2788. def __init__(self, *a, **kw):
  2789. pass
  2790. async def __aenter__(self):
  2791. return self
  2792. async def __aexit__(self, *a):
  2793. pass
  2794. async def get(self, url, **kw):
  2795. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2796. async def post(self, url, **kw):
  2797. return _MockResp(token_response)
  2798. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2799. resp = await async_client.get(
  2800. f"/api/v1/auth/oidc/callback?code=auth-code&state={state}",
  2801. follow_redirects=False,
  2802. )
  2803. location = resp.headers.get("location", "")
  2804. assert resp.status_code == 302, f"Expected redirect, got {resp.status_code}"
  2805. assert "token_validation_failed" not in location, (
  2806. "Trailing slash mismatch in iss claim must not cause token_validation_failed"
  2807. )
  2808. assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
  2809. class TestOIDCCallbackCodeLength:
  2810. """OIDC callback code/state query params must accept up to 2048 characters (OAuth spec)."""
  2811. @pytest.mark.asyncio
  2812. @pytest.mark.integration
  2813. async def test_code_512_chars_accepted(self, async_client: AsyncClient):
  2814. """A 512-character code (old limit) must not be rejected with 422."""
  2815. code = "a" * 512
  2816. resp = await async_client.get(
  2817. f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
  2818. follow_redirects=False,
  2819. )
  2820. assert resp.status_code != 422, "512-char code must not be rejected by Pydantic"
  2821. @pytest.mark.asyncio
  2822. @pytest.mark.integration
  2823. async def test_code_2048_chars_accepted(self, async_client: AsyncClient):
  2824. """A 2048-character code must not be rejected with 422."""
  2825. code = "a" * 2048
  2826. resp = await async_client.get(
  2827. f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
  2828. follow_redirects=False,
  2829. )
  2830. assert resp.status_code != 422, "2048-char code must not be rejected by Pydantic"
  2831. @pytest.mark.asyncio
  2832. @pytest.mark.integration
  2833. async def test_code_2049_chars_rejected(self, async_client: AsyncClient):
  2834. """A 2049-character code must be rejected with 422."""
  2835. code = "a" * 2049
  2836. resp = await async_client.get(
  2837. f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
  2838. follow_redirects=False,
  2839. )
  2840. assert resp.status_code == 422, "2049-char code must be rejected by Pydantic"
  2841. @pytest.mark.asyncio
  2842. @pytest.mark.integration
  2843. async def test_state_512_chars_accepted(self, async_client: AsyncClient):
  2844. """A 512-character state (old limit) must not be rejected with 422."""
  2845. state = "a" * 512
  2846. resp = await async_client.get(
  2847. f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
  2848. follow_redirects=False,
  2849. )
  2850. assert resp.status_code != 422, "512-char state must not be rejected by Pydantic"
  2851. @pytest.mark.asyncio
  2852. @pytest.mark.integration
  2853. async def test_state_2048_chars_accepted(self, async_client: AsyncClient):
  2854. """A 2048-character state must not be rejected with 422."""
  2855. state = "a" * 2048
  2856. resp = await async_client.get(
  2857. f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
  2858. follow_redirects=False,
  2859. )
  2860. assert resp.status_code != 422, "2048-char state must not be rejected by Pydantic"
  2861. @pytest.mark.asyncio
  2862. @pytest.mark.integration
  2863. async def test_state_2049_chars_rejected(self, async_client: AsyncClient):
  2864. """A 2049-character state must be rejected with 422."""
  2865. state = "a" * 2049
  2866. resp = await async_client.get(
  2867. f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
  2868. follow_redirects=False,
  2869. )
  2870. assert resp.status_code == 422, "2049-char state must be rejected by Pydantic"
  2871. # ---------------------------------------------------------------------------
  2872. # Helpers shared by TestOIDCEmailClaimResolution
  2873. # ---------------------------------------------------------------------------
  2874. async def _run_oidc_callback(
  2875. async_client: AsyncClient,
  2876. db_session: AsyncSession,
  2877. *,
  2878. provider_id: int,
  2879. claims: dict,
  2880. private_pem: bytes,
  2881. jwks_data: dict,
  2882. issuer: str,
  2883. client_id: str,
  2884. ) -> str:
  2885. """Run a full OIDC callback flow and return the redirect location."""
  2886. nonce = secrets.token_urlsafe(16)
  2887. now = int(time.time())
  2888. token_claims = {
  2889. "sub": claims.get("sub", f"sub-{secrets.token_hex(8)}"),
  2890. "iss": issuer,
  2891. "aud": client_id,
  2892. "nonce": nonce,
  2893. "iat": now,
  2894. "exp": now + 300,
  2895. **{k: v for k, v in claims.items() if k not in ("sub",)},
  2896. }
  2897. id_token = pyjwt.encode(token_claims, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"})
  2898. state = secrets.token_urlsafe(32)
  2899. code_verifier = secrets.token_urlsafe(48)
  2900. db_session.add(
  2901. AuthEphemeralToken(
  2902. token=state,
  2903. token_type="oidc_state",
  2904. provider_id=provider_id,
  2905. nonce=nonce,
  2906. code_verifier=code_verifier,
  2907. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2908. )
  2909. )
  2910. await db_session.commit()
  2911. discovery_doc = {
  2912. "issuer": issuer,
  2913. "authorization_endpoint": f"{issuer}/auth",
  2914. "token_endpoint": f"{issuer}/token",
  2915. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  2916. }
  2917. token_response = {"access_token": "mock-access", "token_type": "Bearer", "id_token": id_token}
  2918. class _R:
  2919. def __init__(self, data):
  2920. self._data = data
  2921. self.status_code = 200
  2922. self.is_success = True
  2923. self.text = str(data)
  2924. def json(self):
  2925. return self._data
  2926. def raise_for_status(self):
  2927. pass
  2928. class _C:
  2929. def __init__(self, *a, **kw):
  2930. pass
  2931. async def __aenter__(self):
  2932. return self
  2933. async def __aexit__(self, *a):
  2934. pass
  2935. async def get(self, url, **kw):
  2936. return _R(jwks_data if "jwks" in url else discovery_doc)
  2937. async def post(self, url, **kw):
  2938. return _R(token_response)
  2939. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _C):
  2940. resp = await async_client.get(
  2941. f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
  2942. follow_redirects=False,
  2943. )
  2944. return resp.headers.get("location", "")
  2945. class TestOIDCEmailClaimResolution:
  2946. """Three-case email resolution logic: Fall A / Fall B / Fall C."""
  2947. # ── shared helpers ────────────────────────────────────────────────────────
  2948. async def _create_provider(
  2949. self,
  2950. async_client: AsyncClient,
  2951. admin_token: str,
  2952. issuer: str,
  2953. client_id: str,
  2954. *,
  2955. email_claim: str = "email",
  2956. require_email_verified: bool = True,
  2957. auto_link_existing_accounts: bool = False,
  2958. suffix: str = "",
  2959. ) -> int:
  2960. resp = await async_client.post(
  2961. "/api/v1/auth/oidc/providers",
  2962. json={
  2963. "name": f"TestIdP-{suffix or secrets.token_hex(4)}",
  2964. "issuer_url": issuer,
  2965. "client_id": client_id,
  2966. "client_secret": "sec",
  2967. "scopes": "openid email profile",
  2968. "is_enabled": True,
  2969. "auto_create_users": True,
  2970. "auto_link_existing_accounts": auto_link_existing_accounts,
  2971. "email_claim": email_claim,
  2972. "require_email_verified": require_email_verified,
  2973. },
  2974. headers={"Authorization": f"Bearer {admin_token}"},
  2975. )
  2976. assert resp.status_code == 201, resp.text
  2977. return resp.json()["id"]
  2978. async def _get_oidc_link(self, db_session: AsyncSession, provider_id: int, sub: str):
  2979. from sqlalchemy import select
  2980. from backend.app.models.oidc_provider import UserOIDCLink
  2981. result = await db_session.execute(
  2982. select(UserOIDCLink)
  2983. .where(UserOIDCLink.provider_id == provider_id)
  2984. .where(UserOIDCLink.provider_user_id == sub)
  2985. )
  2986. return result.scalar_one_or_none()
  2987. # ── Parametrized matrix: Fall A / Fall B / Fall C ─────────────────────────
  2988. @pytest.mark.asyncio
  2989. @pytest.mark.integration
  2990. @pytest.mark.parametrize(
  2991. "email_claim,require_ev,claims,expected",
  2992. [
  2993. # Fall A: standard claim + require_ev=True (default)
  2994. ("email", True, {"email": "fa@example.com", "email_verified": True}, "fa@example.com"),
  2995. ("email", True, {"email": "fa@example.com", "email_verified": False}, None),
  2996. ("email", True, {"email": "fa@example.com"}, None), # Azure Entra with default config
  2997. # Fall A + SEC-2: malformed email claim rejected even when email_verified=True
  2998. ("email", True, {"email": "notanemail", "email_verified": True}, None),
  2999. # Fall B: standard claim + require_ev=False (Azure Entra permissive)
  3000. ("email", False, {"email": "fb@example.com", "email_verified": True}, "fb@example.com"),
  3001. ("email", False, {"email": "fb@example.com", "email_verified": False}, None),
  3002. ("email", False, {"email": "azure@company.com"}, "azure@company.com"), # ev absent → kept
  3003. # Fall B + SEC-2: malformed email claim rejected in permissive mode (ev absent)
  3004. ("email", False, {"email": "user@nodot"}, None),
  3005. # Fall B + SEC-2: shape check fires before email_verified=False drop
  3006. ("email", False, {"email": "notanemail", "email_verified": False}, None),
  3007. # Fall C: custom claim (preferred_username) — no email_verified check
  3008. ("preferred_username", True, {"preferred_username": "User@Company.COM"}, "user@company.com"),
  3009. ("preferred_username", True, {"preferred_username": " User@EXAMPLE.COM "}, "user@example.com"),
  3010. ("preferred_username", True, {"preferred_username": "justausername"}, None),
  3011. ("preferred_username", True, {"preferred_username": "@"}, None), # SEC-2: "@" only
  3012. ("preferred_username", True, {"preferred_username": "@domain.com"}, None), # SEC-2: empty local
  3013. ("preferred_username", True, {"preferred_username": "user@"}, None), # SEC-2: empty domain
  3014. ("preferred_username", True, {"preferred_username": "user@nodot"}, None), # SEC-2: no dot in domain
  3015. ("preferred_username", True, {}, None), # claim absent
  3016. # Fall C: email_verified=False present alongside custom claim — must NOT suppress the email
  3017. ("preferred_username", True, {"preferred_username": "user@co.com", "email_verified": False}, "user@co.com"),
  3018. ],
  3019. ids=[
  3020. "fall-a-ev-true",
  3021. "fall-a-ev-false",
  3022. "fall-a-ev-absent",
  3023. "fall-a-malformed-email",
  3024. "fall-b-ev-true",
  3025. "fall-b-ev-false",
  3026. "fall-b-ev-absent",
  3027. "fall-b-malformed-email",
  3028. "fall-b-malformed-email-ev-false",
  3029. "fall-c-valid-upn",
  3030. "fall-c-lowercase-strip",
  3031. "fall-c-no-at",
  3032. "fall-c-at-only",
  3033. "fall-c-empty-local",
  3034. "fall-c-empty-domain",
  3035. "fall-c-no-dot-in-domain",
  3036. "fall-c-claim-absent",
  3037. "fall-c-ev-false-ignored",
  3038. ],
  3039. )
  3040. async def test_email_resolution_matrix(
  3041. self,
  3042. async_client: AsyncClient,
  3043. db_session: AsyncSession,
  3044. email_claim: str,
  3045. require_ev: bool,
  3046. claims: dict,
  3047. expected: str | None,
  3048. ):
  3049. """C4: Verify link exists AND check provider_email — avoids false-passing on callback failure."""
  3050. issuer = "https://matrix.test"
  3051. client_id = "matrix-client"
  3052. admin_token = await _setup_and_login(async_client, "matrix_adm", "Matrix123!")
  3053. private_pem, jwks_data = _make_test_rsa_key()
  3054. provider_id = await self._create_provider(
  3055. async_client,
  3056. admin_token,
  3057. issuer,
  3058. client_id,
  3059. email_claim=email_claim,
  3060. require_email_verified=require_ev,
  3061. suffix="matrix",
  3062. )
  3063. sub = f"sub-matrix-{secrets.token_hex(6)}"
  3064. await _run_oidc_callback(
  3065. async_client,
  3066. db_session,
  3067. provider_id=provider_id,
  3068. claims={"sub": sub, **claims},
  3069. private_pem=private_pem,
  3070. jwks_data=jwks_data,
  3071. issuer=issuer,
  3072. client_id=client_id,
  3073. )
  3074. db_session.expire_all()
  3075. link = await self._get_oidc_link(db_session, provider_id, sub)
  3076. assert link is not None, "UserOIDCLink must be created even when email is dropped"
  3077. assert link.provider_email == expected
  3078. # ── Security: auto_link guards (CREATE endpoint) ──────────────────────────
  3079. @pytest.mark.asyncio
  3080. @pytest.mark.integration
  3081. async def test_auto_link_blocked_with_require_ev_false(self, async_client: AsyncClient):
  3082. """SEC-1: auto_link + require_email_verified=False must be rejected at schema level (422)."""
  3083. admin_token = await _setup_and_login(async_client, "sec1_adm", "Sec1Adm123!")
  3084. resp = await async_client.post(
  3085. "/api/v1/auth/oidc/providers",
  3086. json={
  3087. "name": "SEC1-Test",
  3088. "issuer_url": "https://sec1.test",
  3089. "client_id": "sec1-client",
  3090. "client_secret": "sec",
  3091. "scopes": "openid email profile",
  3092. "auto_link_existing_accounts": True,
  3093. "require_email_verified": False,
  3094. },
  3095. headers={"Authorization": f"Bearer {admin_token}"},
  3096. )
  3097. assert resp.status_code == 422
  3098. @pytest.mark.asyncio
  3099. @pytest.mark.integration
  3100. async def test_auto_link_allowed_with_custom_claim_create(self, async_client: AsyncClient):
  3101. """Fall C: auto_link + email_claim!='email' must be accepted on CREATE (201).
  3102. Custom claims (e.g. Azure preferred_username/upn) never perform an email_verified
  3103. check, so auto_link is safe regardless of require_email_verified.
  3104. """
  3105. admin_token = await _setup_and_login(async_client, "sec6c_adm", "Sec6CAdm123!")
  3106. resp = await async_client.post(
  3107. "/api/v1/auth/oidc/providers",
  3108. json={
  3109. "name": "SEC6-Create-Test",
  3110. "issuer_url": "https://sec6c.test",
  3111. "client_id": "sec6c-client",
  3112. "client_secret": "sec",
  3113. "scopes": "openid email profile",
  3114. "auto_link_existing_accounts": True,
  3115. "email_claim": "upn",
  3116. },
  3117. headers={"Authorization": f"Bearer {admin_token}"},
  3118. )
  3119. assert resp.status_code == 201
  3120. assert resp.json()["auto_link_existing_accounts"] is True
  3121. assert resp.json()["email_claim"] == "upn"
  3122. @pytest.mark.asyncio
  3123. @pytest.mark.integration
  3124. async def test_auto_link_allowed_with_custom_claim_update(self, async_client: AsyncClient):
  3125. """Fall C: auto_link=True + email_claim='upn' in same UPDATE request → 200.
  3126. Custom claims never perform an email_verified check, so auto_link is safe.
  3127. """
  3128. admin_token = await _setup_and_login(async_client, "sec6u_adm", "Sec6UAdm123!")
  3129. create_resp = await async_client.post(
  3130. "/api/v1/auth/oidc/providers",
  3131. json={
  3132. "name": "SEC6-Update-Test",
  3133. "issuer_url": "https://sec6u.test",
  3134. "client_id": "sec6u-client",
  3135. "client_secret": "sec",
  3136. "scopes": "openid email profile",
  3137. },
  3138. headers={"Authorization": f"Bearer {admin_token}"},
  3139. )
  3140. assert create_resp.status_code == 201
  3141. provider_id = create_resp.json()["id"]
  3142. resp = await async_client.put(
  3143. f"/api/v1/auth/oidc/providers/{provider_id}",
  3144. json={"auto_link_existing_accounts": True, "email_claim": "upn"},
  3145. headers={"Authorization": f"Bearer {admin_token}"},
  3146. )
  3147. assert resp.status_code == 200
  3148. assert resp.json()["auto_link_existing_accounts"] is True
  3149. assert resp.json()["email_claim"] == "upn"
  3150. # ── Combined-State-Guard (partial updates across two requests) ─────────────
  3151. @pytest.mark.asyncio
  3152. @pytest.mark.integration
  3153. async def test_partial_update_guard_require_ev(self, async_client: AsyncClient):
  3154. """SEC-1 Combined-State-Guard: require_ev=False then auto_link=True → 422 (T1 require_ev path)."""
  3155. admin_token = await _setup_and_login(async_client, "pg_rev_adm", "PgRev123!")
  3156. create_resp = await async_client.post(
  3157. "/api/v1/auth/oidc/providers",
  3158. json={
  3159. "name": "PG-RequireEV-Test",
  3160. "issuer_url": "https://pg-rev.test",
  3161. "client_id": "pg-rev-client",
  3162. "client_secret": "sec",
  3163. "scopes": "openid email profile",
  3164. },
  3165. headers={"Authorization": f"Bearer {admin_token}"},
  3166. )
  3167. assert create_resp.status_code == 201
  3168. provider_id = create_resp.json()["id"]
  3169. upd1 = await async_client.put(
  3170. f"/api/v1/auth/oidc/providers/{provider_id}",
  3171. json={"require_email_verified": False},
  3172. headers={"Authorization": f"Bearer {admin_token}"},
  3173. )
  3174. assert upd1.status_code == 200
  3175. upd2 = await async_client.put(
  3176. f"/api/v1/auth/oidc/providers/{provider_id}",
  3177. json={"auto_link_existing_accounts": True},
  3178. headers={"Authorization": f"Bearer {admin_token}"},
  3179. )
  3180. assert upd2.status_code == 422
  3181. @pytest.mark.asyncio
  3182. @pytest.mark.integration
  3183. async def test_partial_update_custom_claim_then_auto_link_allowed(self, async_client: AsyncClient):
  3184. """Fall C: email_claim='upn' first, then auto_link=True → both 200 (custom claim is safe)."""
  3185. admin_token = await _setup_and_login(async_client, "pg_ec_adm", "PgEc123!")
  3186. create_resp = await async_client.post(
  3187. "/api/v1/auth/oidc/providers",
  3188. json={
  3189. "name": "PG-EmailClaim-Test",
  3190. "issuer_url": "https://pg-ec.test",
  3191. "client_id": "pg-ec-client",
  3192. "client_secret": "sec",
  3193. "scopes": "openid email profile",
  3194. },
  3195. headers={"Authorization": f"Bearer {admin_token}"},
  3196. )
  3197. assert create_resp.status_code == 201
  3198. provider_id = create_resp.json()["id"]
  3199. upd1 = await async_client.put(
  3200. f"/api/v1/auth/oidc/providers/{provider_id}",
  3201. json={"email_claim": "upn"},
  3202. headers={"Authorization": f"Bearer {admin_token}"},
  3203. )
  3204. assert upd1.status_code == 200
  3205. upd2 = await async_client.put(
  3206. f"/api/v1/auth/oidc/providers/{provider_id}",
  3207. json={"auto_link_existing_accounts": True},
  3208. headers={"Authorization": f"Bearer {admin_token}"},
  3209. )
  3210. assert upd2.status_code == 200
  3211. assert upd2.json()["auto_link_existing_accounts"] is True
  3212. assert upd2.json()["email_claim"] == "upn"
  3213. @pytest.mark.asyncio
  3214. @pytest.mark.integration
  3215. async def test_partial_update_auto_link_then_custom_claim_allowed(self, async_client: AsyncClient):
  3216. """Fall C: auto_link=True first (email_claim='email', safe), then email_claim='upn' → both 200."""
  3217. admin_token = await _setup_and_login(async_client, "pg_al_ec_adm", "PgAlEc123!")
  3218. create_resp = await async_client.post(
  3219. "/api/v1/auth/oidc/providers",
  3220. json={
  3221. "name": "PG-AutoLink-Claim-Test",
  3222. "issuer_url": "https://pg-al-ec.test",
  3223. "client_id": "pg-al-ec-client",
  3224. "client_secret": "sec",
  3225. "scopes": "openid email profile",
  3226. },
  3227. headers={"Authorization": f"Bearer {admin_token}"},
  3228. )
  3229. assert create_resp.status_code == 201
  3230. provider_id = create_resp.json()["id"]
  3231. upd1 = await async_client.put(
  3232. f"/api/v1/auth/oidc/providers/{provider_id}",
  3233. json={"auto_link_existing_accounts": True},
  3234. headers={"Authorization": f"Bearer {admin_token}"},
  3235. )
  3236. assert upd1.status_code == 200
  3237. upd2 = await async_client.put(
  3238. f"/api/v1/auth/oidc/providers/{provider_id}",
  3239. json={"email_claim": "preferred_username"},
  3240. headers={"Authorization": f"Bearer {admin_token}"},
  3241. )
  3242. assert upd2.status_code == 200
  3243. assert upd2.json()["auto_link_existing_accounts"] is True
  3244. assert upd2.json()["email_claim"] == "preferred_username"
  3245. @pytest.mark.asyncio
  3246. @pytest.mark.integration
  3247. async def test_partial_update_guard_inverse_order(self, async_client: AsyncClient):
  3248. """T2: auto_link=True first (valid), then require_ev=False → Combined-State-Guard fires (422)."""
  3249. admin_token = await _setup_and_login(async_client, "pg_inv_adm", "PgInv123!")
  3250. create_resp = await async_client.post(
  3251. "/api/v1/auth/oidc/providers",
  3252. json={
  3253. "name": "PG-Inverse-Test",
  3254. "issuer_url": "https://pg-inv.test",
  3255. "client_id": "pg-inv-client",
  3256. "client_secret": "sec",
  3257. "scopes": "openid email profile",
  3258. },
  3259. headers={"Authorization": f"Bearer {admin_token}"},
  3260. )
  3261. assert create_resp.status_code == 201
  3262. provider_id = create_resp.json()["id"]
  3263. # auto_link=True is safe when require_ev=True (default)
  3264. upd1 = await async_client.put(
  3265. f"/api/v1/auth/oidc/providers/{provider_id}",
  3266. json={"auto_link_existing_accounts": True},
  3267. headers={"Authorization": f"Bearer {admin_token}"},
  3268. )
  3269. assert upd1.status_code == 200
  3270. # Disabling require_ev with auto_link already on → unsafe combined state
  3271. upd2 = await async_client.put(
  3272. f"/api/v1/auth/oidc/providers/{provider_id}",
  3273. json={"require_email_verified": False},
  3274. headers={"Authorization": f"Bearer {admin_token}"},
  3275. )
  3276. assert upd2.status_code == 422
  3277. # ── Low5: Response fields verified ───────────────────────────────────────
  3278. @pytest.mark.asyncio
  3279. @pytest.mark.integration
  3280. async def test_create_response_includes_new_fields(self, async_client: AsyncClient):
  3281. """Low5: OIDCProviderResponse must include email_claim and require_email_verified."""
  3282. admin_token = await _setup_and_login(async_client, "resp_adm", "Resp123!")
  3283. resp = await async_client.post(
  3284. "/api/v1/auth/oidc/providers",
  3285. json={
  3286. "name": "ResponseFields-Test",
  3287. "issuer_url": "https://resp.test",
  3288. "client_id": "resp-client",
  3289. "client_secret": "sec",
  3290. "scopes": "openid email profile",
  3291. "email_claim": "preferred_username",
  3292. "require_email_verified": False,
  3293. },
  3294. headers={"Authorization": f"Bearer {admin_token}"},
  3295. )
  3296. assert resp.status_code == 201
  3297. data = resp.json()
  3298. assert data["email_claim"] == "preferred_username"
  3299. assert data["require_email_verified"] is False
  3300. @pytest.mark.asyncio
  3301. @pytest.mark.integration
  3302. async def test_update_response_reflects_new_fields(self, async_client: AsyncClient):
  3303. """Low5: PUT response must reflect updated email_claim and require_email_verified."""
  3304. admin_token = await _setup_and_login(async_client, "upd_resp_adm", "UpdResp123!")
  3305. create_resp = await async_client.post(
  3306. "/api/v1/auth/oidc/providers",
  3307. json={
  3308. "name": "UpdateResponse-Test",
  3309. "issuer_url": "https://upd-resp.test",
  3310. "client_id": "upd-resp-client",
  3311. "client_secret": "sec",
  3312. "scopes": "openid email profile",
  3313. },
  3314. headers={"Authorization": f"Bearer {admin_token}"},
  3315. )
  3316. assert create_resp.status_code == 201
  3317. provider_id = create_resp.json()["id"]
  3318. upd = await async_client.put(
  3319. f"/api/v1/auth/oidc/providers/{provider_id}",
  3320. json={"email_claim": "upn", "require_email_verified": True},
  3321. headers={"Authorization": f"Bearer {admin_token}"},
  3322. )
  3323. assert upd.status_code == 200
  3324. data = upd.json()
  3325. assert data["email_claim"] == "upn"
  3326. assert data["require_email_verified"] is True
  3327. # ===========================================================================
  3328. # TestOIDCEmailClaimValidation — T2: email_claim field validator coverage
  3329. # ===========================================================================
  3330. class TestOIDCEmailClaimValidation:
  3331. """Schema-level validation for the email_claim field."""
  3332. @pytest.mark.asyncio
  3333. @pytest.mark.integration
  3334. async def test_invalid_claim_name_dot_rejected(self, async_client: AsyncClient):
  3335. """email_claim with a dot (log-injection risk) must be rejected."""
  3336. admin_token = await _setup_and_login(async_client, "ecv_adm1", "Ecv123!")
  3337. resp = await async_client.post(
  3338. "/api/v1/auth/oidc/providers",
  3339. json={
  3340. "name": "ECVTest1",
  3341. "issuer_url": "https://ecv1.test",
  3342. "client_id": "ecv1",
  3343. "client_secret": "sec",
  3344. "scopes": "openid email",
  3345. "email_claim": "email.address",
  3346. },
  3347. headers={"Authorization": f"Bearer {admin_token}"},
  3348. )
  3349. assert resp.status_code == 422
  3350. @pytest.mark.asyncio
  3351. @pytest.mark.integration
  3352. async def test_invalid_claim_name_starts_with_digit_rejected(self, async_client: AsyncClient):
  3353. admin_token = await _setup_and_login(async_client, "ecv_adm2", "Ecv123!")
  3354. resp = await async_client.post(
  3355. "/api/v1/auth/oidc/providers",
  3356. json={
  3357. "name": "ECVTest2",
  3358. "issuer_url": "https://ecv2.test",
  3359. "client_id": "ecv2",
  3360. "client_secret": "sec",
  3361. "scopes": "openid email",
  3362. "email_claim": "1invalid",
  3363. },
  3364. headers={"Authorization": f"Bearer {admin_token}"},
  3365. )
  3366. assert resp.status_code == 422
  3367. @pytest.mark.asyncio
  3368. @pytest.mark.integration
  3369. async def test_invalid_claim_name_newline_rejected(self, async_client: AsyncClient):
  3370. """T2 regex-bug guard: re.fullmatch must reject trailing newline."""
  3371. admin_token = await _setup_and_login(async_client, "ecv_adm3", "Ecv123!")
  3372. resp = await async_client.post(
  3373. "/api/v1/auth/oidc/providers",
  3374. json={
  3375. "name": "ECVTest3",
  3376. "issuer_url": "https://ecv3.test",
  3377. "client_id": "ecv3",
  3378. "client_secret": "sec",
  3379. "scopes": "openid email",
  3380. "email_claim": "email\n",
  3381. },
  3382. headers={"Authorization": f"Bearer {admin_token}"},
  3383. )
  3384. assert resp.status_code == 422
  3385. @pytest.mark.asyncio
  3386. @pytest.mark.integration
  3387. async def test_claim_name_65_chars_rejected(self, async_client: AsyncClient):
  3388. """email_claim longer than 64 characters must be rejected."""
  3389. admin_token = await _setup_and_login(async_client, "ecv_adm4", "Ecv123!")
  3390. resp = await async_client.post(
  3391. "/api/v1/auth/oidc/providers",
  3392. json={
  3393. "name": "ECVTest4",
  3394. "issuer_url": "https://ecv4.test",
  3395. "client_id": "ecv4",
  3396. "client_secret": "sec",
  3397. "scopes": "openid email",
  3398. "email_claim": "a" * 65,
  3399. },
  3400. headers={"Authorization": f"Bearer {admin_token}"},
  3401. )
  3402. assert resp.status_code == 422
  3403. @pytest.mark.asyncio
  3404. @pytest.mark.integration
  3405. async def test_valid_claim_name_accepted(self, async_client: AsyncClient):
  3406. """Valid claim names like preferred_username and upn must be accepted."""
  3407. admin_token = await _setup_and_login(async_client, "ecv_adm5", "Ecv123!")
  3408. for claim in ("preferred_username", "upn", "email", "emailAddress"):
  3409. resp = await async_client.post(
  3410. "/api/v1/auth/oidc/providers",
  3411. json={
  3412. "name": f"ECVTest-{claim[:8]}",
  3413. "issuer_url": f"https://ecv-{claim[:8]}.test",
  3414. "client_id": f"ecv-{claim[:8]}",
  3415. "client_secret": "sec",
  3416. "scopes": "openid email",
  3417. "email_claim": claim,
  3418. },
  3419. headers={"Authorization": f"Bearer {admin_token}"},
  3420. )
  3421. assert resp.status_code == 201, f"claim {claim!r} was rejected: {resp.text}"
  3422. # ===========================================================================
  3423. # TestOIDCEmailResolutionExtra — T1 / T3 / T4 additional coverage
  3424. # ===========================================================================
  3425. async def _create_provider_via_api(
  3426. async_client: AsyncClient,
  3427. admin_token: str,
  3428. issuer: str,
  3429. client_id: str,
  3430. *,
  3431. email_claim: str = "email",
  3432. require_email_verified: bool = True,
  3433. suffix: str = "",
  3434. ) -> int:
  3435. resp = await async_client.post(
  3436. "/api/v1/auth/oidc/providers",
  3437. json={
  3438. "name": f"TestIdP-extra-{suffix or secrets.token_hex(4)}",
  3439. "issuer_url": issuer,
  3440. "client_id": client_id,
  3441. "client_secret": "sec",
  3442. "scopes": "openid email profile",
  3443. "is_enabled": True,
  3444. "auto_create_users": True,
  3445. "email_claim": email_claim,
  3446. "require_email_verified": require_email_verified,
  3447. },
  3448. headers={"Authorization": f"Bearer {admin_token}"},
  3449. )
  3450. assert resp.status_code == 201, resp.text
  3451. return resp.json()["id"]
  3452. class TestOIDCEmailResolutionExtra:
  3453. """T1: isinstance guard, T3: SEC-3 normalisation for Fall A/B, T4: inverse Combined-State-Guard."""
  3454. @pytest.mark.asyncio
  3455. @pytest.mark.integration
  3456. async def test_non_string_claim_value_drops_email(
  3457. self,
  3458. async_client: AsyncClient,
  3459. db_session: AsyncSession,
  3460. ):
  3461. """T1: A non-string email_claim value (list) must be silently dropped — no crash."""
  3462. private_pem, jwks_data = _make_test_rsa_key()
  3463. issuer = "https://nonstring-test.example"
  3464. client_id = "nonstring-client"
  3465. admin_token = await _setup_and_login(async_client, "nonstr_adm", "Nonstr123!")
  3466. provider_id = await _create_provider_via_api(
  3467. async_client,
  3468. admin_token,
  3469. issuer,
  3470. client_id,
  3471. email_claim="preferred_username",
  3472. require_email_verified=False,
  3473. suffix="nonstr",
  3474. )
  3475. from sqlalchemy import select
  3476. from backend.app.models.oidc_provider import UserOIDCLink
  3477. # IdP sends preferred_username as a list (non-string) — must not crash
  3478. location = await _run_oidc_callback(
  3479. async_client,
  3480. db_session,
  3481. provider_id=provider_id,
  3482. claims={"sub": "nonstr-sub-1", "preferred_username": ["user@example.com"]},
  3483. private_pem=private_pem,
  3484. jwks_data=jwks_data,
  3485. issuer=issuer,
  3486. client_id=client_id,
  3487. )
  3488. assert "internal_error" not in location, f"Unexpected error redirect: {location}"
  3489. link_result = await db_session.execute(
  3490. select(UserOIDCLink)
  3491. .where(UserOIDCLink.provider_id == provider_id)
  3492. .where(UserOIDCLink.provider_user_id == "nonstr-sub-1")
  3493. )
  3494. link = link_result.scalar_one_or_none()
  3495. assert link is not None
  3496. assert link.provider_email is None # list value dropped
  3497. @pytest.mark.asyncio
  3498. @pytest.mark.integration
  3499. async def test_fall_a_sec3_normalisation(
  3500. self,
  3501. async_client: AsyncClient,
  3502. db_session: AsyncSession,
  3503. ):
  3504. """T3: Fall A — uppercase + whitespace in email claim must be normalised to lowercase."""
  3505. private_pem, jwks_data = _make_test_rsa_key()
  3506. issuer = "https://sec3a-test.example"
  3507. client_id = "sec3a-client"
  3508. admin_token = await _setup_and_login(async_client, "sec3a_adm", "Sec3a123!")
  3509. provider_id = await _create_provider_via_api(
  3510. async_client,
  3511. admin_token,
  3512. issuer,
  3513. client_id,
  3514. email_claim="email",
  3515. require_email_verified=True,
  3516. suffix="sec3a",
  3517. )
  3518. from sqlalchemy import select
  3519. from backend.app.models.oidc_provider import UserOIDCLink
  3520. location = await _run_oidc_callback(
  3521. async_client,
  3522. db_session,
  3523. provider_id=provider_id,
  3524. claims={"sub": "sec3a-sub-1", "email": " USER@EXAMPLE.COM ", "email_verified": True},
  3525. private_pem=private_pem,
  3526. jwks_data=jwks_data,
  3527. issuer=issuer,
  3528. client_id=client_id,
  3529. )
  3530. assert "internal_error" not in location
  3531. link_result = await db_session.execute(
  3532. select(UserOIDCLink)
  3533. .where(UserOIDCLink.provider_id == provider_id)
  3534. .where(UserOIDCLink.provider_user_id == "sec3a-sub-1")
  3535. )
  3536. link = link_result.scalar_one_or_none()
  3537. assert link is not None
  3538. assert link.provider_email == "user@example.com"
  3539. @pytest.mark.asyncio
  3540. @pytest.mark.integration
  3541. async def test_fall_b_sec3_normalisation(
  3542. self,
  3543. async_client: AsyncClient,
  3544. db_session: AsyncSession,
  3545. ):
  3546. """T3: Fall B — uppercase + whitespace in email claim must be normalised to lowercase."""
  3547. private_pem, jwks_data = _make_test_rsa_key()
  3548. issuer = "https://sec3b-test.example"
  3549. client_id = "sec3b-client"
  3550. admin_token = await _setup_and_login(async_client, "sec3b_adm", "Sec3b123!")
  3551. provider_id = await _create_provider_via_api(
  3552. async_client,
  3553. admin_token,
  3554. issuer,
  3555. client_id,
  3556. email_claim="email",
  3557. require_email_verified=False,
  3558. suffix="sec3b",
  3559. )
  3560. from sqlalchemy import select
  3561. from backend.app.models.oidc_provider import UserOIDCLink
  3562. location = await _run_oidc_callback(
  3563. async_client,
  3564. db_session,
  3565. provider_id=provider_id,
  3566. claims={"sub": "sec3b-sub-1", "email": " USER@EXAMPLE.COM "},
  3567. private_pem=private_pem,
  3568. jwks_data=jwks_data,
  3569. issuer=issuer,
  3570. client_id=client_id,
  3571. )
  3572. assert "internal_error" not in location
  3573. link_result = await db_session.execute(
  3574. select(UserOIDCLink)
  3575. .where(UserOIDCLink.provider_id == provider_id)
  3576. .where(UserOIDCLink.provider_user_id == "sec3b-sub-1")
  3577. )
  3578. link = link_result.scalar_one_or_none()
  3579. assert link is not None
  3580. assert link.provider_email == "user@example.com"
  3581. @pytest.mark.asyncio
  3582. @pytest.mark.integration
  3583. async def test_combined_state_guard_email_claim_inverse_order(self, async_client: AsyncClient):
  3584. """Fall C: auto_link=True first, then switch email_claim to custom → both 200 (now allowed).
  3585. Custom claims never perform an email_verified check, so switching to a custom claim
  3586. while auto_link is on transitions from Fall A to Fall C — both are safe.
  3587. """
  3588. admin_token = await _setup_and_login(async_client, "inv_ec_adm", "InvEc123!")
  3589. create_resp = await async_client.post(
  3590. "/api/v1/auth/oidc/providers",
  3591. json={
  3592. "name": "InvEcTest",
  3593. "issuer_url": "https://inv-ec.test",
  3594. "client_id": "inv-ec",
  3595. "client_secret": "sec",
  3596. "scopes": "openid email",
  3597. },
  3598. headers={"Authorization": f"Bearer {admin_token}"},
  3599. )
  3600. assert create_resp.status_code == 201
  3601. provider_id = create_resp.json()["id"]
  3602. # First: enable auto_link (Fall A — email_claim='email', require_ev=True)
  3603. upd1 = await async_client.put(
  3604. f"/api/v1/auth/oidc/providers/{provider_id}",
  3605. json={"auto_link_existing_accounts": True},
  3606. headers={"Authorization": f"Bearer {admin_token}"},
  3607. )
  3608. assert upd1.status_code == 200
  3609. # Second: switch to custom claim → Fall C, still safe
  3610. upd2 = await async_client.put(
  3611. f"/api/v1/auth/oidc/providers/{provider_id}",
  3612. json={"email_claim": "preferred_username"},
  3613. headers={"Authorization": f"Bearer {admin_token}"},
  3614. )
  3615. assert upd2.status_code == 200
  3616. assert upd2.json()["auto_link_existing_accounts"] is True
  3617. assert upd2.json()["email_claim"] == "preferred_username"
  3618. # ===========================================================================
  3619. # E2E: Fall C (custom email claim) auto-link actually links existing user
  3620. # ===========================================================================
  3621. class TestOIDCFallCAutoLinkE2E:
  3622. """OIDC callback with email_claim='preferred_username' (Fall C / Azure Entra ID)
  3623. must auto-link an existing local user when auto_link_existing_accounts=True.
  3624. This test exercises _resolve_provider_email Fall C and the auto-link path in
  3625. oidc_callback — a regression in either would silently drop the link without
  3626. being caught by the configuration-layer tests.
  3627. """
  3628. @pytest.mark.asyncio
  3629. @pytest.mark.integration
  3630. async def test_fall_c_auto_link_links_existing_user_via_callback(
  3631. self, async_client: AsyncClient, db_session: AsyncSession
  3632. ):
  3633. from unittest.mock import AsyncMock, MagicMock, patch
  3634. from sqlalchemy import select as sa_select
  3635. from backend.app.core.auth import get_password_hash
  3636. from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
  3637. issuer = "https://entra.fallc.example.com"
  3638. nonce = secrets.token_urlsafe(32)
  3639. code_verifier = secrets.token_urlsafe(48)
  3640. # ── 1. Local user that should be linked ──────────────────────────────
  3641. alice = User(
  3642. username="fallc_alice",
  3643. email="alice.fallc@example.com",
  3644. password_hash=get_password_hash(secrets.token_urlsafe(16)),
  3645. role="user",
  3646. is_active=True,
  3647. )
  3648. db_session.add(alice)
  3649. await db_session.flush()
  3650. # ── 2. Provider: Fall C config (preferred_username, no email_verified) ─
  3651. provider = OIDCProvider(
  3652. name="AzureEntraFallC",
  3653. issuer_url=issuer,
  3654. client_id="azure-client",
  3655. _client_secret_enc="azure-secret",
  3656. scopes="openid profile",
  3657. is_enabled=True,
  3658. auto_link_existing_accounts=True,
  3659. auto_create_users=False,
  3660. email_claim="preferred_username",
  3661. require_email_verified=False,
  3662. )
  3663. db_session.add(provider)
  3664. await db_session.flush()
  3665. # ── 3. OIDC state token ───────────────────────────────────────────────
  3666. state = secrets.token_urlsafe(32)
  3667. db_session.add(
  3668. AuthEphemeralToken(
  3669. token=state,
  3670. token_type="oidc_state",
  3671. provider_id=provider.id,
  3672. nonce=nonce,
  3673. code_verifier=code_verifier,
  3674. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  3675. )
  3676. )
  3677. await db_session.commit()
  3678. # ── 4. Mock HTTP + JWT ────────────────────────────────────────────────
  3679. fake_discovery = {
  3680. "issuer": issuer,
  3681. "token_endpoint": f"{issuer}/token",
  3682. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  3683. }
  3684. fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
  3685. # Fall C: preferred_username carries the email; no email_verified key at all
  3686. fake_claims = {
  3687. "sub": "azure-sub-alice",
  3688. "preferred_username": "alice.fallc@example.com",
  3689. "iss": issuer,
  3690. "aud": "azure-client",
  3691. "nonce": nonce,
  3692. "exp": 9_999_999_999,
  3693. }
  3694. disc_resp = AsyncMock()
  3695. disc_resp.raise_for_status = MagicMock()
  3696. disc_resp.json = MagicMock(return_value=fake_discovery)
  3697. token_resp = AsyncMock()
  3698. token_resp.json = MagicMock(return_value=fake_token)
  3699. jwks_resp = AsyncMock()
  3700. jwks_resp.raise_for_status = MagicMock()
  3701. jwks_resp.json = MagicMock(return_value={})
  3702. mock_http = AsyncMock()
  3703. mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
  3704. mock_http.post = AsyncMock(return_value=token_resp)
  3705. mock_signing_key = MagicMock()
  3706. mock_signing_key.key = "fake_key"
  3707. with (
  3708. patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
  3709. patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
  3710. patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
  3711. ):
  3712. mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
  3713. mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
  3714. mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
  3715. callback_resp = await async_client.get(
  3716. f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
  3717. follow_redirects=False,
  3718. )
  3719. assert callback_resp.status_code == 302, callback_resp.text
  3720. location = callback_resp.headers.get("location", "")
  3721. assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
  3722. # ── 5. Exchange token → full JWT ──────────────────────────────────────
  3723. oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0].split("#")[-1]
  3724. exchange_resp = await async_client.post(
  3725. "/api/v1/auth/oidc/exchange",
  3726. json={"oidc_token": oidc_exchange_token},
  3727. )
  3728. assert exchange_resp.status_code == 200
  3729. assert exchange_resp.json()["user"]["username"] == "fallc_alice"
  3730. # ── 6. Verify UserOIDCLink was created in DB ──────────────────────────
  3731. async with db_session as s:
  3732. result = await s.execute(
  3733. sa_select(UserOIDCLink).where(
  3734. UserOIDCLink.user_id == alice.id,
  3735. UserOIDCLink.provider_id == provider.id,
  3736. )
  3737. )
  3738. link = result.scalar_one_or_none()
  3739. assert link is not None, "UserOIDCLink must have been created by auto-link"
  3740. assert link.provider_user_id == "azure-sub-alice"
  3741. class TestOIDCAutoCreateUsername:
  3742. """Username derivation priority for auto-created OIDC users (#1173).
  3743. Priority order: email local-part > preferred_username > name > provider_sub.
  3744. Covers: plain claim, spaces-sanitized, name fallback, sub fallback,
  3745. non-string isinstance guard, sanitizes-to-empty fallback, collision counter.
  3746. """
  3747. # ── shared helpers ───────────────────────────────────────────────────────
  3748. @staticmethod
  3749. async def _create_provider(async_client: AsyncClient, admin_token: str, issuer: str, client_id: str) -> int:
  3750. resp = await async_client.post(
  3751. "/api/v1/auth/oidc/providers",
  3752. json={
  3753. "name": f"AutoUser-{secrets.token_hex(4)}",
  3754. "issuer_url": issuer,
  3755. "client_id": client_id,
  3756. "client_secret": "secret",
  3757. "scopes": "openid profile",
  3758. "is_enabled": True,
  3759. "auto_create_users": True,
  3760. "email_claim": "email",
  3761. "require_email_verified": True,
  3762. },
  3763. headers={"Authorization": f"Bearer {admin_token}"},
  3764. )
  3765. assert resp.status_code == 201, resp.text
  3766. return resp.json()["id"]
  3767. @staticmethod
  3768. async def _exchange_username(async_client: AsyncClient, location: str) -> str:
  3769. assert "oidc_token=" in location, f"No oidc_token in redirect: {location}"
  3770. token = location.split("oidc_token=")[1].split("&")[0].split("#")[-1]
  3771. resp = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": token})
  3772. assert resp.status_code == 200, resp.text
  3773. return resp.json()["user"]["username"]
  3774. # ── tests ────────────────────────────────────────────────────────────────
  3775. @pytest.mark.asyncio
  3776. @pytest.mark.integration
  3777. async def test_preferred_username_used_when_no_email(self, async_client: AsyncClient, db_session: AsyncSession):
  3778. """preferred_username='johndoe' → username 'johndoe' (no email claim present)."""
  3779. private_pem, jwks_data = _make_test_rsa_key()
  3780. issuer = "https://au-pref.example"
  3781. client_id = "au-pref-client"
  3782. admin_token = await _setup_and_login(async_client, "au_pref_adm", "AuPrefAdm1!")
  3783. provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
  3784. location = await _run_oidc_callback(
  3785. async_client,
  3786. db_session,
  3787. provider_id=provider_id,
  3788. claims={"sub": "pref-sub-1", "preferred_username": "johndoe"},
  3789. private_pem=private_pem,
  3790. jwks_data=jwks_data,
  3791. issuer=issuer,
  3792. client_id=client_id,
  3793. )
  3794. username = await self._exchange_username(async_client, location)
  3795. assert username == "johndoe"
  3796. @pytest.mark.asyncio
  3797. @pytest.mark.integration
  3798. async def test_preferred_username_spaces_sanitized(self, async_client: AsyncClient, db_session: AsyncSession):
  3799. """preferred_username='John Doe' → sanitized to 'JohnDoe'."""
  3800. private_pem, jwks_data = _make_test_rsa_key()
  3801. issuer = "https://au-spaces.example"
  3802. client_id = "au-spaces-client"
  3803. admin_token = await _setup_and_login(async_client, "au_spaces_adm", "AuSpacesAdm1!")
  3804. provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
  3805. location = await _run_oidc_callback(
  3806. async_client,
  3807. db_session,
  3808. provider_id=provider_id,
  3809. claims={"sub": "spaces-sub-1", "preferred_username": "John Doe"},
  3810. private_pem=private_pem,
  3811. jwks_data=jwks_data,
  3812. issuer=issuer,
  3813. client_id=client_id,
  3814. )
  3815. username = await self._exchange_username(async_client, location)
  3816. assert username == "JohnDoe"
  3817. @pytest.mark.asyncio
  3818. @pytest.mark.integration
  3819. async def test_name_claim_used_when_no_preferred_username(
  3820. self, async_client: AsyncClient, db_session: AsyncSession
  3821. ):
  3822. """name='Jane Smith', no preferred_username → username 'JaneSmith'."""
  3823. private_pem, jwks_data = _make_test_rsa_key()
  3824. issuer = "https://au-name.example"
  3825. client_id = "au-name-client"
  3826. admin_token = await _setup_and_login(async_client, "au_name_adm", "AuNameAdm1!")
  3827. provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
  3828. location = await _run_oidc_callback(
  3829. async_client,
  3830. db_session,
  3831. provider_id=provider_id,
  3832. claims={"sub": "name-sub-1", "name": "Jane Smith"},
  3833. private_pem=private_pem,
  3834. jwks_data=jwks_data,
  3835. issuer=issuer,
  3836. client_id=client_id,
  3837. )
  3838. username = await self._exchange_username(async_client, location)
  3839. assert username == "JaneSmith"
  3840. @pytest.mark.asyncio
  3841. @pytest.mark.integration
  3842. async def test_provider_sub_fallback_when_no_claims(self, async_client: AsyncClient, db_session: AsyncSession):
  3843. """No preferred_username, no name, no email → username derived from provider_sub."""
  3844. private_pem, jwks_data = _make_test_rsa_key()
  3845. issuer = "https://au-sub.example"
  3846. client_id = "au-sub-client"
  3847. admin_token = await _setup_and_login(async_client, "au_sub_adm", "AuSubAdm1!")
  3848. provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
  3849. location = await _run_oidc_callback(
  3850. async_client,
  3851. db_session,
  3852. provider_id=provider_id,
  3853. claims={"sub": "abc123xyz"},
  3854. private_pem=private_pem,
  3855. jwks_data=jwks_data,
  3856. issuer=issuer,
  3857. client_id=client_id,
  3858. )
  3859. username = await self._exchange_username(async_client, location)
  3860. assert username == "abc123xyz"
  3861. @pytest.mark.asyncio
  3862. @pytest.mark.integration
  3863. async def test_non_string_preferred_username_falls_through_to_name(
  3864. self, async_client: AsyncClient, db_session: AsyncSession
  3865. ):
  3866. """preferred_username is a list (non-string) → isinstance guard skips it, uses name."""
  3867. private_pem, jwks_data = _make_test_rsa_key()
  3868. issuer = "https://au-nonstr.example"
  3869. client_id = "au-nonstr-client"
  3870. admin_token = await _setup_and_login(async_client, "au_nonstr_adm", "AuNonstrAdm1!")
  3871. provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
  3872. location = await _run_oidc_callback(
  3873. async_client,
  3874. db_session,
  3875. provider_id=provider_id,
  3876. claims={"sub": "nonstr-sub-2", "preferred_username": ["listval"], "name": "BobJones"},
  3877. private_pem=private_pem,
  3878. jwks_data=jwks_data,
  3879. issuer=issuer,
  3880. client_id=client_id,
  3881. )
  3882. username = await self._exchange_username(async_client, location)
  3883. assert username == "BobJones"
  3884. @pytest.mark.asyncio
  3885. @pytest.mark.integration
  3886. async def test_preferred_username_sanitizes_to_empty_falls_through_to_name(
  3887. self, async_client: AsyncClient, db_session: AsyncSession
  3888. ):
  3889. """preferred_username='!!!' sanitizes to '' → falls through to name claim."""
  3890. private_pem, jwks_data = _make_test_rsa_key()
  3891. issuer = "https://au-empty.example"
  3892. client_id = "au-empty-client"
  3893. admin_token = await _setup_and_login(async_client, "au_empty_adm", "AuEmptyAdm1!")
  3894. provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
  3895. location = await _run_oidc_callback(
  3896. async_client,
  3897. db_session,
  3898. provider_id=provider_id,
  3899. claims={"sub": "empty-sub-1", "preferred_username": "!!!", "name": "bob"},
  3900. private_pem=private_pem,
  3901. jwks_data=jwks_data,
  3902. issuer=issuer,
  3903. client_id=client_id,
  3904. )
  3905. username = await self._exchange_username(async_client, location)
  3906. assert username == "bob"
  3907. @pytest.mark.asyncio
  3908. @pytest.mark.integration
  3909. async def test_username_collision_appends_counter(self, async_client: AsyncClient, db_session: AsyncSession):
  3910. """When preferred_username 'collider' is already taken, counter suffix is appended."""
  3911. from backend.app.core.auth import get_password_hash
  3912. # Pre-create a user occupying the candidate username
  3913. existing = User(
  3914. username="collider",
  3915. email="collider@example.com",
  3916. password_hash=get_password_hash("irrelevant"),
  3917. role="user",
  3918. is_active=True,
  3919. )
  3920. db_session.add(existing)
  3921. await db_session.commit()
  3922. private_pem, jwks_data = _make_test_rsa_key()
  3923. issuer = "https://au-collision.example"
  3924. client_id = "au-collision-client"
  3925. admin_token = await _setup_and_login(async_client, "au_col_adm", "AuColAdm1!")
  3926. provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
  3927. location = await _run_oidc_callback(
  3928. async_client,
  3929. db_session,
  3930. provider_id=provider_id,
  3931. claims={"sub": "col-sub-1", "preferred_username": "collider"},
  3932. private_pem=private_pem,
  3933. jwks_data=jwks_data,
  3934. issuer=issuer,
  3935. client_id=client_id,
  3936. )
  3937. username = await self._exchange_username(async_client, location)
  3938. assert username == "collider1"
  3939. # ===========================================================================
  3940. # OIDC auto-create: configurable default group (#1173 Thread 2)
  3941. # ===========================================================================
  3942. class TestOIDCAutoCreateDefaultGroup:
  3943. """Auto-created OIDC users receive the provider's configured default group.
  3944. Resolution order:
  3945. 1. provider.default_group_id (configured)
  3946. 2. "Viewers" system group (fallback when default_group_id is None)
  3947. 3. no group (last resort when both are unavailable)
  3948. All tests are DB-agnostic: they verify group membership via the OIDC
  3949. exchange response, which includes the user's group list.
  3950. """
  3951. @staticmethod
  3952. async def _create_provider(
  3953. async_client: AsyncClient,
  3954. admin_token: str,
  3955. *,
  3956. issuer: str,
  3957. client_id: str,
  3958. default_group_id: int | None = None,
  3959. ) -> int:
  3960. payload: dict = {
  3961. "name": f"DgAutoProvider-{secrets.token_hex(4)}",
  3962. "issuer_url": issuer,
  3963. "client_id": client_id,
  3964. "client_secret": "secret",
  3965. "scopes": "openid profile",
  3966. "is_enabled": True,
  3967. "auto_create_users": True,
  3968. "email_claim": "email",
  3969. "require_email_verified": True,
  3970. }
  3971. if default_group_id is not None:
  3972. payload["default_group_id"] = default_group_id
  3973. resp = await async_client.post(
  3974. "/api/v1/auth/oidc/providers",
  3975. json=payload,
  3976. headers={"Authorization": f"Bearer {admin_token}"},
  3977. )
  3978. assert resp.status_code == 201, resp.text
  3979. return resp.json()["id"]
  3980. @staticmethod
  3981. async def _run_autocreate_and_get_groups(
  3982. async_client: AsyncClient,
  3983. db_session: AsyncSession,
  3984. *,
  3985. provider_id: int,
  3986. sub: str,
  3987. issuer: str,
  3988. client_id: str,
  3989. private_pem: bytes,
  3990. jwks_data: dict,
  3991. ) -> list[str]:
  3992. """Complete OIDC callback + exchange and return the new user's group names."""
  3993. location = await _run_oidc_callback(
  3994. async_client,
  3995. db_session,
  3996. provider_id=provider_id,
  3997. claims={"sub": sub},
  3998. private_pem=private_pem,
  3999. jwks_data=jwks_data,
  4000. issuer=issuer,
  4001. client_id=client_id,
  4002. )
  4003. assert "oidc_token=" in location, f"No oidc_token in redirect: {location}"
  4004. token = location.split("oidc_token=")[1].split("&")[0].split("#")[-1]
  4005. resp = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": token})
  4006. assert resp.status_code == 200, resp.text
  4007. return [g["name"] for g in resp.json()["user"]["groups"]]
  4008. # ── tests ────────────────────────────────────────────────────────────────
  4009. @pytest.mark.asyncio
  4010. @pytest.mark.integration
  4011. async def test_configured_group_assigned_to_auto_created_user(
  4012. self, async_client: AsyncClient, db_session: AsyncSession
  4013. ):
  4014. """Auto-created user is placed in the provider's configured default_group_id."""
  4015. from sqlalchemy import select
  4016. from backend.app.models.group import Group
  4017. private_pem, jwks_data = _make_test_rsa_key()
  4018. issuer = "https://dg-configured.example"
  4019. client_id = "dg-configured-client"
  4020. admin_token = await _setup_and_login(async_client, "dg_cfg_adm", "DgCfgAdm1!")
  4021. grp_result = await db_session.execute(select(Group).where(Group.name == "Operators"))
  4022. operators = grp_result.scalar_one()
  4023. provider_id = await self._create_provider(
  4024. async_client,
  4025. admin_token,
  4026. issuer=issuer,
  4027. client_id=client_id,
  4028. default_group_id=operators.id,
  4029. )
  4030. group_names = await self._run_autocreate_and_get_groups(
  4031. async_client,
  4032. db_session,
  4033. provider_id=provider_id,
  4034. sub="dg-cfg-sub-1",
  4035. issuer=issuer,
  4036. client_id=client_id,
  4037. private_pem=private_pem,
  4038. jwks_data=jwks_data,
  4039. )
  4040. assert "Operators" in group_names, f"Expected Operators, got {group_names}"
  4041. assert "Viewers" not in group_names
  4042. @pytest.mark.asyncio
  4043. @pytest.mark.integration
  4044. async def test_null_default_group_id_falls_back_to_viewers(
  4045. self, async_client: AsyncClient, db_session: AsyncSession
  4046. ):
  4047. """When default_group_id is None, auto-created user falls back to Viewers."""
  4048. private_pem, jwks_data = _make_test_rsa_key()
  4049. issuer = "https://dg-null.example"
  4050. client_id = "dg-null-client"
  4051. admin_token = await _setup_and_login(async_client, "dg_null_adm", "DgNullAdm1!")
  4052. provider_id = await self._create_provider(
  4053. async_client,
  4054. admin_token,
  4055. issuer=issuer,
  4056. client_id=client_id,
  4057. )
  4058. group_names = await self._run_autocreate_and_get_groups(
  4059. async_client,
  4060. db_session,
  4061. provider_id=provider_id,
  4062. sub="dg-null-sub-1",
  4063. issuer=issuer,
  4064. client_id=client_id,
  4065. private_pem=private_pem,
  4066. jwks_data=jwks_data,
  4067. )
  4068. assert "Viewers" in group_names, f"Expected Viewers, got {group_names}"
  4069. @pytest.mark.asyncio
  4070. @pytest.mark.integration
  4071. async def test_dangling_default_group_id_falls_back_to_viewers(
  4072. self, async_client: AsyncClient, db_session: AsyncSession
  4073. ):
  4074. """When configured group is deleted, auto-created user falls back to Viewers.
  4075. SQLite does not enforce FK ON DELETE SET NULL (no PRAGMA foreign_keys=ON),
  4076. so provider.default_group_id may point to a deleted group. The runtime
  4077. resolution chain must handle this and fall back to Viewers.
  4078. """
  4079. from sqlalchemy import delete as sa_delete, select
  4080. from backend.app.models.group import Group
  4081. private_pem, jwks_data = _make_test_rsa_key()
  4082. issuer = "https://dg-dangling.example"
  4083. client_id = "dg-dangling-client"
  4084. admin_token = await _setup_and_login(async_client, "dg_dangle_adm", "DgDangleAdm1!")
  4085. # Create a temporary group and use it as default_group_id
  4086. temp_group = Group(name="TempGroup-DgDangle", permissions=[])
  4087. db_session.add(temp_group)
  4088. await db_session.commit()
  4089. await db_session.refresh(temp_group)
  4090. temp_group_id = temp_group.id
  4091. provider_id = await self._create_provider(
  4092. async_client,
  4093. admin_token,
  4094. issuer=issuer,
  4095. client_id=client_id,
  4096. default_group_id=temp_group_id,
  4097. )
  4098. # Delete the group — simulates dangling FK (especially on SQLite)
  4099. await db_session.execute(sa_delete(Group).where(Group.id == temp_group_id))
  4100. await db_session.commit()
  4101. group_names = await self._run_autocreate_and_get_groups(
  4102. async_client,
  4103. db_session,
  4104. provider_id=provider_id,
  4105. sub="dg-dangle-sub-1",
  4106. issuer=issuer,
  4107. client_id=client_id,
  4108. private_pem=private_pem,
  4109. jwks_data=jwks_data,
  4110. )
  4111. assert "Viewers" in group_names, f"Expected Viewers fallback, got {group_names}"
  4112. @pytest.mark.asyncio
  4113. @pytest.mark.integration
  4114. async def test_administrators_group_can_be_set_as_default(
  4115. self, async_client: AsyncClient, db_session: AsyncSession
  4116. ):
  4117. """Operators can configure Administrators as the default group (e.g. single-tenant IdP)."""
  4118. from sqlalchemy import select
  4119. from backend.app.models.group import Group
  4120. private_pem, jwks_data = _make_test_rsa_key()
  4121. issuer = "https://dg-admin.example"
  4122. client_id = "dg-admin-client"
  4123. admin_token = await _setup_and_login(async_client, "dg_admgrp_adm", "DgAdmgrpAdm1!")
  4124. grp_result = await db_session.execute(select(Group).where(Group.name == "Administrators"))
  4125. administrators = grp_result.scalar_one()
  4126. provider_id = await self._create_provider(
  4127. async_client,
  4128. admin_token,
  4129. issuer=issuer,
  4130. client_id=client_id,
  4131. default_group_id=administrators.id,
  4132. )
  4133. group_names = await self._run_autocreate_and_get_groups(
  4134. async_client,
  4135. db_session,
  4136. provider_id=provider_id,
  4137. sub="dg-admin-sub-1",
  4138. issuer=issuer,
  4139. client_id=client_id,
  4140. private_pem=private_pem,
  4141. jwks_data=jwks_data,
  4142. )
  4143. assert "Administrators" in group_names, f"Expected Administrators, got {group_names}"