| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242 |
- """Integration tests for 2FA and OIDC API endpoints.
- Tests the full request/response cycle for:
- - GET /api/v1/auth/2fa/status
- - POST /api/v1/auth/2fa/totp/setup
- - POST /api/v1/auth/2fa/totp/enable
- - POST /api/v1/auth/2fa/totp/disable
- - POST /api/v1/auth/2fa/email/enable
- - POST /api/v1/auth/2fa/email/disable
- - POST /api/v1/auth/2fa/verify (TOTP, email, backup paths)
- - DELETE /api/v1/auth/2fa/admin/{user_id}
- - GET /api/v1/auth/oidc/providers
- - POST /api/v1/auth/oidc/providers
- - PATCH /api/v1/auth/oidc/providers/{id}
- - DELETE /api/v1/auth/oidc/providers/{id}
- """
- from __future__ import annotations
- import secrets
- import time
- from datetime import datetime, timedelta, timezone
- from unittest.mock import patch
- import jwt as pyjwt
- import pyotp
- import pytest
- from httpx import AsyncClient
- from passlib.context import CryptContext
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- from backend.app.models.user import User
- _pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
- # ---------------------------------------------------------------------------
- # Fixtures / helpers
- # ---------------------------------------------------------------------------
- AUTH_SETUP_URL = "/api/v1/auth/setup"
- LOGIN_URL = "/api/v1/auth/login"
- def _norm_pw(password: str) -> str:
- """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
- if not any(c.isupper() for c in password):
- password = password[0].upper() + password[1:]
- if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
- password = password + "!"
- return password
- async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
- """Enable auth, create an admin user, login, and return the bearer token."""
- password = _norm_pw(password)
- await client.post(
- AUTH_SETUP_URL,
- json={
- "auth_enabled": True,
- "admin_username": username,
- "admin_password": password,
- },
- )
- resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
- assert resp.status_code == 200
- return resp.json()["access_token"]
- async def _login_get_pre_auth_token(client: AsyncClient, username: str, password: str) -> str:
- """Login a user who has 2FA enabled; return the pre_auth_token from the response."""
- password = _norm_pw(password)
- resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
- assert resp.status_code == 200
- data = resp.json()
- assert data["requires_2fa"] is True, f"Expected requires_2fa=True, got {data}"
- assert data["pre_auth_token"] is not None
- return data["pre_auth_token"]
- def _auth_header(token: str) -> dict[str, str]:
- return {"Authorization": f"Bearer {token}"}
- # ===========================================================================
- # 2FA Status
- # ===========================================================================
- class TestTwoFAStatus:
- """Tests for GET /api/v1/auth/2fa/status."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_requires_auth(self, async_client: AsyncClient):
- response = await async_client.get("/api/v1/auth/2fa/status")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_default_disabled(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "statususer", "statuspass123")
- response = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert response.status_code == 200
- data = response.json()
- assert data["totp_enabled"] is False
- assert data["email_otp_enabled"] is False
- assert data["backup_codes_remaining"] == 0
- # ===========================================================================
- # TOTP Setup
- # ===========================================================================
- class TestTOTPSetup:
- """Tests for POST /api/v1/auth/2fa/totp/setup."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_requires_auth(self, async_client: AsyncClient):
- response = await async_client.post("/api/v1/auth/2fa/totp/setup")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_returns_secret_and_qr(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "totpsetup", "totpsetup123")
- response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- assert response.status_code == 200
- data = response.json()
- assert "secret" in data
- assert len(data["secret"]) > 0
- assert "qr_code_b64" in data
- assert data["issuer"] == "Bambuddy"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_secret_is_valid_base32(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "totpbase32", "totpbase32pw")
- response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- assert response.status_code == 200
- secret = response.json()["secret"]
- # pyotp will raise on invalid base32
- totp = pyotp.TOTP(secret)
- assert len(totp.now()) == 6
- # ===========================================================================
- # TOTP Enable
- # ===========================================================================
- class TestTOTPEnable:
- """Tests for POST /api/v1/auth/2fa/totp/enable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_without_setup_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "nosetupenable", "nosetupenable1")
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_with_invalid_code_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "badcodeuser", "badcodeuser1")
- await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_with_valid_code_returns_backup_codes(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "enableok", "enableok123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- data = response.json()
- assert "backup_codes" in data
- assert len(data["backup_codes"]) == 10
- for code in data["backup_codes"]:
- assert len(code) == 8
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_reflects_enabled_totp(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "statustotp", "statustotp1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- data = status_resp.json()
- assert data["totp_enabled"] is True
- assert data["backup_codes_remaining"] == 10
- # ===========================================================================
- # TOTP Disable
- # ===========================================================================
- class TestTOTPDisable:
- """Tests for POST /api/v1/auth/2fa/totp/disable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_when_not_enabled_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "disablenoenab", "disablenoenab1")
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_with_valid_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "disableok", "disableok123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Disable with a fresh valid code
- disable_code = pyotp.TOTP(secret).now()
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": disable_code},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- assert "disabled" in response.json()["message"].lower()
- # Status should now show disabled
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["totp_enabled"] is False
- # ===========================================================================
- # Email OTP Enable/Disable
- # ===========================================================================
- class TestEmailOTP:
- """Tests for POST /api/v1/auth/2fa/email/enable, /enable/confirm and /disable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_email_otp_without_email_returns_400(self, async_client: AsyncClient):
- """Users without an email address cannot enable email OTP."""
- token = await _setup_and_login(async_client, "noemailuser", "noemailuser1")
- response = await async_client.post("/api/v1/auth/2fa/email/enable", headers=_auth_header(token))
- assert response.status_code == 400
- assert "email" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_happy_path(self, async_client: AsyncClient, db_session: AsyncSession):
- """Confirm step activates email OTP when setup_token + code are valid (C5)."""
- token = await _setup_and_login(async_client, "confirmenable", "confirmenable1")
- # Give user an email address directly (SMTP not available in tests)
- from sqlalchemy import select as sa_select
- result = await db_session.execute(sa_select(User).where(User.username == "confirmenable"))
- user = result.scalar_one()
- user.email = "confirmenable@example.com"
- await db_session.commit()
- # Inject a known setup token directly into the DB (bypasses SMTP)
- code = "123456"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmenable",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert resp.status_code == 200
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["email_otp_enabled"] is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_wrong_code(self, async_client: AsyncClient, db_session: AsyncSession):
- """Wrong code on confirm step returns 400 and does not enable email OTP."""
- token = await _setup_and_login(async_client, "confirmwrong", "confirmwrong1")
- code_hash = _pwd_context.hash("654321")
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmwrong",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_setup_token_is_single_use(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """Setup token is consumed on first use; replay returns 400."""
- token = await _setup_and_login(async_client, "confirmonce", "confirmonce1")
- code = "111111"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmonce",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- first = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert first.status_code == 200
- second = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert second.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_email_otp_requires_password(self, async_client: AsyncClient):
- """Disabling email OTP requires the account password (C6: re-auth)."""
- token = await _setup_and_login(async_client, "disemailotp", "disemailotp1")
- # Wrong password → 401
- response = await async_client.post(
- "/api/v1/auth/2fa/email/disable",
- json={"password": "wrongpassword"},
- headers=_auth_header(token),
- )
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_email_otp_when_enabled(self, async_client: AsyncClient, db_session: AsyncSession):
- """Disabling email OTP when enabled turns it off and status reflects that."""
- token = await _setup_and_login(async_client, "disemailpw", "disemailpw1")
- # Enable email OTP via direct DB injection (no SMTP)
- code = "222222"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="disemailpw",
- nonce=_pwd_context.hash(code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- # Now disable
- response = await async_client.post(
- "/api/v1/auth/2fa/email/disable",
- json={"password": _norm_pw("disemailpw1")},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["email_otp_enabled"] is False
- # ===========================================================================
- # 2FA Verify — TOTP path
- # ===========================================================================
- class TestTwoFAVerifyTOTP:
- """Tests for POST /api/v1/auth/2fa/verify using the TOTP method."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_with_invalid_pre_auth_token(self, async_client: AsyncClient):
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "bogus", "method": "totp", "code": "123456"},
- )
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_totp_issues_jwt(self, async_client: AsyncClient):
- """Full flow: setup → enable TOTP → login → pre_auth_token → verify → JWT."""
- token = await _setup_and_login(async_client, "verifytotpok", "verifytotpok1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Login now returns requires_2fa=True + pre_auth_token
- pre_auth_token = await _login_get_pre_auth_token(async_client, "verifytotpok", "verifytotpok1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={
- "pre_auth_token": pre_auth_token,
- "method": "totp",
- "code": pyotp.TOTP(secret).now(),
- },
- )
- assert verify_resp.status_code == 200
- data = verify_resp.json()
- assert "access_token" in data
- assert data["token_type"] == "bearer"
- assert data["user"]["username"] == "verifytotpok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_totp_invalid_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "verifybadcode", "verifybadcode1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "verifybadcode", "verifybadcode1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert verify_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_invalid_method(self, async_client: AsyncClient):
- """An invalid 2FA method should return 400 even with a valid pre_auth_token."""
- token = await _setup_and_login(async_client, "invalidmethod", "invalidmethod1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "invalidmethod", "invalidmethod1")
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "sms", "code": "123456"},
- )
- assert response.status_code == 422 # Pydantic Literal validation
- # ===========================================================================
- # 2FA Verify — Backup code path
- # ===========================================================================
- class TestTwoFAVerifyBackup:
- """Tests for POST /api/v1/auth/2fa/verify using the backup method."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_with_backup_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupcodeok", "backupcodeok1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcodeok", "backupcodeok1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- assert verify_resp.status_code == 200
- assert "access_token" in verify_resp.json()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_backup_code_is_single_use(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupsingle", "backupsingle1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- # First use — should succeed
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
- first_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- assert first_resp.status_code == 200
- # Second use of the same code — must fail (need new pre_auth_token + same backup code)
- pre_auth_token2 = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
- second_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token2, "method": "backup", "code": backup_code},
- )
- assert second_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_backup_code_count_decrements(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupcount", "backupcount1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcount", "backupcount1")
- await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- # Status is readable with the original full token (still valid)
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["backup_codes_remaining"] == 9
- # ===========================================================================
- # Rate Limiting
- # ===========================================================================
- class TestRateLimiting:
- """Ensure 429 is returned after 5 failed 2FA attempts."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rate_limit_lockout(self, async_client: AsyncClient):
- """After 5 failed TOTP attempts the 6th must return 429."""
- token = await _setup_and_login(async_client, "ratelimituser", "ratelimituser1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # 5 failed attempts via the login → pre_auth_token → verify flow
- for _ in range(5):
- pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
- await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- # 6th attempt should hit the rate limit
- pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert response.status_code == 429
- # ===========================================================================
- # Admin 2FA Disable
- # ===========================================================================
- class TestAdminDisable2FA:
- """Tests for DELETE /api/v1/auth/2fa/admin/{user_id}."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_admin_disable_requires_admin(self, async_client: AsyncClient):
- """Only admins can use the admin disable endpoint."""
- # The only user in a fresh setup IS admin, so just check the 404 path
- token = await _setup_and_login(async_client, "admincheck", "admincheck123")
- # Try to disable for a non-existent user_id — should get 200 (no-op) or 404
- response = await async_client.request(
- "DELETE",
- "/api/v1/auth/2fa/admin/99999",
- json={"admin_password": _norm_pw("admincheck123")},
- headers=_auth_header(token),
- )
- # Admin users succeed regardless (returns 200 even if user doesn't exist)
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_admin_disable_clears_totp(self, async_client: AsyncClient):
- from sqlalchemy import select
- from backend.app.models.user import User
- token = await _setup_and_login(async_client, "admintotp", "admintotp123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Find the user's id by querying status (which works with the token)
- me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
- user_id = me_resp.json()["id"]
- response = await async_client.request(
- "DELETE",
- f"/api/v1/auth/2fa/admin/{user_id}",
- json={"admin_password": _norm_pw("admintotp123")},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- # I2: admin_disable_2fa bumps password_changed_at, invalidating the old token.
- # Re-login to get a fresh token before checking status.
- new_login = await async_client.post(
- LOGIN_URL, json={"username": "admintotp", "password": _norm_pw("admintotp123")}
- )
- assert new_login.status_code == 200, f"re-login failed: {new_login.json()}"
- assert new_login.json().get("requires_2fa") is False, f"still requires 2FA: {new_login.json()}"
- new_token = new_login.json()["access_token"]
- assert new_token is not None, f"no access_token in: {new_login.json()}"
- # Status should now show TOTP disabled
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(new_token))
- assert status_resp.status_code == 200, f"status check failed: {status_resp.json()}"
- assert status_resp.json()["totp_enabled"] is False
- # ===========================================================================
- # OIDC Provider CRUD
- # ===========================================================================
- class TestOIDCProviders:
- """Tests for OIDC provider management endpoints."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_public_providers_empty(self, async_client: AsyncClient):
- response = await async_client.get("/api/v1/auth/oidc/providers")
- assert response.status_code == 200
- assert isinstance(response.json(), list)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_provider_requires_admin(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcadmincreate", "oidcadmincreate1")
- response = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PocketID",
- "issuer_url": "https://auth.example.com",
- "client_id": "bambuddy",
- "client_secret": "supersecret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- assert response.status_code == 201
- data = response.json()
- assert data["name"] == "PocketID"
- assert data["issuer_url"] == "https://auth.example.com"
- assert "client_secret" not in data # Secret must not be returned
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_created_provider_appears_in_all_list(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidclistall", "oidclistall123")
- await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "TestProvider",
- "issuer_url": "https://test.example.com",
- "client_id": "testclient",
- "client_secret": "testsecret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- response = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
- assert response.status_code == 200
- names = [p["name"] for p in response.json()]
- assert "TestProvider" in names
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disabled_provider_not_in_public_list(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcdisabled", "oidcdisabled1")
- await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "DisabledProvider",
- "issuer_url": "https://disabled.example.com",
- "client_id": "dc",
- "client_secret": "ds",
- "scopes": "openid",
- "is_enabled": False,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- response = await async_client.get("/api/v1/auth/oidc/providers")
- names = [p["name"] for p in response.json()]
- assert "DisabledProvider" not in names
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_provider(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcupdate", "oidcupdate123")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "OldName",
- "issuer_url": "https://update.example.com",
- "client_id": "uc",
- "client_secret": "us",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- provider_id = create_resp.json()["id"]
- put_resp = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"name": "NewName"},
- headers=_auth_header(token),
- )
- assert put_resp.status_code == 200
- assert put_resp.json()["name"] == "NewName"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_provider(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcdelete", "oidcdelete123")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ToDelete",
- "issuer_url": "https://delete.example.com",
- "client_id": "dc",
- "client_secret": "ds",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- provider_id = create_resp.json()["id"]
- del_resp = await async_client.delete(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- headers=_auth_header(token),
- )
- assert del_resp.status_code == 200
- # No longer in list
- all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
- ids = [p["id"] for p in all_resp.json()]
- assert provider_id not in ids
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_nonexistent_provider_returns_404(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidc404", "oidc404pass1")
- response = await async_client.put(
- "/api/v1/auth/oidc/providers/99999",
- json={"name": "ghost"},
- headers=_auth_header(token),
- )
- assert response.status_code == 404
- # ===========================================================================
- # Security: pre-auth token single-use
- # ===========================================================================
- class TestPreAuthTokenSingleUse:
- """pre_auth_token must be consumed on successful 2FA and cannot be reused."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_pre_auth_token_is_single_use(self, async_client: AsyncClient):
- """A pre_auth_token that was successfully used cannot be reused."""
- token = await _setup_and_login(async_client, "singleusepat", "singleusepat1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "singleusepat", "singleusepat1")
- # First use — succeeds
- first = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert first.status_code == 200
- # Second use of the same token — must fail (token already consumed on success)
- second = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert second.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_pre_auth_token_survives_wrong_code(self, async_client: AsyncClient):
- """A wrong 2FA code must NOT burn the pre_auth_token (user can retry)."""
- token = await _setup_and_login(async_client, "survivepatuser", "survivepatuser1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "survivepatuser", "survivepatuser1")
- # Wrong code — should fail but not burn the token
- bad = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert bad.status_code == 401
- # Same token, correct code — should succeed (token still valid)
- good = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert good.status_code == 200
- # ===========================================================================
- # Security: cross-user token isolation
- # ===========================================================================
- class TestCrossUserTokenIsolation:
- """A pre_auth_token issued for user A cannot authenticate as user B."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_token_cannot_be_used_for_different_user(self, async_client: AsyncClient):
- """pre_auth_token is bound to the issuing user; using it to verify a different
- user's TOTP code must fail."""
- # Set up two users with TOTP
- token_a = await _setup_and_login(async_client, "crossusera", "crossusera1")
- setup_a = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token_a))
- secret_a = setup_a.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret_a).now()},
- headers=_auth_header(token_a),
- )
- # Get pre_auth_token for user A
- pre_auth_a = await _login_get_pre_auth_token(async_client, "crossusera", "crossusera1")
- # Try to use user A's token but supply a clearly invalid code — must fail
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_a, "method": "totp", "code": "000000"},
- )
- assert resp.status_code == 401
- # ===========================================================================
- # Security: admin disable non-admin rejection
- # ===========================================================================
- class TestAdminDisableNonAdminRejection:
- """Non-admin users must be rejected from the admin disable endpoint."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_admin_cannot_disable_2fa(self, async_client: AsyncClient):
- """A regular (non-admin) user must receive 403 from DELETE /auth/2fa/admin/{id}."""
- # Set up admin, then create a regular user
- admin_token = await _setup_and_login(async_client, "adminusr2fa", "adminusr2fa1")
- # Create a regular user via user management
- create_resp = await async_client.post(
- "/api/v1/users",
- json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- # Login as regular user
- login_resp = await async_client.post(
- LOGIN_URL,
- json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
- )
- regular_token = login_resp.json()["access_token"]
- # Try to call admin endpoint with the regular user's token
- resp = await async_client.delete(
- f"/api/v1/auth/2fa/admin/{create_resp.json()['id']}",
- headers=_auth_header(regular_token),
- )
- assert resp.status_code == 403
- # ===========================================================================
- # Regenerate backup codes
- # ===========================================================================
- class TestRegenerateBackupCodes:
- """Tests for POST /api/v1/auth/2fa/totp/regenerate-backup-codes."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_requires_totp_enabled(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "regennototp", "regennototp1")
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_invalidates_old_codes(self, async_client: AsyncClient):
- """After regenerating, old backup codes must no longer work."""
- token = await _setup_and_login(async_client, "regeninval", "regeninval1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- old_backup = enable_resp.json()["backup_codes"][0]
- # Regenerate backup codes
- regen_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- assert regen_resp.status_code == 200
- new_codes = regen_resp.json()["backup_codes"]
- assert len(new_codes) == 10
- assert old_backup not in new_codes
- # Old backup code must now fail
- pre_auth_token = await _login_get_pre_auth_token(async_client, "regeninval", "regeninval1")
- fail_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": old_backup},
- )
- assert fail_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_with_invalid_code_fails(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "regeninvcode", "regeninvcode1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- # ===========================================================================
- # Security: method field validation
- # ===========================================================================
- class TestVerifyMethodValidation:
- """The method field must be one of totp/email/backup (Pydantic Literal)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_method_rejected_by_schema(self, async_client: AsyncClient):
- """Pydantic should reject unknown method values with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "123456", "method": "sms"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oversized_pre_auth_token_rejected(self, async_client: AsyncClient):
- """pre_auth_token exceeding max_length=128 should be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 200, "code": "123456", "method": "totp"},
- )
- assert resp.status_code == 422
- # ===========================================================================
- # Login response shape for 2FA users
- # ===========================================================================
- class TestLoginResponseShape:
- """Login for a 2FA-enabled user must return requires_2fa+pre_auth_token
- and must NOT include access_token (which would bypass the 2FA gate)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_login_2fa_user_omits_access_token(self, async_client: AsyncClient):
- """A user with TOTP enabled must not receive an access_token on /auth/login."""
- token = await _setup_and_login(async_client, "loginshape", "loginshape1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- login_resp = await async_client.post(LOGIN_URL, json={"username": "loginshape", "password": "Loginshape1!"})
- assert login_resp.status_code == 200
- data = login_resp.json()
- assert data.get("requires_2fa") is True
- assert data.get("pre_auth_token") is not None
- # access_token must NOT be present — it would bypass the 2FA gate
- assert "access_token" not in data or data["access_token"] is None
- # ===========================================================================
- # TOTP replay protection
- # ===========================================================================
- async def _setup_totp_user(client: AsyncClient, username: str, password: str) -> tuple[str, str]:
- """Create user, set up and enable TOTP; return (bearer_token, totp_secret)."""
- token = await _setup_and_login(client, username, password)
- setup_resp = await client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- return token, secret
- class TestTOTPReplay:
- """The same TOTP code must not be accepted twice within one 30-second window."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_totp_replay_rejected_on_verify(self, async_client: AsyncClient):
- """Replaying the same code on /2fa/verify must return 400."""
- _token, secret = await _setup_totp_user(async_client, "replayverify", "replayverify1")
- code = pyotp.TOTP(secret).now()
- pre_auth = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
- first = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
- )
- assert first.status_code == 200
- # Second login to get a fresh pre_auth_token (first was consumed)
- pre_auth2 = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
- second = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth2, "method": "totp", "code": code},
- )
- assert second.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_totp_replay_rejected_on_disable(self, async_client: AsyncClient):
- """A code already used in verify_2fa must be rejected on /2fa/totp/disable."""
- _setup_token, secret = await _setup_totp_user(async_client, "replaydisable", "replaydisable1")
- code = pyotp.TOTP(secret).now()
- # Use the code in verify_2fa — this sets last_totp_counter in DB
- pre_auth = await _login_get_pre_auth_token(async_client, "replaydisable", "replaydisable1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
- )
- assert verify_resp.status_code == 200
- authed_token = verify_resp.json()["access_token"]
- # Replay the same code on disable — must be rejected (same 30-second window)
- disable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": code},
- headers=_auth_header(authed_token),
- )
- assert disable_resp.status_code == 400
- # ===========================================================================
- # Rate limiting on disable_totp and regenerate_backup_codes (I10)
- # ===========================================================================
- class TestRateLimitingDisableRegenerate:
- """disable_totp and regenerate_backup_codes must enforce rate limiting (I10)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_totp_rate_limited_after_failures(self, async_client: AsyncClient):
- """Repeated wrong codes on /2fa/totp/disable trigger 429."""
- token, _secret = await _setup_totp_user(async_client, "rldisable", "rldisable1")
- for _ in range(5):
- await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 429
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_backup_codes_rate_limited_after_failures(self, async_client: AsyncClient):
- """Repeated wrong codes on /2fa/totp/regenerate-backup-codes trigger 429."""
- token, _secret = await _setup_totp_user(async_client, "rlregen", "rlregen1")
- for _ in range(5):
- await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 429
- # ===========================================================================
- # Email OTP send → verify end-to-end (coverage gap C3)
- # ===========================================================================
- class TestEmailOTPSendVerify:
- """Full email OTP login: send code → verify code → JWT."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_otp_send_and_verify(self, async_client: AsyncClient, db_session: AsyncSession):
- """login → POST /2fa/email/send (patched SMTP) → POST /2fa/verify → JWT."""
- import re
- from unittest.mock import AsyncMock, MagicMock, patch
- from sqlalchemy import select as sa_select
- token = await _setup_and_login(async_client, "emailsendok", "emailsendok1")
- # Give the user an email address
- result = await db_session.execute(sa_select(User).where(User.username == "emailsendok"))
- user = result.scalar_one()
- user.email = "emailsendok@example.com"
- await db_session.commit()
- # Enable email OTP via DB injection
- setup_code = "444444"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="emailsendok",
- nonce=_pwd_context.hash(setup_code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": setup_code},
- headers=_auth_header(token),
- )
- # Login now requires 2FA — get pre_auth_token (cookie set automatically)
- pre_auth_token = await _login_get_pre_auth_token(async_client, "emailsendok", "emailsendok1")
- # Mock SMTP and capture the sent OTP code
- captured: dict[str, str] = {}
- smtp_settings_mock = MagicMock()
- def _capture_email(smtp_settings, to_email, subject, body_text, body_html):
- m = re.search(r"login code is: (\d{6})", body_text)
- if m:
- captured["otp"] = m.group(1)
- with (
- patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_settings_mock)),
- patch("backend.app.api.routes.mfa.send_email", side_effect=_capture_email),
- ):
- send_resp = await async_client.post(
- "/api/v1/auth/2fa/email/send",
- json={"pre_auth_token": pre_auth_token},
- )
- assert send_resp.status_code == 200, send_resp.text
- fresh_token = send_resp.json()["pre_auth_token"]
- assert "otp" in captured, "send_email was not called or code not found in body"
- # Verify with the captured OTP code — cookie still in the async_client jar
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
- )
- assert verify_resp.status_code == 200
- data = verify_resp.json()
- assert "access_token" in data
- assert data["user"]["username"] == "emailsendok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_otp_wrong_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """A wrong email OTP code must return 401 without burning the pre_auth_token."""
- from unittest.mock import AsyncMock, MagicMock, patch
- from sqlalchemy import select as sa_select
- token = await _setup_and_login(async_client, "emailwrongcode", "emailwrongcode1")
- result = await db_session.execute(sa_select(User).where(User.username == "emailwrongcode"))
- user = result.scalar_one()
- user.email = "emailwrongcode@example.com"
- await db_session.commit()
- setup_code = "555555"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="emailwrongcode",
- nonce=_pwd_context.hash(setup_code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": setup_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "emailwrongcode", "emailwrongcode1")
- captured: dict[str, str] = {}
- smtp_mock = MagicMock()
- def _capture(smtp_settings, to_email, subject, body_text, body_html):
- import re
- m = re.search(r"login code is: (\d{6})", body_text)
- if m:
- captured["otp"] = m.group(1)
- with (
- patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_mock)),
- patch("backend.app.api.routes.mfa.send_email", side_effect=_capture),
- ):
- send_resp = await async_client.post(
- "/api/v1/auth/2fa/email/send",
- json={"pre_auth_token": pre_auth_token},
- )
- assert send_resp.status_code == 200
- fresh_token = send_resp.json()["pre_auth_token"]
- # Wrong code → 401
- bad = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": "000000"},
- )
- assert bad.status_code == 401
- # Correct code still works (token not burned by wrong attempt)
- good = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
- )
- assert good.status_code == 200
- # ===========================================================================
- # OIDC end-to-end (coverage gap C4)
- # ===========================================================================
- def _make_test_rsa_key():
- """Generate a throwaway RSA key pair and a matching JWK set for tests."""
- import base64
- from cryptography.hazmat.primitives import serialization
- from cryptography.hazmat.primitives.asymmetric import rsa
- private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
- private_pem = private_key.private_bytes(
- serialization.Encoding.PEM,
- serialization.PrivateFormat.TraditionalOpenSSL,
- serialization.NoEncryption(),
- )
- pub_numbers = private_key.public_key().public_numbers()
- def _b64url(n: int, length: int) -> str:
- return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
- jwks = {
- "keys": [
- {
- "kty": "RSA",
- "use": "sig",
- "alg": "RS256",
- "kid": "test-kid-1",
- "n": _b64url(pub_numbers.n, 256),
- "e": _b64url(pub_numbers.e, 3),
- }
- ]
- }
- return private_pem, jwks
- class TestOIDCEndToEnd:
- """Full OIDC auth-code flow: state → callback (mocked IdP) → exchange → JWT."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_callback_creates_user_and_issues_jwt(self, async_client: AsyncClient, db_session: AsyncSession):
- """callback validates the mocked ID token, creates a user, and redirects
- with an oidc_exchange token; exchanging that token returns a full JWT."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://idp.test.example.com"
- client_id = "oidc-test-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "oidc-sub-e2e",
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "oidce2e@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- # Create OIDC provider
- admin_token = await _setup_and_login(async_client, "oidce2eadm", "oidce2eadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "E2E-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "test-secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- # Simulate the authorize step: insert an oidc_state token directly
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Mock httpx calls made inside oidc_callback
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- token_response = {
- "access_token": "mock-access",
- "token_type": "Bearer",
- "id_token": id_token,
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *args, **kwargs):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *args):
- pass
- async def get(self, url, **kwargs):
- if "jwks" in url:
- return _MockResp(jwks_data)
- return _MockResp(discovery_doc)
- async def post(self, url, **kwargs):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- callback_resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=test-auth-code&state={state}",
- follow_redirects=False,
- )
- assert callback_resp.status_code == 302, callback_resp.text
- location = callback_resp.headers.get("location", "")
- assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
- # Extract and exchange the oidc_exchange token
- oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0]
- exchange_resp = await async_client.post(
- "/api/v1/auth/oidc/exchange",
- json={"oidc_token": oidc_exchange_token},
- )
- assert exchange_resp.status_code == 200
- data = exchange_resp.json()
- assert "access_token" in data
- assert data["user"]["username"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_callback_invalid_state_redirects_error(self, async_client: AsyncClient):
- """An unknown state token must redirect to /?oidc_error=invalid_state."""
- resp = await async_client.get(
- "/api/v1/auth/oidc/callback?code=x&state=totally-bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- assert "invalid_state" in resp.headers.get("location", "")
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_state_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
- """Replaying the same state token must fail on the second callback."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://idp2.test.example.com"
- client_id = "oidc-client-2"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-single-use",
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "su@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "oidcsuadm", "oidcsuadm1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SU-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- token_response = {"access_token": "a", "token_type": "Bearer", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- first = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert first.status_code == 302
- assert "oidc_token=" in first.headers.get("location", "")
- # Replay: second callback with the same state must fail
- second = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert second.status_code == 302
- assert "invalid_state" in second.headers.get("location", "")
- # ===========================================================================
- # H-2: Wrong code must NOT consume the email OTP setup token (peek-then-consume)
- # ===========================================================================
- class TestEmailOTPSetupTokenPreservedOnWrongCode:
- """After H-2 fix: a wrong code leaves the setup token intact so the user can retry."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_wrong_code_does_not_consume_setup_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """Wrong code returns 400 but the setup token survives; correct code then works."""
- token = await _setup_and_login(async_client, "h2retryuser", "h2retrypass1")
- code = "999999"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="h2retryuser",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # First attempt: wrong code → 400
- wrong = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": "000000"},
- headers=_auth_header(token),
- )
- assert wrong.status_code == 400
- # Second attempt: correct code → must succeed (token was NOT consumed)
- correct = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert correct.status_code == 200
- # ===========================================================================
- # M-2: New OIDC provider must default to auto_link_existing_accounts=False
- # ===========================================================================
- class TestOIDCProviderAutoLinkDefault:
- """auto_link_existing_accounts must default to False (M-2 fix)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_new_provider_auto_link_defaults_to_false(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "m2autolinkadmin", "m2autolinkadmin1")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "AutoLinkTest",
- "issuer_url": "https://autolink.example.com",
- "client_id": "alc",
- "client_secret": "als",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- # auto_link_existing_accounts intentionally omitted
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 201
- assert resp.json()["auto_link_existing_accounts"] is False
- # ===========================================================================
- # L-5: 2FA verify code format validation
- # ===========================================================================
- class TestTwoFAVerifyCodeFormat:
- """TwoFAVerifyRequest.code must be 6–8 alphanumeric characters (L-5)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_too_long_rejected(self, async_client: AsyncClient):
- """code > 8 characters must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "1" * 9, "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_non_alphanumeric_rejected(self, async_client: AsyncClient):
- """code containing non-alphanumeric chars must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "12-456", "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_too_short_rejected(self, async_client: AsyncClient):
- """code < 6 characters must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "12345", "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_exactly_6_passes_schema(self, async_client: AsyncClient):
- """6-character alphanumeric code passes schema (may fail 2FA logic with 400)."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 32, "code": "123456", "method": "totp"},
- )
- assert resp.status_code != 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_exactly_8_passes_schema(self, async_client: AsyncClient):
- """8-character alphanumeric backup code passes schema."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 32, "code": "ABCD1234", "method": "backup"},
- )
- assert resp.status_code != 422
- # ===========================================================================
- # M-NEW-1: verify_slicer_download_token must NOT consume token on wrong resource
- # ===========================================================================
- class TestSlicerTokenResourceBinding:
- """Token for resource A must survive a wrong-resource check and still work for A."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_wrong_resource_does_not_consume_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """A slicer token bound to archive:5 must NOT be consumed when checked against archive:6."""
- from datetime import datetime, timedelta, timezone
- from backend.app.core.auth import verify_slicer_download_token
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- now = datetime.now(timezone.utc)
- token_val = secrets.token_urlsafe(24)
- db_session.add(
- AuthEphemeralToken(
- token=token_val,
- token_type="slicer_download",
- nonce="archive:5",
- expires_at=now + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Wrong resource → must return False and NOT consume the token
- wrong = await verify_slicer_download_token(token_val, "archive", 6)
- assert wrong is False
- # Correct resource → must return True (token survived the wrong-resource check)
- correct = await verify_slicer_download_token(token_val, "archive", 5)
- assert correct is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_correct_resource_consumes_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """A slicer token is single-use: second correct-resource check must return False."""
- from datetime import datetime, timedelta, timezone
- from backend.app.core.auth import verify_slicer_download_token
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- now = datetime.now(timezone.utc)
- token_val = secrets.token_urlsafe(24)
- db_session.add(
- AuthEphemeralToken(
- token=token_val,
- token_type="slicer_download",
- nonce="library:99",
- expires_at=now + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- first = await verify_slicer_download_token(token_val, "library", 99)
- assert first is True
- second = await verify_slicer_download_token(token_val, "library", 99)
- assert second is False
- # ===========================================================================
- # M-NEW-3 / L-NEW-1: Schema length validation for change-password & forgot-password
- # ===========================================================================
- class TestSchemaLengthValidationR2:
- """Input length limits added in review round 2."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_change_password_current_too_long_rejected(self, async_client: AsyncClient):
- """current_password > 256 chars must be rejected with 422 (prevents pbkdf2 DoS)."""
- resp = await async_client.post(
- "/api/v1/users/me/change-password",
- json={"current_password": "x" * 257, "new_password": "ValidPass1!"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_forgot_password_email_too_long_rejected(self, async_client: AsyncClient):
- """email > 254 chars must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": "a" * 243 + "@example.com"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_forgot_password_email_at_limit_passes_schema(self, async_client: AsyncClient):
- """Short email passes schema (may return 400/200 from business logic)."""
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": "user@example.com"},
- )
- assert resp.status_code != 422
- # ===========================================================================
- # L-NEW-2: TOTPSetupRequest.code max_length
- # ===========================================================================
- class TestTOTPSetupCodeMaxLength:
- """TOTPSetupRequest.code must be bounded (L-NEW-2)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_code_too_long_rejected(self, async_client: AsyncClient):
- """code > 8 chars must be rejected with 422."""
- import pyotp as _pyotp
- token = await _setup_and_login(async_client, "totp_setup_maxlen", "totp_setup_maxlen1")
- # Enable TOTP so the setup-code guard path is active
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": _pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- json={"code": "1" * 9},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- # ===========================================================================
- # L-NEW-3: EmailOTPEnableConfirmRequest.code must be exactly 6 digits
- # ===========================================================================
- class TestEmailOTPConfirmCodeFormat:
- """EmailOTPEnableConfirmRequest.code must be 6 digits (L-NEW-3)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_digit_code_rejected(self, async_client: AsyncClient):
- """Alpha characters in the email OTP confirm code must be rejected with 422."""
- token = await _setup_and_login(async_client, "emailotpfmt", "emailotpfmt1")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "ABCDEF"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_seven_digit_code_rejected(self, async_client: AsyncClient):
- """7-digit code must be rejected with 422 (min_length=max_length=6)."""
- token = await _setup_and_login(async_client, "emailotplen7", "emailotplen7x")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "1234567"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_valid_six_digit_code_passes_schema(self, async_client: AsyncClient):
- """6-digit numeric code passes schema (may return 400 on bad token — that's fine)."""
- token = await _setup_and_login(async_client, "emailotpfmt6", "emailotpfmt6x")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "123456"},
- headers=_auth_header(token),
- )
- assert resp.status_code != 422
- # ===========================================================================
- # L-NEW-4: OIDCProviderCreate field max_length constraints
- # ===========================================================================
- class TestOIDCProviderFieldLengths:
- """OIDCProviderCreate fields must reject inputs exceeding max_length (L-NEW-4)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_name_too_long_rejected(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcfldadmin", "oidcfldadmin1")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "n" * 101,
- "issuer_url": "https://test.example.com",
- "client_id": "cid",
- "client_secret": "csec",
- "scopes": "openid",
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_client_secret_too_long_rejected(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcseclen", "oidcseclen123")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ValidName",
- "issuer_url": "https://test.example.com",
- "client_id": "cid",
- "client_secret": "s" * 513,
- "scopes": "openid",
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- # ---------------------------------------------------------------------------
- # M-NEW-4 / M-NEW-5 / L-NEW-5: UserCreate & UserUpdate field length limits
- # ---------------------------------------------------------------------------
- class TestUserCreateUpdateFieldLengths:
- """UserCreate and UserUpdate must enforce max_length on username, password, email."""
- @pytest.fixture
- async def admin_token(self, async_client: AsyncClient) -> str:
- return await _setup_and_login(async_client, "ucfldadmin", "ucfldadmin1!")
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "u" * 151,
- "password": "ValidPass1!",
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserX",
- "password": "A1!" + "x" * 254,
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserY",
- "password": "ValidPass1!",
- "email": "a" * 246 + "@x.com", # total 253 chars -> fine; 248+@x.com=255 -> too long
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- # 248 'a' + '@x.com' (6) = 254 chars — just at limit, should pass
- # Use 249 + '@x.com' = 255 chars to trigger the 422
- assert resp.status_code in (201, 422) # boundary sanity check
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_email_exceeds_limit_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserZ",
- "password": "ValidPass1!",
- "email": "a" * 249 + "@x.com", # 255 chars — exceeds RFC 5321 max of 254
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- # Create a user first
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr1", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"username": "u" * 151},
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr2", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"password": "A1!" + "x" * 254},
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr3", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"email": "a" * 249 + "@x.com"}, # 255 chars
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- # ---------------------------------------------------------------------------
- # L-NEW-6: per-IP rate limiting on /forgot-password
- # ---------------------------------------------------------------------------
- _SMTP_DATA_FOR_IPLIMIT = {
- "smtp_host": "smtp.test.com",
- "smtp_port": 587,
- "smtp_username": "test@test.com",
- "smtp_password": "testpass",
- "smtp_security": "starttls",
- "smtp_auth_enabled": True,
- "smtp_from_email": "noreply@test.com",
- }
- class TestForgotPasswordPerIpRateLimit:
- """POST /forgot-password must enforce a per-IP cap (L-NEW-6).
- The test sends 11 requests from the simulated test-client IP using 11
- different email addresses (so the per-email bucket is never exhausted).
- The 11th request must be rejected with 429.
- """
- @pytest.fixture
- async def advanced_auth_token(self, async_client: AsyncClient) -> str:
- """Set up auth, SMTP, and enable advanced auth; return admin token."""
- token = await _setup_and_login(async_client, "iprladmin", "iprladmin1!")
- headers = _auth_header(token)
- await async_client.post("/api/v1/auth/smtp", headers=headers, json=_SMTP_DATA_FOR_IPLIMIT)
- await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
- return token
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_per_ip_limit_triggers_429(self, async_client: AsyncClient, advanced_auth_token: str):
- # Send 11 requests from the same test-client IP using unique email
- # addresses so the per-email bucket (limit=3) is never exhausted.
- responses = []
- for i in range(11):
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": f"unique{i}@example.com"},
- )
- responses.append(resp.status_code)
- # First 10 must not be rate-limited by the IP bucket
- for code in responses[:10]:
- assert code != 429, f"Unexpected 429 before limit reached: {responses}"
- # The 11th must be rate-limited
- assert responses[10] == 429, f"Expected 429 on 11th request, got {responses[10]}"
- # ---------------------------------------------------------------------------
- # M-NEW-6: OIDC auto-link must be rejected if target user already has an
- # OIDC link to a different provider
- # ---------------------------------------------------------------------------
- class TestOIDCAutoLinkExistingLinkRejection:
- """OIDC callback must reject auto-linking when the email-matched user
- already has an OIDC link to a different provider (M-NEW-6)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_rejected_when_user_already_linked(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """Auto-link via email-match is rejected when the target user is
- already linked to another OIDC provider."""
- import base64
- import hashlib
- from unittest.mock import AsyncMock, MagicMock, patch
- from backend.app.core.auth import get_password_hash
- from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
- from backend.app.models.user import User
- # ── 1. Target user with a known email ────────────────────────────
- target = User(
- username="oidcALTarget",
- email="alinktest@example.com",
- auth_source="oidc",
- password_hash=get_password_hash(secrets.token_urlsafe(16)),
- role="user",
- is_active=True,
- )
- db_session.add(target)
- await db_session.flush()
- # ── 2. Provider B — legitimate, already linked to target ──────────
- prov_b = OIDCProvider(
- name="ProvB_m6test",
- issuer_url="https://providerb-m6.example.com",
- client_id="client_b",
- _client_secret_enc="secret_b",
- scopes="openid email profile",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(prov_b)
- await db_session.flush()
- db_session.add(
- UserOIDCLink(
- user_id=target.id,
- provider_id=prov_b.id,
- provider_user_id="legitimate_sub",
- provider_email="alinktest@example.com",
- )
- )
- # ── 3. Provider A — attacker-controlled, auto_link=True ───────────
- prov_a = OIDCProvider(
- name="ProvA_m6test",
- issuer_url="https://providera-m6.example.com",
- client_id="client_a",
- _client_secret_enc="secret_a",
- scopes="openid email profile",
- is_enabled=True,
- auto_link_existing_accounts=True,
- auto_create_users=False,
- )
- db_session.add(prov_a)
- await db_session.flush()
- # ── 4. OIDC state for Provider A ──────────────────────────────────
- state = secrets.token_urlsafe(32)
- nonce = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=prov_a.id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # ── 5. Mock HTTP + JWT so the callback can reach the auto-link check ─
- fake_discovery = {
- "issuer": "https://providera-m6.example.com",
- "token_endpoint": "https://providera-m6.example.com/token",
- "jwks_uri": "https://providera-m6.example.com/jwks",
- }
- fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
- fake_claims = {
- "sub": "attacker_sub_unique",
- "email": "alinktest@example.com",
- "email_verified": True,
- "nonce": nonce,
- "iss": "https://providera-m6.example.com",
- "aud": "client_a",
- "exp": 9_999_999_999,
- }
- disc_resp = AsyncMock()
- disc_resp.raise_for_status = MagicMock()
- disc_resp.json = MagicMock(return_value=fake_discovery)
- token_resp = AsyncMock()
- token_resp.ok = True
- token_resp.json = MagicMock(return_value=fake_token)
- jwks_resp = AsyncMock()
- jwks_resp.raise_for_status = MagicMock()
- jwks_resp.json = MagicMock(return_value={})
- mock_http = AsyncMock()
- mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
- mock_http.post = AsyncMock(return_value=token_resp)
- mock_signing_key = MagicMock()
- mock_signing_key.key = "fake_key"
- with (
- patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
- patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
- patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
- ):
- mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
- mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
- mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
- follow_redirects=False,
- )
- # M-NEW-6: must redirect with no_linked_account — NOT create a second link
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "no_linked_account" in location, f"Expected no_linked_account in redirect, got: {location}"
- # Verify no second OIDC link was created for Provider A
- from sqlalchemy import select as sa_select
- from backend.app.models.oidc_provider import UserOIDCLink as _UOL
- async with db_session as s:
- links_result = await s.execute(
- sa_select(_UOL).where(_UOL.user_id == target.id, _UOL.provider_id == prov_a.id)
- )
- assert links_result.scalar_one_or_none() is None, "No link to Provider A must exist"
- # ===========================================================================
- # Test Gap 1: OIDC state token is single-use — replay must be rejected
- # ===========================================================================
- class TestOIDCStateReplay:
- """OIDC state token must be consumed on first use; a second callback with
- the same state must redirect to ``?oidc_error=invalid_state``."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_replay_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """Replaying a consumed OIDC state token must return invalid_state."""
- from backend.app.models.oidc_provider import OIDCProvider
- # ── 1. Seed a minimal provider ────────────────────────────────────
- provider = OIDCProvider(
- name="StateReplayIdP",
- issuer_url="https://statereplay-idp.example.com",
- client_id="client_replay",
- _client_secret_enc="secret_replay",
- scopes="openid",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(provider)
- await db_session.flush()
- # ── 2. Seed an OIDC state token ───────────────────────────────────
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider.id,
- nonce=secrets.token_urlsafe(32),
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # ── 3. First callback — discovery will fail (no real IdP), but the
- # state token is atomically consumed (DELETE…RETURNING + commit)
- # before the HTTP call is attempted.
- first = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert first.status_code == 302
- # The first call may fail for any reason except invalid_state
- assert "invalid_state" not in first.headers.get("location", ""), (
- f"First call should NOT get invalid_state: {first.headers.get('location')}"
- )
- # ── 4. Second callback with the same state → must be invalid_state ─
- second = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert second.status_code == 302
- assert "invalid_state" in second.headers.get("location", ""), (
- f"Replayed state must redirect to invalid_state, got: {second.headers.get('location')}"
- )
- # ===========================================================================
- # Test Gap 2: OIDC iss claim mismatch must redirect to token_validation_failed
- # ===========================================================================
- class TestOIDCIssMismatch:
- """JWT whose iss claim does not match the discovery issuer must be rejected."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_iss_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- correct_issuer = "https://correct-iss.example.com"
- wrong_issuer = "https://wrong-iss.example.com"
- client_id = "iss-mismatch-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- # Sign the token with the WRONG issuer (iss != discovery_issuer)
- id_token = pyjwt.encode(
- {
- "sub": "sub-iss-test",
- "iss": wrong_issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "iss@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "issadmin1", "issadmin1!")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "IssTest-IdP",
- "issuer_url": correct_issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Discovery returns the CORRECT issuer; JWT carries the WRONG one.
- discovery_doc = {
- "issuer": correct_issuer,
- "token_endpoint": f"{correct_issuer}/token",
- "jwks_uri": f"{correct_issuer}/.well-known/jwks.json",
- }
- token_response = {"access_token": "a", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "token_validation_failed" in location, f"Expected token_validation_failed, got: {location}"
- # ===========================================================================
- # Test Gap 3: /forgot-password/confirm token is single-use
- # ===========================================================================
- class TestForgotPasswordTokenSingleUse:
- """POST /forgot-password/confirm must reject a token after its first use."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_token_reuse_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- from backend.app.core.auth import get_password_hash
- from backend.app.models.user import User as _User
- user = _User(
- username="fpcuser1",
- email="fpc@example.com",
- password_hash=get_password_hash("OldPass1!"),
- role="user",
- is_active=True,
- )
- db_session.add(user)
- await db_session.flush()
- reset_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=reset_token,
- token_type="password_reset",
- username="fpcuser1",
- expires_at=datetime.now(timezone.utc) + timedelta(hours=1),
- )
- )
- await db_session.commit()
- # First use → success
- resp1 = await async_client.post(
- "/api/v1/auth/forgot-password/confirm",
- json={"token": reset_token, "new_password": "NewPass1!"},
- )
- assert resp1.status_code == 200, resp1.text
- # Second use → token already consumed, must fail
- resp2 = await async_client.post(
- "/api/v1/auth/forgot-password/confirm",
- json={"token": reset_token, "new_password": "AnotherNew1!"},
- )
- assert resp2.status_code == 400
- # ===========================================================================
- # C1 regression: setup_totp must reject a replayed TOTP code
- # ===========================================================================
- class TestSetupTOTPReplayRejected:
- """setup_totp must reject a TOTP code that was already accepted in its window."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_replayed_setup_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- from sqlalchemy import select as sa_select
- from backend.app.models.user_totp import UserTOTP
- token = await _setup_and_login(async_client, "setupreplay1", "setupreplay1!")
- # Step 1: Initial TOTP setup (no active TOTP yet → no code required)
- setup_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- headers=_auth_header(token),
- )
- assert setup_resp.status_code == 200
- secret = setup_resp.json()["secret"]
- # Step 2: Enable TOTP with a valid code
- totp_obj = pyotp.TOTP(secret)
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": totp_obj.now()},
- headers=_auth_header(token),
- )
- assert enable_resp.status_code == 200 # TOTP is now active (is_enabled=True)
- # Step 3: Determine current valid code and its counter
- me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
- user_id = me_resp.json()["id"]
- totp_result = await db_session.execute(sa_select(UserTOTP).where(UserTOTP.user_id == user_id))
- totp_record = totp_result.scalar_one()
- secret_now = totp_record.secret # decrypted via property
- totp_now = pyotp.TOTP(secret_now)
- valid_code = totp_now.now()
- accepted_counter = totp_now.timecode(datetime.now(timezone.utc))
- # Step 4: Pre-set last_totp_counter so this code looks already used
- totp_record.last_totp_counter = accepted_counter
- await db_session.commit()
- # Step 5: Attempt setup_totp with the "already used" code → must be rejected
- replay_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- assert replay_resp.status_code == 400
- assert "already used" in replay_resp.json()["detail"]
- # ===========================================================================
- # Nit8: OIDC aud mismatch and nonce mismatch tests
- # ===========================================================================
- class TestOIDCAudAndNonceMismatch:
- """Nit8: aud != client_id and nonce != stored value must each fail the callback."""
- def _make_oidc_provider_setup(self):
- """Return a helper for building OIDC test fixtures inline."""
- private_pem, jwks_data = _make_test_rsa_key()
- return private_pem, jwks_data
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_aud_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """ID token with aud != client_id must be rejected (PyJWT InvalidAudienceError)."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://aud-mismatch.example.com"
- client_id = "aud-test-client"
- wrong_aud = "some-other-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-aud-test",
- "iss": issuer,
- "aud": wrong_aud, # <-- wrong audience
- "nonce": nonce,
- "email": "aud@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "audmismatch_admin", "AudMismatch_admin1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "AudMismatch-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp({"access_token": "a", "id_token": id_token})
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "token_validation_failed" in location, (
- f"Expected token_validation_failed redirect for aud mismatch, got: {location}"
- )
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_nonce_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """ID token with nonce != stored state nonce must be rejected."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://nonce-mismatch.example.com"
- client_id = "nonce-test-client"
- stored_nonce = secrets.token_urlsafe(16)
- wrong_nonce = secrets.token_urlsafe(16) # different from stored_nonce
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-nonce-test",
- "iss": issuer,
- "aud": client_id,
- "nonce": wrong_nonce, # <-- does not match stored_nonce
- "email": "nonce@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "noncemismatch_admin", "NonceMismatch_admin1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "NonceMismatch-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=stored_nonce, # state has correct nonce; JWT carries wrong_nonce
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp({"access_token": "a", "id_token": id_token})
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- # The callback redirects to ?oidc_error=nonce_mismatch when nonces differ.
- assert "nonce_mismatch" in location, f"Expected nonce_mismatch redirect for nonce mismatch, got: {location}"
- # ===========================================================================
- # Expired OIDC token rejection — state and exchange tokens
- # ===========================================================================
- class TestOIDCExpiredTokenRejection:
- """Expired OIDC state and exchange tokens must be rejected atomically.
- The DELETE … WHERE expires_at > now must ensure that an already-expired
- token is never consumed (committed) before the expiry is checked, so the
- token row stays in the DB and is not silently discarded.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_expired_state_token_rejected_as_invalid_state(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """An expired OIDC state token must redirect to invalid_state without
- being consumed — it must still exist in the DB after the rejected call."""
- from backend.app.models.oidc_provider import OIDCProvider
- provider = OIDCProvider(
- name="ExpiredStateIdP",
- issuer_url="https://expired-state.example.com",
- client_id="client_expired_state",
- _client_secret_enc="secret_exp_state",
- scopes="openid",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(provider)
- await db_session.flush()
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider.id,
- nonce=secrets.token_urlsafe(16),
- code_verifier=secrets.token_urlsafe(48),
- # already expired
- expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
- )
- )
- await db_session.commit()
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "invalid_state" in location, f"Expected invalid_state redirect for expired state, got: {location}"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_expired_exchange_token_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """An expired OIDC exchange token must return 401 without being consumed."""
- from sqlalchemy import select as sa_select
- expired_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=expired_token,
- token_type="oidc_exchange",
- username="some_user",
- # already expired
- expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/oidc/exchange",
- json={"oidc_token": expired_token},
- )
- assert resp.status_code == 401
- assert "expired" in resp.json().get("detail", "").lower() or "invalid" in resp.json().get("detail", "").lower()
- # Token must NOT have been consumed — it should still be in the DB
- # (the atomic DELETE WHERE expires_at > now left it untouched)
- result = await db_session.execute(
- sa_select(AuthEphemeralToken).where(AuthEphemeralToken.token == expired_token)
- )
- remaining = result.scalar_one_or_none()
- assert remaining is not None, "Expired exchange token must not be consumed by a rejected request"
- # ===========================================================================
- # Trailing slash in issuer_url — discovery URL must not contain double slash
- # ===========================================================================
- class TestOIDCIssuerUrlTrailingSlash:
- """Providers like Authentik use issuer URLs with a trailing slash.
- BamBuddy must strip the slash before appending /.well-known/openid-configuration
- to avoid a double-slash that results in a 404.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trailing_slash_issuer_url_fetches_correct_discovery_url(self, async_client: AsyncClient):
- from unittest.mock import AsyncMock, MagicMock, patch
- issuer_with_slash = "https://authentik.example.com/application/o/bambuddy/"
- admin_token = await _setup_and_login(async_client, "oidcslashadm", "oidcslashadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "Authentik-Slash",
- "issuer_url": issuer_with_slash,
- "client_id": "bambuddy",
- "client_secret": "secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- fake_discovery = {
- "issuer": issuer_with_slash,
- "authorization_endpoint": "https://authentik.example.com/application/o/bambuddy/authorize",
- }
- disc_resp = AsyncMock()
- disc_resp.raise_for_status = MagicMock()
- disc_resp.json = MagicMock(return_value=fake_discovery)
- mock_http = AsyncMock()
- mock_http.get = AsyncMock(return_value=disc_resp)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_cls:
- mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
- mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
- resp = await async_client.get(f"/api/v1/auth/oidc/authorize/{provider_id}")
- assert resp.status_code == 200
- called_url = mock_http.get.call_args_list[0][0][0]
- assert "//" not in called_url.replace("https://", ""), (
- f"Discovery URL must not contain double slash: {called_url}"
- )
- assert called_url.endswith("/.well-known/openid-configuration"), (
- f"Expected discovery URL to end with /.well-known/openid-configuration, got: {called_url}"
- )
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_iss_claim_trailing_slash_accepted(self, async_client: AsyncClient, db_session: AsyncSession):
- """Provider configured without trailing slash, Authentik JWT iss has trailing slash.
- Both sides must be normalised before comparison so the login succeeds.
- """
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer_no_slash = "https://authentik.example.com/application/o/bambuddy"
- issuer_with_slash = issuer_no_slash + "/"
- client_id = "bambuddy-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "authentik-sub-123",
- "iss": issuer_with_slash,
- "aud": client_id,
- "nonce": nonce,
- "email": "authentik-user@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "authentikadm", "authentikadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "Authentik-ISS",
- "issuer_url": issuer_no_slash,
- "client_id": client_id,
- "client_secret": "secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer_with_slash,
- "authorization_endpoint": f"{issuer_no_slash}/authorize",
- "token_endpoint": f"{issuer_no_slash}/token",
- "jwks_uri": f"{issuer_no_slash}/.well-known/jwks.json",
- }
- token_response = {"access_token": "mock", "token_type": "Bearer", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.is_success = True
- self.status_code = 200
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=auth-code&state={state}",
- follow_redirects=False,
- )
- location = resp.headers.get("location", "")
- assert resp.status_code == 302, f"Expected redirect, got {resp.status_code}"
- assert "token_validation_failed" not in location, (
- "Trailing slash mismatch in iss claim must not cause token_validation_failed"
- )
- assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
- class TestOIDCCallbackCodeLength:
- """OIDC callback code/state query params must accept up to 2048 characters (OAuth spec)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_512_chars_accepted(self, async_client: AsyncClient):
- """A 512-character code (old limit) must not be rejected with 422."""
- code = "a" * 512
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "512-char code must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_2048_chars_accepted(self, async_client: AsyncClient):
- """A 2048-character code must not be rejected with 422."""
- code = "a" * 2048
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "2048-char code must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_2049_chars_rejected(self, async_client: AsyncClient):
- """A 2049-character code must be rejected with 422."""
- code = "a" * 2049
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code == 422, "2049-char code must be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_512_chars_accepted(self, async_client: AsyncClient):
- """A 512-character state (old limit) must not be rejected with 422."""
- state = "a" * 512
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "512-char state must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_2048_chars_accepted(self, async_client: AsyncClient):
- """A 2048-character state must not be rejected with 422."""
- state = "a" * 2048
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "2048-char state must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_2049_chars_rejected(self, async_client: AsyncClient):
- """A 2049-character state must be rejected with 422."""
- state = "a" * 2049
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 422, "2049-char state must be rejected by Pydantic"
- # ---------------------------------------------------------------------------
- # Helpers shared by TestOIDCEmailClaimResolution
- # ---------------------------------------------------------------------------
- async def _run_oidc_callback(
- async_client: AsyncClient,
- db_session: AsyncSession,
- *,
- provider_id: int,
- claims: dict,
- private_pem: bytes,
- jwks_data: dict,
- issuer: str,
- client_id: str,
- ) -> str:
- """Run a full OIDC callback flow and return the redirect location."""
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- token_claims = {
- "sub": claims.get("sub", f"sub-{secrets.token_hex(8)}"),
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "iat": now,
- "exp": now + 300,
- **{k: v for k, v in claims.items() if k not in ("sub",)},
- }
- id_token = pyjwt.encode(token_claims, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"})
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- token_response = {"access_token": "mock-access", "token_type": "Bearer", "id_token": id_token}
- class _R:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _C:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _R(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _R(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _C):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
- follow_redirects=False,
- )
- return resp.headers.get("location", "")
- class TestOIDCEmailClaimResolution:
- """Three-case email resolution logic: Fall A / Fall B / Fall C."""
- # ── shared helpers ────────────────────────────────────────────────────────
- async def _create_provider(
- self,
- async_client: AsyncClient,
- admin_token: str,
- issuer: str,
- client_id: str,
- *,
- email_claim: str = "email",
- require_email_verified: bool = True,
- auto_link_existing_accounts: bool = False,
- suffix: str = "",
- ) -> int:
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": f"TestIdP-{suffix or secrets.token_hex(4)}",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "sec",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- "auto_link_existing_accounts": auto_link_existing_accounts,
- "email_claim": email_claim,
- "require_email_verified": require_email_verified,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201, resp.text
- return resp.json()["id"]
- async def _get_oidc_link(self, db_session: AsyncSession, provider_id: int, sub: str):
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == sub)
- )
- return result.scalar_one_or_none()
- # ── Parametrized matrix: Fall A / Fall B / Fall C ─────────────────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- @pytest.mark.parametrize(
- "email_claim,require_ev,claims,expected",
- [
- # Fall A: standard claim + require_ev=True (default)
- ("email", True, {"email": "fa@example.com", "email_verified": True}, "fa@example.com"),
- ("email", True, {"email": "fa@example.com", "email_verified": False}, None),
- ("email", True, {"email": "fa@example.com"}, None), # Azure Entra with default config
- # Fall A + SEC-2: malformed email claim rejected even when email_verified=True
- ("email", True, {"email": "notanemail", "email_verified": True}, None),
- # Fall B: standard claim + require_ev=False (Azure Entra permissive)
- ("email", False, {"email": "fb@example.com", "email_verified": True}, "fb@example.com"),
- ("email", False, {"email": "fb@example.com", "email_verified": False}, None),
- ("email", False, {"email": "azure@company.com"}, "azure@company.com"), # ev absent → kept
- # Fall B + SEC-2: malformed email claim rejected in permissive mode (ev absent)
- ("email", False, {"email": "user@nodot"}, None),
- # Fall B + SEC-2: shape check fires before email_verified=False drop
- ("email", False, {"email": "notanemail", "email_verified": False}, None),
- # Fall C: custom claim (preferred_username) — no email_verified check
- ("preferred_username", True, {"preferred_username": "User@Company.COM"}, "user@company.com"),
- ("preferred_username", True, {"preferred_username": " User@EXAMPLE.COM "}, "user@example.com"),
- ("preferred_username", True, {"preferred_username": "justausername"}, None),
- ("preferred_username", True, {"preferred_username": "@"}, None), # SEC-2: "@" only
- ("preferred_username", True, {"preferred_username": "@domain.com"}, None), # SEC-2: empty local
- ("preferred_username", True, {"preferred_username": "user@"}, None), # SEC-2: empty domain
- ("preferred_username", True, {"preferred_username": "user@nodot"}, None), # SEC-2: no dot in domain
- ("preferred_username", True, {}, None), # claim absent
- # Fall C: email_verified=False present alongside custom claim — must NOT suppress the email
- ("preferred_username", True, {"preferred_username": "user@co.com", "email_verified": False}, "user@co.com"),
- ],
- ids=[
- "fall-a-ev-true",
- "fall-a-ev-false",
- "fall-a-ev-absent",
- "fall-a-malformed-email",
- "fall-b-ev-true",
- "fall-b-ev-false",
- "fall-b-ev-absent",
- "fall-b-malformed-email",
- "fall-b-malformed-email-ev-false",
- "fall-c-valid-upn",
- "fall-c-lowercase-strip",
- "fall-c-no-at",
- "fall-c-at-only",
- "fall-c-empty-local",
- "fall-c-empty-domain",
- "fall-c-no-dot-in-domain",
- "fall-c-claim-absent",
- "fall-c-ev-false-ignored",
- ],
- )
- async def test_email_resolution_matrix(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- email_claim: str,
- require_ev: bool,
- claims: dict,
- expected: str | None,
- ):
- """C4: Verify link exists AND check provider_email — avoids false-passing on callback failure."""
- issuer = "https://matrix.test"
- client_id = "matrix-client"
- admin_token = await _setup_and_login(async_client, "matrix_adm", "Matrix123!")
- private_pem, jwks_data = _make_test_rsa_key()
- provider_id = await self._create_provider(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim=email_claim,
- require_email_verified=require_ev,
- suffix="matrix",
- )
- sub = f"sub-matrix-{secrets.token_hex(6)}"
- await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": sub, **claims},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- db_session.expire_all()
- link = await self._get_oidc_link(db_session, provider_id, sub)
- assert link is not None, "UserOIDCLink must be created even when email is dropped"
- assert link.provider_email == expected
- # ── Security: auto_link guards (CREATE endpoint) ──────────────────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_blocked_with_require_ev_false(self, async_client: AsyncClient):
- """SEC-1: auto_link + require_email_verified=False must be rejected at schema level (422)."""
- admin_token = await _setup_and_login(async_client, "sec1_adm", "Sec1Adm123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SEC1-Test",
- "issuer_url": "https://sec1.test",
- "client_id": "sec1-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- "auto_link_existing_accounts": True,
- "require_email_verified": False,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_allowed_with_custom_claim_create(self, async_client: AsyncClient):
- """Fall C: auto_link + email_claim!='email' must be accepted on CREATE (201).
- Custom claims (e.g. Azure preferred_username/upn) never perform an email_verified
- check, so auto_link is safe regardless of require_email_verified.
- """
- admin_token = await _setup_and_login(async_client, "sec6c_adm", "Sec6CAdm123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SEC6-Create-Test",
- "issuer_url": "https://sec6c.test",
- "client_id": "sec6c-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- "auto_link_existing_accounts": True,
- "email_claim": "upn",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201
- assert resp.json()["auto_link_existing_accounts"] is True
- assert resp.json()["email_claim"] == "upn"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_allowed_with_custom_claim_update(self, async_client: AsyncClient):
- """Fall C: auto_link=True + email_claim='upn' in same UPDATE request → 200.
- Custom claims never perform an email_verified check, so auto_link is safe.
- """
- admin_token = await _setup_and_login(async_client, "sec6u_adm", "Sec6UAdm123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SEC6-Update-Test",
- "issuer_url": "https://sec6u.test",
- "client_id": "sec6u-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- resp = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True, "email_claim": "upn"},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 200
- assert resp.json()["auto_link_existing_accounts"] is True
- assert resp.json()["email_claim"] == "upn"
- # ── Combined-State-Guard (partial updates across two requests) ─────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_partial_update_guard_require_ev(self, async_client: AsyncClient):
- """SEC-1 Combined-State-Guard: require_ev=False then auto_link=True → 422 (T1 require_ev path)."""
- admin_token = await _setup_and_login(async_client, "pg_rev_adm", "PgRev123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PG-RequireEV-Test",
- "issuer_url": "https://pg-rev.test",
- "client_id": "pg-rev-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"require_email_verified": False},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_partial_update_custom_claim_then_auto_link_allowed(self, async_client: AsyncClient):
- """Fall C: email_claim='upn' first, then auto_link=True → both 200 (custom claim is safe)."""
- admin_token = await _setup_and_login(async_client, "pg_ec_adm", "PgEc123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PG-EmailClaim-Test",
- "issuer_url": "https://pg-ec.test",
- "client_id": "pg-ec-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"email_claim": "upn"},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 200
- assert upd2.json()["auto_link_existing_accounts"] is True
- assert upd2.json()["email_claim"] == "upn"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_partial_update_auto_link_then_custom_claim_allowed(self, async_client: AsyncClient):
- """Fall C: auto_link=True first (email_claim='email', safe), then email_claim='upn' → both 200."""
- admin_token = await _setup_and_login(async_client, "pg_al_ec_adm", "PgAlEc123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PG-AutoLink-Claim-Test",
- "issuer_url": "https://pg-al-ec.test",
- "client_id": "pg-al-ec-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"email_claim": "preferred_username"},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 200
- assert upd2.json()["auto_link_existing_accounts"] is True
- assert upd2.json()["email_claim"] == "preferred_username"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_partial_update_guard_inverse_order(self, async_client: AsyncClient):
- """T2: auto_link=True first (valid), then require_ev=False → Combined-State-Guard fires (422)."""
- admin_token = await _setup_and_login(async_client, "pg_inv_adm", "PgInv123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PG-Inverse-Test",
- "issuer_url": "https://pg-inv.test",
- "client_id": "pg-inv-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- # auto_link=True is safe when require_ev=True (default)
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- # Disabling require_ev with auto_link already on → unsafe combined state
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"require_email_verified": False},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 422
- # ── Low5: Response fields verified ───────────────────────────────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_response_includes_new_fields(self, async_client: AsyncClient):
- """Low5: OIDCProviderResponse must include email_claim and require_email_verified."""
- admin_token = await _setup_and_login(async_client, "resp_adm", "Resp123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ResponseFields-Test",
- "issuer_url": "https://resp.test",
- "client_id": "resp-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- "email_claim": "preferred_username",
- "require_email_verified": False,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201
- data = resp.json()
- assert data["email_claim"] == "preferred_username"
- assert data["require_email_verified"] is False
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_response_reflects_new_fields(self, async_client: AsyncClient):
- """Low5: PUT response must reflect updated email_claim and require_email_verified."""
- admin_token = await _setup_and_login(async_client, "upd_resp_adm", "UpdResp123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "UpdateResponse-Test",
- "issuer_url": "https://upd-resp.test",
- "client_id": "upd-resp-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- upd = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"email_claim": "upn", "require_email_verified": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd.status_code == 200
- data = upd.json()
- assert data["email_claim"] == "upn"
- assert data["require_email_verified"] is True
- # ===========================================================================
- # TestOIDCEmailClaimValidation — T2: email_claim field validator coverage
- # ===========================================================================
- class TestOIDCEmailClaimValidation:
- """Schema-level validation for the email_claim field."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_claim_name_dot_rejected(self, async_client: AsyncClient):
- """email_claim with a dot (log-injection risk) must be rejected."""
- admin_token = await _setup_and_login(async_client, "ecv_adm1", "Ecv123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ECVTest1",
- "issuer_url": "https://ecv1.test",
- "client_id": "ecv1",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": "email.address",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_claim_name_starts_with_digit_rejected(self, async_client: AsyncClient):
- admin_token = await _setup_and_login(async_client, "ecv_adm2", "Ecv123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ECVTest2",
- "issuer_url": "https://ecv2.test",
- "client_id": "ecv2",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": "1invalid",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_claim_name_newline_rejected(self, async_client: AsyncClient):
- """T2 regex-bug guard: re.fullmatch must reject trailing newline."""
- admin_token = await _setup_and_login(async_client, "ecv_adm3", "Ecv123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ECVTest3",
- "issuer_url": "https://ecv3.test",
- "client_id": "ecv3",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": "email\n",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_claim_name_65_chars_rejected(self, async_client: AsyncClient):
- """email_claim longer than 64 characters must be rejected."""
- admin_token = await _setup_and_login(async_client, "ecv_adm4", "Ecv123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ECVTest4",
- "issuer_url": "https://ecv4.test",
- "client_id": "ecv4",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": "a" * 65,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_valid_claim_name_accepted(self, async_client: AsyncClient):
- """Valid claim names like preferred_username and upn must be accepted."""
- admin_token = await _setup_and_login(async_client, "ecv_adm5", "Ecv123!")
- for claim in ("preferred_username", "upn", "email", "emailAddress"):
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": f"ECVTest-{claim[:8]}",
- "issuer_url": f"https://ecv-{claim[:8]}.test",
- "client_id": f"ecv-{claim[:8]}",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": claim,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201, f"claim {claim!r} was rejected: {resp.text}"
- # ===========================================================================
- # TestOIDCEmailResolutionExtra — T1 / T3 / T4 additional coverage
- # ===========================================================================
- async def _create_provider_via_api(
- async_client: AsyncClient,
- admin_token: str,
- issuer: str,
- client_id: str,
- *,
- email_claim: str = "email",
- require_email_verified: bool = True,
- suffix: str = "",
- ) -> int:
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": f"TestIdP-extra-{suffix or secrets.token_hex(4)}",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "sec",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- "email_claim": email_claim,
- "require_email_verified": require_email_verified,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201, resp.text
- return resp.json()["id"]
- class TestOIDCEmailResolutionExtra:
- """T1: isinstance guard, T3: SEC-3 normalisation for Fall A/B, T4: inverse Combined-State-Guard."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_string_claim_value_drops_email(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """T1: A non-string email_claim value (list) must be silently dropped — no crash."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://nonstring-test.example"
- client_id = "nonstring-client"
- admin_token = await _setup_and_login(async_client, "nonstr_adm", "Nonstr123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="preferred_username",
- require_email_verified=False,
- suffix="nonstr",
- )
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- # IdP sends preferred_username as a list (non-string) — must not crash
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "nonstr-sub-1", "preferred_username": ["user@example.com"]},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- assert "internal_error" not in location, f"Unexpected error redirect: {location}"
- link_result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == "nonstr-sub-1")
- )
- link = link_result.scalar_one_or_none()
- assert link is not None
- assert link.provider_email is None # list value dropped
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fall_a_sec3_normalisation(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """T3: Fall A — uppercase + whitespace in email claim must be normalised to lowercase."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://sec3a-test.example"
- client_id = "sec3a-client"
- admin_token = await _setup_and_login(async_client, "sec3a_adm", "Sec3a123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="email",
- require_email_verified=True,
- suffix="sec3a",
- )
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "sec3a-sub-1", "email": " USER@EXAMPLE.COM ", "email_verified": True},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- assert "internal_error" not in location
- link_result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == "sec3a-sub-1")
- )
- link = link_result.scalar_one_or_none()
- assert link is not None
- assert link.provider_email == "user@example.com"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fall_b_sec3_normalisation(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """T3: Fall B — uppercase + whitespace in email claim must be normalised to lowercase."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://sec3b-test.example"
- client_id = "sec3b-client"
- admin_token = await _setup_and_login(async_client, "sec3b_adm", "Sec3b123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="email",
- require_email_verified=False,
- suffix="sec3b",
- )
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "sec3b-sub-1", "email": " USER@EXAMPLE.COM "},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- assert "internal_error" not in location
- link_result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == "sec3b-sub-1")
- )
- link = link_result.scalar_one_or_none()
- assert link is not None
- assert link.provider_email == "user@example.com"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_combined_state_guard_email_claim_inverse_order(self, async_client: AsyncClient):
- """Fall C: auto_link=True first, then switch email_claim to custom → both 200 (now allowed).
- Custom claims never perform an email_verified check, so switching to a custom claim
- while auto_link is on transitions from Fall A to Fall C — both are safe.
- """
- admin_token = await _setup_and_login(async_client, "inv_ec_adm", "InvEc123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "InvEcTest",
- "issuer_url": "https://inv-ec.test",
- "client_id": "inv-ec",
- "client_secret": "sec",
- "scopes": "openid email",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- # First: enable auto_link (Fall A — email_claim='email', require_ev=True)
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- # Second: switch to custom claim → Fall C, still safe
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"email_claim": "preferred_username"},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 200
- assert upd2.json()["auto_link_existing_accounts"] is True
- assert upd2.json()["email_claim"] == "preferred_username"
- # ===========================================================================
- # E2E: Fall C (custom email claim) auto-link actually links existing user
- # ===========================================================================
- class TestOIDCFallCAutoLinkE2E:
- """OIDC callback with email_claim='preferred_username' (Fall C / Azure Entra ID)
- must auto-link an existing local user when auto_link_existing_accounts=True.
- This test exercises _resolve_provider_email Fall C and the auto-link path in
- oidc_callback — a regression in either would silently drop the link without
- being caught by the configuration-layer tests.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fall_c_auto_link_links_existing_user_via_callback(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- from unittest.mock import AsyncMock, MagicMock, patch
- from sqlalchemy import select as sa_select
- from backend.app.core.auth import get_password_hash
- from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
- issuer = "https://entra.fallc.example.com"
- nonce = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- # ── 1. Local user that should be linked ──────────────────────────────
- alice = User(
- username="fallc_alice",
- email="alice.fallc@example.com",
- password_hash=get_password_hash(secrets.token_urlsafe(16)),
- role="user",
- is_active=True,
- )
- db_session.add(alice)
- await db_session.flush()
- # ── 2. Provider: Fall C config (preferred_username, no email_verified) ─
- provider = OIDCProvider(
- name="AzureEntraFallC",
- issuer_url=issuer,
- client_id="azure-client",
- _client_secret_enc="azure-secret",
- scopes="openid profile",
- is_enabled=True,
- auto_link_existing_accounts=True,
- auto_create_users=False,
- email_claim="preferred_username",
- require_email_verified=False,
- )
- db_session.add(provider)
- await db_session.flush()
- # ── 3. OIDC state token ───────────────────────────────────────────────
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider.id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # ── 4. Mock HTTP + JWT ────────────────────────────────────────────────
- fake_discovery = {
- "issuer": issuer,
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
- # Fall C: preferred_username carries the email; no email_verified key at all
- fake_claims = {
- "sub": "azure-sub-alice",
- "preferred_username": "alice.fallc@example.com",
- "iss": issuer,
- "aud": "azure-client",
- "nonce": nonce,
- "exp": 9_999_999_999,
- }
- disc_resp = AsyncMock()
- disc_resp.raise_for_status = MagicMock()
- disc_resp.json = MagicMock(return_value=fake_discovery)
- token_resp = AsyncMock()
- token_resp.json = MagicMock(return_value=fake_token)
- jwks_resp = AsyncMock()
- jwks_resp.raise_for_status = MagicMock()
- jwks_resp.json = MagicMock(return_value={})
- mock_http = AsyncMock()
- mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
- mock_http.post = AsyncMock(return_value=token_resp)
- mock_signing_key = MagicMock()
- mock_signing_key.key = "fake_key"
- with (
- patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
- patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
- patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
- ):
- mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
- mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
- mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
- callback_resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
- follow_redirects=False,
- )
- assert callback_resp.status_code == 302, callback_resp.text
- location = callback_resp.headers.get("location", "")
- assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
- # ── 5. Exchange token → full JWT ──────────────────────────────────────
- oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0].split("#")[-1]
- exchange_resp = await async_client.post(
- "/api/v1/auth/oidc/exchange",
- json={"oidc_token": oidc_exchange_token},
- )
- assert exchange_resp.status_code == 200
- assert exchange_resp.json()["user"]["username"] == "fallc_alice"
- # ── 6. Verify UserOIDCLink was created in DB ──────────────────────────
- async with db_session as s:
- result = await s.execute(
- sa_select(UserOIDCLink).where(
- UserOIDCLink.user_id == alice.id,
- UserOIDCLink.provider_id == provider.id,
- )
- )
- link = result.scalar_one_or_none()
- assert link is not None, "UserOIDCLink must have been created by auto-link"
- assert link.provider_user_id == "azure-sub-alice"
|