| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265 |
- """Integration tests for 2FA and OIDC API endpoints.
- Tests the full request/response cycle for:
- - GET /api/v1/auth/2fa/status
- - POST /api/v1/auth/2fa/totp/setup
- - POST /api/v1/auth/2fa/totp/enable
- - POST /api/v1/auth/2fa/totp/disable
- - POST /api/v1/auth/2fa/email/enable
- - POST /api/v1/auth/2fa/email/disable
- - POST /api/v1/auth/2fa/verify (TOTP, email, backup paths)
- - DELETE /api/v1/auth/2fa/admin/{user_id}
- - GET /api/v1/auth/oidc/providers
- - POST /api/v1/auth/oidc/providers
- - PATCH /api/v1/auth/oidc/providers/{id}
- - DELETE /api/v1/auth/oidc/providers/{id}
- """
- from __future__ import annotations
- import secrets
- from datetime import datetime, timedelta, timezone
- import pyotp
- import pytest
- from httpx import AsyncClient
- from passlib.context import CryptContext
- from sqlalchemy.ext.asyncio import AsyncSession
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- from backend.app.models.user import User
- _pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
- # ---------------------------------------------------------------------------
- # Fixtures / helpers
- # ---------------------------------------------------------------------------
- AUTH_SETUP_URL = "/api/v1/auth/setup"
- LOGIN_URL = "/api/v1/auth/login"
- def _norm_pw(password: str) -> str:
- """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
- if not any(c.isupper() for c in password):
- password = password[0].upper() + password[1:]
- if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
- password = password + "!"
- return password
- async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
- """Enable auth, create an admin user, login, and return the bearer token."""
- password = _norm_pw(password)
- await client.post(
- AUTH_SETUP_URL,
- json={
- "auth_enabled": True,
- "admin_username": username,
- "admin_password": password,
- },
- )
- resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
- assert resp.status_code == 200
- return resp.json()["access_token"]
- async def _login_get_pre_auth_token(client: AsyncClient, username: str, password: str) -> str:
- """Login a user who has 2FA enabled; return the pre_auth_token from the response."""
- password = _norm_pw(password)
- resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
- assert resp.status_code == 200
- data = resp.json()
- assert data["requires_2fa"] is True, f"Expected requires_2fa=True, got {data}"
- assert data["pre_auth_token"] is not None
- return data["pre_auth_token"]
- def _auth_header(token: str) -> dict[str, str]:
- return {"Authorization": f"Bearer {token}"}
- # ===========================================================================
- # 2FA Status
- # ===========================================================================
- class TestTwoFAStatus:
- """Tests for GET /api/v1/auth/2fa/status."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_requires_auth(self, async_client: AsyncClient):
- response = await async_client.get("/api/v1/auth/2fa/status")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_default_disabled(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "statususer", "statuspass123")
- response = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert response.status_code == 200
- data = response.json()
- assert data["totp_enabled"] is False
- assert data["email_otp_enabled"] is False
- assert data["backup_codes_remaining"] == 0
- # ===========================================================================
- # TOTP Setup
- # ===========================================================================
- class TestTOTPSetup:
- """Tests for POST /api/v1/auth/2fa/totp/setup."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_requires_auth(self, async_client: AsyncClient):
- response = await async_client.post("/api/v1/auth/2fa/totp/setup")
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_returns_secret_and_qr(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "totpsetup", "totpsetup123")
- response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- assert response.status_code == 200
- data = response.json()
- assert "secret" in data
- assert len(data["secret"]) > 0
- assert "qr_code_b64" in data
- assert data["issuer"] == "Bambuddy"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_secret_is_valid_base32(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "totpbase32", "totpbase32pw")
- response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- assert response.status_code == 200
- secret = response.json()["secret"]
- # pyotp will raise on invalid base32
- totp = pyotp.TOTP(secret)
- assert len(totp.now()) == 6
- # ===========================================================================
- # TOTP Enable
- # ===========================================================================
- class TestTOTPEnable:
- """Tests for POST /api/v1/auth/2fa/totp/enable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_without_setup_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "nosetupenable", "nosetupenable1")
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_with_invalid_code_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "badcodeuser", "badcodeuser1")
- await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_with_valid_code_returns_backup_codes(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "enableok", "enableok123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- data = response.json()
- assert "backup_codes" in data
- assert len(data["backup_codes"]) == 10
- for code in data["backup_codes"]:
- assert len(code) == 8
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_status_reflects_enabled_totp(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "statustotp", "statustotp1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- data = status_resp.json()
- assert data["totp_enabled"] is True
- assert data["backup_codes_remaining"] == 10
- # ===========================================================================
- # TOTP Disable
- # ===========================================================================
- class TestTOTPDisable:
- """Tests for POST /api/v1/auth/2fa/totp/disable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_when_not_enabled_returns_400(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "disablenoenab", "disablenoenab1")
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert response.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_with_valid_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "disableok", "disableok123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Disable with a fresh valid code
- disable_code = pyotp.TOTP(secret).now()
- response = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": disable_code},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- assert "disabled" in response.json()["message"].lower()
- # Status should now show disabled
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["totp_enabled"] is False
- # ===========================================================================
- # Email OTP Enable/Disable
- # ===========================================================================
- class TestEmailOTP:
- """Tests for POST /api/v1/auth/2fa/email/enable, /enable/confirm and /disable."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_enable_email_otp_without_email_returns_400(self, async_client: AsyncClient):
- """Users without an email address cannot enable email OTP."""
- token = await _setup_and_login(async_client, "noemailuser", "noemailuser1")
- response = await async_client.post("/api/v1/auth/2fa/email/enable", headers=_auth_header(token))
- assert response.status_code == 400
- assert "email" in response.json()["detail"].lower()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_happy_path(self, async_client: AsyncClient, db_session: AsyncSession):
- """Confirm step activates email OTP when setup_token + code are valid (C5)."""
- token = await _setup_and_login(async_client, "confirmenable", "confirmenable1")
- # Give user an email address directly (SMTP not available in tests)
- from sqlalchemy import select as sa_select
- result = await db_session.execute(sa_select(User).where(User.username == "confirmenable"))
- user = result.scalar_one()
- user.email = "confirmenable@example.com"
- await db_session.commit()
- # Inject a known setup token directly into the DB (bypasses SMTP)
- code = "123456"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmenable",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert resp.status_code == 200
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["email_otp_enabled"] is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_wrong_code(self, async_client: AsyncClient, db_session: AsyncSession):
- """Wrong code on confirm step returns 400 and does not enable email OTP."""
- token = await _setup_and_login(async_client, "confirmwrong", "confirmwrong1")
- code_hash = _pwd_context.hash("654321")
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmwrong",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_confirm_enable_email_otp_setup_token_is_single_use(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """Setup token is consumed on first use; replay returns 400."""
- token = await _setup_and_login(async_client, "confirmonce", "confirmonce1")
- code = "111111"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="confirmonce",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- first = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert first.status_code == 200
- second = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert second.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_email_otp_requires_password(self, async_client: AsyncClient):
- """Disabling email OTP requires the account password (C6: re-auth)."""
- token = await _setup_and_login(async_client, "disemailotp", "disemailotp1")
- # Wrong password → 401
- response = await async_client.post(
- "/api/v1/auth/2fa/email/disable",
- json={"password": "wrongpassword"},
- headers=_auth_header(token),
- )
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_email_otp_when_enabled(self, async_client: AsyncClient, db_session: AsyncSession):
- """Disabling email OTP when enabled turns it off and status reflects that."""
- token = await _setup_and_login(async_client, "disemailpw", "disemailpw1")
- # Enable email OTP via direct DB injection (no SMTP)
- code = "222222"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="disemailpw",
- nonce=_pwd_context.hash(code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- # Now disable
- response = await async_client.post(
- "/api/v1/auth/2fa/email/disable",
- json={"password": _norm_pw("disemailpw1")},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["email_otp_enabled"] is False
- # ===========================================================================
- # 2FA Verify — TOTP path
- # ===========================================================================
- class TestTwoFAVerifyTOTP:
- """Tests for POST /api/v1/auth/2fa/verify using the TOTP method."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_with_invalid_pre_auth_token(self, async_client: AsyncClient):
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "bogus", "method": "totp", "code": "123456"},
- )
- assert response.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_totp_issues_jwt(self, async_client: AsyncClient):
- """Full flow: setup → enable TOTP → login → pre_auth_token → verify → JWT."""
- token = await _setup_and_login(async_client, "verifytotpok", "verifytotpok1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Login now returns requires_2fa=True + pre_auth_token
- pre_auth_token = await _login_get_pre_auth_token(async_client, "verifytotpok", "verifytotpok1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={
- "pre_auth_token": pre_auth_token,
- "method": "totp",
- "code": pyotp.TOTP(secret).now(),
- },
- )
- assert verify_resp.status_code == 200
- data = verify_resp.json()
- assert "access_token" in data
- assert data["token_type"] == "bearer"
- assert data["user"]["username"] == "verifytotpok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_totp_invalid_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "verifybadcode", "verifybadcode1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "verifybadcode", "verifybadcode1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert verify_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_invalid_method(self, async_client: AsyncClient):
- """An invalid 2FA method should return 400 even with a valid pre_auth_token."""
- token = await _setup_and_login(async_client, "invalidmethod", "invalidmethod1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "invalidmethod", "invalidmethod1")
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "sms", "code": "123456"},
- )
- assert response.status_code == 422 # Pydantic Literal validation
- # ===========================================================================
- # 2FA Verify — Backup code path
- # ===========================================================================
- class TestTwoFAVerifyBackup:
- """Tests for POST /api/v1/auth/2fa/verify using the backup method."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_verify_with_backup_code(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupcodeok", "backupcodeok1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcodeok", "backupcodeok1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- assert verify_resp.status_code == 200
- assert "access_token" in verify_resp.json()
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_backup_code_is_single_use(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupsingle", "backupsingle1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- # First use — should succeed
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
- first_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- assert first_resp.status_code == 200
- # Second use of the same code — must fail (need new pre_auth_token + same backup code)
- pre_auth_token2 = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
- second_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token2, "method": "backup", "code": backup_code},
- )
- assert second_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_backup_code_count_decrements(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "backupcount", "backupcount1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- backup_code = enable_resp.json()["backup_codes"][0]
- pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcount", "backupcount1")
- await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
- )
- # Status is readable with the original full token (still valid)
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
- assert status_resp.json()["backup_codes_remaining"] == 9
- # ===========================================================================
- # Rate Limiting
- # ===========================================================================
- class TestRateLimiting:
- """Ensure 429 is returned after 5 failed 2FA attempts."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_rate_limit_lockout(self, async_client: AsyncClient):
- """After 5 failed TOTP attempts the 6th must return 429."""
- token = await _setup_and_login(async_client, "ratelimituser", "ratelimituser1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # 5 failed attempts via the login → pre_auth_token → verify flow
- for _ in range(5):
- pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
- await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- # 6th attempt should hit the rate limit
- pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
- response = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert response.status_code == 429
- # ===========================================================================
- # Admin 2FA Disable
- # ===========================================================================
- class TestAdminDisable2FA:
- """Tests for DELETE /api/v1/auth/2fa/admin/{user_id}."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_admin_disable_requires_admin(self, async_client: AsyncClient):
- """Only admins can use the admin disable endpoint."""
- # The only user in a fresh setup IS admin, so just check the 404 path
- token = await _setup_and_login(async_client, "admincheck", "admincheck123")
- # Try to disable for a non-existent user_id — should get 200 (no-op) or 404
- response = await async_client.request(
- "DELETE",
- "/api/v1/auth/2fa/admin/99999",
- json={"admin_password": _norm_pw("admincheck123")},
- headers=_auth_header(token),
- )
- # Admin users succeed regardless (returns 200 even if user doesn't exist)
- assert response.status_code == 200
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_admin_disable_clears_totp(self, async_client: AsyncClient):
- from sqlalchemy import select
- from backend.app.models.user import User
- token = await _setup_and_login(async_client, "admintotp", "admintotp123")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- # Find the user's id by querying status (which works with the token)
- me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
- user_id = me_resp.json()["id"]
- response = await async_client.request(
- "DELETE",
- f"/api/v1/auth/2fa/admin/{user_id}",
- json={"admin_password": _norm_pw("admintotp123")},
- headers=_auth_header(token),
- )
- assert response.status_code == 200
- # I2: admin_disable_2fa bumps password_changed_at, invalidating the old token.
- # Re-login to get a fresh token before checking status.
- new_login = await async_client.post(
- LOGIN_URL, json={"username": "admintotp", "password": _norm_pw("admintotp123")}
- )
- assert new_login.status_code == 200, f"re-login failed: {new_login.json()}"
- assert new_login.json().get("requires_2fa") is False, f"still requires 2FA: {new_login.json()}"
- new_token = new_login.json()["access_token"]
- assert new_token is not None, f"no access_token in: {new_login.json()}"
- # Status should now show TOTP disabled
- status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(new_token))
- assert status_resp.status_code == 200, f"status check failed: {status_resp.json()}"
- assert status_resp.json()["totp_enabled"] is False
- # ===========================================================================
- # OIDC Provider CRUD
- # ===========================================================================
- class TestOIDCProviders:
- """Tests for OIDC provider management endpoints."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_list_public_providers_empty(self, async_client: AsyncClient):
- response = await async_client.get("/api/v1/auth/oidc/providers")
- assert response.status_code == 200
- assert isinstance(response.json(), list)
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_provider_requires_admin(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcadmincreate", "oidcadmincreate1")
- response = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "PocketID",
- "issuer_url": "https://auth.example.com",
- "client_id": "bambuddy",
- "client_secret": "supersecret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- assert response.status_code == 201
- data = response.json()
- assert data["name"] == "PocketID"
- assert data["issuer_url"] == "https://auth.example.com"
- assert "client_secret" not in data # Secret must not be returned
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_created_provider_appears_in_all_list(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidclistall", "oidclistall123")
- await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "TestProvider",
- "issuer_url": "https://test.example.com",
- "client_id": "testclient",
- "client_secret": "testsecret",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- response = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
- assert response.status_code == 200
- names = [p["name"] for p in response.json()]
- assert "TestProvider" in names
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disabled_provider_not_in_public_list(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcdisabled", "oidcdisabled1")
- await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "DisabledProvider",
- "issuer_url": "https://disabled.example.com",
- "client_id": "dc",
- "client_secret": "ds",
- "scopes": "openid",
- "is_enabled": False,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- response = await async_client.get("/api/v1/auth/oidc/providers")
- names = [p["name"] for p in response.json()]
- assert "DisabledProvider" not in names
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_provider(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcupdate", "oidcupdate123")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "OldName",
- "issuer_url": "https://update.example.com",
- "client_id": "uc",
- "client_secret": "us",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- provider_id = create_resp.json()["id"]
- put_resp = await async_client.put(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- json={"name": "NewName"},
- headers=_auth_header(token),
- )
- assert put_resp.status_code == 200
- assert put_resp.json()["name"] == "NewName"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_delete_provider(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcdelete", "oidcdelete123")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ToDelete",
- "issuer_url": "https://delete.example.com",
- "client_id": "dc",
- "client_secret": "ds",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(token),
- )
- provider_id = create_resp.json()["id"]
- del_resp = await async_client.delete(
- f"/api/v1/auth/oidc/providers/{provider_id}",
- headers=_auth_header(token),
- )
- assert del_resp.status_code == 200
- # No longer in list
- all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
- ids = [p["id"] for p in all_resp.json()]
- assert provider_id not in ids
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_nonexistent_provider_returns_404(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidc404", "oidc404pass1")
- response = await async_client.put(
- "/api/v1/auth/oidc/providers/99999",
- json={"name": "ghost"},
- headers=_auth_header(token),
- )
- assert response.status_code == 404
- # ===========================================================================
- # Security: pre-auth token single-use
- # ===========================================================================
- class TestPreAuthTokenSingleUse:
- """pre_auth_token must be consumed on successful 2FA and cannot be reused."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_pre_auth_token_is_single_use(self, async_client: AsyncClient):
- """A pre_auth_token that was successfully used cannot be reused."""
- token = await _setup_and_login(async_client, "singleusepat", "singleusepat1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "singleusepat", "singleusepat1")
- # First use — succeeds
- first = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert first.status_code == 200
- # Second use of the same token — must fail (token already consumed on success)
- second = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert second.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_pre_auth_token_survives_wrong_code(self, async_client: AsyncClient):
- """A wrong 2FA code must NOT burn the pre_auth_token (user can retry)."""
- token = await _setup_and_login(async_client, "survivepatuser", "survivepatuser1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- valid_code = pyotp.TOTP(secret).now()
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "survivepatuser", "survivepatuser1")
- # Wrong code — should fail but not burn the token
- bad = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
- )
- assert bad.status_code == 401
- # Same token, correct code — should succeed (token still valid)
- good = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
- )
- assert good.status_code == 200
- # ===========================================================================
- # Security: cross-user token isolation
- # ===========================================================================
- class TestCrossUserTokenIsolation:
- """A pre_auth_token issued for user A cannot authenticate as user B."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_token_cannot_be_used_for_different_user(self, async_client: AsyncClient):
- """pre_auth_token is bound to the issuing user; using it to verify a different
- user's TOTP code must fail."""
- # Set up two users with TOTP
- token_a = await _setup_and_login(async_client, "crossusera", "crossusera1")
- setup_a = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token_a))
- secret_a = setup_a.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret_a).now()},
- headers=_auth_header(token_a),
- )
- # Get pre_auth_token for user A
- pre_auth_a = await _login_get_pre_auth_token(async_client, "crossusera", "crossusera1")
- # Try to use user A's token but supply a clearly invalid code — must fail
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_a, "method": "totp", "code": "000000"},
- )
- assert resp.status_code == 401
- # ===========================================================================
- # Security: admin disable non-admin rejection
- # ===========================================================================
- class TestAdminDisableNonAdminRejection:
- """Non-admin users must be rejected from the admin disable endpoint."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_admin_cannot_disable_2fa(self, async_client: AsyncClient):
- """A regular (non-admin) user must receive 403 from DELETE /auth/2fa/admin/{id}."""
- # Set up admin, then create a regular user
- admin_token = await _setup_and_login(async_client, "adminusr2fa", "adminusr2fa1")
- # Create a regular user via user management
- create_resp = await async_client.post(
- "/api/v1/users",
- json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- # Login as regular user
- login_resp = await async_client.post(
- LOGIN_URL,
- json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
- )
- regular_token = login_resp.json()["access_token"]
- # Try to call admin endpoint with the regular user's token
- resp = await async_client.delete(
- f"/api/v1/auth/2fa/admin/{create_resp.json()['id']}",
- headers=_auth_header(regular_token),
- )
- assert resp.status_code == 403
- # ===========================================================================
- # Regenerate backup codes
- # ===========================================================================
- class TestRegenerateBackupCodes:
- """Tests for POST /api/v1/auth/2fa/totp/regenerate-backup-codes."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_requires_totp_enabled(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "regennototp", "regennototp1")
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "123456"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_invalidates_old_codes(self, async_client: AsyncClient):
- """After regenerating, old backup codes must no longer work."""
- token = await _setup_and_login(async_client, "regeninval", "regeninval1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- old_backup = enable_resp.json()["backup_codes"][0]
- # Regenerate backup codes
- regen_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- assert regen_resp.status_code == 200
- new_codes = regen_resp.json()["backup_codes"]
- assert len(new_codes) == 10
- assert old_backup not in new_codes
- # Old backup code must now fail
- pre_auth_token = await _login_get_pre_auth_token(async_client, "regeninval", "regeninval1")
- fail_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth_token, "method": "backup", "code": old_backup},
- )
- assert fail_resp.status_code == 401
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_with_invalid_code_fails(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "regeninvcode", "regeninvcode1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 400
- # ===========================================================================
- # Security: method field validation
- # ===========================================================================
- class TestVerifyMethodValidation:
- """The method field must be one of totp/email/backup (Pydantic Literal)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_invalid_method_rejected_by_schema(self, async_client: AsyncClient):
- """Pydantic should reject unknown method values with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "123456", "method": "sms"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oversized_pre_auth_token_rejected(self, async_client: AsyncClient):
- """pre_auth_token exceeding max_length=128 should be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 200, "code": "123456", "method": "totp"},
- )
- assert resp.status_code == 422
- # ===========================================================================
- # Login response shape for 2FA users
- # ===========================================================================
- class TestLoginResponseShape:
- """Login for a 2FA-enabled user must return requires_2fa+pre_auth_token
- and must NOT include access_token (which would bypass the 2FA gate)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_login_2fa_user_omits_access_token(self, async_client: AsyncClient):
- """A user with TOTP enabled must not receive an access_token on /auth/login."""
- token = await _setup_and_login(async_client, "loginshape", "loginshape1")
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- login_resp = await async_client.post(LOGIN_URL, json={"username": "loginshape", "password": "Loginshape1!"})
- assert login_resp.status_code == 200
- data = login_resp.json()
- assert data.get("requires_2fa") is True
- assert data.get("pre_auth_token") is not None
- # access_token must NOT be present — it would bypass the 2FA gate
- assert "access_token" not in data or data["access_token"] is None
- # ===========================================================================
- # TOTP replay protection
- # ===========================================================================
- async def _setup_totp_user(client: AsyncClient, username: str, password: str) -> tuple[str, str]:
- """Create user, set up and enable TOTP; return (bearer_token, totp_secret)."""
- token = await _setup_and_login(client, username, password)
- setup_resp = await client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- return token, secret
- class TestTOTPReplay:
- """The same TOTP code must not be accepted twice within one 30-second window."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_totp_replay_rejected_on_verify(self, async_client: AsyncClient):
- """Replaying the same code on /2fa/verify must return 400."""
- _token, secret = await _setup_totp_user(async_client, "replayverify", "replayverify1")
- code = pyotp.TOTP(secret).now()
- pre_auth = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
- first = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
- )
- assert first.status_code == 200
- # Second login to get a fresh pre_auth_token (first was consumed)
- pre_auth2 = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
- second = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth2, "method": "totp", "code": code},
- )
- assert second.status_code == 400
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_totp_replay_rejected_on_disable(self, async_client: AsyncClient):
- """A code already used in verify_2fa must be rejected on /2fa/totp/disable."""
- _setup_token, secret = await _setup_totp_user(async_client, "replaydisable", "replaydisable1")
- code = pyotp.TOTP(secret).now()
- # Use the code in verify_2fa — this sets last_totp_counter in DB
- pre_auth = await _login_get_pre_auth_token(async_client, "replaydisable", "replaydisable1")
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
- )
- assert verify_resp.status_code == 200
- authed_token = verify_resp.json()["access_token"]
- # Replay the same code on disable — must be rejected (same 30-second window)
- disable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": code},
- headers=_auth_header(authed_token),
- )
- assert disable_resp.status_code == 400
- # ===========================================================================
- # Rate limiting on disable_totp and regenerate_backup_codes (I10)
- # ===========================================================================
- class TestRateLimitingDisableRegenerate:
- """disable_totp and regenerate_backup_codes must enforce rate limiting (I10)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_disable_totp_rate_limited_after_failures(self, async_client: AsyncClient):
- """Repeated wrong codes on /2fa/totp/disable trigger 429."""
- token, _secret = await _setup_totp_user(async_client, "rldisable", "rldisable1")
- for _ in range(5):
- await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/disable",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 429
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_regenerate_backup_codes_rate_limited_after_failures(self, async_client: AsyncClient):
- """Repeated wrong codes on /2fa/totp/regenerate-backup-codes trigger 429."""
- token, _secret = await _setup_totp_user(async_client, "rlregen", "rlregen1")
- for _ in range(5):
- await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/regenerate-backup-codes",
- json={"code": "000000"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 429
- # ===========================================================================
- # Email OTP send → verify end-to-end (coverage gap C3)
- # ===========================================================================
- class TestEmailOTPSendVerify:
- """Full email OTP login: send code → verify code → JWT."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_otp_send_and_verify(self, async_client: AsyncClient, db_session: AsyncSession):
- """login → POST /2fa/email/send (patched SMTP) → POST /2fa/verify → JWT."""
- import re
- from unittest.mock import AsyncMock, MagicMock, patch
- from sqlalchemy import select as sa_select
- token = await _setup_and_login(async_client, "emailsendok", "emailsendok1")
- # Give the user an email address
- result = await db_session.execute(sa_select(User).where(User.username == "emailsendok"))
- user = result.scalar_one()
- user.email = "emailsendok@example.com"
- await db_session.commit()
- # Enable email OTP via DB injection
- setup_code = "444444"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="emailsendok",
- nonce=_pwd_context.hash(setup_code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": setup_code},
- headers=_auth_header(token),
- )
- # Login now requires 2FA — get pre_auth_token (cookie set automatically)
- pre_auth_token = await _login_get_pre_auth_token(async_client, "emailsendok", "emailsendok1")
- # Mock SMTP and capture the sent OTP code
- captured: dict[str, str] = {}
- smtp_settings_mock = MagicMock()
- def _capture_email(smtp_settings, to_email, subject, body_text, body_html):
- m = re.search(r"login code is: (\d{6})", body_text)
- if m:
- captured["otp"] = m.group(1)
- with (
- patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_settings_mock)),
- patch("backend.app.api.routes.mfa.send_email", side_effect=_capture_email),
- ):
- send_resp = await async_client.post(
- "/api/v1/auth/2fa/email/send",
- json={"pre_auth_token": pre_auth_token},
- )
- assert send_resp.status_code == 200, send_resp.text
- fresh_token = send_resp.json()["pre_auth_token"]
- assert "otp" in captured, "send_email was not called or code not found in body"
- # Verify with the captured OTP code — cookie still in the async_client jar
- verify_resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
- )
- assert verify_resp.status_code == 200
- data = verify_resp.json()
- assert "access_token" in data
- assert data["user"]["username"] == "emailsendok"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_email_otp_wrong_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """A wrong email OTP code must return 401 without burning the pre_auth_token."""
- from unittest.mock import AsyncMock, MagicMock, patch
- from sqlalchemy import select as sa_select
- token = await _setup_and_login(async_client, "emailwrongcode", "emailwrongcode1")
- result = await db_session.execute(sa_select(User).where(User.username == "emailwrongcode"))
- user = result.scalar_one()
- user.email = "emailwrongcode@example.com"
- await db_session.commit()
- setup_code = "555555"
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="emailwrongcode",
- nonce=_pwd_context.hash(setup_code),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": setup_code},
- headers=_auth_header(token),
- )
- pre_auth_token = await _login_get_pre_auth_token(async_client, "emailwrongcode", "emailwrongcode1")
- captured: dict[str, str] = {}
- smtp_mock = MagicMock()
- def _capture(smtp_settings, to_email, subject, body_text, body_html):
- import re
- m = re.search(r"login code is: (\d{6})", body_text)
- if m:
- captured["otp"] = m.group(1)
- with (
- patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_mock)),
- patch("backend.app.api.routes.mfa.send_email", side_effect=_capture),
- ):
- send_resp = await async_client.post(
- "/api/v1/auth/2fa/email/send",
- json={"pre_auth_token": pre_auth_token},
- )
- assert send_resp.status_code == 200
- fresh_token = send_resp.json()["pre_auth_token"]
- # Wrong code → 401
- bad = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": "000000"},
- )
- assert bad.status_code == 401
- # Correct code still works (token not burned by wrong attempt)
- good = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
- )
- assert good.status_code == 200
- # ===========================================================================
- # OIDC end-to-end (coverage gap C4)
- # ===========================================================================
- def _make_test_rsa_key():
- """Generate a throwaway RSA key pair and a matching JWK set for tests."""
- import base64
- from cryptography.hazmat.primitives import serialization
- from cryptography.hazmat.primitives.asymmetric import rsa
- private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
- private_pem = private_key.private_bytes(
- serialization.Encoding.PEM,
- serialization.PrivateFormat.TraditionalOpenSSL,
- serialization.NoEncryption(),
- )
- pub_numbers = private_key.public_key().public_numbers()
- def _b64url(n: int, length: int) -> str:
- return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
- jwks = {
- "keys": [
- {
- "kty": "RSA",
- "use": "sig",
- "alg": "RS256",
- "kid": "test-kid-1",
- "n": _b64url(pub_numbers.n, 256),
- "e": _b64url(pub_numbers.e, 3),
- }
- ]
- }
- return private_pem, jwks
- class TestOIDCEndToEnd:
- """Full OIDC auth-code flow: state → callback (mocked IdP) → exchange → JWT."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_callback_creates_user_and_issues_jwt(self, async_client: AsyncClient, db_session: AsyncSession):
- """callback validates the mocked ID token, creates a user, and redirects
- with an oidc_exchange token; exchanging that token returns a full JWT."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://idp.test.example.com"
- client_id = "oidc-test-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "oidc-sub-e2e",
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "oidce2e@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- # Create OIDC provider
- admin_token = await _setup_and_login(async_client, "oidce2eadm", "oidce2eadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "E2E-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "test-secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- # Simulate the authorize step: insert an oidc_state token directly
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Mock httpx calls made inside oidc_callback
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- token_response = {
- "access_token": "mock-access",
- "token_type": "Bearer",
- "id_token": id_token,
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *args, **kwargs):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *args):
- pass
- async def get(self, url, **kwargs):
- if "jwks" in url:
- return _MockResp(jwks_data)
- return _MockResp(discovery_doc)
- async def post(self, url, **kwargs):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- callback_resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=test-auth-code&state={state}",
- follow_redirects=False,
- )
- assert callback_resp.status_code == 302, callback_resp.text
- location = callback_resp.headers.get("location", "")
- assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
- # Extract and exchange the oidc_exchange token
- oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0]
- exchange_resp = await async_client.post(
- "/api/v1/auth/oidc/exchange",
- json={"oidc_token": oidc_exchange_token},
- )
- assert exchange_resp.status_code == 200
- data = exchange_resp.json()
- assert "access_token" in data
- assert data["user"]["username"] is not None
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_callback_invalid_state_redirects_error(self, async_client: AsyncClient):
- """An unknown state token must redirect to /?oidc_error=invalid_state."""
- resp = await async_client.get(
- "/api/v1/auth/oidc/callback?code=x&state=totally-bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- assert "invalid_state" in resp.headers.get("location", "")
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_oidc_state_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
- """Replaying the same state token must fail on the second callback."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://idp2.test.example.com"
- client_id = "oidc-client-2"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-single-use",
- "iss": issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "su@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "oidcsuadm", "oidcsuadm1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "SU-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "authorization_endpoint": f"{issuer}/auth",
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- token_response = {"access_token": "a", "token_type": "Bearer", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- first = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert first.status_code == 302
- assert "oidc_token=" in first.headers.get("location", "")
- # Replay: second callback with the same state must fail
- second = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert second.status_code == 302
- assert "invalid_state" in second.headers.get("location", "")
- # ===========================================================================
- # H-2: Wrong code must NOT consume the email OTP setup token (peek-then-consume)
- # ===========================================================================
- class TestEmailOTPSetupTokenPreservedOnWrongCode:
- """After H-2 fix: a wrong code leaves the setup token intact so the user can retry."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_wrong_code_does_not_consume_setup_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """Wrong code returns 400 but the setup token survives; correct code then works."""
- token = await _setup_and_login(async_client, "h2retryuser", "h2retrypass1")
- code = "999999"
- code_hash = _pwd_context.hash(code)
- setup_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=setup_token,
- token_type="email_otp_setup",
- username="h2retryuser",
- nonce=code_hash,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # First attempt: wrong code → 400
- wrong = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": "000000"},
- headers=_auth_header(token),
- )
- assert wrong.status_code == 400
- # Second attempt: correct code → must succeed (token was NOT consumed)
- correct = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": setup_token, "code": code},
- headers=_auth_header(token),
- )
- assert correct.status_code == 200
- # ===========================================================================
- # M-2: New OIDC provider must default to auto_link_existing_accounts=False
- # ===========================================================================
- class TestOIDCProviderAutoLinkDefault:
- """auto_link_existing_accounts must default to False (M-2 fix)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_new_provider_auto_link_defaults_to_false(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "m2autolinkadmin", "m2autolinkadmin1")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "AutoLinkTest",
- "issuer_url": "https://autolink.example.com",
- "client_id": "alc",
- "client_secret": "als",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": False,
- # auto_link_existing_accounts intentionally omitted
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 201
- assert resp.json()["auto_link_existing_accounts"] is False
- # ===========================================================================
- # L-5: 2FA verify code format validation
- # ===========================================================================
- class TestTwoFAVerifyCodeFormat:
- """TwoFAVerifyRequest.code must be 6–8 alphanumeric characters (L-5)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_too_long_rejected(self, async_client: AsyncClient):
- """code > 8 characters must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "1" * 9, "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_non_alphanumeric_rejected(self, async_client: AsyncClient):
- """code containing non-alphanumeric chars must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "12-456", "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_too_short_rejected(self, async_client: AsyncClient):
- """code < 6 characters must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "anytoken", "code": "12345", "method": "totp"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_exactly_6_passes_schema(self, async_client: AsyncClient):
- """6-character alphanumeric code passes schema (may fail 2FA logic with 400)."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 32, "code": "123456", "method": "totp"},
- )
- assert resp.status_code != 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_exactly_8_passes_schema(self, async_client: AsyncClient):
- """8-character alphanumeric backup code passes schema."""
- resp = await async_client.post(
- "/api/v1/auth/2fa/verify",
- json={"pre_auth_token": "x" * 32, "code": "ABCD1234", "method": "backup"},
- )
- assert resp.status_code != 422
- # ===========================================================================
- # M-NEW-1: verify_slicer_download_token must NOT consume token on wrong resource
- # ===========================================================================
- class TestSlicerTokenResourceBinding:
- """Token for resource A must survive a wrong-resource check and still work for A."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_wrong_resource_does_not_consume_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """A slicer token bound to archive:5 must NOT be consumed when checked against archive:6."""
- from datetime import datetime, timedelta, timezone
- from backend.app.core.auth import verify_slicer_download_token
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- now = datetime.now(timezone.utc)
- token_val = secrets.token_urlsafe(24)
- db_session.add(
- AuthEphemeralToken(
- token=token_val,
- token_type="slicer_download",
- nonce="archive:5",
- expires_at=now + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Wrong resource → must return False and NOT consume the token
- wrong = await verify_slicer_download_token(token_val, "archive", 6)
- assert wrong is False
- # Correct resource → must return True (token survived the wrong-resource check)
- correct = await verify_slicer_download_token(token_val, "archive", 5)
- assert correct is True
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_correct_resource_consumes_token(self, async_client: AsyncClient, db_session: AsyncSession):
- """A slicer token is single-use: second correct-resource check must return False."""
- from datetime import datetime, timedelta, timezone
- from backend.app.core.auth import verify_slicer_download_token
- from backend.app.models.auth_ephemeral import AuthEphemeralToken
- now = datetime.now(timezone.utc)
- token_val = secrets.token_urlsafe(24)
- db_session.add(
- AuthEphemeralToken(
- token=token_val,
- token_type="slicer_download",
- nonce="library:99",
- expires_at=now + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- first = await verify_slicer_download_token(token_val, "library", 99)
- assert first is True
- second = await verify_slicer_download_token(token_val, "library", 99)
- assert second is False
- # ===========================================================================
- # M-NEW-3 / L-NEW-1: Schema length validation for change-password & forgot-password
- # ===========================================================================
- class TestSchemaLengthValidationR2:
- """Input length limits added in review round 2."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_change_password_current_too_long_rejected(self, async_client: AsyncClient):
- """current_password > 256 chars must be rejected with 422 (prevents pbkdf2 DoS)."""
- resp = await async_client.post(
- "/api/v1/users/me/change-password",
- json={"current_password": "x" * 257, "new_password": "ValidPass1!"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_forgot_password_email_too_long_rejected(self, async_client: AsyncClient):
- """email > 254 chars must be rejected with 422."""
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": "a" * 243 + "@example.com"},
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_forgot_password_email_at_limit_passes_schema(self, async_client: AsyncClient):
- """Short email passes schema (may return 400/200 from business logic)."""
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": "user@example.com"},
- )
- assert resp.status_code != 422
- # ===========================================================================
- # L-NEW-2: TOTPSetupRequest.code max_length
- # ===========================================================================
- class TestTOTPSetupCodeMaxLength:
- """TOTPSetupRequest.code must be bounded (L-NEW-2)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_setup_code_too_long_rejected(self, async_client: AsyncClient):
- """code > 8 chars must be rejected with 422."""
- import pyotp as _pyotp
- token = await _setup_and_login(async_client, "totp_setup_maxlen", "totp_setup_maxlen1")
- # Enable TOTP so the setup-code guard path is active
- setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
- secret = setup_resp.json()["secret"]
- await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": _pyotp.TOTP(secret).now()},
- headers=_auth_header(token),
- )
- resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- json={"code": "1" * 9},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- # ===========================================================================
- # L-NEW-3: EmailOTPEnableConfirmRequest.code must be exactly 6 digits
- # ===========================================================================
- class TestEmailOTPConfirmCodeFormat:
- """EmailOTPEnableConfirmRequest.code must be 6 digits (L-NEW-3)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_non_digit_code_rejected(self, async_client: AsyncClient):
- """Alpha characters in the email OTP confirm code must be rejected with 422."""
- token = await _setup_and_login(async_client, "emailotpfmt", "emailotpfmt1")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "ABCDEF"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_seven_digit_code_rejected(self, async_client: AsyncClient):
- """7-digit code must be rejected with 422 (min_length=max_length=6)."""
- token = await _setup_and_login(async_client, "emailotplen7", "emailotplen7x")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "1234567"},
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_valid_six_digit_code_passes_schema(self, async_client: AsyncClient):
- """6-digit numeric code passes schema (may return 400 on bad token — that's fine)."""
- token = await _setup_and_login(async_client, "emailotpfmt6", "emailotpfmt6x")
- resp = await async_client.post(
- "/api/v1/auth/2fa/email/enable/confirm",
- json={"setup_token": "x" * 32, "code": "123456"},
- headers=_auth_header(token),
- )
- assert resp.status_code != 422
- # ===========================================================================
- # L-NEW-4: OIDCProviderCreate field max_length constraints
- # ===========================================================================
- class TestOIDCProviderFieldLengths:
- """OIDCProviderCreate fields must reject inputs exceeding max_length (L-NEW-4)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_name_too_long_rejected(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcfldadmin", "oidcfldadmin1")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "n" * 101,
- "issuer_url": "https://test.example.com",
- "client_id": "cid",
- "client_secret": "csec",
- "scopes": "openid",
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_client_secret_too_long_rejected(self, async_client: AsyncClient):
- token = await _setup_and_login(async_client, "oidcseclen", "oidcseclen123")
- resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "ValidName",
- "issuer_url": "https://test.example.com",
- "client_id": "cid",
- "client_secret": "s" * 513,
- "scopes": "openid",
- },
- headers=_auth_header(token),
- )
- assert resp.status_code == 422
- # ---------------------------------------------------------------------------
- # M-NEW-4 / M-NEW-5 / L-NEW-5: UserCreate & UserUpdate field length limits
- # ---------------------------------------------------------------------------
- class TestUserCreateUpdateFieldLengths:
- """UserCreate and UserUpdate must enforce max_length on username, password, email."""
- @pytest.fixture
- async def admin_token(self, async_client: AsyncClient) -> str:
- return await _setup_and_login(async_client, "ucfldadmin", "ucfldadmin1!")
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "u" * 151,
- "password": "ValidPass1!",
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserX",
- "password": "A1!" + "x" * 254,
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserY",
- "password": "ValidPass1!",
- "email": "a" * 246 + "@x.com", # total 253 chars -> fine; 248+@x.com=255 -> too long
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- # 248 'a' + '@x.com' (6) = 254 chars — just at limit, should pass
- # Use 249 + '@x.com' = 255 chars to trigger the 422
- assert resp.status_code in (201, 422) # boundary sanity check
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_create_email_exceeds_limit_rejected(self, async_client: AsyncClient, admin_token: str):
- resp = await async_client.post(
- "/api/v1/users/",
- json={
- "username": "newuserZ",
- "password": "ValidPass1!",
- "email": "a" * 249 + "@x.com", # 255 chars — exceeds RFC 5321 max of 254
- "role": "user",
- },
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- # Create a user first
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr1", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"username": "u" * 151},
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr2", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"password": "A1!" + "x" * 254},
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_update_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
- create_resp = await async_client.post(
- "/api/v1/users/",
- json={"username": "updusr3", "password": "ValidPass1!", "role": "user"},
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- user_id = create_resp.json()["id"]
- resp = await async_client.patch(
- f"/api/v1/users/{user_id}",
- json={"email": "a" * 249 + "@x.com"}, # 255 chars
- headers=_auth_header(admin_token),
- )
- assert resp.status_code == 422
- # ---------------------------------------------------------------------------
- # L-NEW-6: per-IP rate limiting on /forgot-password
- # ---------------------------------------------------------------------------
- _SMTP_DATA_FOR_IPLIMIT = {
- "smtp_host": "smtp.test.com",
- "smtp_port": 587,
- "smtp_username": "test@test.com",
- "smtp_password": "testpass",
- "smtp_security": "starttls",
- "smtp_auth_enabled": True,
- "smtp_from_email": "noreply@test.com",
- }
- class TestForgotPasswordPerIpRateLimit:
- """POST /forgot-password must enforce a per-IP cap (L-NEW-6).
- The test sends 11 requests from the simulated test-client IP using 11
- different email addresses (so the per-email bucket is never exhausted).
- The 11th request must be rejected with 429.
- """
- @pytest.fixture
- async def advanced_auth_token(self, async_client: AsyncClient) -> str:
- """Set up auth, SMTP, and enable advanced auth; return admin token."""
- token = await _setup_and_login(async_client, "iprladmin", "iprladmin1!")
- headers = _auth_header(token)
- await async_client.post("/api/v1/auth/smtp", headers=headers, json=_SMTP_DATA_FOR_IPLIMIT)
- await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
- return token
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_per_ip_limit_triggers_429(self, async_client: AsyncClient, advanced_auth_token: str):
- # Send 11 requests from the same test-client IP using unique email
- # addresses so the per-email bucket (limit=3) is never exhausted.
- responses = []
- for i in range(11):
- resp = await async_client.post(
- "/api/v1/auth/forgot-password",
- json={"email": f"unique{i}@example.com"},
- )
- responses.append(resp.status_code)
- # First 10 must not be rate-limited by the IP bucket
- for code in responses[:10]:
- assert code != 429, f"Unexpected 429 before limit reached: {responses}"
- # The 11th must be rate-limited
- assert responses[10] == 429, f"Expected 429 on 11th request, got {responses[10]}"
- # ---------------------------------------------------------------------------
- # M-NEW-6: OIDC auto-link must be rejected if target user already has an
- # OIDC link to a different provider
- # ---------------------------------------------------------------------------
- class TestOIDCAutoLinkExistingLinkRejection:
- """OIDC callback must reject auto-linking when the email-matched user
- already has an OIDC link to a different provider (M-NEW-6)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_auto_link_rejected_when_user_already_linked(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """Auto-link via email-match is rejected when the target user is
- already linked to another OIDC provider."""
- import base64
- import hashlib
- from unittest.mock import AsyncMock, MagicMock, patch
- from backend.app.core.auth import get_password_hash
- from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
- from backend.app.models.user import User
- # ── 1. Target user with a known email ────────────────────────────
- target = User(
- username="oidcALTarget",
- email="alinktest@example.com",
- auth_source="oidc",
- password_hash=get_password_hash(secrets.token_urlsafe(16)),
- role="user",
- is_active=True,
- )
- db_session.add(target)
- await db_session.flush()
- # ── 2. Provider B — legitimate, already linked to target ──────────
- prov_b = OIDCProvider(
- name="ProvB_m6test",
- issuer_url="https://providerb-m6.example.com",
- client_id="client_b",
- _client_secret_enc="secret_b",
- scopes="openid email profile",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(prov_b)
- await db_session.flush()
- db_session.add(
- UserOIDCLink(
- user_id=target.id,
- provider_id=prov_b.id,
- provider_user_id="legitimate_sub",
- provider_email="alinktest@example.com",
- )
- )
- # ── 3. Provider A — attacker-controlled, auto_link=True ───────────
- prov_a = OIDCProvider(
- name="ProvA_m6test",
- issuer_url="https://providera-m6.example.com",
- client_id="client_a",
- _client_secret_enc="secret_a",
- scopes="openid email profile",
- is_enabled=True,
- auto_link_existing_accounts=True,
- auto_create_users=False,
- )
- db_session.add(prov_a)
- await db_session.flush()
- # ── 4. OIDC state for Provider A ──────────────────────────────────
- state = secrets.token_urlsafe(32)
- nonce = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=prov_a.id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # ── 5. Mock HTTP + JWT so the callback can reach the auto-link check ─
- fake_discovery = {
- "issuer": "https://providera-m6.example.com",
- "token_endpoint": "https://providera-m6.example.com/token",
- "jwks_uri": "https://providera-m6.example.com/jwks",
- }
- fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
- fake_claims = {
- "sub": "attacker_sub_unique",
- "email": "alinktest@example.com",
- "email_verified": True,
- "nonce": nonce,
- "iss": "https://providera-m6.example.com",
- "aud": "client_a",
- "exp": 9_999_999_999,
- }
- disc_resp = AsyncMock()
- disc_resp.raise_for_status = MagicMock()
- disc_resp.json = MagicMock(return_value=fake_discovery)
- token_resp = AsyncMock()
- token_resp.ok = True
- token_resp.json = MagicMock(return_value=fake_token)
- jwks_resp = AsyncMock()
- jwks_resp.raise_for_status = MagicMock()
- jwks_resp.json = MagicMock(return_value={})
- mock_http = AsyncMock()
- mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
- mock_http.post = AsyncMock(return_value=token_resp)
- mock_signing_key = MagicMock()
- mock_signing_key.key = "fake_key"
- with (
- patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
- patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
- patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
- ):
- mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
- mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
- mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
- follow_redirects=False,
- )
- # M-NEW-6: must redirect with no_linked_account — NOT create a second link
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "no_linked_account" in location, f"Expected no_linked_account in redirect, got: {location}"
- # Verify no second OIDC link was created for Provider A
- from sqlalchemy import select as sa_select
- from backend.app.models.oidc_provider import UserOIDCLink as _UOL
- async with db_session as s:
- links_result = await s.execute(
- sa_select(_UOL).where(_UOL.user_id == target.id, _UOL.provider_id == prov_a.id)
- )
- assert links_result.scalar_one_or_none() is None, "No link to Provider A must exist"
- # ===========================================================================
- # Test Gap 1: OIDC state token is single-use — replay must be rejected
- # ===========================================================================
- class TestOIDCStateReplay:
- """OIDC state token must be consumed on first use; a second callback with
- the same state must redirect to ``?oidc_error=invalid_state``."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_replay_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """Replaying a consumed OIDC state token must return invalid_state."""
- from backend.app.models.oidc_provider import OIDCProvider
- # ── 1. Seed a minimal provider ────────────────────────────────────
- provider = OIDCProvider(
- name="StateReplayIdP",
- issuer_url="https://statereplay-idp.example.com",
- client_id="client_replay",
- _client_secret_enc="secret_replay",
- scopes="openid",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(provider)
- await db_session.flush()
- # ── 2. Seed an OIDC state token ───────────────────────────────────
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider.id,
- nonce=secrets.token_urlsafe(32),
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
- )
- )
- await db_session.commit()
- # ── 3. First callback — discovery will fail (no real IdP), but the
- # state token is atomically consumed (DELETE…RETURNING + commit)
- # before the HTTP call is attempted.
- first = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert first.status_code == 302
- # The first call may fail for any reason except invalid_state
- assert "invalid_state" not in first.headers.get("location", ""), (
- f"First call should NOT get invalid_state: {first.headers.get('location')}"
- )
- # ── 4. Second callback with the same state → must be invalid_state ─
- second = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert second.status_code == 302
- assert "invalid_state" in second.headers.get("location", ""), (
- f"Replayed state must redirect to invalid_state, got: {second.headers.get('location')}"
- )
- # ===========================================================================
- # Test Gap 2: OIDC iss claim mismatch must redirect to token_validation_failed
- # ===========================================================================
- class TestOIDCIssMismatch:
- """JWT whose iss claim does not match the discovery issuer must be rejected."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_iss_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- correct_issuer = "https://correct-iss.example.com"
- wrong_issuer = "https://wrong-iss.example.com"
- client_id = "iss-mismatch-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- # Sign the token with the WRONG issuer (iss != discovery_issuer)
- id_token = pyjwt.encode(
- {
- "sub": "sub-iss-test",
- "iss": wrong_issuer,
- "aud": client_id,
- "nonce": nonce,
- "email": "iss@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "issadmin1", "issadmin1!")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "IssTest-IdP",
- "issuer_url": correct_issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- # Discovery returns the CORRECT issuer; JWT carries the WRONG one.
- discovery_doc = {
- "issuer": correct_issuer,
- "token_endpoint": f"{correct_issuer}/token",
- "jwks_uri": f"{correct_issuer}/.well-known/jwks.json",
- }
- token_response = {"access_token": "a", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "token_validation_failed" in location, f"Expected token_validation_failed, got: {location}"
- # ===========================================================================
- # Test Gap 3: /forgot-password/confirm token is single-use
- # ===========================================================================
- class TestForgotPasswordTokenSingleUse:
- """POST /forgot-password/confirm must reject a token after its first use."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_token_reuse_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- from backend.app.core.auth import get_password_hash
- from backend.app.models.user import User as _User
- user = _User(
- username="fpcuser1",
- email="fpc@example.com",
- password_hash=get_password_hash("OldPass1!"),
- role="user",
- is_active=True,
- )
- db_session.add(user)
- await db_session.flush()
- reset_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=reset_token,
- token_type="password_reset",
- username="fpcuser1",
- expires_at=datetime.now(timezone.utc) + timedelta(hours=1),
- )
- )
- await db_session.commit()
- # First use → success
- resp1 = await async_client.post(
- "/api/v1/auth/forgot-password/confirm",
- json={"token": reset_token, "new_password": "NewPass1!"},
- )
- assert resp1.status_code == 200, resp1.text
- # Second use → token already consumed, must fail
- resp2 = await async_client.post(
- "/api/v1/auth/forgot-password/confirm",
- json={"token": reset_token, "new_password": "AnotherNew1!"},
- )
- assert resp2.status_code == 400
- # ===========================================================================
- # C1 regression: setup_totp must reject a replayed TOTP code
- # ===========================================================================
- class TestSetupTOTPReplayRejected:
- """setup_totp must reject a TOTP code that was already accepted in its window."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_replayed_setup_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- from sqlalchemy import select as sa_select
- from backend.app.models.user_totp import UserTOTP
- token = await _setup_and_login(async_client, "setupreplay1", "setupreplay1!")
- # Step 1: Initial TOTP setup (no active TOTP yet → no code required)
- setup_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- headers=_auth_header(token),
- )
- assert setup_resp.status_code == 200
- secret = setup_resp.json()["secret"]
- # Step 2: Enable TOTP with a valid code
- totp_obj = pyotp.TOTP(secret)
- enable_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/enable",
- json={"code": totp_obj.now()},
- headers=_auth_header(token),
- )
- assert enable_resp.status_code == 200 # TOTP is now active (is_enabled=True)
- # Step 3: Determine current valid code and its counter
- me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
- user_id = me_resp.json()["id"]
- totp_result = await db_session.execute(sa_select(UserTOTP).where(UserTOTP.user_id == user_id))
- totp_record = totp_result.scalar_one()
- secret_now = totp_record.secret # decrypted via property
- totp_now = pyotp.TOTP(secret_now)
- valid_code = totp_now.now()
- accepted_counter = totp_now.timecode(datetime.now(timezone.utc))
- # Step 4: Pre-set last_totp_counter so this code looks already used
- totp_record.last_totp_counter = accepted_counter
- await db_session.commit()
- # Step 5: Attempt setup_totp with the "already used" code → must be rejected
- replay_resp = await async_client.post(
- "/api/v1/auth/2fa/totp/setup",
- json={"code": valid_code},
- headers=_auth_header(token),
- )
- assert replay_resp.status_code == 400
- assert "already used" in replay_resp.json()["detail"]
- # ===========================================================================
- # Nit8: OIDC aud mismatch and nonce mismatch tests
- # ===========================================================================
- class TestOIDCAudAndNonceMismatch:
- """Nit8: aud != client_id and nonce != stored value must each fail the callback."""
- def _make_oidc_provider_setup(self):
- """Return a helper for building OIDC test fixtures inline."""
- private_pem, jwks_data = _make_test_rsa_key()
- return private_pem, jwks_data
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_aud_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """ID token with aud != client_id must be rejected (PyJWT InvalidAudienceError)."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://aud-mismatch.example.com"
- client_id = "aud-test-client"
- wrong_aud = "some-other-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-aud-test",
- "iss": issuer,
- "aud": wrong_aud, # <-- wrong audience
- "nonce": nonce,
- "email": "aud@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "audmismatch_admin", "AudMismatch_admin1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "AudMismatch-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp({"access_token": "a", "id_token": id_token})
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "token_validation_failed" in location, (
- f"Expected token_validation_failed redirect for aud mismatch, got: {location}"
- )
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_nonce_mismatch_redirects_token_validation_failed(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """ID token with nonce != stored state nonce must be rejected."""
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer = "https://nonce-mismatch.example.com"
- client_id = "nonce-test-client"
- stored_nonce = secrets.token_urlsafe(16)
- wrong_nonce = secrets.token_urlsafe(16) # different from stored_nonce
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "sub-nonce-test",
- "iss": issuer,
- "aud": client_id,
- "nonce": wrong_nonce, # <-- does not match stored_nonce
- "email": "nonce@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "noncemismatch_admin", "NonceMismatch_admin1")
- cr = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "NonceMismatch-IdP",
- "issuer_url": issuer,
- "client_id": client_id,
- "client_secret": "s",
- "scopes": "openid",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert cr.status_code in (200, 201), cr.text
- provider_id = cr.json()["id"]
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=stored_nonce, # state has correct nonce; JWT carries wrong_nonce
- code_verifier=secrets.token_urlsafe(48),
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer,
- "token_endpoint": f"{issuer}/token",
- "jwks_uri": f"{issuer}/.well-known/jwks.json",
- }
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.status_code = 200
- self.is_success = True
- self.text = ""
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp({"access_token": "a", "id_token": id_token})
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=c&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- # The callback redirects to ?oidc_error=nonce_mismatch when nonces differ.
- assert "nonce_mismatch" in location, f"Expected nonce_mismatch redirect for nonce mismatch, got: {location}"
- # ===========================================================================
- # Expired OIDC token rejection — state and exchange tokens
- # ===========================================================================
- class TestOIDCExpiredTokenRejection:
- """Expired OIDC state and exchange tokens must be rejected atomically.
- The DELETE … WHERE expires_at > now must ensure that an already-expired
- token is never consumed (committed) before the expiry is checked, so the
- token row stays in the DB and is not silently discarded.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_expired_state_token_rejected_as_invalid_state(
- self, async_client: AsyncClient, db_session: AsyncSession
- ):
- """An expired OIDC state token must redirect to invalid_state without
- being consumed — it must still exist in the DB after the rejected call."""
- from backend.app.models.oidc_provider import OIDCProvider
- provider = OIDCProvider(
- name="ExpiredStateIdP",
- issuer_url="https://expired-state.example.com",
- client_id="client_expired_state",
- _client_secret_enc="secret_exp_state",
- scopes="openid",
- is_enabled=True,
- auto_link_existing_accounts=False,
- auto_create_users=False,
- )
- db_session.add(provider)
- await db_session.flush()
- state = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider.id,
- nonce=secrets.token_urlsafe(16),
- code_verifier=secrets.token_urlsafe(48),
- # already expired
- expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
- )
- )
- await db_session.commit()
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 302
- location = resp.headers.get("location", "")
- assert "invalid_state" in location, f"Expected invalid_state redirect for expired state, got: {location}"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_expired_exchange_token_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
- """An expired OIDC exchange token must return 401 without being consumed."""
- from sqlalchemy import select as sa_select
- expired_token = secrets.token_urlsafe(32)
- db_session.add(
- AuthEphemeralToken(
- token=expired_token,
- token_type="oidc_exchange",
- username="some_user",
- # already expired
- expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
- )
- )
- await db_session.commit()
- resp = await async_client.post(
- "/api/v1/auth/oidc/exchange",
- json={"oidc_token": expired_token},
- )
- assert resp.status_code == 401
- assert "expired" in resp.json().get("detail", "").lower() or "invalid" in resp.json().get("detail", "").lower()
- # Token must NOT have been consumed — it should still be in the DB
- # (the atomic DELETE WHERE expires_at > now left it untouched)
- result = await db_session.execute(
- sa_select(AuthEphemeralToken).where(AuthEphemeralToken.token == expired_token)
- )
- remaining = result.scalar_one_or_none()
- assert remaining is not None, "Expired exchange token must not be consumed by a rejected request"
- # ===========================================================================
- # Trailing slash in issuer_url — discovery URL must not contain double slash
- # ===========================================================================
- class TestOIDCIssuerUrlTrailingSlash:
- """Providers like Authentik use issuer URLs with a trailing slash.
- BamBuddy must strip the slash before appending /.well-known/openid-configuration
- to avoid a double-slash that results in a 404.
- """
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_trailing_slash_issuer_url_fetches_correct_discovery_url(self, async_client: AsyncClient):
- from unittest.mock import AsyncMock, MagicMock, patch
- issuer_with_slash = "https://authentik.example.com/application/o/bambuddy/"
- admin_token = await _setup_and_login(async_client, "oidcslashadm", "oidcslashadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "Authentik-Slash",
- "issuer_url": issuer_with_slash,
- "client_id": "bambuddy",
- "client_secret": "secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": False,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- fake_discovery = {
- "issuer": issuer_with_slash,
- "authorization_endpoint": "https://authentik.example.com/application/o/bambuddy/authorize",
- }
- disc_resp = AsyncMock()
- disc_resp.raise_for_status = MagicMock()
- disc_resp.json = MagicMock(return_value=fake_discovery)
- mock_http = AsyncMock()
- mock_http.get = AsyncMock(return_value=disc_resp)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_cls:
- mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
- mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
- resp = await async_client.get(f"/api/v1/auth/oidc/authorize/{provider_id}")
- assert resp.status_code == 200
- called_url = mock_http.get.call_args_list[0][0][0]
- assert "//" not in called_url.replace("https://", ""), (
- f"Discovery URL must not contain double slash: {called_url}"
- )
- assert called_url.endswith("/.well-known/openid-configuration"), (
- f"Expected discovery URL to end with /.well-known/openid-configuration, got: {called_url}"
- )
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_iss_claim_trailing_slash_accepted(self, async_client: AsyncClient, db_session: AsyncSession):
- """Provider configured without trailing slash, Authentik JWT iss has trailing slash.
- Both sides must be normalised before comparison so the login succeeds.
- """
- import time
- from unittest.mock import patch
- import jwt as pyjwt
- private_pem, jwks_data = _make_test_rsa_key()
- issuer_no_slash = "https://authentik.example.com/application/o/bambuddy"
- issuer_with_slash = issuer_no_slash + "/"
- client_id = "bambuddy-client"
- nonce = secrets.token_urlsafe(16)
- now = int(time.time())
- id_token = pyjwt.encode(
- {
- "sub": "authentik-sub-123",
- "iss": issuer_with_slash,
- "aud": client_id,
- "nonce": nonce,
- "email": "authentik-user@example.com",
- "email_verified": True,
- "iat": now,
- "exp": now + 300,
- },
- private_pem,
- algorithm="RS256",
- headers={"kid": "test-kid-1"},
- )
- admin_token = await _setup_and_login(async_client, "authentikadm", "authentikadm1")
- create_resp = await async_client.post(
- "/api/v1/auth/oidc/providers",
- json={
- "name": "Authentik-ISS",
- "issuer_url": issuer_no_slash,
- "client_id": client_id,
- "client_secret": "secret",
- "scopes": "openid email profile",
- "is_enabled": True,
- "auto_create_users": True,
- },
- headers=_auth_header(admin_token),
- )
- assert create_resp.status_code == 201
- provider_id = create_resp.json()["id"]
- state = secrets.token_urlsafe(32)
- code_verifier = secrets.token_urlsafe(48)
- db_session.add(
- AuthEphemeralToken(
- token=state,
- token_type="oidc_state",
- provider_id=provider_id,
- nonce=nonce,
- code_verifier=code_verifier,
- expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
- )
- )
- await db_session.commit()
- discovery_doc = {
- "issuer": issuer_with_slash,
- "authorization_endpoint": f"{issuer_no_slash}/authorize",
- "token_endpoint": f"{issuer_no_slash}/token",
- "jwks_uri": f"{issuer_no_slash}/.well-known/jwks.json",
- }
- token_response = {"access_token": "mock", "token_type": "Bearer", "id_token": id_token}
- class _MockResp:
- def __init__(self, data):
- self._data = data
- self.is_success = True
- self.status_code = 200
- self.text = str(data)
- def json(self):
- return self._data
- def raise_for_status(self):
- pass
- class _MockHttpxClient:
- def __init__(self, *a, **kw):
- pass
- async def __aenter__(self):
- return self
- async def __aexit__(self, *a):
- pass
- async def get(self, url, **kw):
- return _MockResp(jwks_data if "jwks" in url else discovery_doc)
- async def post(self, url, **kw):
- return _MockResp(token_response)
- with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=auth-code&state={state}",
- follow_redirects=False,
- )
- location = resp.headers.get("location", "")
- assert resp.status_code == 302, f"Expected redirect, got {resp.status_code}"
- assert "token_validation_failed" not in location, (
- "Trailing slash mismatch in iss claim must not cause token_validation_failed"
- )
- assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
- class TestOIDCCallbackCodeLength:
- """OIDC callback code/state query params must accept up to 2048 characters (OAuth spec)."""
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_512_chars_accepted(self, async_client: AsyncClient):
- """A 512-character code (old limit) must not be rejected with 422."""
- code = "a" * 512
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "512-char code must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_2048_chars_accepted(self, async_client: AsyncClient):
- """A 2048-character code must not be rejected with 422."""
- code = "a" * 2048
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "2048-char code must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_code_2049_chars_rejected(self, async_client: AsyncClient):
- """A 2049-character code must be rejected with 422."""
- code = "a" * 2049
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code={code}&state=bogus-state",
- follow_redirects=False,
- )
- assert resp.status_code == 422, "2049-char code must be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_512_chars_accepted(self, async_client: AsyncClient):
- """A 512-character state (old limit) must not be rejected with 422."""
- state = "a" * 512
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "512-char state must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_2048_chars_accepted(self, async_client: AsyncClient):
- """A 2048-character state must not be rejected with 422."""
- state = "a" * 2048
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code != 422, "2048-char state must not be rejected by Pydantic"
- @pytest.mark.asyncio
- @pytest.mark.integration
- async def test_state_2049_chars_rejected(self, async_client: AsyncClient):
- """A 2049-character state must be rejected with 422."""
- state = "a" * 2049
- resp = await async_client.get(
- f"/api/v1/auth/oidc/callback?code=bogus-code&state={state}",
- follow_redirects=False,
- )
- assert resp.status_code == 422, "2049-char state must be rejected by Pydantic"
|