| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078507950805081508250835084508550865087508850895090509150925093509450955096509750985099510051015102510351045105 |
- """Integration tests for 2FA and OIDC API endpoints.
- Tests the full request/response cycle for:
- - GET /api/v1/auth/2fa/status
- - POST /api/v1/auth/2fa/totp/setup
- - POST /api/v1/auth/2fa/totp/enable
- - POST /api/v1/auth/2fa/totp/disable
- - POST /api/v1/auth/2fa/email/enable
- - POST /api/v1/auth/2fa/email/disable
- - POST /api/v1/auth/2fa/verify (TOTP, email, backup paths)
- - DELETE /api/v1/auth/2fa/admin/{user_id}
- - GET /api/v1/auth/oidc/providers
- - POST /api/v1/auth/oidc/providers
- - PATCH /api/v1/auth/oidc/providers/{id}
- - DELETE /api/v1/auth/oidc/providers/{id}
- """
- from __future__ import annotations
- import secrets
- import time
- from datetime import datetime, timedelta, timezone
- from unittest.mock import patch
- import jwt as pyjwt
- import pyotp
- import pytest
- from httpx import AsyncClient
- from passlib.context import CryptContext
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- from backend.app.models.user import User
- _pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
- # ---------------------------------------------------------------------------
- # Fixtures / helpers
- # ---------------------------------------------------------------------------
- AUTH_SETUP_URL = "/api/v1/auth/setup"
- LOGIN_URL = "/api/v1/auth/login"
- def _norm_pw(password: str) -> str:
- """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
- if not any(c.isupper() for c in password):
- password = password[0].upper() + password[1:]
- if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
- password = password + "!"
- return password
- async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
- """Enable auth, create an admin user, login, and return the bearer token."""
- password = _norm_pw(password)
- await client.post(
- AUTH_SETUP_URL,
- json={
- "auth_enabled": True,
- "admin_username": username,
- "admin_password": password,
- },
- )
- resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
- assert resp.status_code == 200
- return resp.json()["access_token"]
- async def _login_get_pre_auth_token(client: AsyncClient, username: str, password: str) -> str:
- """Login a user who has 2FA enabled; return the pre_auth_token from the response."""
- password = _norm_pw(password)
- resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
- assert resp.status_code == 200
- data = resp.json()
- assert data["requires_2fa"] is True, f"Expected requires_2fa=True, got {data}"
- assert data["pre_auth_token"] is not None
- return data["pre_auth_token"]
- def _auth_header(token: str) -> dict[str, str]:
- return {"Authorization": f"Bearer {token}"}
- # ===========================================================================
- # 2FA Status
- # ===========================================================================
- class TestTwoFAStatus:
- """Tests for GET /api/v1/auth/2fa/status."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_requires_auth(self, async_client: AsyncClient):
- response = await async_client.get("/api/v1/auth/2fa/status")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_default_disabled(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "statususer", "statuspass123")
- response = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert response.status_code == 200
- data = response.json()
- assert data["totp_enabled"] is False
- assert data["email_otp_enabled"] is False
- assert data["backup_codes_remaining"] == 0
- # ===========================================================================
- # TOTP Setup
- # ===========================================================================
- class TestTOTPSetup:
- """Tests for POST /api/v1/auth/2fa/totp/setup."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_requires_auth(self, async_client: AsyncClient):
- response = await async_client.post("/api/v1/auth/2fa/totp/setup")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_returns_secret_and_qr(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "totpsetup", "totpsetup123")
- response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- assert response.status_code == 200
- data = response.json()
- assert "secret" in data
- assert len(data["secret"]) > 0
- assert "qr_code_b64" in data
- assert data["issuer"] == "Bambuddy"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_secret_is_valid_base32(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "totpbase32", "totpbase32pw")
- response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- assert response.status_code == 200
- secret = response.json()["secret"]
- # pyotp will raise on invalid base32
- totp = pyotp.TOTP(secret)
- assert len(totp.now()) == 6
- # ===========================================================================
- # TOTP Enable
- # ===========================================================================
- class TestTOTPEnable:
- """Tests for POST /api/v1/auth/2fa/totp/enable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_without_setup_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "nosetupenable", "nosetupenable1")
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_with_invalid_code_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "badcodeuser", "badcodeuser1")
- await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_with_valid_code_returns_backup_codes(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "enableok", "enableok123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- data = response.json()
- assert "backup_codes" in data
- assert len(data["backup_codes"]) == 10
- for code in data["backup_codes"]:
- assert len(code) == 8
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_reflects_enabled_totp(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "statustotp", "statustotp1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- data = status_resp.json()
- assert data["totp_enabled"] is True
- assert data["backup_codes_remaining"] == 10
- # ===========================================================================
- # TOTP Disable
- # ===========================================================================
- class TestTOTPDisable:
- """Tests for POST /api/v1/auth/2fa/totp/disable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_when_not_enabled_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "disablenoenab", "disablenoenab1")
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_with_valid_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "disableok", "disableok123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Disable with a fresh valid code
- disable_code = pyotp.TOTP(secret).now()
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": disable_code},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- assert "disabled" in response.json()["message"].lower()
- # Status should now show disabled
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["totp_enabled"] is False
- # ===========================================================================
- # Email OTP Enable/Disable
- # ===========================================================================
- class TestEmailOTP:
- """Tests for POST /api/v1/auth/2fa/email/enable, /enable/confirm and /disable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_email_otp_without_email_returns_400(self, async_client: AsyncClient):
- """Users without an email address cannot enable email OTP."""
- token = await _setup_and_login(async_client, "noemailuser", "noemailuser1")
- response = await async_client.post("/api/v1/auth/2fa/email/enable", headers=_auth_header(token))
- assert response.status_code == 400
- assert "email" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_happy_path(self, async_client: AsyncClient, db_session: AsyncSession):
- """Confirm step activates email OTP when setup_token + code are valid (C5)."""
- token = await _setup_and_login(async_client, "confirmenable", "confirmenable1")
- # Give user an email address directly (SMTP not available in tests)
- from sqlalchemy import select as sa_select
- result = await db_session.execute(sa_select(User).where(User.username == "confirmenable"))
- user = result.scalar_one()
- user.email = "confirmenable@example.com"
- await db_session.commit()
- # Inject a known setup token directly into the DB (bypasses SMTP)
- code = "123456"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmenable",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert resp.status_code == 200
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["email_otp_enabled"] is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_wrong_code(self, async_client: AsyncClient, db_session: AsyncSession):
- """Wrong code on confirm step returns 400 and does not enable email OTP."""
- token = await _setup_and_login(async_client, "confirmwrong", "confirmwrong1")
- code_hash = _pwd_context.hash("654321")
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmwrong",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_setup_token_is_single_use(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """Setup token is consumed on first use; replay returns 400."""
- token = await _setup_and_login(async_client, "confirmonce", "confirmonce1")
- code = "111111"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmonce",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- first = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert first.status_code == 200
- second = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert second.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_email_otp_requires_password(self, async_client: AsyncClient):
- """Disabling email OTP requires the account password (C6: re-auth)."""
- token = await _setup_and_login(async_client, "disemailotp", "disemailotp1")
- # Wrong password → 401
- response = await async_client.post(
- "/api/v1/auth/2fa/email/disable",
- json={"password": "wrongpassword"},
- headers=_auth_header(token),
- )
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_email_otp_when_enabled(self, async_client: AsyncClient, db_session: AsyncSession):
- """Disabling email OTP when enabled turns it off and status reflects that."""
- token = await _setup_and_login(async_client, "disemailpw", "disemailpw1")
- # Enable email OTP via direct DB injection (no SMTP)
- code = "222222"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="disemailpw",
- nonce=_pwd_context.hash(code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- # Now disable
- response = await async_client.post(
- "/api/v1/auth/2fa/email/disable",
- json={"password": _norm_pw("disemailpw1")},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["email_otp_enabled"] is False
- # ===========================================================================
- # 2FA Verify — TOTP path
- # ===========================================================================
- class TestTwoFAVerifyTOTP:
- """Tests for POST /api/v1/auth/2fa/verify using the TOTP method."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_with_invalid_pre_auth_token(self, async_client: AsyncClient):
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "bogus", "method": "totp", "code": "123456"},
- )
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_totp_issues_jwt(self, async_client: AsyncClient):
- """Full flow: setup → enable TOTP → login → pre_auth_token → verify → JWT."""
- token = await _setup_and_login(async_client, "verifytotpok", "verifytotpok1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Login now returns requires_2fa=True + pre_auth_token
- pre_auth_token = await _login_get_pre_auth_token(async_client, "verifytotpok", "verifytotpok1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={
- "pre_auth_token": pre_auth_token,
- "method": "totp",
- "code": pyotp.TOTP(secret).now(),
- },
- )
- assert verify_resp.status_code == 200
- data = verify_resp.json()
- assert "access_token" in data
- assert data["token_type"] == "bearer"
- assert data["user"]["username"] == "verifytotpok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_totp_invalid_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "verifybadcode", "verifybadcode1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "verifybadcode", "verifybadcode1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert verify_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_invalid_method(self, async_client: AsyncClient):
- """An invalid 2FA method should return 400 even with a valid pre_auth_token."""
- token = await _setup_and_login(async_client, "invalidmethod", "invalidmethod1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "invalidmethod", "invalidmethod1")
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "sms", "code": "123456"},
- )
- assert response.status_code == 422 # Pydantic Literal validation
- # ===========================================================================
- # 2FA Verify — Backup code path
- # ===========================================================================
- class TestTwoFAVerifyBackup:
- """Tests for POST /api/v1/auth/2fa/verify using the backup method."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_with_backup_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupcodeok", "backupcodeok1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcodeok", "backupcodeok1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- assert verify_resp.status_code == 200
- assert "access_token" in verify_resp.json()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_backup_code_is_single_use(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupsingle", "backupsingle1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- # First use — should succeed
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
- first_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- assert first_resp.status_code == 200
- # Second use of the same code — must fail (need new pre_auth_token + same backup code)
- pre_auth_token2 = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
- second_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token2, "method": "backup", "code": backup_code},
- )
- assert second_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_backup_code_count_decrements(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupcount", "backupcount1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcount", "backupcount1")
- await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- # Status is readable with the original full token (still valid)
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["backup_codes_remaining"] == 9
- # ===========================================================================
- # Rate Limiting
- # ===========================================================================
- class TestRateLimiting:
- """Ensure 429 is returned after 5 failed 2FA attempts."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rate_limit_lockout(self, async_client: AsyncClient):
- """After 5 failed TOTP attempts the 6th must return 429."""
- token = await _setup_and_login(async_client, "ratelimituser", "ratelimituser1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # 5 failed attempts via the login → pre_auth_token → verify flow
- for _ in range(5):
- pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
- await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- # 6th attempt should hit the rate limit
- pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert response.status_code == 429
- # ===========================================================================
- # Admin 2FA Disable
- # ===========================================================================
- class TestAdminDisable2FA:
- """Tests for DELETE /api/v1/auth/2fa/admin/{user_id}."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_admin_disable_requires_admin(self, async_client: AsyncClient):
- """Only admins can use the admin disable endpoint."""
- # The only user in a fresh setup IS admin, so just check the 404 path
- token = await _setup_and_login(async_client, "admincheck", "admincheck123")
- # Try to disable for a non-existent user_id — should get 200 (no-op) or 404
- response = await async_client.request(
- "DELETE",
- "/api/v1/auth/2fa/admin/99999",
- json={"admin_password": _norm_pw("admincheck123")},
- headers=_auth_header(token),
- )
- # Admin users succeed regardless (returns 200 even if user doesn't exist)
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_admin_disable_clears_totp(self, async_client: AsyncClient):
- from sqlalchemy import select
- from backend.app.models.user import User
- token = await _setup_and_login(async_client, "admintotp", "admintotp123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Find the user's id by querying status (which works with the token)
- me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
- user_id = me_resp.json()["id"]
- response = await async_client.request(
- "DELETE",
- f"/api/v1/auth/2fa/admin/{user_id}",
- json={"admin_password": _norm_pw("admintotp123")},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- # I2: admin_disable_2fa bumps password_changed_at, invalidating the old token.
- # Re-login to get a fresh token before checking status.
- new_login = await async_client.post(
- LOGIN_URL, json={"username": "admintotp", "password": _norm_pw("admintotp123")}
- )
- assert new_login.status_code == 200, f"re-login failed: {new_login.json()}"
- assert new_login.json().get("requires_2fa") is False, f"still requires 2FA: {new_login.json()}"
- new_token = new_login.json()["access_token"]
- assert new_token is not None, f"no access_token in: {new_login.json()}"
- # Status should now show TOTP disabled
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(new_token))
- assert status_resp.status_code == 200, f"status check failed: {status_resp.json()}"
- assert status_resp.json()["totp_enabled"] is False
- # ===========================================================================
- # OIDC Provider CRUD
- # ===========================================================================
- class TestOIDCProviders:
- """Tests for OIDC provider management endpoints."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_public_providers_empty(self, async_client: AsyncClient):
- response = await async_client.get("/api/v1/auth/oidc/providers")
- assert response.status_code == 200
- assert isinstance(response.json(), list)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_provider_requires_admin(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcadmincreate", "oidcadmincreate1")
- response = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PocketID",
- "issuer_url": "https://auth.example.com",
- "client_id": "bambuddy",
- "client_secret": "supersecret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- assert response.status_code == 201
- data = response.json()
- assert data["name"] == "PocketID"
- assert data["issuer_url"] == "https://auth.example.com"
- assert "client_secret" not in data # Secret must not be returned
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_created_provider_appears_in_all_list(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidclistall", "oidclistall123")
- await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "TestProvider",
- "issuer_url": "https://test.example.com",
- "client_id": "testclient",
- "client_secret": "testsecret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- response = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
- assert response.status_code == 200
- names = [p["name"] for p in response.json()]
- assert "TestProvider" in names
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disabled_provider_not_in_public_list(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcdisabled", "oidcdisabled1")
- await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "DisabledProvider",
- "issuer_url": "https://disabled.example.com",
- "client_id": "dc",
- "client_secret": "ds",
- "scopes": "openid",
- "is_enabled": False,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- response = await async_client.get("/api/v1/auth/oidc/providers")
- names = [p["name"] for p in response.json()]
- assert "DisabledProvider" not in names
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_provider(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcupdate", "oidcupdate123")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "OldName",
- "issuer_url": "https://update.example.com",
- "client_id": "uc",
- "client_secret": "us",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- provider_id = create_resp.json()["id"]
- put_resp = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"name": "NewName"},
- headers=_auth_header(token),
- )
- assert put_resp.status_code == 200
- assert put_resp.json()["name"] == "NewName"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_provider(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcdelete", "oidcdelete123")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ToDelete",
- "issuer_url": "https://delete.example.com",
- "client_id": "dc",
- "client_secret": "ds",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- provider_id = create_resp.json()["id"]
- del_resp = await async_client.delete(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- headers=_auth_header(token),
- )
- assert del_resp.status_code == 200
- # No longer in list
- all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
- ids = [p["id"] for p in all_resp.json()]
- assert provider_id not in ids
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_nonexistent_provider_returns_404(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidc404", "oidc404pass1")
- response = await async_client.put(
- "/api/v1/auth/oidc/providers/99999",
- json={"name": "ghost"},
- headers=_auth_header(token),
- )
- assert response.status_code == 404
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_provider_with_default_group_id(self, async_client: AsyncClient, db_session: AsyncSession):
- """Creating a provider with a valid default_group_id stores and returns the value."""
- from sqlalchemy import select
- from backend.app.models.group import Group
- token = await _setup_and_login(async_client, "oidcdg_create", "OidcDgCreate1!")
- grp_result = await db_session.execute(select(Group).where(Group.name == "Operators"))
- operators = grp_result.scalar_one()
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "DgCreateProvider",
- "issuer_url": "https://dgcreate.example.com",
- "client_id": "dgcreate-client",
- "client_secret": "secret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- "default_group_id": operators.id,
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 201, resp.text
- assert resp.json()["default_group_id"] == operators.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_provider_invalid_default_group_id_returns_422(self, async_client: AsyncClient):
- """A default_group_id referencing a non-existent group returns 422."""
- token = await _setup_and_login(async_client, "oidcdg_bad", "OidcDgBad1!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "DgBadProvider",
- "issuer_url": "https://dgbad.example.com",
- "client_id": "dgbad-client",
- "client_secret": "secret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- "default_group_id": 999999,
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_provider_omit_default_group_id_stores_null(self, async_client: AsyncClient):
- """Omitting default_group_id results in null in the response."""
- token = await _setup_and_login(async_client, "oidcdg_null", "OidcDgNull1!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "DgNullProvider",
- "issuer_url": "https://dgnull.example.com",
- "client_id": "dgnull-client",
- "client_secret": "secret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 201, resp.text
- assert resp.json()["default_group_id"] is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_provider_default_group_id(self, async_client: AsyncClient, db_session: AsyncSession):
- """Updating default_group_id via PUT stores the new value."""
- from sqlalchemy import select
- from backend.app.models.group import Group
- token = await _setup_and_login(async_client, "oidcdg_update", "OidcDgUpdate1!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "DgUpdateProvider",
- "issuer_url": "https://dgupdate.example.com",
- "client_id": "dgupdate-client",
- "client_secret": "secret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- provider_id = create_resp.json()["id"]
- assert create_resp.json()["default_group_id"] is None
- grp_result = await db_session.execute(select(Group).where(Group.name == "Operators"))
- operators = grp_result.scalar_one()
- put_resp = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"default_group_id": operators.id},
- headers=_auth_header(token),
- )
- assert put_resp.status_code == 200, put_resp.text
- assert put_resp.json()["default_group_id"] == operators.id
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_default_group_id_in_public_and_admin_list(self, async_client: AsyncClient, db_session: AsyncSession):
- """default_group_id appears in both the public and admin list responses."""
- from sqlalchemy import select
- from backend.app.models.group import Group
- token = await _setup_and_login(async_client, "oidcdg_list", "OidcDgList1!")
- grp_result = await db_session.execute(select(Group).where(Group.name == "Operators"))
- operators = grp_result.scalar_one()
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "DgListProvider",
- "issuer_url": "https://dglist.example.com",
- "client_id": "dglist-client",
- "client_secret": "secret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- "default_group_id": operators.id,
- },
- headers=_auth_header(token),
- )
- provider_id = create_resp.json()["id"]
- all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
- match = next((p for p in all_resp.json() if p["id"] == provider_id), None)
- assert match is not None
- assert match["default_group_id"] == operators.id
- pub_resp = await async_client.get("/api/v1/auth/oidc/providers")
- pub_match = next((p for p in pub_resp.json() if p["id"] == provider_id), None)
- assert pub_match is not None
- assert pub_match["default_group_id"] == operators.id
- # ===========================================================================
- # Security: pre-auth token single-use
- # ===========================================================================
- class TestPreAuthTokenSingleUse:
- """pre_auth_token must be consumed on successful 2FA and cannot be reused."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_pre_auth_token_is_single_use(self, async_client: AsyncClient):
- """A pre_auth_token that was successfully used cannot be reused."""
- token = await _setup_and_login(async_client, "singleusepat", "singleusepat1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "singleusepat", "singleusepat1")
- # First use — succeeds
- first = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert first.status_code == 200
- # Second use of the same token — must fail (token already consumed on success)
- second = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert second.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_pre_auth_token_survives_wrong_code(self, async_client: AsyncClient):
- """A wrong 2FA code must NOT burn the pre_auth_token (user can retry)."""
- token = await _setup_and_login(async_client, "survivepatuser", "survivepatuser1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "survivepatuser", "survivepatuser1")
- # Wrong code — should fail but not burn the token
- bad = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert bad.status_code == 401
- # Same token, correct code — should succeed (token still valid)
- good = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert good.status_code == 200
- # ===========================================================================
- # Security: cross-user token isolation
- # ===========================================================================
- class TestCrossUserTokenIsolation:
- """A pre_auth_token issued for user A cannot authenticate as user B."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_token_cannot_be_used_for_different_user(self, async_client: AsyncClient):
- """pre_auth_token is bound to the issuing user; using it to verify a different
- user's TOTP code must fail."""
- # Set up two users with TOTP
- token_a = await _setup_and_login(async_client, "crossusera", "crossusera1")
- setup_a = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token_a))
- secret_a = setup_a.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret_a).now()},
- headers=_auth_header(token_a),
- )
- # Get pre_auth_token for user A
- pre_auth_a = await _login_get_pre_auth_token(async_client, "crossusera", "crossusera1")
- # Try to use user A's token but supply a clearly invalid code — must fail
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_a, "method": "totp", "code": "000000"},
- )
- assert resp.status_code == 401
- # ===========================================================================
- # Security: admin disable non-admin rejection
- # ===========================================================================
- class TestAdminDisableNonAdminRejection:
- """Non-admin users must be rejected from the admin disable endpoint."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_admin_cannot_disable_2fa(self, async_client: AsyncClient):
- """A regular (non-admin) user must receive 403 from DELETE /auth/2fa/admin/{id}."""
- # Set up admin, then create a regular user
- admin_token = await _setup_and_login(async_client, "adminusr2fa", "adminusr2fa1")
- # Create a regular user via user management
- create_resp = await async_client.post(
- "/api/v1/users",
- json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- # Login as regular user
- login_resp = await async_client.post(
- LOGIN_URL,
- json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
- )
- regular_token = login_resp.json()["access_token"]
- # Try to call admin endpoint with the regular user's token
- resp = await async_client.delete(
- f"/api/v1/auth/2fa/admin/{create_resp.json()['id']}",
- headers=_auth_header(regular_token),
- )
- assert resp.status_code == 403
- # ===========================================================================
- # Regenerate backup codes
- # ===========================================================================
- class TestRegenerateBackupCodes:
- """Tests for POST /api/v1/auth/2fa/totp/regenerate-backup-codes."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_requires_totp_enabled(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "regennototp", "regennototp1")
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_invalidates_old_codes(self, async_client: AsyncClient):
- """After regenerating, old backup codes must no longer work."""
- token = await _setup_and_login(async_client, "regeninval", "regeninval1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- old_backup = enable_resp.json()["backup_codes"][0]
- # Regenerate backup codes
- regen_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- assert regen_resp.status_code == 200
- new_codes = regen_resp.json()["backup_codes"]
- assert len(new_codes) == 10
- assert old_backup not in new_codes
- # Old backup code must now fail
- pre_auth_token = await _login_get_pre_auth_token(async_client, "regeninval", "regeninval1")
- fail_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": old_backup},
- )
- assert fail_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_with_invalid_code_fails(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "regeninvcode", "regeninvcode1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- # ===========================================================================
- # Security: method field validation
- # ===========================================================================
- class TestVerifyMethodValidation:
- """The method field must be one of totp/email/backup (Pydantic Literal)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_method_rejected_by_schema(self, async_client: AsyncClient):
- """Pydantic should reject unknown method values with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "123456", "method": "sms"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oversized_pre_auth_token_rejected(self, async_client: AsyncClient):
- """pre_auth_token exceeding max_length=128 should be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 200, "code": "123456", "method": "totp"},
- )
- assert resp.status_code == 422
- # ===========================================================================
- # Login response shape for 2FA users
- # ===========================================================================
- class TestLoginResponseShape:
- """Login for a 2FA-enabled user must return requires_2fa+pre_auth_token
- and must NOT include access_token (which would bypass the 2FA gate)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_login_2fa_user_omits_access_token(self, async_client: AsyncClient):
- """A user with TOTP enabled must not receive an access_token on /auth/login."""
- token = await _setup_and_login(async_client, "loginshape", "loginshape1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- login_resp = await async_client.post(LOGIN_URL, json={"username": "loginshape", "password": "Loginshape1!"})
- assert login_resp.status_code == 200
- data = login_resp.json()
- assert data.get("requires_2fa") is True
- assert data.get("pre_auth_token") is not None
- # access_token must NOT be present — it would bypass the 2FA gate
- assert "access_token" not in data or data["access_token"] is None
- # ===========================================================================
- # TOTP replay protection
- # ===========================================================================
- async def _setup_totp_user(client: AsyncClient, username: str, password: str) -> tuple[str, str]:
- """Create user, set up and enable TOTP; return (bearer_token, totp_secret)."""
- token = await _setup_and_login(client, username, password)
- setup_resp = await client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- return token, secret
- class TestTOTPReplay:
- """The same TOTP code must not be accepted twice within one 30-second window."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_totp_replay_rejected_on_verify(self, async_client: AsyncClient):
- """Replaying the same code on /2fa/verify must return 400."""
- _token, secret = await _setup_totp_user(async_client, "replayverify", "replayverify1")
- code = pyotp.TOTP(secret).now()
- pre_auth = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
- first = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
- )
- assert first.status_code == 200
- # Second login to get a fresh pre_auth_token (first was consumed)
- pre_auth2 = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
- second = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth2, "method": "totp", "code": code},
- )
- assert second.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_totp_replay_rejected_on_disable(self, async_client: AsyncClient):
- """A code already used in verify_2fa must be rejected on /2fa/totp/disable."""
- _setup_token, secret = await _setup_totp_user(async_client, "replaydisable", "replaydisable1")
- code = pyotp.TOTP(secret).now()
- # Use the code in verify_2fa — this sets last_totp_counter in DB
- pre_auth = await _login_get_pre_auth_token(async_client, "replaydisable", "replaydisable1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
- )
- assert verify_resp.status_code == 200
- authed_token = verify_resp.json()["access_token"]
- # Replay the same code on disable — must be rejected (same 30-second window)
- disable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": code},
- headers=_auth_header(authed_token),
- )
- assert disable_resp.status_code == 400
- # ===========================================================================
- # Rate limiting on disable_totp and regenerate_backup_codes (I10)
- # ===========================================================================
- class TestRateLimitingDisableRegenerate:
- """disable_totp and regenerate_backup_codes must enforce rate limiting (I10)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_totp_rate_limited_after_failures(self, async_client: AsyncClient):
- """Repeated wrong codes on /2fa/totp/disable trigger 429."""
- token, _secret = await _setup_totp_user(async_client, "rldisable", "rldisable1")
- for _ in range(5):
- await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 429
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_backup_codes_rate_limited_after_failures(self, async_client: AsyncClient):
- """Repeated wrong codes on /2fa/totp/regenerate-backup-codes trigger 429."""
- token, _secret = await _setup_totp_user(async_client, "rlregen", "rlregen1")
- for _ in range(5):
- await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 429
- # ===========================================================================
- # Email OTP send → verify end-to-end (coverage gap C3)
- # ===========================================================================
- class TestEmailOTPSendVerify:
- """Full email OTP login: send code → verify code → JWT."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_otp_send_and_verify(self, async_client: AsyncClient, db_session: AsyncSession):
- """login → POST /2fa/email/send (patched SMTP) → POST /2fa/verify → JWT."""
- import re
- from unittest.mock import AsyncMock, MagicMock, patch
- from sqlalchemy import select as sa_select
- token = await _setup_and_login(async_client, "emailsendok", "emailsendok1")
- # Give the user an email address
- result = await db_session.execute(sa_select(User).where(User.username == "emailsendok"))
- user = result.scalar_one()
- user.email = "emailsendok@example.com"
- await db_session.commit()
- # Enable email OTP via DB injection
- setup_code = "444444"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="emailsendok",
- nonce=_pwd_context.hash(setup_code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": setup_code},
- headers=_auth_header(token),
- )
- # Login now requires 2FA — get pre_auth_token (cookie set automatically)
- pre_auth_token = await _login_get_pre_auth_token(async_client, "emailsendok", "emailsendok1")
- # Mock SMTP and capture the sent OTP code
- captured: dict[str, str] = {}
- smtp_settings_mock = MagicMock()
- def _capture_email(smtp_settings, to_email, subject, body_text, body_html):
- m = re.search(r"login code is: (\d{6})", body_text)
- if m:
- captured["otp"] = m.group(1)
- with (
- patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_settings_mock)),
- patch("backend.app.api.routes.mfa.send_email", side_effect=_capture_email),
- ):
- send_resp = await async_client.post(
- "/api/v1/auth/2fa/email/send",
- json={"pre_auth_token": pre_auth_token},
- )
- assert send_resp.status_code == 200, send_resp.text
- fresh_token = send_resp.json()["pre_auth_token"]
- assert "otp" in captured, "send_email was not called or code not found in body"
- # Verify with the captured OTP code — cookie still in the async_client jar
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
- )
- assert verify_resp.status_code == 200
- data = verify_resp.json()
- assert "access_token" in data
- assert data["user"]["username"] == "emailsendok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_otp_wrong_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """A wrong email OTP code must return 401 without burning the pre_auth_token."""
- from unittest.mock import AsyncMock, MagicMock, patch
- from sqlalchemy import select as sa_select
- token = await _setup_and_login(async_client, "emailwrongcode", "emailwrongcode1")
- result = await db_session.execute(sa_select(User).where(User.username == "emailwrongcode"))
- user = result.scalar_one()
- user.email = "emailwrongcode@example.com"
- await db_session.commit()
- setup_code = "555555"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="emailwrongcode",
- nonce=_pwd_context.hash(setup_code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": setup_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "emailwrongcode", "emailwrongcode1")
- captured: dict[str, str] = {}
- smtp_mock = MagicMock()
- def _capture(smtp_settings, to_email, subject, body_text, body_html):
- import re
- m = re.search(r"login code is: (\d{6})", body_text)
- if m:
- captured["otp"] = m.group(1)
- with (
- patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_mock)),
- patch("backend.app.api.routes.mfa.send_email", side_effect=_capture),
- ):
- send_resp = await async_client.post(
- "/api/v1/auth/2fa/email/send",
- json={"pre_auth_token": pre_auth_token},
- )
- assert send_resp.status_code == 200
- fresh_token = send_resp.json()["pre_auth_token"]
- # Wrong code → 401
- bad = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": "000000"},
- )
- assert bad.status_code == 401
- # Correct code still works (token not burned by wrong attempt)
- good = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
- )
- assert good.status_code == 200
- # ===========================================================================
- # OIDC end-to-end (coverage gap C4)
- # ===========================================================================
- def _make_test_rsa_key():
- """Generate a throwaway RSA key pair and a matching JWK set for tests."""
- import base64
- from cryptography.hazmat.primitives import serialization
- from cryptography.hazmat.primitives.asymmetric import rsa
- private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
- private_pem = private_key.private_bytes(
- serialization.Encoding.PEM,
- serialization.PrivateFormat.TraditionalOpenSSL,
- serialization.NoEncryption(),
- )
- pub_numbers = private_key.public_key().public_numbers()
- def _b64url(n: int, length: int) -> str:
- return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
- jwks = {
- "keys": [
- {
- "kty": "RSA",
- "use": "sig",
- "alg": "RS256",
- "kid": "test-kid-1",
- "n": _b64url(pub_numbers.n, 256),
- "e": _b64url(pub_numbers.e, 3),
- }
- ]
- }
- return private_pem, jwks
- class TestOIDCEndToEnd:
- """Full OIDC auth-code flow: state → callback (mocked IdP) → exchange → JWT."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_callback_creates_user_and_issues_jwt(self, async_client: AsyncClient, db_session: AsyncSession):
- """callback validates the mocked ID token, creates a user, and redirects
- with an oidc_exchange token; exchanging that token returns a full JWT."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://idp.test.example.com"
- client_id = "oidc-test-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "oidc-sub-e2e",
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "oidce2e@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- # Create OIDC provider
- admin_token = await _setup_and_login(async_client, "oidce2eadm", "oidce2eadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "E2E-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "test-secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- # Simulate the authorize step: insert an oidc_state token directly
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Mock httpx calls made inside oidc_callback
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- token_response = {
- "access_token": "mock-access",
- "token_type": "Bearer",
- "id_token": id_token,
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *args, **kwargs):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *args):
- pass
- async def get(self, url, **kwargs):
- if "jwks" in url:
- return _MockResp(jwks_data)
- return _MockResp(discovery_doc)
- async def post(self, url, **kwargs):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- callback_resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=test-auth-code&state={state}",
- follow_redirects=False,
- )
- assert callback_resp.status_code == 302, callback_resp.text
- location = callback_resp.headers.get("location", "")
- assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
- # Extract and exchange the oidc_exchange token
- oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0]
- exchange_resp = await async_client.post(
- "/api/v1/auth/oidc/exchange",
- json={"oidc_token": oidc_exchange_token},
- )
- assert exchange_resp.status_code == 200
- data = exchange_resp.json()
- assert "access_token" in data
- assert data["user"]["username"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_callback_invalid_state_redirects_error(self, async_client: AsyncClient):
- """An unknown state token must redirect to /?oidc_error=invalid_state."""
- resp = await async_client.get(
- "/api/v1/auth/oidc/callback?code=x&state=totally-bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- assert "invalid_state" in resp.headers.get("location", "")
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_state_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
- """Replaying the same state token must fail on the second callback."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://idp2.test.example.com"
- client_id = "oidc-client-2"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-single-use",
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "su@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "oidcsuadm", "oidcsuadm1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SU-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- token_response = {"access_token": "a", "token_type": "Bearer", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- first = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert first.status_code == 302
- assert "oidc_token=" in first.headers.get("location", "")
- # Replay: second callback with the same state must fail
- second = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert second.status_code == 302
- assert "invalid_state" in second.headers.get("location", "")
- # ===========================================================================
- # H-2: Wrong code must NOT consume the email OTP setup token (peek-then-consume)
- # ===========================================================================
- class TestEmailOTPSetupTokenPreservedOnWrongCode:
- """After H-2 fix: a wrong code leaves the setup token intact so the user can retry."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_wrong_code_does_not_consume_setup_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """Wrong code returns 400 but the setup token survives; correct code then works."""
- token = await _setup_and_login(async_client, "h2retryuser", "h2retrypass1")
- code = "999999"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="h2retryuser",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # First attempt: wrong code → 400
- wrong = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": "000000"},
- headers=_auth_header(token),
- )
- assert wrong.status_code == 400
- # Second attempt: correct code → must succeed (token was NOT consumed)
- correct = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert correct.status_code == 200
- # ===========================================================================
- # M-2: New OIDC provider must default to auto_link_existing_accounts=False
- # ===========================================================================
- class TestOIDCProviderAutoLinkDefault:
- """auto_link_existing_accounts must default to False (M-2 fix)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_new_provider_auto_link_defaults_to_false(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "m2autolinkadmin", "m2autolinkadmin1")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "AutoLinkTest",
- "issuer_url": "https://autolink.example.com",
- "client_id": "alc",
- "client_secret": "als",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- # auto_link_existing_accounts intentionally omitted
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 201
- assert resp.json()["auto_link_existing_accounts"] is False
- # ===========================================================================
- # L-5: 2FA verify code format validation
- # ===========================================================================
- class TestTwoFAVerifyCodeFormat:
- """TwoFAVerifyRequest.code must be 6–8 alphanumeric characters (L-5)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_too_long_rejected(self, async_client: AsyncClient):
- """code > 8 characters must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "1" * 9, "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_non_alphanumeric_rejected(self, async_client: AsyncClient):
- """code containing non-alphanumeric chars must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "12-456", "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_too_short_rejected(self, async_client: AsyncClient):
- """code < 6 characters must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "12345", "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_exactly_6_passes_schema(self, async_client: AsyncClient):
- """6-character alphanumeric code passes schema (may fail 2FA logic with 400)."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 32, "code": "123456", "method": "totp"},
- )
- assert resp.status_code != 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_exactly_8_passes_schema(self, async_client: AsyncClient):
- """8-character alphanumeric backup code passes schema."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 32, "code": "ABCD1234", "method": "backup"},
- )
- assert resp.status_code != 422
- # ===========================================================================
- # M-NEW-1: verify_slicer_download_token must NOT consume token on wrong resource
- # ===========================================================================
- class TestSlicerTokenResourceBinding:
- """Token for resource A must survive a wrong-resource check and still work for A."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_wrong_resource_does_not_consume_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """A slicer token bound to archive:5 must NOT be consumed when checked against archive:6."""
- from datetime import datetime, timedelta, timezone
- from backend.app.core.auth import verify_slicer_download_token
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- now = datetime.now(timezone.utc)
- token_val = secrets.token_urlsafe(24)
- db_session.add(
- AuthEphemeralToken(
- token=token_val,
- token_type="slicer_download",
- nonce="archive:5",
- expires_at=now + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Wrong resource → must return False and NOT consume the token
- wrong = await verify_slicer_download_token(token_val, "archive", 6)
- assert wrong is False
- # Correct resource → must return True (token survived the wrong-resource check)
- correct = await verify_slicer_download_token(token_val, "archive", 5)
- assert correct is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_correct_resource_consumes_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """A slicer token is single-use: second correct-resource check must return False."""
- from datetime import datetime, timedelta, timezone
- from backend.app.core.auth import verify_slicer_download_token
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- now = datetime.now(timezone.utc)
- token_val = secrets.token_urlsafe(24)
- db_session.add(
- AuthEphemeralToken(
- token=token_val,
- token_type="slicer_download",
- nonce="library:99",
- expires_at=now + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- first = await verify_slicer_download_token(token_val, "library", 99)
- assert first is True
- second = await verify_slicer_download_token(token_val, "library", 99)
- assert second is False
- # ===========================================================================
- # M-NEW-3 / L-NEW-1: Schema length validation for change-password & forgot-password
- # ===========================================================================
- class TestSchemaLengthValidationR2:
- """Input length limits added in review round 2."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_change_password_current_too_long_rejected(self, async_client: AsyncClient):
- """current_password > 256 chars must be rejected with 422 (prevents pbkdf2 DoS)."""
- resp = await async_client.post(
- "/api/v1/users/me/change-password",
- json={"current_password": "x" * 257, "new_password": "ValidPass1!"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_forgot_password_email_too_long_rejected(self, async_client: AsyncClient):
- """email > 254 chars must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": "a" * 243 + "@example.com"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_forgot_password_email_at_limit_passes_schema(self, async_client: AsyncClient):
- """Short email passes schema (may return 400/200 from business logic)."""
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": "user@example.com"},
- )
- assert resp.status_code != 422
- # ===========================================================================
- # L-NEW-2: TOTPSetupRequest.code max_length
- # ===========================================================================
- class TestTOTPSetupCodeMaxLength:
- """TOTPSetupRequest.code must be bounded (L-NEW-2)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_code_too_long_rejected(self, async_client: AsyncClient):
- """code > 8 chars must be rejected with 422."""
- import pyotp as _pyotp
- token = await _setup_and_login(async_client, "totp_setup_maxlen", "totp_setup_maxlen1")
- # Enable TOTP so the setup-code guard path is active
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": _pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- json={"code": "1" * 9},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- # ===========================================================================
- # L-NEW-3: EmailOTPEnableConfirmRequest.code must be exactly 6 digits
- # ===========================================================================
- class TestEmailOTPConfirmCodeFormat:
- """EmailOTPEnableConfirmRequest.code must be 6 digits (L-NEW-3)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_digit_code_rejected(self, async_client: AsyncClient):
- """Alpha characters in the email OTP confirm code must be rejected with 422."""
- token = await _setup_and_login(async_client, "emailotpfmt", "emailotpfmt1")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "ABCDEF"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_seven_digit_code_rejected(self, async_client: AsyncClient):
- """7-digit code must be rejected with 422 (min_length=max_length=6)."""
- token = await _setup_and_login(async_client, "emailotplen7", "emailotplen7x")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "1234567"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_valid_six_digit_code_passes_schema(self, async_client: AsyncClient):
- """6-digit numeric code passes schema (may return 400 on bad token — that's fine)."""
- token = await _setup_and_login(async_client, "emailotpfmt6", "emailotpfmt6x")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "123456"},
- headers=_auth_header(token),
- )
- assert resp.status_code != 422
- # ===========================================================================
- # L-NEW-4: OIDCProviderCreate field max_length constraints
- # ===========================================================================
- class TestOIDCProviderFieldLengths:
- """OIDCProviderCreate fields must reject inputs exceeding max_length (L-NEW-4)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_name_too_long_rejected(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcfldadmin", "oidcfldadmin1")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "n" * 101,
- "issuer_url": "https://test.example.com",
- "client_id": "cid",
- "client_secret": "csec",
- "scopes": "openid",
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_client_secret_too_long_rejected(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcseclen", "oidcseclen123")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ValidName",
- "issuer_url": "https://test.example.com",
- "client_id": "cid",
- "client_secret": "s" * 513,
- "scopes": "openid",
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- # ---------------------------------------------------------------------------
- # M-NEW-4 / M-NEW-5 / L-NEW-5: UserCreate & UserUpdate field length limits
- # ---------------------------------------------------------------------------
- class TestUserCreateUpdateFieldLengths:
- """UserCreate and UserUpdate must enforce max_length on username, password, email."""
- @pytest.fixture
- async def admin_token(self, async_client: AsyncClient) -> str:
- return await _setup_and_login(async_client, "ucfldadmin", "ucfldadmin1!")
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "u" * 151,
- "password": "ValidPass1!",
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserX",
- "password": "A1!" + "x" * 254,
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserY",
- "password": "ValidPass1!",
- "email": "a" * 246 + "@x.com", # total 253 chars -> fine; 248+@x.com=255 -> too long
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- # 248 'a' + '@x.com' (6) = 254 chars — just at limit, should pass
- # Use 249 + '@x.com' = 255 chars to trigger the 422
- assert resp.status_code in (201, 422) # boundary sanity check
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_email_exceeds_limit_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserZ",
- "password": "ValidPass1!",
- "email": "a" * 249 + "@x.com", # 255 chars — exceeds RFC 5321 max of 254
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- # Create a user first
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr1", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"username": "u" * 151},
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr2", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"password": "A1!" + "x" * 254},
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr3", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"email": "a" * 249 + "@x.com"}, # 255 chars
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- # ---------------------------------------------------------------------------
- # L-NEW-6: per-IP rate limiting on /forgot-password
- # ---------------------------------------------------------------------------
- _SMTP_DATA_FOR_IPLIMIT = {
- "smtp_host": "smtp.test.com",
- "smtp_port": 587,
- "smtp_username": "test@test.com",
- "smtp_password": "testpass",
- "smtp_security": "starttls",
- "smtp_auth_enabled": True,
- "smtp_from_email": "noreply@test.com",
- }
- class TestForgotPasswordPerIpRateLimit:
- """POST /forgot-password must enforce a per-IP cap (L-NEW-6).
- The test sends 11 requests from the simulated test-client IP using 11
- different email addresses (so the per-email bucket is never exhausted).
- The 11th request must be rejected with 429.
- """
- @pytest.fixture
- async def advanced_auth_token(self, async_client: AsyncClient) -> str:
- """Set up auth, SMTP, and enable advanced auth; return admin token."""
- token = await _setup_and_login(async_client, "iprladmin", "iprladmin1!")
- headers = _auth_header(token)
- await async_client.post("/api/v1/auth/smtp", headers=headers, json=_SMTP_DATA_FOR_IPLIMIT)
- await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
- return token
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_per_ip_limit_triggers_429(self, async_client: AsyncClient, advanced_auth_token: str):
- # Send 11 requests from the same test-client IP using unique email
- # addresses so the per-email bucket (limit=3) is never exhausted.
- responses = []
- for i in range(11):
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": f"unique{i}@example.com"},
- )
- responses.append(resp.status_code)
- # First 10 must not be rate-limited by the IP bucket
- for code in responses[:10]:
- assert code != 429, f"Unexpected 429 before limit reached: {responses}"
- # The 11th must be rate-limited
- assert responses[10] == 429, f"Expected 429 on 11th request, got {responses[10]}"
- # ---------------------------------------------------------------------------
- # M-NEW-6: OIDC auto-link must be rejected if target user already has an
- # OIDC link to a different provider
- # ---------------------------------------------------------------------------
- class TestOIDCAutoLinkExistingLinkRejection:
- """OIDC callback must reject auto-linking when the email-matched user
- already has an OIDC link to a different provider (M-NEW-6)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_rejected_when_user_already_linked(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """Auto-link via email-match is rejected when the target user is
- already linked to another OIDC provider."""
- import base64
- import hashlib
- from unittest.mock import AsyncMock, MagicMock, patch
- from backend.app.core.auth import get_password_hash
- from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
- from backend.app.models.user import User
- # ── 1. Target user with a known email ────────────────────────────
- target = User(
- username="oidcALTarget",
- email="alinktest@example.com",
- auth_source="oidc",
- password_hash=get_password_hash(secrets.token_urlsafe(16)),
- role="user",
- is_active=True,
- )
- db_session.add(target)
- await db_session.flush()
- # ── 2. Provider B — legitimate, already linked to target ──────────
- prov_b = OIDCProvider(
- name="ProvB_m6test",
- issuer_url="https://providerb-m6.example.com",
- client_id="client_b",
- _client_secret_enc="secret_b",
- scopes="openid email profile",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(prov_b)
- await db_session.flush()
- db_session.add(
- UserOIDCLink(
- user_id=target.id,
- provider_id=prov_b.id,
- provider_user_id="legitimate_sub",
- provider_email="alinktest@example.com",
- )
- )
- # ── 3. Provider A — attacker-controlled, auto_link=True ───────────
- prov_a = OIDCProvider(
- name="ProvA_m6test",
- issuer_url="https://providera-m6.example.com",
- client_id="client_a",
- _client_secret_enc="secret_a",
- scopes="openid email profile",
- is_enabled=True,
- auto_link_existing_accounts=True,
- auto_create_users=False,
- )
- db_session.add(prov_a)
- await db_session.flush()
- # ── 4. OIDC state for Provider A ──────────────────────────────────
- state = secrets.token_urlsafe(32)
- nonce = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=prov_a.id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # ── 5. Mock HTTP + JWT so the callback can reach the auto-link check ─
- fake_discovery = {
- "issuer": "https://providera-m6.example.com",
- "token_endpoint": "https://providera-m6.example.com/token",
- "jwks_uri": "https://providera-m6.example.com/jwks",
- }
- fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
- fake_claims = {
- "sub": "attacker_sub_unique",
- "email": "alinktest@example.com",
- "email_verified": True,
- "nonce": nonce,
- "iss": "https://providera-m6.example.com",
- "aud": "client_a",
- "exp": 9_999_999_999,
- }
- disc_resp = AsyncMock()
- disc_resp.raise_for_status = MagicMock()
- disc_resp.json = MagicMock(return_value=fake_discovery)
- token_resp = AsyncMock()
- token_resp.ok = True
- token_resp.json = MagicMock(return_value=fake_token)
- jwks_resp = AsyncMock()
- jwks_resp.raise_for_status = MagicMock()
- jwks_resp.json = MagicMock(return_value={})
- mock_http = AsyncMock()
- mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
- mock_http.post = AsyncMock(return_value=token_resp)
- mock_signing_key = MagicMock()
- mock_signing_key.key = "fake_key"
- with (
- patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
- patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
- patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
- ):
- mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
- mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
- mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
- follow_redirects=False,
- )
- # M-NEW-6: must redirect with no_linked_account — NOT create a second link
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "no_linked_account" in location, f"Expected no_linked_account in redirect, got: {location}"
- # Verify no second OIDC link was created for Provider A
- from sqlalchemy import select as sa_select
- from backend.app.models.oidc_provider import UserOIDCLink as _UOL
- async with db_session as s:
- links_result = await s.execute(
- sa_select(_UOL).where(_UOL.user_id == target.id, _UOL.provider_id == prov_a.id)
- )
- assert links_result.scalar_one_or_none() is None, "No link to Provider A must exist"
- # ===========================================================================
- # Test Gap 1: OIDC state token is single-use — replay must be rejected
- # ===========================================================================
- class TestOIDCStateReplay:
- """OIDC state token must be consumed on first use; a second callback with
- the same state must redirect to ``?oidc_error=invalid_state``."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_replay_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """Replaying a consumed OIDC state token must return invalid_state."""
- from backend.app.models.oidc_provider import OIDCProvider
- # ── 1. Seed a minimal provider ────────────────────────────────────
- provider = OIDCProvider(
- name="StateReplayIdP",
- issuer_url="https://statereplay-idp.example.com",
- client_id="client_replay",
- _client_secret_enc="secret_replay",
- scopes="openid",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(provider)
- await db_session.flush()
- # ── 2. Seed an OIDC state token ───────────────────────────────────
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider.id,
- nonce=secrets.token_urlsafe(32),
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # ── 3. First callback — discovery will fail (no real IdP), but the
- # state token is atomically consumed (DELETE…RETURNING + commit)
- # before the HTTP call is attempted.
- first = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert first.status_code == 302
- # The first call may fail for any reason except invalid_state
- assert "invalid_state" not in first.headers.get("location", ""), (
- f"First call should NOT get invalid_state: {first.headers.get('location')}"
- )
- # ── 4. Second callback with the same state → must be invalid_state ─
- second = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert second.status_code == 302
- assert "invalid_state" in second.headers.get("location", ""), (
- f"Replayed state must redirect to invalid_state, got: {second.headers.get('location')}"
- )
- # ===========================================================================
- # Test Gap 2: OIDC iss claim mismatch must redirect to token_validation_failed
- # ===========================================================================
- class TestOIDCIssMismatch:
- """JWT whose iss claim does not match the discovery issuer must be rejected."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_iss_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- correct_issuer = "https://correct-iss.example.com"
- wrong_issuer = "https://wrong-iss.example.com"
- client_id = "iss-mismatch-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- # Sign the token with the WRONG issuer (iss != discovery_issuer)
- id_token = pyjwt.encode(
- {
- "sub": "sub-iss-test",
- "iss": wrong_issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "iss@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "issadmin1", "issadmin1!")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "IssTest-IdP",
- "issuer_url": correct_issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Discovery returns the CORRECT issuer; JWT carries the WRONG one.
- discovery_doc = {
- "issuer": correct_issuer,
- "token_endpoint": f"{correct_issuer}/token",
- "jwks_uri": f"{correct_issuer}/.well-known/jwks.json",
- }
- token_response = {"access_token": "a", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "token_validation_failed" in location, f"Expected token_validation_failed, got: {location}"
- # ===========================================================================
- # Test Gap 3: /forgot-password/confirm token is single-use
- # ===========================================================================
- class TestForgotPasswordTokenSingleUse:
- """POST /forgot-password/confirm must reject a token after its first use."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_token_reuse_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- from backend.app.core.auth import get_password_hash
- from backend.app.models.user import User as _User
- user = _User(
- username="fpcuser1",
- email="fpc@example.com",
- password_hash=get_password_hash("OldPass1!"),
- role="user",
- is_active=True,
- )
- db_session.add(user)
- await db_session.flush()
- reset_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=reset_token,
- token_type="password_reset",
- username="fpcuser1",
- expires_at=datetime.now(timezone.utc) + timedelta(hours=1),
- )
- )
- await db_session.commit()
- # First use → success
- resp1 = await async_client.post(
- "/api/v1/auth/forgot-password/confirm",
- json={"token": reset_token, "new_password": "NewPass1!"},
- )
- assert resp1.status_code == 200, resp1.text
- # Second use → token already consumed, must fail
- resp2 = await async_client.post(
- "/api/v1/auth/forgot-password/confirm",
- json={"token": reset_token, "new_password": "AnotherNew1!"},
- )
- assert resp2.status_code == 400
- # ===========================================================================
- # C1 regression: setup_totp must reject a replayed TOTP code
- # ===========================================================================
- class TestSetupTOTPReplayRejected:
- """setup_totp must reject a TOTP code that was already accepted in its window."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_replayed_setup_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- from sqlalchemy import select as sa_select
- from backend.app.models.user_totp import UserTOTP
- token = await _setup_and_login(async_client, "setupreplay1", "setupreplay1!")
- # Step 1: Initial TOTP setup (no active TOTP yet → no code required)
- setup_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- headers=_auth_header(token),
- )
- assert setup_resp.status_code == 200
- secret = setup_resp.json()["secret"]
- # Step 2: Enable TOTP with a valid code
- totp_obj = pyotp.TOTP(secret)
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": totp_obj.now()},
- headers=_auth_header(token),
- )
- assert enable_resp.status_code == 200 # TOTP is now active (is_enabled=True)
- # Step 3: Determine current valid code and its counter
- me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
- user_id = me_resp.json()["id"]
- totp_result = await db_session.execute(sa_select(UserTOTP).where(UserTOTP.user_id == user_id))
- totp_record = totp_result.scalar_one()
- secret_now = totp_record.secret # decrypted via property
- totp_now = pyotp.TOTP(secret_now)
- valid_code = totp_now.now()
- accepted_counter = totp_now.timecode(datetime.now(timezone.utc))
- # Step 4: Pre-set last_totp_counter so this code looks already used
- totp_record.last_totp_counter = accepted_counter
- await db_session.commit()
- # Step 5: Attempt setup_totp with the "already used" code → must be rejected
- replay_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- assert replay_resp.status_code == 400
- assert "already used" in replay_resp.json()["detail"]
- # ===========================================================================
- # Nit8: OIDC aud mismatch and nonce mismatch tests
- # ===========================================================================
- class TestOIDCAudAndNonceMismatch:
- """Nit8: aud != client_id and nonce != stored value must each fail the callback."""
- def _make_oidc_provider_setup(self):
- """Return a helper for building OIDC test fixtures inline."""
- private_pem, jwks_data = _make_test_rsa_key()
- return private_pem, jwks_data
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_aud_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """ID token with aud != client_id must be rejected (PyJWT InvalidAudienceError)."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://aud-mismatch.example.com"
- client_id = "aud-test-client"
- wrong_aud = "some-other-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-aud-test",
- "iss": issuer,
- "aud": wrong_aud, # <-- wrong audience
- "nonce": nonce,
- "email": "aud@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "audmismatch_admin", "AudMismatch_admin1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "AudMismatch-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp({"access_token": "a", "id_token": id_token})
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "token_validation_failed" in location, (
- f"Expected token_validation_failed redirect for aud mismatch, got: {location}"
- )
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_nonce_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """ID token with nonce != stored state nonce must be rejected."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://nonce-mismatch.example.com"
- client_id = "nonce-test-client"
- stored_nonce = secrets.token_urlsafe(16)
- wrong_nonce = secrets.token_urlsafe(16) # different from stored_nonce
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-nonce-test",
- "iss": issuer,
- "aud": client_id,
- "nonce": wrong_nonce, # <-- does not match stored_nonce
- "email": "nonce@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "noncemismatch_admin", "NonceMismatch_admin1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "NonceMismatch-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=stored_nonce, # state has correct nonce; JWT carries wrong_nonce
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp({"access_token": "a", "id_token": id_token})
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- # The callback redirects to ?oidc_error=nonce_mismatch when nonces differ.
- assert "nonce_mismatch" in location, f"Expected nonce_mismatch redirect for nonce mismatch, got: {location}"
- # ===========================================================================
- # Expired OIDC token rejection — state and exchange tokens
- # ===========================================================================
- class TestOIDCExpiredTokenRejection:
- """Expired OIDC state and exchange tokens must be rejected atomically.
- The DELETE … WHERE expires_at > now must ensure that an already-expired
- token is never consumed (committed) before the expiry is checked, so the
- token row stays in the DB and is not silently discarded.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_expired_state_token_rejected_as_invalid_state(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """An expired OIDC state token must redirect to invalid_state without
- being consumed — it must still exist in the DB after the rejected call."""
- from backend.app.models.oidc_provider import OIDCProvider
- provider = OIDCProvider(
- name="ExpiredStateIdP",
- issuer_url="https://expired-state.example.com",
- client_id="client_expired_state",
- _client_secret_enc="secret_exp_state",
- scopes="openid",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(provider)
- await db_session.flush()
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider.id,
- nonce=secrets.token_urlsafe(16),
- code_verifier=secrets.token_urlsafe(48),
- # already expired
- expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
- )
- )
- await db_session.commit()
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "invalid_state" in location, f"Expected invalid_state redirect for expired state, got: {location}"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_expired_exchange_token_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """An expired OIDC exchange token must return 401 without being consumed."""
- from sqlalchemy import select as sa_select
- expired_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=expired_token,
- token_type="oidc_exchange",
- username="some_user",
- # already expired
- expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/oidc/exchange",
- json={"oidc_token": expired_token},
- )
- assert resp.status_code == 401
- assert "expired" in resp.json().get("detail", "").lower() or "invalid" in resp.json().get("detail", "").lower()
- # Token must NOT have been consumed — it should still be in the DB
- # (the atomic DELETE WHERE expires_at > now left it untouched)
- result = await db_session.execute(
- sa_select(AuthEphemeralToken).where(AuthEphemeralToken.token == expired_token)
- )
- remaining = result.scalar_one_or_none()
- assert remaining is not None, "Expired exchange token must not be consumed by a rejected request"
- # ===========================================================================
- # Trailing slash in issuer_url — discovery URL must not contain double slash
- # ===========================================================================
- class TestOIDCIssuerUrlTrailingSlash:
- """Providers like Authentik use issuer URLs with a trailing slash.
- BamBuddy must strip the slash before appending /.well-known/openid-configuration
- to avoid a double-slash that results in a 404.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trailing_slash_issuer_url_fetches_correct_discovery_url(self, async_client: AsyncClient):
- from unittest.mock import AsyncMock, MagicMock, patch
- issuer_with_slash = "https://authentik.example.com/application/o/bambuddy/"
- admin_token = await _setup_and_login(async_client, "oidcslashadm", "oidcslashadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "Authentik-Slash",
- "issuer_url": issuer_with_slash,
- "client_id": "bambuddy",
- "client_secret": "secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- fake_discovery = {
- "issuer": issuer_with_slash,
- "authorization_endpoint": "https://authentik.example.com/application/o/bambuddy/authorize",
- }
- disc_resp = AsyncMock()
- disc_resp.raise_for_status = MagicMock()
- disc_resp.json = MagicMock(return_value=fake_discovery)
- mock_http = AsyncMock()
- mock_http.get = AsyncMock(return_value=disc_resp)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_cls:
- mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
- mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
- resp = await async_client.get(f"/api/v1/auth/oidc/authorize/{provider_id}")
- assert resp.status_code == 200
- called_url = mock_http.get.call_args_list[0][0][0]
- assert "//" not in called_url.replace("https://", ""), (
- f"Discovery URL must not contain double slash: {called_url}"
- )
- assert called_url.endswith("/.well-known/openid-configuration"), (
- f"Expected discovery URL to end with /.well-known/openid-configuration, got: {called_url}"
- )
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_iss_claim_trailing_slash_accepted(self, async_client: AsyncClient, db_session: AsyncSession):
- """Provider configured without trailing slash, Authentik JWT iss has trailing slash.
- Both sides must be normalised before comparison so the login succeeds.
- """
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer_no_slash = "https://authentik.example.com/application/o/bambuddy"
- issuer_with_slash = issuer_no_slash + "/"
- client_id = "bambuddy-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "authentik-sub-123",
- "iss": issuer_with_slash,
- "aud": client_id,
- "nonce": nonce,
- "email": "authentik-user@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "authentikadm", "authentikadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "Authentik-ISS",
- "issuer_url": issuer_no_slash,
- "client_id": client_id,
- "client_secret": "secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer_with_slash,
- "authorization_endpoint": f"{issuer_no_slash}/authorize",
- "token_endpoint": f"{issuer_no_slash}/token",
- "jwks_uri": f"{issuer_no_slash}/.well-known/jwks.json",
- }
- token_response = {"access_token": "mock", "token_type": "Bearer", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.is_success = True
- self.status_code = 200
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=auth-code&state={state}",
- follow_redirects=False,
- )
- location = resp.headers.get("location", "")
- assert resp.status_code == 302, f"Expected redirect, got {resp.status_code}"
- assert "token_validation_failed" not in location, (
- "Trailing slash mismatch in iss claim must not cause token_validation_failed"
- )
- assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
- class TestOIDCCallbackCodeLength:
- """OIDC callback code/state query params must accept up to 2048 characters (OAuth spec)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_512_chars_accepted(self, async_client: AsyncClient):
- """A 512-character code (old limit) must not be rejected with 422."""
- code = "a" * 512
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "512-char code must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_2048_chars_accepted(self, async_client: AsyncClient):
- """A 2048-character code must not be rejected with 422."""
- code = "a" * 2048
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "2048-char code must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_2049_chars_rejected(self, async_client: AsyncClient):
- """A 2049-character code must be rejected with 422."""
- code = "a" * 2049
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code == 422, "2049-char code must be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_512_chars_accepted(self, async_client: AsyncClient):
- """A 512-character state (old limit) must not be rejected with 422."""
- state = "a" * 512
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "512-char state must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_2048_chars_accepted(self, async_client: AsyncClient):
- """A 2048-character state must not be rejected with 422."""
- state = "a" * 2048
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "2048-char state must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_2049_chars_rejected(self, async_client: AsyncClient):
- """A 2049-character state must be rejected with 422."""
- state = "a" * 2049
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 422, "2049-char state must be rejected by Pydantic"
- # ---------------------------------------------------------------------------
- # Helpers shared by TestOIDCEmailClaimResolution
- # ---------------------------------------------------------------------------
- async def _run_oidc_callback(
- async_client: AsyncClient,
- db_session: AsyncSession,
- *,
- provider_id: int,
- claims: dict,
- private_pem: bytes,
- jwks_data: dict,
- issuer: str,
- client_id: str,
- ) -> str:
- """Run a full OIDC callback flow and return the redirect location."""
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- token_claims = {
- "sub": claims.get("sub", f"sub-{secrets.token_hex(8)}"),
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "iat": now,
- "exp": now + 300,
- **{k: v for k, v in claims.items() if k not in ("sub",)},
- }
- id_token = pyjwt.encode(token_claims, private_pem, algorithm="RS256", headers={"kid": "test-kid-1"})
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- token_response = {"access_token": "mock-access", "token_type": "Bearer", "id_token": id_token}
- class _R:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _C:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _R(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _R(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _C):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=test-code&state={state}",
- follow_redirects=False,
- )
- return resp.headers.get("location", "")
- class TestOIDCEmailClaimResolution:
- """Three-case email resolution logic: Fall A / Fall B / Fall C."""
- # ── shared helpers ────────────────────────────────────────────────────────
- async def _create_provider(
- self,
- async_client: AsyncClient,
- admin_token: str,
- issuer: str,
- client_id: str,
- *,
- email_claim: str = "email",
- require_email_verified: bool = True,
- auto_link_existing_accounts: bool = False,
- suffix: str = "",
- ) -> int:
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": f"TestIdP-{suffix or secrets.token_hex(4)}",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "sec",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- "auto_link_existing_accounts": auto_link_existing_accounts,
- "email_claim": email_claim,
- "require_email_verified": require_email_verified,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201, resp.text
- return resp.json()["id"]
- async def _get_oidc_link(self, db_session: AsyncSession, provider_id: int, sub: str):
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == sub)
- )
- return result.scalar_one_or_none()
- # ── Parametrized matrix: Fall A / Fall B / Fall C ─────────────────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- @pytest.mark.parametrize(
- "email_claim,require_ev,claims,expected",
- [
- # Fall A: standard claim + require_ev=True (default)
- ("email", True, {"email": "fa@example.com", "email_verified": True}, "fa@example.com"),
- ("email", True, {"email": "fa@example.com", "email_verified": False}, None),
- ("email", True, {"email": "fa@example.com"}, None), # Azure Entra with default config
- # Fall A + SEC-2: malformed email claim rejected even when email_verified=True
- ("email", True, {"email": "notanemail", "email_verified": True}, None),
- # Fall B: standard claim + require_ev=False (Azure Entra permissive)
- ("email", False, {"email": "fb@example.com", "email_verified": True}, "fb@example.com"),
- ("email", False, {"email": "fb@example.com", "email_verified": False}, None),
- ("email", False, {"email": "azure@company.com"}, "azure@company.com"), # ev absent → kept
- # Fall B + SEC-2: malformed email claim rejected in permissive mode (ev absent)
- ("email", False, {"email": "user@nodot"}, None),
- # Fall B + SEC-2: shape check fires before email_verified=False drop
- ("email", False, {"email": "notanemail", "email_verified": False}, None),
- # Fall C: custom claim (preferred_username) — no email_verified check
- ("preferred_username", True, {"preferred_username": "User@Company.COM"}, "user@company.com"),
- ("preferred_username", True, {"preferred_username": " User@EXAMPLE.COM "}, "user@example.com"),
- ("preferred_username", True, {"preferred_username": "justausername"}, None),
- ("preferred_username", True, {"preferred_username": "@"}, None), # SEC-2: "@" only
- ("preferred_username", True, {"preferred_username": "@domain.com"}, None), # SEC-2: empty local
- ("preferred_username", True, {"preferred_username": "user@"}, None), # SEC-2: empty domain
- ("preferred_username", True, {"preferred_username": "user@nodot"}, None), # SEC-2: no dot in domain
- ("preferred_username", True, {}, None), # claim absent
- # Fall C: email_verified=False present alongside custom claim — must NOT suppress the email
- ("preferred_username", True, {"preferred_username": "user@co.com", "email_verified": False}, "user@co.com"),
- ],
- ids=[
- "fall-a-ev-true",
- "fall-a-ev-false",
- "fall-a-ev-absent",
- "fall-a-malformed-email",
- "fall-b-ev-true",
- "fall-b-ev-false",
- "fall-b-ev-absent",
- "fall-b-malformed-email",
- "fall-b-malformed-email-ev-false",
- "fall-c-valid-upn",
- "fall-c-lowercase-strip",
- "fall-c-no-at",
- "fall-c-at-only",
- "fall-c-empty-local",
- "fall-c-empty-domain",
- "fall-c-no-dot-in-domain",
- "fall-c-claim-absent",
- "fall-c-ev-false-ignored",
- ],
- )
- async def test_email_resolution_matrix(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- email_claim: str,
- require_ev: bool,
- claims: dict,
- expected: str | None,
- ):
- """C4: Verify link exists AND check provider_email — avoids false-passing on callback failure."""
- issuer = "https://matrix.test"
- client_id = "matrix-client"
- admin_token = await _setup_and_login(async_client, "matrix_adm", "Matrix123!")
- private_pem, jwks_data = _make_test_rsa_key()
- provider_id = await self._create_provider(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim=email_claim,
- require_email_verified=require_ev,
- suffix="matrix",
- )
- sub = f"sub-matrix-{secrets.token_hex(6)}"
- await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": sub, **claims},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- db_session.expire_all()
- link = await self._get_oidc_link(db_session, provider_id, sub)
- assert link is not None, "UserOIDCLink must be created even when email is dropped"
- assert link.provider_email == expected
- # ── Security: auto_link guards (CREATE endpoint) ──────────────────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_blocked_with_require_ev_false(self, async_client: AsyncClient):
- """SEC-1: auto_link + require_email_verified=False must be rejected at schema level (422)."""
- admin_token = await _setup_and_login(async_client, "sec1_adm", "Sec1Adm123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SEC1-Test",
- "issuer_url": "https://sec1.test",
- "client_id": "sec1-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- "auto_link_existing_accounts": True,
- "require_email_verified": False,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_allowed_with_custom_claim_create(self, async_client: AsyncClient):
- """Fall C: auto_link + email_claim!='email' must be accepted on CREATE (201).
- Custom claims (e.g. Azure preferred_username/upn) never perform an email_verified
- check, so auto_link is safe regardless of require_email_verified.
- """
- admin_token = await _setup_and_login(async_client, "sec6c_adm", "Sec6CAdm123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SEC6-Create-Test",
- "issuer_url": "https://sec6c.test",
- "client_id": "sec6c-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- "auto_link_existing_accounts": True,
- "email_claim": "upn",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201
- assert resp.json()["auto_link_existing_accounts"] is True
- assert resp.json()["email_claim"] == "upn"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_allowed_with_custom_claim_update(self, async_client: AsyncClient):
- """Fall C: auto_link=True + email_claim='upn' in same UPDATE request → 200.
- Custom claims never perform an email_verified check, so auto_link is safe.
- """
- admin_token = await _setup_and_login(async_client, "sec6u_adm", "Sec6UAdm123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SEC6-Update-Test",
- "issuer_url": "https://sec6u.test",
- "client_id": "sec6u-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- resp = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True, "email_claim": "upn"},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 200
- assert resp.json()["auto_link_existing_accounts"] is True
- assert resp.json()["email_claim"] == "upn"
- # ── Combined-State-Guard (partial updates across two requests) ─────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_partial_update_guard_require_ev(self, async_client: AsyncClient):
- """SEC-1 Combined-State-Guard: require_ev=False then auto_link=True → 422 (T1 require_ev path)."""
- admin_token = await _setup_and_login(async_client, "pg_rev_adm", "PgRev123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PG-RequireEV-Test",
- "issuer_url": "https://pg-rev.test",
- "client_id": "pg-rev-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"require_email_verified": False},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_partial_update_custom_claim_then_auto_link_allowed(self, async_client: AsyncClient):
- """Fall C: email_claim='upn' first, then auto_link=True → both 200 (custom claim is safe)."""
- admin_token = await _setup_and_login(async_client, "pg_ec_adm", "PgEc123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PG-EmailClaim-Test",
- "issuer_url": "https://pg-ec.test",
- "client_id": "pg-ec-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"email_claim": "upn"},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 200
- assert upd2.json()["auto_link_existing_accounts"] is True
- assert upd2.json()["email_claim"] == "upn"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_partial_update_auto_link_then_custom_claim_allowed(self, async_client: AsyncClient):
- """Fall C: auto_link=True first (email_claim='email', safe), then email_claim='upn' → both 200."""
- admin_token = await _setup_and_login(async_client, "pg_al_ec_adm", "PgAlEc123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PG-AutoLink-Claim-Test",
- "issuer_url": "https://pg-al-ec.test",
- "client_id": "pg-al-ec-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"email_claim": "preferred_username"},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 200
- assert upd2.json()["auto_link_existing_accounts"] is True
- assert upd2.json()["email_claim"] == "preferred_username"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_partial_update_guard_inverse_order(self, async_client: AsyncClient):
- """T2: auto_link=True first (valid), then require_ev=False → Combined-State-Guard fires (422)."""
- admin_token = await _setup_and_login(async_client, "pg_inv_adm", "PgInv123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PG-Inverse-Test",
- "issuer_url": "https://pg-inv.test",
- "client_id": "pg-inv-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- # auto_link=True is safe when require_ev=True (default)
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- # Disabling require_ev with auto_link already on → unsafe combined state
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"require_email_verified": False},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 422
- # ── Low5: Response fields verified ───────────────────────────────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_response_includes_new_fields(self, async_client: AsyncClient):
- """Low5: OIDCProviderResponse must include email_claim and require_email_verified."""
- admin_token = await _setup_and_login(async_client, "resp_adm", "Resp123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ResponseFields-Test",
- "issuer_url": "https://resp.test",
- "client_id": "resp-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- "email_claim": "preferred_username",
- "require_email_verified": False,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201
- data = resp.json()
- assert data["email_claim"] == "preferred_username"
- assert data["require_email_verified"] is False
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_response_reflects_new_fields(self, async_client: AsyncClient):
- """Low5: PUT response must reflect updated email_claim and require_email_verified."""
- admin_token = await _setup_and_login(async_client, "upd_resp_adm", "UpdResp123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "UpdateResponse-Test",
- "issuer_url": "https://upd-resp.test",
- "client_id": "upd-resp-client",
- "client_secret": "sec",
- "scopes": "openid email profile",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- upd = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"email_claim": "upn", "require_email_verified": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd.status_code == 200
- data = upd.json()
- assert data["email_claim"] == "upn"
- assert data["require_email_verified"] is True
- # ===========================================================================
- # TestOIDCEmailClaimValidation — T2: email_claim field validator coverage
- # ===========================================================================
- class TestOIDCEmailClaimValidation:
- """Schema-level validation for the email_claim field."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_claim_name_dot_rejected(self, async_client: AsyncClient):
- """email_claim with a dot (log-injection risk) must be rejected."""
- admin_token = await _setup_and_login(async_client, "ecv_adm1", "Ecv123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ECVTest1",
- "issuer_url": "https://ecv1.test",
- "client_id": "ecv1",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": "email.address",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_claim_name_starts_with_digit_rejected(self, async_client: AsyncClient):
- admin_token = await _setup_and_login(async_client, "ecv_adm2", "Ecv123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ECVTest2",
- "issuer_url": "https://ecv2.test",
- "client_id": "ecv2",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": "1invalid",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_claim_name_newline_rejected(self, async_client: AsyncClient):
- """T2 regex-bug guard: re.fullmatch must reject trailing newline."""
- admin_token = await _setup_and_login(async_client, "ecv_adm3", "Ecv123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ECVTest3",
- "issuer_url": "https://ecv3.test",
- "client_id": "ecv3",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": "email\n",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_claim_name_65_chars_rejected(self, async_client: AsyncClient):
- """email_claim longer than 64 characters must be rejected."""
- admin_token = await _setup_and_login(async_client, "ecv_adm4", "Ecv123!")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ECVTest4",
- "issuer_url": "https://ecv4.test",
- "client_id": "ecv4",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": "a" * 65,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_valid_claim_name_accepted(self, async_client: AsyncClient):
- """Valid claim names like preferred_username and upn must be accepted."""
- admin_token = await _setup_and_login(async_client, "ecv_adm5", "Ecv123!")
- for claim in ("preferred_username", "upn", "email", "emailAddress"):
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": f"ECVTest-{claim[:8]}",
- "issuer_url": f"https://ecv-{claim[:8]}.test",
- "client_id": f"ecv-{claim[:8]}",
- "client_secret": "sec",
- "scopes": "openid email",
- "email_claim": claim,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201, f"claim {claim!r} was rejected: {resp.text}"
- # ===========================================================================
- # TestOIDCEmailResolutionExtra — T1 / T3 / T4 additional coverage
- # ===========================================================================
- async def _create_provider_via_api(
- async_client: AsyncClient,
- admin_token: str,
- issuer: str,
- client_id: str,
- *,
- email_claim: str = "email",
- require_email_verified: bool = True,
- suffix: str = "",
- ) -> int:
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": f"TestIdP-extra-{suffix or secrets.token_hex(4)}",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "sec",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- "email_claim": email_claim,
- "require_email_verified": require_email_verified,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201, resp.text
- return resp.json()["id"]
- class TestOIDCEmailResolutionExtra:
- """T1: isinstance guard, T3: SEC-3 normalisation for Fall A/B, T4: inverse Combined-State-Guard."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_string_claim_value_drops_email(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """T1: A non-string email_claim value (list) must be silently dropped — no crash."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://nonstring-test.example"
- client_id = "nonstring-client"
- admin_token = await _setup_and_login(async_client, "nonstr_adm", "Nonstr123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="preferred_username",
- require_email_verified=False,
- suffix="nonstr",
- )
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- # IdP sends preferred_username as a list (non-string) — must not crash
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "nonstr-sub-1", "preferred_username": ["user@example.com"]},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- assert "internal_error" not in location, f"Unexpected error redirect: {location}"
- link_result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == "nonstr-sub-1")
- )
- link = link_result.scalar_one_or_none()
- assert link is not None
- assert link.provider_email is None # list value dropped
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fall_a_sec3_normalisation(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """T3: Fall A — uppercase + whitespace in email claim must be normalised to lowercase."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://sec3a-test.example"
- client_id = "sec3a-client"
- admin_token = await _setup_and_login(async_client, "sec3a_adm", "Sec3a123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="email",
- require_email_verified=True,
- suffix="sec3a",
- )
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "sec3a-sub-1", "email": " USER@EXAMPLE.COM ", "email_verified": True},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- assert "internal_error" not in location
- link_result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == "sec3a-sub-1")
- )
- link = link_result.scalar_one_or_none()
- assert link is not None
- assert link.provider_email == "user@example.com"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fall_b_sec3_normalisation(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """T3: Fall B — uppercase + whitespace in email claim must be normalised to lowercase."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://sec3b-test.example"
- client_id = "sec3b-client"
- admin_token = await _setup_and_login(async_client, "sec3b_adm", "Sec3b123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="email",
- require_email_verified=False,
- suffix="sec3b",
- )
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "sec3b-sub-1", "email": " USER@EXAMPLE.COM "},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- assert "internal_error" not in location
- link_result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == "sec3b-sub-1")
- )
- link = link_result.scalar_one_or_none()
- assert link is not None
- assert link.provider_email == "user@example.com"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_combined_state_guard_email_claim_inverse_order(self, async_client: AsyncClient):
- """Fall C: auto_link=True first, then switch email_claim to custom → both 200 (now allowed).
- Custom claims never perform an email_verified check, so switching to a custom claim
- while auto_link is on transitions from Fall A to Fall C — both are safe.
- """
- admin_token = await _setup_and_login(async_client, "inv_ec_adm", "InvEc123!")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "InvEcTest",
- "issuer_url": "https://inv-ec.test",
- "client_id": "inv-ec",
- "client_secret": "sec",
- "scopes": "openid email",
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- # First: enable auto_link (Fall A — email_claim='email', require_ev=True)
- upd1 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"auto_link_existing_accounts": True},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd1.status_code == 200
- # Second: switch to custom claim → Fall C, still safe
- upd2 = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"email_claim": "preferred_username"},
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert upd2.status_code == 200
- assert upd2.json()["auto_link_existing_accounts"] is True
- assert upd2.json()["email_claim"] == "preferred_username"
- # ===========================================================================
- # Issue #1569: standard 'email' claim fallback for User.email during
- # auto-provisioning when email_claim is a non-email identity claim.
- # ===========================================================================
- class TestOIDCStandardEmailFallback:
- """Issue #1569: when email_claim is set to a non-email identity claim
- (e.g. preferred_username on Authentik) and the IdP token also carries a
- standard 'email' claim, auto-created users get User.email from the
- standard claim — without affecting the auto-link gate."""
- async def _get_user_and_link(self, db_session: AsyncSession, provider_id: int, sub: str):
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- from backend.app.models.user import User as UserModel
- link_result = await db_session.execute(
- select(UserOIDCLink)
- .where(UserOIDCLink.provider_id == provider_id)
- .where(UserOIDCLink.provider_user_id == sub)
- )
- link = link_result.scalar_one_or_none()
- if link is None:
- return None, None
- user_result = await db_session.execute(select(UserModel).where(UserModel.id == link.user_id))
- return user_result.scalar_one_or_none(), link
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_claim_preferred_username_falls_back_to_standard_email(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """email_claim=preferred_username + both claims → username from preferred_username,
- email populated from the standard 'email' claim."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://issue1569-both.example"
- client_id = "issue1569-both-client"
- admin_token = await _setup_and_login(async_client, "i1569b_adm", "I1569b123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="preferred_username",
- require_email_verified=False,
- suffix="i1569both",
- )
- sub = f"sub-i1569-both-{secrets.token_hex(6)}"
- await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={
- "sub": sub,
- "preferred_username": "jdoe",
- "email": "jdoe@example.com",
- "email_verified": True,
- },
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- db_session.expire_all()
- user, link = await self._get_user_and_link(db_session, provider_id, sub)
- assert user is not None and link is not None
- assert user.username == "jdoe"
- assert user.email == "jdoe@example.com"
- assert link.provider_email == "jdoe@example.com"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_claim_preferred_username_no_standard_email_keeps_email_none(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """email_claim=preferred_username + no standard 'email' claim → behaviour unchanged:
- username derived from preferred_username, User.email stays None."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://issue1569-noemail.example"
- client_id = "issue1569-noemail-client"
- admin_token = await _setup_and_login(async_client, "i1569n_adm", "I1569n123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="preferred_username",
- require_email_verified=False,
- suffix="i1569none",
- )
- sub = f"sub-i1569-none-{secrets.token_hex(6)}"
- await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": sub, "preferred_username": "jdoe2"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- db_session.expire_all()
- user, link = await self._get_user_and_link(db_session, provider_id, sub)
- assert user is not None and link is not None
- assert user.username == "jdoe2"
- assert user.email is None
- assert link.provider_email is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fallback_respects_email_verified_false(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """email_claim=preferred_username + standard 'email' claim with email_verified=False
- → fallback drops the email (User.email None) — matches Fall B semantics."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://issue1569-evfalse.example"
- client_id = "issue1569-evfalse-client"
- admin_token = await _setup_and_login(async_client, "i1569f_adm", "I1569f123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="preferred_username",
- require_email_verified=False,
- suffix="i1569evfalse",
- )
- sub = f"sub-i1569-evfalse-{secrets.token_hex(6)}"
- await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={
- "sub": sub,
- "preferred_username": "jdoe3",
- "email": "jdoe3@example.com",
- "email_verified": False,
- },
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- db_session.expire_all()
- user, link = await self._get_user_and_link(db_session, provider_id, sub)
- assert user is not None and link is not None
- assert user.username == "jdoe3"
- assert user.email is None
- assert link.provider_email is None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fallback_skipped_when_email_claim_is_email(
- self,
- async_client: AsyncClient,
- db_session: AsyncSession,
- ):
- """email_claim='email' (default) — fallback path must NOT fire. Primary
- resolver result is authoritative; this guards against the fallback
- accidentally widening Fall-A/B semantics."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://issue1569-default.example"
- client_id = "issue1569-default-client"
- admin_token = await _setup_and_login(async_client, "i1569d_adm", "I1569d123!")
- provider_id = await _create_provider_via_api(
- async_client,
- admin_token,
- issuer,
- client_id,
- email_claim="email",
- require_email_verified=True,
- suffix="i1569default",
- )
- sub = f"sub-i1569-default-{secrets.token_hex(6)}"
- # email_verified absent → Fall A drops it. Fallback must NOT
- # re-instate the email since email_claim='email' is already the
- # primary path.
- await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": sub, "email": "jdoe4@example.com"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- db_session.expire_all()
- user, link = await self._get_user_and_link(db_session, provider_id, sub)
- assert user is not None and link is not None
- assert user.email is None
- assert link.provider_email is None
- # ===========================================================================
- # E2E: Fall C (custom email claim) auto-link actually links existing user
- # ===========================================================================
- class TestOIDCFallCAutoLinkE2E:
- """OIDC callback with email_claim='preferred_username' (Fall C / Azure Entra ID)
- must auto-link an existing local user when auto_link_existing_accounts=True.
- This test exercises _resolve_provider_email Fall C and the auto-link path in
- oidc_callback — a regression in either would silently drop the link without
- being caught by the configuration-layer tests.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_fall_c_auto_link_links_existing_user_via_callback(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- from unittest.mock import AsyncMock, MagicMock, patch
- from sqlalchemy import select as sa_select
- from backend.app.core.auth import get_password_hash
- from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
- issuer = "https://entra.fallc.example.com"
- nonce = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- # ── 1. Local user that should be linked ──────────────────────────────
- alice = User(
- username="fallc_alice",
- email="alice.fallc@example.com",
- password_hash=get_password_hash(secrets.token_urlsafe(16)),
- role="user",
- is_active=True,
- )
- db_session.add(alice)
- await db_session.flush()
- # ── 2. Provider: Fall C config (preferred_username, no email_verified) ─
- provider = OIDCProvider(
- name="AzureEntraFallC",
- issuer_url=issuer,
- client_id="azure-client",
- _client_secret_enc="azure-secret",
- scopes="openid profile",
- is_enabled=True,
- auto_link_existing_accounts=True,
- auto_create_users=False,
- email_claim="preferred_username",
- require_email_verified=False,
- )
- db_session.add(provider)
- await db_session.flush()
- # ── 3. OIDC state token ───────────────────────────────────────────────
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider.id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # ── 4. Mock HTTP + JWT ────────────────────────────────────────────────
- fake_discovery = {
- "issuer": issuer,
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
- # Fall C: preferred_username carries the email; no email_verified key at all
- fake_claims = {
- "sub": "azure-sub-alice",
- "preferred_username": "alice.fallc@example.com",
- "iss": issuer,
- "aud": "azure-client",
- "nonce": nonce,
- "exp": 9_999_999_999,
- }
- disc_resp = AsyncMock()
- disc_resp.raise_for_status = MagicMock()
- disc_resp.json = MagicMock(return_value=fake_discovery)
- token_resp = AsyncMock()
- token_resp.json = MagicMock(return_value=fake_token)
- jwks_resp = AsyncMock()
- jwks_resp.raise_for_status = MagicMock()
- jwks_resp.json = MagicMock(return_value={})
- mock_http = AsyncMock()
- mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
- mock_http.post = AsyncMock(return_value=token_resp)
- mock_signing_key = MagicMock()
- mock_signing_key.key = "fake_key"
- with (
- patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
- patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
- patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
- ):
- mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
- mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
- mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
- callback_resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
- follow_redirects=False,
- )
- assert callback_resp.status_code == 302, callback_resp.text
- location = callback_resp.headers.get("location", "")
- assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
- # ── 5. Exchange token → full JWT ──────────────────────────────────────
- oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0].split("#")[-1]
- exchange_resp = await async_client.post(
- "/api/v1/auth/oidc/exchange",
- json={"oidc_token": oidc_exchange_token},
- )
- assert exchange_resp.status_code == 200
- assert exchange_resp.json()["user"]["username"] == "fallc_alice"
- # ── 6. Verify UserOIDCLink was created in DB ──────────────────────────
- async with db_session as s:
- result = await s.execute(
- sa_select(UserOIDCLink).where(
- UserOIDCLink.user_id == alice.id,
- UserOIDCLink.provider_id == provider.id,
- )
- )
- link = result.scalar_one_or_none()
- assert link is not None, "UserOIDCLink must have been created by auto-link"
- assert link.provider_user_id == "azure-sub-alice"
- class TestOIDCAutoCreateUsername:
- """Username derivation priority for auto-created OIDC users (#1173).
- Priority order: email local-part > preferred_username > name > provider_sub.
- Covers: plain claim, spaces-sanitized, name fallback, sub fallback,
- non-string isinstance guard, sanitizes-to-empty fallback, collision counter.
- """
- # ── shared helpers ───────────────────────────────────────────────────────
- @staticmethod
- async def _create_provider(async_client: AsyncClient, admin_token: str, issuer: str, client_id: str) -> int:
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": f"AutoUser-{secrets.token_hex(4)}",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "secret",
- "scopes": "openid profile",
- "is_enabled": True,
- "auto_create_users": True,
- "email_claim": "email",
- "require_email_verified": True,
- },
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201, resp.text
- return resp.json()["id"]
- @staticmethod
- async def _exchange_username(async_client: AsyncClient, location: str) -> str:
- assert "oidc_token=" in location, f"No oidc_token in redirect: {location}"
- token = location.split("oidc_token=")[1].split("&")[0].split("#")[-1]
- resp = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": token})
- assert resp.status_code == 200, resp.text
- return resp.json()["user"]["username"]
- # ── tests ────────────────────────────────────────────────────────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_preferred_username_used_when_no_email(self, async_client: AsyncClient, db_session: AsyncSession):
- """preferred_username='johndoe' → username 'johndoe' (no email claim present)."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://au-pref.example"
- client_id = "au-pref-client"
- admin_token = await _setup_and_login(async_client, "au_pref_adm", "AuPrefAdm1!")
- provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "pref-sub-1", "preferred_username": "johndoe"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- username = await self._exchange_username(async_client, location)
- assert username == "johndoe"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_preferred_username_spaces_sanitized(self, async_client: AsyncClient, db_session: AsyncSession):
- """preferred_username='John Doe' → sanitized to 'JohnDoe'."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://au-spaces.example"
- client_id = "au-spaces-client"
- admin_token = await _setup_and_login(async_client, "au_spaces_adm", "AuSpacesAdm1!")
- provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "spaces-sub-1", "preferred_username": "John Doe"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- username = await self._exchange_username(async_client, location)
- assert username == "JohnDoe"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_name_claim_used_when_no_preferred_username(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """name='Jane Smith', no preferred_username → username 'JaneSmith'."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://au-name.example"
- client_id = "au-name-client"
- admin_token = await _setup_and_login(async_client, "au_name_adm", "AuNameAdm1!")
- provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "name-sub-1", "name": "Jane Smith"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- username = await self._exchange_username(async_client, location)
- assert username == "JaneSmith"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_provider_sub_fallback_when_no_claims(self, async_client: AsyncClient, db_session: AsyncSession):
- """No preferred_username, no name, no email → username derived from provider_sub."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://au-sub.example"
- client_id = "au-sub-client"
- admin_token = await _setup_and_login(async_client, "au_sub_adm", "AuSubAdm1!")
- provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "abc123xyz"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- username = await self._exchange_username(async_client, location)
- assert username == "abc123xyz"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_string_preferred_username_falls_through_to_name(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """preferred_username is a list (non-string) → isinstance guard skips it, uses name."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://au-nonstr.example"
- client_id = "au-nonstr-client"
- admin_token = await _setup_and_login(async_client, "au_nonstr_adm", "AuNonstrAdm1!")
- provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "nonstr-sub-2", "preferred_username": ["listval"], "name": "BobJones"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- username = await self._exchange_username(async_client, location)
- assert username == "BobJones"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_preferred_username_sanitizes_to_empty_falls_through_to_name(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """preferred_username='!!!' sanitizes to '' → falls through to name claim."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://au-empty.example"
- client_id = "au-empty-client"
- admin_token = await _setup_and_login(async_client, "au_empty_adm", "AuEmptyAdm1!")
- provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "empty-sub-1", "preferred_username": "!!!", "name": "bob"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- username = await self._exchange_username(async_client, location)
- assert username == "bob"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_username_collision_appends_counter(self, async_client: AsyncClient, db_session: AsyncSession):
- """When preferred_username 'collider' is already taken, counter suffix is appended."""
- from backend.app.core.auth import get_password_hash
- # Pre-create a user occupying the candidate username
- existing = User(
- username="collider",
- email="collider@example.com",
- password_hash=get_password_hash("irrelevant"),
- role="user",
- is_active=True,
- )
- db_session.add(existing)
- await db_session.commit()
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://au-collision.example"
- client_id = "au-collision-client"
- admin_token = await _setup_and_login(async_client, "au_col_adm", "AuColAdm1!")
- provider_id = await self._create_provider(async_client, admin_token, issuer, client_id)
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": "col-sub-1", "preferred_username": "collider"},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- username = await self._exchange_username(async_client, location)
- assert username == "collider1"
- # ===========================================================================
- # OIDC auto-create: configurable default group (#1173 Thread 2)
- # ===========================================================================
- class TestOIDCAutoCreateDefaultGroup:
- """Auto-created OIDC users receive the provider's configured default group.
- Resolution order:
- 1. provider.default_group_id (configured)
- 2. "Viewers" system group (fallback when default_group_id is None)
- 3. no group (last resort when both are unavailable)
- All tests are DB-agnostic: they verify group membership via the OIDC
- exchange response, which includes the user's group list.
- """
- @staticmethod
- async def _create_provider(
- async_client: AsyncClient,
- admin_token: str,
- *,
- issuer: str,
- client_id: str,
- default_group_id: int | None = None,
- ) -> int:
- payload: dict = {
- "name": f"DgAutoProvider-{secrets.token_hex(4)}",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "secret",
- "scopes": "openid profile",
- "is_enabled": True,
- "auto_create_users": True,
- "email_claim": "email",
- "require_email_verified": True,
- }
- if default_group_id is not None:
- payload["default_group_id"] = default_group_id
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json=payload,
- headers={"Authorization": f"Bearer {admin_token}"},
- )
- assert resp.status_code == 201, resp.text
- return resp.json()["id"]
- @staticmethod
- async def _run_autocreate_and_get_groups(
- async_client: AsyncClient,
- db_session: AsyncSession,
- *,
- provider_id: int,
- sub: str,
- issuer: str,
- client_id: str,
- private_pem: bytes,
- jwks_data: dict,
- ) -> list[str]:
- """Complete OIDC callback + exchange and return the new user's group names."""
- location = await _run_oidc_callback(
- async_client,
- db_session,
- provider_id=provider_id,
- claims={"sub": sub},
- private_pem=private_pem,
- jwks_data=jwks_data,
- issuer=issuer,
- client_id=client_id,
- )
- assert "oidc_token=" in location, f"No oidc_token in redirect: {location}"
- token = location.split("oidc_token=")[1].split("&")[0].split("#")[-1]
- resp = await async_client.post("/api/v1/auth/oidc/exchange", json={"oidc_token": token})
- assert resp.status_code == 200, resp.text
- return [g["name"] for g in resp.json()["user"]["groups"]]
- # ── tests ────────────────────────────────────────────────────────────────
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_configured_group_assigned_to_auto_created_user(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """Auto-created user is placed in the provider's configured default_group_id."""
- from sqlalchemy import select
- from backend.app.models.group import Group
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://dg-configured.example"
- client_id = "dg-configured-client"
- admin_token = await _setup_and_login(async_client, "dg_cfg_adm", "DgCfgAdm1!")
- grp_result = await db_session.execute(select(Group).where(Group.name == "Operators"))
- operators = grp_result.scalar_one()
- provider_id = await self._create_provider(
- async_client,
- admin_token,
- issuer=issuer,
- client_id=client_id,
- default_group_id=operators.id,
- )
- group_names = await self._run_autocreate_and_get_groups(
- async_client,
- db_session,
- provider_id=provider_id,
- sub="dg-cfg-sub-1",
- issuer=issuer,
- client_id=client_id,
- private_pem=private_pem,
- jwks_data=jwks_data,
- )
- assert "Operators" in group_names, f"Expected Operators, got {group_names}"
- assert "Viewers" not in group_names
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_null_default_group_id_falls_back_to_viewers(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """When default_group_id is None, auto-created user falls back to Viewers."""
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://dg-null.example"
- client_id = "dg-null-client"
- admin_token = await _setup_and_login(async_client, "dg_null_adm", "DgNullAdm1!")
- provider_id = await self._create_provider(
- async_client,
- admin_token,
- issuer=issuer,
- client_id=client_id,
- )
- group_names = await self._run_autocreate_and_get_groups(
- async_client,
- db_session,
- provider_id=provider_id,
- sub="dg-null-sub-1",
- issuer=issuer,
- client_id=client_id,
- private_pem=private_pem,
- jwks_data=jwks_data,
- )
- assert "Viewers" in group_names, f"Expected Viewers, got {group_names}"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_dangling_default_group_id_falls_back_to_viewers(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """When configured group is deleted, auto-created user falls back to Viewers.
- SQLite does not enforce FK ON DELETE SET NULL (no PRAGMA foreign_keys=ON),
- so provider.default_group_id may point to a deleted group. The runtime
- resolution chain must handle this and fall back to Viewers.
- """
- from sqlalchemy import delete as sa_delete, select
- from backend.app.models.group import Group
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://dg-dangling.example"
- client_id = "dg-dangling-client"
- admin_token = await _setup_and_login(async_client, "dg_dangle_adm", "DgDangleAdm1!")
- # Create a temporary group and use it as default_group_id
- temp_group = Group(name="TempGroup-DgDangle", permissions=[])
- db_session.add(temp_group)
- await db_session.commit()
- await db_session.refresh(temp_group)
- temp_group_id = temp_group.id
- provider_id = await self._create_provider(
- async_client,
- admin_token,
- issuer=issuer,
- client_id=client_id,
- default_group_id=temp_group_id,
- )
- # Delete the group — simulates dangling FK (especially on SQLite)
- await db_session.execute(sa_delete(Group).where(Group.id == temp_group_id))
- await db_session.commit()
- group_names = await self._run_autocreate_and_get_groups(
- async_client,
- db_session,
- provider_id=provider_id,
- sub="dg-dangle-sub-1",
- issuer=issuer,
- client_id=client_id,
- private_pem=private_pem,
- jwks_data=jwks_data,
- )
- assert "Viewers" in group_names, f"Expected Viewers fallback, got {group_names}"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_administrators_group_can_be_set_as_default(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """Operators can configure Administrators as the default group (e.g. single-tenant IdP)."""
- from sqlalchemy import select
- from backend.app.models.group import Group
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://dg-admin.example"
- client_id = "dg-admin-client"
- admin_token = await _setup_and_login(async_client, "dg_admgrp_adm", "DgAdmgrpAdm1!")
- grp_result = await db_session.execute(select(Group).where(Group.name == "Administrators"))
- administrators = grp_result.scalar_one()
- provider_id = await self._create_provider(
- async_client,
- admin_token,
- issuer=issuer,
- client_id=client_id,
- default_group_id=administrators.id,
- )
- group_names = await self._run_autocreate_and_get_groups(
- async_client,
- db_session,
- provider_id=provider_id,
- sub="dg-admin-sub-1",
- issuer=issuer,
- client_id=client_id,
- private_pem=private_pem,
- jwks_data=jwks_data,
- )
- assert "Administrators" in group_names, f"Expected Administrators, got {group_names}"
- class TestListOidcLinksDefensiveProviderNull:
- """list_oidc_links must not crash if a link's provider is orphaned on SQLite.
- PR for #1285 added a defensive null-check at mfa.py:1871 so the endpoint
- returns ``provider_name="<deleted>"`` for orphan links instead of raising
- AttributeError when accessing ``link.provider.name``. This scenario is
- only reachable on SQLite (PRAGMA foreign_keys=OFF) when an OIDCProvider
- row is removed without the ORM cascade running.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_oidc_links_returns_deleted_marker_for_orphan_provider(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- from sqlalchemy import select
- from backend.app.models.oidc_provider import UserOIDCLink
- admin_token = await _setup_and_login(async_client, "linkorphan_adm", "LinkOrphanAdm1!")
- admin_row = await db_session.execute(select(User).where(User.username == "linkorphan_adm"))
- admin_user = admin_row.scalar_one()
- # Orphan link: provider_id=99999 deliberately points at no row.
- db_session.add(
- UserOIDCLink(
- user_id=admin_user.id,
- provider_id=99999,
- provider_user_id="orphan-sub",
- provider_email="orphan@example.com",
- )
- )
- await db_session.commit()
- resp = await async_client.get(
- "/api/v1/auth/oidc/links",
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 200, resp.text
- links = resp.json()
- assert len(links) == 1
- # The fix: orphan provider_id no longer crashes — returns "<deleted>" instead.
- assert links[0]["provider_name"] == "<deleted>", (
- f"Expected '<deleted>' fallback for orphan provider, got {links[0]['provider_name']!r}"
- )
- assert links[0]["provider_id"] == 99999
|