test_mfa_api.py 124 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199
  1. """Integration tests for 2FA and OIDC API endpoints.
  2. Tests the full request/response cycle for:
  3. - GET /api/v1/auth/2fa/status
  4. - POST /api/v1/auth/2fa/totp/setup
  5. - POST /api/v1/auth/2fa/totp/enable
  6. - POST /api/v1/auth/2fa/totp/disable
  7. - POST /api/v1/auth/2fa/email/enable
  8. - POST /api/v1/auth/2fa/email/disable
  9. - POST /api/v1/auth/2fa/verify (TOTP, email, backup paths)
  10. - DELETE /api/v1/auth/2fa/admin/{user_id}
  11. - GET /api/v1/auth/oidc/providers
  12. - POST /api/v1/auth/oidc/providers
  13. - PATCH /api/v1/auth/oidc/providers/{id}
  14. - DELETE /api/v1/auth/oidc/providers/{id}
  15. """
  16. from __future__ import annotations
  17. import secrets
  18. from datetime import datetime, timedelta, timezone
  19. import pyotp
  20. import pytest
  21. from httpx import AsyncClient
  22. from passlib.context import CryptContext
  23. from sqlalchemy.ext.asyncio import AsyncSession
  24. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  25. from backend.app.models.user import User
  26. _pwd_context = CryptContext(schemes=["pbkdf2_sha256"], deprecated="auto")
  27. # ---------------------------------------------------------------------------
  28. # Fixtures / helpers
  29. # ---------------------------------------------------------------------------
  30. AUTH_SETUP_URL = "/api/v1/auth/setup"
  31. LOGIN_URL = "/api/v1/auth/login"
  32. def _norm_pw(password: str) -> str:
  33. """Ensure password meets complexity requirements (I4: SetupRequest now validates)."""
  34. if not any(c.isupper() for c in password):
  35. password = password[0].upper() + password[1:]
  36. if not any(c not in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" for c in password):
  37. password = password + "!"
  38. return password
  39. async def _setup_and_login(client: AsyncClient, username: str, password: str) -> str:
  40. """Enable auth, create an admin user, login, and return the bearer token."""
  41. password = _norm_pw(password)
  42. await client.post(
  43. AUTH_SETUP_URL,
  44. json={
  45. "auth_enabled": True,
  46. "admin_username": username,
  47. "admin_password": password,
  48. },
  49. )
  50. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  51. assert resp.status_code == 200
  52. return resp.json()["access_token"]
  53. async def _login_get_pre_auth_token(client: AsyncClient, username: str, password: str) -> str:
  54. """Login a user who has 2FA enabled; return the pre_auth_token from the response."""
  55. password = _norm_pw(password)
  56. resp = await client.post(LOGIN_URL, json={"username": username, "password": password})
  57. assert resp.status_code == 200
  58. data = resp.json()
  59. assert data["requires_2fa"] is True, f"Expected requires_2fa=True, got {data}"
  60. assert data["pre_auth_token"] is not None
  61. return data["pre_auth_token"]
  62. def _auth_header(token: str) -> dict[str, str]:
  63. return {"Authorization": f"Bearer {token}"}
  64. # ===========================================================================
  65. # 2FA Status
  66. # ===========================================================================
  67. class TestTwoFAStatus:
  68. """Tests for GET /api/v1/auth/2fa/status."""
  69. @pytest.mark.asyncio
  70. @pytest.mark.integration
  71. async def test_status_requires_auth(self, async_client: AsyncClient):
  72. response = await async_client.get("/api/v1/auth/2fa/status")
  73. assert response.status_code == 401
  74. @pytest.mark.asyncio
  75. @pytest.mark.integration
  76. async def test_status_default_disabled(self, async_client: AsyncClient):
  77. token = await _setup_and_login(async_client, "statususer", "statuspass123")
  78. response = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  79. assert response.status_code == 200
  80. data = response.json()
  81. assert data["totp_enabled"] is False
  82. assert data["email_otp_enabled"] is False
  83. assert data["backup_codes_remaining"] == 0
  84. # ===========================================================================
  85. # TOTP Setup
  86. # ===========================================================================
  87. class TestTOTPSetup:
  88. """Tests for POST /api/v1/auth/2fa/totp/setup."""
  89. @pytest.mark.asyncio
  90. @pytest.mark.integration
  91. async def test_setup_requires_auth(self, async_client: AsyncClient):
  92. response = await async_client.post("/api/v1/auth/2fa/totp/setup")
  93. assert response.status_code == 401
  94. @pytest.mark.asyncio
  95. @pytest.mark.integration
  96. async def test_setup_returns_secret_and_qr(self, async_client: AsyncClient):
  97. token = await _setup_and_login(async_client, "totpsetup", "totpsetup123")
  98. response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  99. assert response.status_code == 200
  100. data = response.json()
  101. assert "secret" in data
  102. assert len(data["secret"]) > 0
  103. assert "qr_code_b64" in data
  104. assert data["issuer"] == "Bambuddy"
  105. @pytest.mark.asyncio
  106. @pytest.mark.integration
  107. async def test_setup_secret_is_valid_base32(self, async_client: AsyncClient):
  108. token = await _setup_and_login(async_client, "totpbase32", "totpbase32pw")
  109. response = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  110. assert response.status_code == 200
  111. secret = response.json()["secret"]
  112. # pyotp will raise on invalid base32
  113. totp = pyotp.TOTP(secret)
  114. assert len(totp.now()) == 6
  115. # ===========================================================================
  116. # TOTP Enable
  117. # ===========================================================================
  118. class TestTOTPEnable:
  119. """Tests for POST /api/v1/auth/2fa/totp/enable."""
  120. @pytest.mark.asyncio
  121. @pytest.mark.integration
  122. async def test_enable_without_setup_returns_400(self, async_client: AsyncClient):
  123. token = await _setup_and_login(async_client, "nosetupenable", "nosetupenable1")
  124. response = await async_client.post(
  125. "/api/v1/auth/2fa/totp/enable",
  126. json={"code": "123456"},
  127. headers=_auth_header(token),
  128. )
  129. assert response.status_code == 400
  130. @pytest.mark.asyncio
  131. @pytest.mark.integration
  132. async def test_enable_with_invalid_code_returns_400(self, async_client: AsyncClient):
  133. token = await _setup_and_login(async_client, "badcodeuser", "badcodeuser1")
  134. await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  135. response = await async_client.post(
  136. "/api/v1/auth/2fa/totp/enable",
  137. json={"code": "000000"},
  138. headers=_auth_header(token),
  139. )
  140. assert response.status_code == 400
  141. @pytest.mark.asyncio
  142. @pytest.mark.integration
  143. async def test_enable_with_valid_code_returns_backup_codes(self, async_client: AsyncClient):
  144. token = await _setup_and_login(async_client, "enableok", "enableok123")
  145. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  146. secret = setup_resp.json()["secret"]
  147. valid_code = pyotp.TOTP(secret).now()
  148. response = await async_client.post(
  149. "/api/v1/auth/2fa/totp/enable",
  150. json={"code": valid_code},
  151. headers=_auth_header(token),
  152. )
  153. assert response.status_code == 200
  154. data = response.json()
  155. assert "backup_codes" in data
  156. assert len(data["backup_codes"]) == 10
  157. for code in data["backup_codes"]:
  158. assert len(code) == 8
  159. @pytest.mark.asyncio
  160. @pytest.mark.integration
  161. async def test_status_reflects_enabled_totp(self, async_client: AsyncClient):
  162. token = await _setup_and_login(async_client, "statustotp", "statustotp1")
  163. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  164. secret = setup_resp.json()["secret"]
  165. valid_code = pyotp.TOTP(secret).now()
  166. await async_client.post(
  167. "/api/v1/auth/2fa/totp/enable",
  168. json={"code": valid_code},
  169. headers=_auth_header(token),
  170. )
  171. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  172. data = status_resp.json()
  173. assert data["totp_enabled"] is True
  174. assert data["backup_codes_remaining"] == 10
  175. # ===========================================================================
  176. # TOTP Disable
  177. # ===========================================================================
  178. class TestTOTPDisable:
  179. """Tests for POST /api/v1/auth/2fa/totp/disable."""
  180. @pytest.mark.asyncio
  181. @pytest.mark.integration
  182. async def test_disable_when_not_enabled_returns_400(self, async_client: AsyncClient):
  183. token = await _setup_and_login(async_client, "disablenoenab", "disablenoenab1")
  184. response = await async_client.post(
  185. "/api/v1/auth/2fa/totp/disable",
  186. json={"code": "123456"},
  187. headers=_auth_header(token),
  188. )
  189. assert response.status_code == 400
  190. @pytest.mark.asyncio
  191. @pytest.mark.integration
  192. async def test_disable_with_valid_code(self, async_client: AsyncClient):
  193. token = await _setup_and_login(async_client, "disableok", "disableok123")
  194. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  195. secret = setup_resp.json()["secret"]
  196. valid_code = pyotp.TOTP(secret).now()
  197. await async_client.post(
  198. "/api/v1/auth/2fa/totp/enable",
  199. json={"code": valid_code},
  200. headers=_auth_header(token),
  201. )
  202. # Disable with a fresh valid code
  203. disable_code = pyotp.TOTP(secret).now()
  204. response = await async_client.post(
  205. "/api/v1/auth/2fa/totp/disable",
  206. json={"code": disable_code},
  207. headers=_auth_header(token),
  208. )
  209. assert response.status_code == 200
  210. assert "disabled" in response.json()["message"].lower()
  211. # Status should now show disabled
  212. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  213. assert status_resp.json()["totp_enabled"] is False
  214. # ===========================================================================
  215. # Email OTP Enable/Disable
  216. # ===========================================================================
  217. class TestEmailOTP:
  218. """Tests for POST /api/v1/auth/2fa/email/enable, /enable/confirm and /disable."""
  219. @pytest.mark.asyncio
  220. @pytest.mark.integration
  221. async def test_enable_email_otp_without_email_returns_400(self, async_client: AsyncClient):
  222. """Users without an email address cannot enable email OTP."""
  223. token = await _setup_and_login(async_client, "noemailuser", "noemailuser1")
  224. response = await async_client.post("/api/v1/auth/2fa/email/enable", headers=_auth_header(token))
  225. assert response.status_code == 400
  226. assert "email" in response.json()["detail"].lower()
  227. @pytest.mark.asyncio
  228. @pytest.mark.integration
  229. async def test_confirm_enable_email_otp_happy_path(self, async_client: AsyncClient, db_session: AsyncSession):
  230. """Confirm step activates email OTP when setup_token + code are valid (C5)."""
  231. token = await _setup_and_login(async_client, "confirmenable", "confirmenable1")
  232. # Give user an email address directly (SMTP not available in tests)
  233. from sqlalchemy import select as sa_select
  234. result = await db_session.execute(sa_select(User).where(User.username == "confirmenable"))
  235. user = result.scalar_one()
  236. user.email = "confirmenable@example.com"
  237. await db_session.commit()
  238. # Inject a known setup token directly into the DB (bypasses SMTP)
  239. code = "123456"
  240. code_hash = _pwd_context.hash(code)
  241. setup_token = secrets.token_urlsafe(32)
  242. db_session.add(
  243. AuthEphemeralToken(
  244. token=setup_token,
  245. token_type="email_otp_setup",
  246. username="confirmenable",
  247. nonce=code_hash,
  248. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  249. )
  250. )
  251. await db_session.commit()
  252. resp = await async_client.post(
  253. "/api/v1/auth/2fa/email/enable/confirm",
  254. json={"setup_token": setup_token, "code": code},
  255. headers=_auth_header(token),
  256. )
  257. assert resp.status_code == 200
  258. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  259. assert status_resp.json()["email_otp_enabled"] is True
  260. @pytest.mark.asyncio
  261. @pytest.mark.integration
  262. async def test_confirm_enable_email_otp_wrong_code(self, async_client: AsyncClient, db_session: AsyncSession):
  263. """Wrong code on confirm step returns 400 and does not enable email OTP."""
  264. token = await _setup_and_login(async_client, "confirmwrong", "confirmwrong1")
  265. code_hash = _pwd_context.hash("654321")
  266. setup_token = secrets.token_urlsafe(32)
  267. db_session.add(
  268. AuthEphemeralToken(
  269. token=setup_token,
  270. token_type="email_otp_setup",
  271. username="confirmwrong",
  272. nonce=code_hash,
  273. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  274. )
  275. )
  276. await db_session.commit()
  277. resp = await async_client.post(
  278. "/api/v1/auth/2fa/email/enable/confirm",
  279. json={"setup_token": setup_token, "code": "000000"},
  280. headers=_auth_header(token),
  281. )
  282. assert resp.status_code == 400
  283. @pytest.mark.asyncio
  284. @pytest.mark.integration
  285. async def test_confirm_enable_email_otp_setup_token_is_single_use(
  286. self, async_client: AsyncClient, db_session: AsyncSession
  287. ):
  288. """Setup token is consumed on first use; replay returns 400."""
  289. token = await _setup_and_login(async_client, "confirmonce", "confirmonce1")
  290. code = "111111"
  291. code_hash = _pwd_context.hash(code)
  292. setup_token = secrets.token_urlsafe(32)
  293. db_session.add(
  294. AuthEphemeralToken(
  295. token=setup_token,
  296. token_type="email_otp_setup",
  297. username="confirmonce",
  298. nonce=code_hash,
  299. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  300. )
  301. )
  302. await db_session.commit()
  303. first = await async_client.post(
  304. "/api/v1/auth/2fa/email/enable/confirm",
  305. json={"setup_token": setup_token, "code": code},
  306. headers=_auth_header(token),
  307. )
  308. assert first.status_code == 200
  309. second = await async_client.post(
  310. "/api/v1/auth/2fa/email/enable/confirm",
  311. json={"setup_token": setup_token, "code": code},
  312. headers=_auth_header(token),
  313. )
  314. assert second.status_code == 400
  315. @pytest.mark.asyncio
  316. @pytest.mark.integration
  317. async def test_disable_email_otp_requires_password(self, async_client: AsyncClient):
  318. """Disabling email OTP requires the account password (C6: re-auth)."""
  319. token = await _setup_and_login(async_client, "disemailotp", "disemailotp1")
  320. # Wrong password → 401
  321. response = await async_client.post(
  322. "/api/v1/auth/2fa/email/disable",
  323. json={"password": "wrongpassword"},
  324. headers=_auth_header(token),
  325. )
  326. assert response.status_code == 401
  327. @pytest.mark.asyncio
  328. @pytest.mark.integration
  329. async def test_disable_email_otp_when_enabled(self, async_client: AsyncClient, db_session: AsyncSession):
  330. """Disabling email OTP when enabled turns it off and status reflects that."""
  331. token = await _setup_and_login(async_client, "disemailpw", "disemailpw1")
  332. # Enable email OTP via direct DB injection (no SMTP)
  333. code = "222222"
  334. setup_token = secrets.token_urlsafe(32)
  335. db_session.add(
  336. AuthEphemeralToken(
  337. token=setup_token,
  338. token_type="email_otp_setup",
  339. username="disemailpw",
  340. nonce=_pwd_context.hash(code),
  341. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  342. )
  343. )
  344. await db_session.commit()
  345. await async_client.post(
  346. "/api/v1/auth/2fa/email/enable/confirm",
  347. json={"setup_token": setup_token, "code": code},
  348. headers=_auth_header(token),
  349. )
  350. # Now disable
  351. response = await async_client.post(
  352. "/api/v1/auth/2fa/email/disable",
  353. json={"password": _norm_pw("disemailpw1")},
  354. headers=_auth_header(token),
  355. )
  356. assert response.status_code == 200
  357. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  358. assert status_resp.json()["email_otp_enabled"] is False
  359. # ===========================================================================
  360. # 2FA Verify — TOTP path
  361. # ===========================================================================
  362. class TestTwoFAVerifyTOTP:
  363. """Tests for POST /api/v1/auth/2fa/verify using the TOTP method."""
  364. @pytest.mark.asyncio
  365. @pytest.mark.integration
  366. async def test_verify_with_invalid_pre_auth_token(self, async_client: AsyncClient):
  367. response = await async_client.post(
  368. "/api/v1/auth/2fa/verify",
  369. json={"pre_auth_token": "bogus", "method": "totp", "code": "123456"},
  370. )
  371. assert response.status_code == 401
  372. @pytest.mark.asyncio
  373. @pytest.mark.integration
  374. async def test_verify_totp_issues_jwt(self, async_client: AsyncClient):
  375. """Full flow: setup → enable TOTP → login → pre_auth_token → verify → JWT."""
  376. token = await _setup_and_login(async_client, "verifytotpok", "verifytotpok1")
  377. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  378. secret = setup_resp.json()["secret"]
  379. valid_code = pyotp.TOTP(secret).now()
  380. await async_client.post(
  381. "/api/v1/auth/2fa/totp/enable",
  382. json={"code": valid_code},
  383. headers=_auth_header(token),
  384. )
  385. # Login now returns requires_2fa=True + pre_auth_token
  386. pre_auth_token = await _login_get_pre_auth_token(async_client, "verifytotpok", "verifytotpok1")
  387. verify_resp = await async_client.post(
  388. "/api/v1/auth/2fa/verify",
  389. json={
  390. "pre_auth_token": pre_auth_token,
  391. "method": "totp",
  392. "code": pyotp.TOTP(secret).now(),
  393. },
  394. )
  395. assert verify_resp.status_code == 200
  396. data = verify_resp.json()
  397. assert "access_token" in data
  398. assert data["token_type"] == "bearer"
  399. assert data["user"]["username"] == "verifytotpok"
  400. @pytest.mark.asyncio
  401. @pytest.mark.integration
  402. async def test_verify_totp_invalid_code(self, async_client: AsyncClient):
  403. token = await _setup_and_login(async_client, "verifybadcode", "verifybadcode1")
  404. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  405. secret = setup_resp.json()["secret"]
  406. valid_code = pyotp.TOTP(secret).now()
  407. await async_client.post(
  408. "/api/v1/auth/2fa/totp/enable",
  409. json={"code": valid_code},
  410. headers=_auth_header(token),
  411. )
  412. pre_auth_token = await _login_get_pre_auth_token(async_client, "verifybadcode", "verifybadcode1")
  413. verify_resp = await async_client.post(
  414. "/api/v1/auth/2fa/verify",
  415. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  416. )
  417. assert verify_resp.status_code == 401
  418. @pytest.mark.asyncio
  419. @pytest.mark.integration
  420. async def test_verify_invalid_method(self, async_client: AsyncClient):
  421. """An invalid 2FA method should return 400 even with a valid pre_auth_token."""
  422. token = await _setup_and_login(async_client, "invalidmethod", "invalidmethod1")
  423. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  424. secret = setup_resp.json()["secret"]
  425. valid_code = pyotp.TOTP(secret).now()
  426. await async_client.post(
  427. "/api/v1/auth/2fa/totp/enable",
  428. json={"code": valid_code},
  429. headers=_auth_header(token),
  430. )
  431. pre_auth_token = await _login_get_pre_auth_token(async_client, "invalidmethod", "invalidmethod1")
  432. response = await async_client.post(
  433. "/api/v1/auth/2fa/verify",
  434. json={"pre_auth_token": pre_auth_token, "method": "sms", "code": "123456"},
  435. )
  436. assert response.status_code == 422 # Pydantic Literal validation
  437. # ===========================================================================
  438. # 2FA Verify — Backup code path
  439. # ===========================================================================
  440. class TestTwoFAVerifyBackup:
  441. """Tests for POST /api/v1/auth/2fa/verify using the backup method."""
  442. @pytest.mark.asyncio
  443. @pytest.mark.integration
  444. async def test_verify_with_backup_code(self, async_client: AsyncClient):
  445. token = await _setup_and_login(async_client, "backupcodeok", "backupcodeok1")
  446. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  447. secret = setup_resp.json()["secret"]
  448. valid_code = pyotp.TOTP(secret).now()
  449. enable_resp = await async_client.post(
  450. "/api/v1/auth/2fa/totp/enable",
  451. json={"code": valid_code},
  452. headers=_auth_header(token),
  453. )
  454. backup_code = enable_resp.json()["backup_codes"][0]
  455. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcodeok", "backupcodeok1")
  456. verify_resp = await async_client.post(
  457. "/api/v1/auth/2fa/verify",
  458. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  459. )
  460. assert verify_resp.status_code == 200
  461. assert "access_token" in verify_resp.json()
  462. @pytest.mark.asyncio
  463. @pytest.mark.integration
  464. async def test_backup_code_is_single_use(self, async_client: AsyncClient):
  465. token = await _setup_and_login(async_client, "backupsingle", "backupsingle1")
  466. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  467. secret = setup_resp.json()["secret"]
  468. valid_code = pyotp.TOTP(secret).now()
  469. enable_resp = await async_client.post(
  470. "/api/v1/auth/2fa/totp/enable",
  471. json={"code": valid_code},
  472. headers=_auth_header(token),
  473. )
  474. backup_code = enable_resp.json()["backup_codes"][0]
  475. # First use — should succeed
  476. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
  477. first_resp = await async_client.post(
  478. "/api/v1/auth/2fa/verify",
  479. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  480. )
  481. assert first_resp.status_code == 200
  482. # Second use of the same code — must fail (need new pre_auth_token + same backup code)
  483. pre_auth_token2 = await _login_get_pre_auth_token(async_client, "backupsingle", "backupsingle1")
  484. second_resp = await async_client.post(
  485. "/api/v1/auth/2fa/verify",
  486. json={"pre_auth_token": pre_auth_token2, "method": "backup", "code": backup_code},
  487. )
  488. assert second_resp.status_code == 401
  489. @pytest.mark.asyncio
  490. @pytest.mark.integration
  491. async def test_backup_code_count_decrements(self, async_client: AsyncClient):
  492. token = await _setup_and_login(async_client, "backupcount", "backupcount1")
  493. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  494. secret = setup_resp.json()["secret"]
  495. valid_code = pyotp.TOTP(secret).now()
  496. enable_resp = await async_client.post(
  497. "/api/v1/auth/2fa/totp/enable",
  498. json={"code": valid_code},
  499. headers=_auth_header(token),
  500. )
  501. backup_code = enable_resp.json()["backup_codes"][0]
  502. pre_auth_token = await _login_get_pre_auth_token(async_client, "backupcount", "backupcount1")
  503. await async_client.post(
  504. "/api/v1/auth/2fa/verify",
  505. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": backup_code},
  506. )
  507. # Status is readable with the original full token (still valid)
  508. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(token))
  509. assert status_resp.json()["backup_codes_remaining"] == 9
  510. # ===========================================================================
  511. # Rate Limiting
  512. # ===========================================================================
  513. class TestRateLimiting:
  514. """Ensure 429 is returned after 5 failed 2FA attempts."""
  515. @pytest.mark.asyncio
  516. @pytest.mark.integration
  517. async def test_rate_limit_lockout(self, async_client: AsyncClient):
  518. """After 5 failed TOTP attempts the 6th must return 429."""
  519. token = await _setup_and_login(async_client, "ratelimituser", "ratelimituser1")
  520. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  521. secret = setup_resp.json()["secret"]
  522. valid_code = pyotp.TOTP(secret).now()
  523. await async_client.post(
  524. "/api/v1/auth/2fa/totp/enable",
  525. json={"code": valid_code},
  526. headers=_auth_header(token),
  527. )
  528. # 5 failed attempts via the login → pre_auth_token → verify flow
  529. for _ in range(5):
  530. pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
  531. await async_client.post(
  532. "/api/v1/auth/2fa/verify",
  533. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  534. )
  535. # 6th attempt should hit the rate limit
  536. pre_auth_token = await _login_get_pre_auth_token(async_client, "ratelimituser", "ratelimituser1")
  537. response = await async_client.post(
  538. "/api/v1/auth/2fa/verify",
  539. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  540. )
  541. assert response.status_code == 429
  542. # ===========================================================================
  543. # Admin 2FA Disable
  544. # ===========================================================================
  545. class TestAdminDisable2FA:
  546. """Tests for DELETE /api/v1/auth/2fa/admin/{user_id}."""
  547. @pytest.mark.asyncio
  548. @pytest.mark.integration
  549. async def test_admin_disable_requires_admin(self, async_client: AsyncClient):
  550. """Only admins can use the admin disable endpoint."""
  551. # The only user in a fresh setup IS admin, so just check the 404 path
  552. token = await _setup_and_login(async_client, "admincheck", "admincheck123")
  553. # Try to disable for a non-existent user_id — should get 200 (no-op) or 404
  554. response = await async_client.request(
  555. "DELETE",
  556. "/api/v1/auth/2fa/admin/99999",
  557. json={"admin_password": _norm_pw("admincheck123")},
  558. headers=_auth_header(token),
  559. )
  560. # Admin users succeed regardless (returns 200 even if user doesn't exist)
  561. assert response.status_code == 200
  562. @pytest.mark.asyncio
  563. @pytest.mark.integration
  564. async def test_admin_disable_clears_totp(self, async_client: AsyncClient):
  565. from sqlalchemy import select
  566. from backend.app.models.user import User
  567. token = await _setup_and_login(async_client, "admintotp", "admintotp123")
  568. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  569. secret = setup_resp.json()["secret"]
  570. valid_code = pyotp.TOTP(secret).now()
  571. await async_client.post(
  572. "/api/v1/auth/2fa/totp/enable",
  573. json={"code": valid_code},
  574. headers=_auth_header(token),
  575. )
  576. # Find the user's id by querying status (which works with the token)
  577. me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
  578. user_id = me_resp.json()["id"]
  579. response = await async_client.request(
  580. "DELETE",
  581. f"/api/v1/auth/2fa/admin/{user_id}",
  582. json={"admin_password": _norm_pw("admintotp123")},
  583. headers=_auth_header(token),
  584. )
  585. assert response.status_code == 200
  586. # I2: admin_disable_2fa bumps password_changed_at, invalidating the old token.
  587. # Re-login to get a fresh token before checking status.
  588. new_login = await async_client.post(
  589. LOGIN_URL, json={"username": "admintotp", "password": _norm_pw("admintotp123")}
  590. )
  591. assert new_login.status_code == 200, f"re-login failed: {new_login.json()}"
  592. assert new_login.json().get("requires_2fa") is False, f"still requires 2FA: {new_login.json()}"
  593. new_token = new_login.json()["access_token"]
  594. assert new_token is not None, f"no access_token in: {new_login.json()}"
  595. # Status should now show TOTP disabled
  596. status_resp = await async_client.get("/api/v1/auth/2fa/status", headers=_auth_header(new_token))
  597. assert status_resp.status_code == 200, f"status check failed: {status_resp.json()}"
  598. assert status_resp.json()["totp_enabled"] is False
  599. # ===========================================================================
  600. # OIDC Provider CRUD
  601. # ===========================================================================
  602. class TestOIDCProviders:
  603. """Tests for OIDC provider management endpoints."""
  604. @pytest.mark.asyncio
  605. @pytest.mark.integration
  606. async def test_list_public_providers_empty(self, async_client: AsyncClient):
  607. response = await async_client.get("/api/v1/auth/oidc/providers")
  608. assert response.status_code == 200
  609. assert isinstance(response.json(), list)
  610. @pytest.mark.asyncio
  611. @pytest.mark.integration
  612. async def test_create_provider_requires_admin(self, async_client: AsyncClient):
  613. token = await _setup_and_login(async_client, "oidcadmincreate", "oidcadmincreate1")
  614. response = await async_client.post(
  615. "/api/v1/auth/oidc/providers",
  616. json={
  617. "name": "PocketID",
  618. "issuer_url": "https://auth.example.com",
  619. "client_id": "bambuddy",
  620. "client_secret": "supersecret",
  621. "scopes": "openid email profile",
  622. "is_enabled": True,
  623. "auto_create_users": False,
  624. },
  625. headers=_auth_header(token),
  626. )
  627. assert response.status_code == 201
  628. data = response.json()
  629. assert data["name"] == "PocketID"
  630. assert data["issuer_url"] == "https://auth.example.com"
  631. assert "client_secret" not in data # Secret must not be returned
  632. @pytest.mark.asyncio
  633. @pytest.mark.integration
  634. async def test_created_provider_appears_in_all_list(self, async_client: AsyncClient):
  635. token = await _setup_and_login(async_client, "oidclistall", "oidclistall123")
  636. await async_client.post(
  637. "/api/v1/auth/oidc/providers",
  638. json={
  639. "name": "TestProvider",
  640. "issuer_url": "https://test.example.com",
  641. "client_id": "testclient",
  642. "client_secret": "testsecret",
  643. "scopes": "openid",
  644. "is_enabled": True,
  645. "auto_create_users": False,
  646. },
  647. headers=_auth_header(token),
  648. )
  649. response = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
  650. assert response.status_code == 200
  651. names = [p["name"] for p in response.json()]
  652. assert "TestProvider" in names
  653. @pytest.mark.asyncio
  654. @pytest.mark.integration
  655. async def test_disabled_provider_not_in_public_list(self, async_client: AsyncClient):
  656. token = await _setup_and_login(async_client, "oidcdisabled", "oidcdisabled1")
  657. await async_client.post(
  658. "/api/v1/auth/oidc/providers",
  659. json={
  660. "name": "DisabledProvider",
  661. "issuer_url": "https://disabled.example.com",
  662. "client_id": "dc",
  663. "client_secret": "ds",
  664. "scopes": "openid",
  665. "is_enabled": False,
  666. "auto_create_users": False,
  667. },
  668. headers=_auth_header(token),
  669. )
  670. response = await async_client.get("/api/v1/auth/oidc/providers")
  671. names = [p["name"] for p in response.json()]
  672. assert "DisabledProvider" not in names
  673. @pytest.mark.asyncio
  674. @pytest.mark.integration
  675. async def test_update_provider(self, async_client: AsyncClient):
  676. token = await _setup_and_login(async_client, "oidcupdate", "oidcupdate123")
  677. create_resp = await async_client.post(
  678. "/api/v1/auth/oidc/providers",
  679. json={
  680. "name": "OldName",
  681. "issuer_url": "https://update.example.com",
  682. "client_id": "uc",
  683. "client_secret": "us",
  684. "scopes": "openid",
  685. "is_enabled": True,
  686. "auto_create_users": False,
  687. },
  688. headers=_auth_header(token),
  689. )
  690. provider_id = create_resp.json()["id"]
  691. put_resp = await async_client.put(
  692. f"/api/v1/auth/oidc/providers/{provider_id}",
  693. json={"name": "NewName"},
  694. headers=_auth_header(token),
  695. )
  696. assert put_resp.status_code == 200
  697. assert put_resp.json()["name"] == "NewName"
  698. @pytest.mark.asyncio
  699. @pytest.mark.integration
  700. async def test_delete_provider(self, async_client: AsyncClient):
  701. token = await _setup_and_login(async_client, "oidcdelete", "oidcdelete123")
  702. create_resp = await async_client.post(
  703. "/api/v1/auth/oidc/providers",
  704. json={
  705. "name": "ToDelete",
  706. "issuer_url": "https://delete.example.com",
  707. "client_id": "dc",
  708. "client_secret": "ds",
  709. "scopes": "openid",
  710. "is_enabled": True,
  711. "auto_create_users": False,
  712. },
  713. headers=_auth_header(token),
  714. )
  715. provider_id = create_resp.json()["id"]
  716. del_resp = await async_client.delete(
  717. f"/api/v1/auth/oidc/providers/{provider_id}",
  718. headers=_auth_header(token),
  719. )
  720. assert del_resp.status_code == 200
  721. # No longer in list
  722. all_resp = await async_client.get("/api/v1/auth/oidc/providers/all", headers=_auth_header(token))
  723. ids = [p["id"] for p in all_resp.json()]
  724. assert provider_id not in ids
  725. @pytest.mark.asyncio
  726. @pytest.mark.integration
  727. async def test_update_nonexistent_provider_returns_404(self, async_client: AsyncClient):
  728. token = await _setup_and_login(async_client, "oidc404", "oidc404pass1")
  729. response = await async_client.put(
  730. "/api/v1/auth/oidc/providers/99999",
  731. json={"name": "ghost"},
  732. headers=_auth_header(token),
  733. )
  734. assert response.status_code == 404
  735. # ===========================================================================
  736. # Security: pre-auth token single-use
  737. # ===========================================================================
  738. class TestPreAuthTokenSingleUse:
  739. """pre_auth_token must be consumed on successful 2FA and cannot be reused."""
  740. @pytest.mark.asyncio
  741. @pytest.mark.integration
  742. async def test_pre_auth_token_is_single_use(self, async_client: AsyncClient):
  743. """A pre_auth_token that was successfully used cannot be reused."""
  744. token = await _setup_and_login(async_client, "singleusepat", "singleusepat1")
  745. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  746. secret = setup_resp.json()["secret"]
  747. valid_code = pyotp.TOTP(secret).now()
  748. await async_client.post(
  749. "/api/v1/auth/2fa/totp/enable",
  750. json={"code": valid_code},
  751. headers=_auth_header(token),
  752. )
  753. pre_auth_token = await _login_get_pre_auth_token(async_client, "singleusepat", "singleusepat1")
  754. # First use — succeeds
  755. first = await async_client.post(
  756. "/api/v1/auth/2fa/verify",
  757. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  758. )
  759. assert first.status_code == 200
  760. # Second use of the same token — must fail (token already consumed on success)
  761. second = await async_client.post(
  762. "/api/v1/auth/2fa/verify",
  763. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  764. )
  765. assert second.status_code == 401
  766. @pytest.mark.asyncio
  767. @pytest.mark.integration
  768. async def test_pre_auth_token_survives_wrong_code(self, async_client: AsyncClient):
  769. """A wrong 2FA code must NOT burn the pre_auth_token (user can retry)."""
  770. token = await _setup_and_login(async_client, "survivepatuser", "survivepatuser1")
  771. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  772. secret = setup_resp.json()["secret"]
  773. valid_code = pyotp.TOTP(secret).now()
  774. await async_client.post(
  775. "/api/v1/auth/2fa/totp/enable",
  776. json={"code": valid_code},
  777. headers=_auth_header(token),
  778. )
  779. pre_auth_token = await _login_get_pre_auth_token(async_client, "survivepatuser", "survivepatuser1")
  780. # Wrong code — should fail but not burn the token
  781. bad = await async_client.post(
  782. "/api/v1/auth/2fa/verify",
  783. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": "000000"},
  784. )
  785. assert bad.status_code == 401
  786. # Same token, correct code — should succeed (token still valid)
  787. good = await async_client.post(
  788. "/api/v1/auth/2fa/verify",
  789. json={"pre_auth_token": pre_auth_token, "method": "totp", "code": pyotp.TOTP(secret).now()},
  790. )
  791. assert good.status_code == 200
  792. # ===========================================================================
  793. # Security: cross-user token isolation
  794. # ===========================================================================
  795. class TestCrossUserTokenIsolation:
  796. """A pre_auth_token issued for user A cannot authenticate as user B."""
  797. @pytest.mark.asyncio
  798. @pytest.mark.integration
  799. async def test_token_cannot_be_used_for_different_user(self, async_client: AsyncClient):
  800. """pre_auth_token is bound to the issuing user; using it to verify a different
  801. user's TOTP code must fail."""
  802. # Set up two users with TOTP
  803. token_a = await _setup_and_login(async_client, "crossusera", "crossusera1")
  804. setup_a = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token_a))
  805. secret_a = setup_a.json()["secret"]
  806. await async_client.post(
  807. "/api/v1/auth/2fa/totp/enable",
  808. json={"code": pyotp.TOTP(secret_a).now()},
  809. headers=_auth_header(token_a),
  810. )
  811. # Get pre_auth_token for user A
  812. pre_auth_a = await _login_get_pre_auth_token(async_client, "crossusera", "crossusera1")
  813. # Try to use user A's token but supply a clearly invalid code — must fail
  814. resp = await async_client.post(
  815. "/api/v1/auth/2fa/verify",
  816. json={"pre_auth_token": pre_auth_a, "method": "totp", "code": "000000"},
  817. )
  818. assert resp.status_code == 401
  819. # ===========================================================================
  820. # Security: admin disable non-admin rejection
  821. # ===========================================================================
  822. class TestAdminDisableNonAdminRejection:
  823. """Non-admin users must be rejected from the admin disable endpoint."""
  824. @pytest.mark.asyncio
  825. @pytest.mark.integration
  826. async def test_non_admin_cannot_disable_2fa(self, async_client: AsyncClient):
  827. """A regular (non-admin) user must receive 403 from DELETE /auth/2fa/admin/{id}."""
  828. # Set up admin, then create a regular user
  829. admin_token = await _setup_and_login(async_client, "adminusr2fa", "adminusr2fa1")
  830. # Create a regular user via user management
  831. create_resp = await async_client.post(
  832. "/api/v1/users",
  833. json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
  834. headers=_auth_header(admin_token),
  835. )
  836. assert create_resp.status_code == 201
  837. # Login as regular user
  838. login_resp = await async_client.post(
  839. LOGIN_URL,
  840. json={"username": "regularusr2fa", "password": "Regularusr2fa1!"},
  841. )
  842. regular_token = login_resp.json()["access_token"]
  843. # Try to call admin endpoint with the regular user's token
  844. resp = await async_client.delete(
  845. f"/api/v1/auth/2fa/admin/{create_resp.json()['id']}",
  846. headers=_auth_header(regular_token),
  847. )
  848. assert resp.status_code == 403
  849. # ===========================================================================
  850. # Regenerate backup codes
  851. # ===========================================================================
  852. class TestRegenerateBackupCodes:
  853. """Tests for POST /api/v1/auth/2fa/totp/regenerate-backup-codes."""
  854. @pytest.mark.asyncio
  855. @pytest.mark.integration
  856. async def test_regenerate_requires_totp_enabled(self, async_client: AsyncClient):
  857. token = await _setup_and_login(async_client, "regennototp", "regennototp1")
  858. resp = await async_client.post(
  859. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  860. json={"code": "123456"},
  861. headers=_auth_header(token),
  862. )
  863. assert resp.status_code == 400
  864. @pytest.mark.asyncio
  865. @pytest.mark.integration
  866. async def test_regenerate_invalidates_old_codes(self, async_client: AsyncClient):
  867. """After regenerating, old backup codes must no longer work."""
  868. token = await _setup_and_login(async_client, "regeninval", "regeninval1")
  869. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  870. secret = setup_resp.json()["secret"]
  871. enable_resp = await async_client.post(
  872. "/api/v1/auth/2fa/totp/enable",
  873. json={"code": pyotp.TOTP(secret).now()},
  874. headers=_auth_header(token),
  875. )
  876. old_backup = enable_resp.json()["backup_codes"][0]
  877. # Regenerate backup codes
  878. regen_resp = await async_client.post(
  879. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  880. json={"code": pyotp.TOTP(secret).now()},
  881. headers=_auth_header(token),
  882. )
  883. assert regen_resp.status_code == 200
  884. new_codes = regen_resp.json()["backup_codes"]
  885. assert len(new_codes) == 10
  886. assert old_backup not in new_codes
  887. # Old backup code must now fail
  888. pre_auth_token = await _login_get_pre_auth_token(async_client, "regeninval", "regeninval1")
  889. fail_resp = await async_client.post(
  890. "/api/v1/auth/2fa/verify",
  891. json={"pre_auth_token": pre_auth_token, "method": "backup", "code": old_backup},
  892. )
  893. assert fail_resp.status_code == 401
  894. @pytest.mark.asyncio
  895. @pytest.mark.integration
  896. async def test_regenerate_with_invalid_code_fails(self, async_client: AsyncClient):
  897. token = await _setup_and_login(async_client, "regeninvcode", "regeninvcode1")
  898. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  899. secret = setup_resp.json()["secret"]
  900. await async_client.post(
  901. "/api/v1/auth/2fa/totp/enable",
  902. json={"code": pyotp.TOTP(secret).now()},
  903. headers=_auth_header(token),
  904. )
  905. resp = await async_client.post(
  906. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  907. json={"code": "000000"},
  908. headers=_auth_header(token),
  909. )
  910. assert resp.status_code == 400
  911. # ===========================================================================
  912. # Security: method field validation
  913. # ===========================================================================
  914. class TestVerifyMethodValidation:
  915. """The method field must be one of totp/email/backup (Pydantic Literal)."""
  916. @pytest.mark.asyncio
  917. @pytest.mark.integration
  918. async def test_invalid_method_rejected_by_schema(self, async_client: AsyncClient):
  919. """Pydantic should reject unknown method values with 422."""
  920. resp = await async_client.post(
  921. "/api/v1/auth/2fa/verify",
  922. json={"pre_auth_token": "anytoken", "code": "123456", "method": "sms"},
  923. )
  924. assert resp.status_code == 422
  925. @pytest.mark.asyncio
  926. @pytest.mark.integration
  927. async def test_oversized_pre_auth_token_rejected(self, async_client: AsyncClient):
  928. """pre_auth_token exceeding max_length=128 should be rejected with 422."""
  929. resp = await async_client.post(
  930. "/api/v1/auth/2fa/verify",
  931. json={"pre_auth_token": "x" * 200, "code": "123456", "method": "totp"},
  932. )
  933. assert resp.status_code == 422
  934. # ===========================================================================
  935. # Login response shape for 2FA users
  936. # ===========================================================================
  937. class TestLoginResponseShape:
  938. """Login for a 2FA-enabled user must return requires_2fa+pre_auth_token
  939. and must NOT include access_token (which would bypass the 2FA gate)."""
  940. @pytest.mark.asyncio
  941. @pytest.mark.integration
  942. async def test_login_2fa_user_omits_access_token(self, async_client: AsyncClient):
  943. """A user with TOTP enabled must not receive an access_token on /auth/login."""
  944. token = await _setup_and_login(async_client, "loginshape", "loginshape1")
  945. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  946. secret = setup_resp.json()["secret"]
  947. await async_client.post(
  948. "/api/v1/auth/2fa/totp/enable",
  949. json={"code": pyotp.TOTP(secret).now()},
  950. headers=_auth_header(token),
  951. )
  952. login_resp = await async_client.post(LOGIN_URL, json={"username": "loginshape", "password": "Loginshape1!"})
  953. assert login_resp.status_code == 200
  954. data = login_resp.json()
  955. assert data.get("requires_2fa") is True
  956. assert data.get("pre_auth_token") is not None
  957. # access_token must NOT be present — it would bypass the 2FA gate
  958. assert "access_token" not in data or data["access_token"] is None
  959. # ===========================================================================
  960. # TOTP replay protection
  961. # ===========================================================================
  962. async def _setup_totp_user(client: AsyncClient, username: str, password: str) -> tuple[str, str]:
  963. """Create user, set up and enable TOTP; return (bearer_token, totp_secret)."""
  964. token = await _setup_and_login(client, username, password)
  965. setup_resp = await client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  966. secret = setup_resp.json()["secret"]
  967. await client.post(
  968. "/api/v1/auth/2fa/totp/enable",
  969. json={"code": pyotp.TOTP(secret).now()},
  970. headers=_auth_header(token),
  971. )
  972. return token, secret
  973. class TestTOTPReplay:
  974. """The same TOTP code must not be accepted twice within one 30-second window."""
  975. @pytest.mark.asyncio
  976. @pytest.mark.integration
  977. async def test_totp_replay_rejected_on_verify(self, async_client: AsyncClient):
  978. """Replaying the same code on /2fa/verify must return 400."""
  979. _token, secret = await _setup_totp_user(async_client, "replayverify", "replayverify1")
  980. code = pyotp.TOTP(secret).now()
  981. pre_auth = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
  982. first = await async_client.post(
  983. "/api/v1/auth/2fa/verify",
  984. json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
  985. )
  986. assert first.status_code == 200
  987. # Second login to get a fresh pre_auth_token (first was consumed)
  988. pre_auth2 = await _login_get_pre_auth_token(async_client, "replayverify", "replayverify1")
  989. second = await async_client.post(
  990. "/api/v1/auth/2fa/verify",
  991. json={"pre_auth_token": pre_auth2, "method": "totp", "code": code},
  992. )
  993. assert second.status_code == 400
  994. @pytest.mark.asyncio
  995. @pytest.mark.integration
  996. async def test_totp_replay_rejected_on_disable(self, async_client: AsyncClient):
  997. """A code already used in verify_2fa must be rejected on /2fa/totp/disable."""
  998. _setup_token, secret = await _setup_totp_user(async_client, "replaydisable", "replaydisable1")
  999. code = pyotp.TOTP(secret).now()
  1000. # Use the code in verify_2fa — this sets last_totp_counter in DB
  1001. pre_auth = await _login_get_pre_auth_token(async_client, "replaydisable", "replaydisable1")
  1002. verify_resp = await async_client.post(
  1003. "/api/v1/auth/2fa/verify",
  1004. json={"pre_auth_token": pre_auth, "method": "totp", "code": code},
  1005. )
  1006. assert verify_resp.status_code == 200
  1007. authed_token = verify_resp.json()["access_token"]
  1008. # Replay the same code on disable — must be rejected (same 30-second window)
  1009. disable_resp = await async_client.post(
  1010. "/api/v1/auth/2fa/totp/disable",
  1011. json={"code": code},
  1012. headers=_auth_header(authed_token),
  1013. )
  1014. assert disable_resp.status_code == 400
  1015. # ===========================================================================
  1016. # Rate limiting on disable_totp and regenerate_backup_codes (I10)
  1017. # ===========================================================================
  1018. class TestRateLimitingDisableRegenerate:
  1019. """disable_totp and regenerate_backup_codes must enforce rate limiting (I10)."""
  1020. @pytest.mark.asyncio
  1021. @pytest.mark.integration
  1022. async def test_disable_totp_rate_limited_after_failures(self, async_client: AsyncClient):
  1023. """Repeated wrong codes on /2fa/totp/disable trigger 429."""
  1024. token, _secret = await _setup_totp_user(async_client, "rldisable", "rldisable1")
  1025. for _ in range(5):
  1026. await async_client.post(
  1027. "/api/v1/auth/2fa/totp/disable",
  1028. json={"code": "000000"},
  1029. headers=_auth_header(token),
  1030. )
  1031. resp = await async_client.post(
  1032. "/api/v1/auth/2fa/totp/disable",
  1033. json={"code": "000000"},
  1034. headers=_auth_header(token),
  1035. )
  1036. assert resp.status_code == 429
  1037. @pytest.mark.asyncio
  1038. @pytest.mark.integration
  1039. async def test_regenerate_backup_codes_rate_limited_after_failures(self, async_client: AsyncClient):
  1040. """Repeated wrong codes on /2fa/totp/regenerate-backup-codes trigger 429."""
  1041. token, _secret = await _setup_totp_user(async_client, "rlregen", "rlregen1")
  1042. for _ in range(5):
  1043. await async_client.post(
  1044. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  1045. json={"code": "000000"},
  1046. headers=_auth_header(token),
  1047. )
  1048. resp = await async_client.post(
  1049. "/api/v1/auth/2fa/totp/regenerate-backup-codes",
  1050. json={"code": "000000"},
  1051. headers=_auth_header(token),
  1052. )
  1053. assert resp.status_code == 429
  1054. # ===========================================================================
  1055. # Email OTP send → verify end-to-end (coverage gap C3)
  1056. # ===========================================================================
  1057. class TestEmailOTPSendVerify:
  1058. """Full email OTP login: send code → verify code → JWT."""
  1059. @pytest.mark.asyncio
  1060. @pytest.mark.integration
  1061. async def test_email_otp_send_and_verify(self, async_client: AsyncClient, db_session: AsyncSession):
  1062. """login → POST /2fa/email/send (patched SMTP) → POST /2fa/verify → JWT."""
  1063. import re
  1064. from unittest.mock import AsyncMock, MagicMock, patch
  1065. from sqlalchemy import select as sa_select
  1066. token = await _setup_and_login(async_client, "emailsendok", "emailsendok1")
  1067. # Give the user an email address
  1068. result = await db_session.execute(sa_select(User).where(User.username == "emailsendok"))
  1069. user = result.scalar_one()
  1070. user.email = "emailsendok@example.com"
  1071. await db_session.commit()
  1072. # Enable email OTP via DB injection
  1073. setup_code = "444444"
  1074. setup_token = secrets.token_urlsafe(32)
  1075. db_session.add(
  1076. AuthEphemeralToken(
  1077. token=setup_token,
  1078. token_type="email_otp_setup",
  1079. username="emailsendok",
  1080. nonce=_pwd_context.hash(setup_code),
  1081. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1082. )
  1083. )
  1084. await db_session.commit()
  1085. await async_client.post(
  1086. "/api/v1/auth/2fa/email/enable/confirm",
  1087. json={"setup_token": setup_token, "code": setup_code},
  1088. headers=_auth_header(token),
  1089. )
  1090. # Login now requires 2FA — get pre_auth_token (cookie set automatically)
  1091. pre_auth_token = await _login_get_pre_auth_token(async_client, "emailsendok", "emailsendok1")
  1092. # Mock SMTP and capture the sent OTP code
  1093. captured: dict[str, str] = {}
  1094. smtp_settings_mock = MagicMock()
  1095. def _capture_email(smtp_settings, to_email, subject, body_text, body_html):
  1096. m = re.search(r"login code is: (\d{6})", body_text)
  1097. if m:
  1098. captured["otp"] = m.group(1)
  1099. with (
  1100. patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_settings_mock)),
  1101. patch("backend.app.api.routes.mfa.send_email", side_effect=_capture_email),
  1102. ):
  1103. send_resp = await async_client.post(
  1104. "/api/v1/auth/2fa/email/send",
  1105. json={"pre_auth_token": pre_auth_token},
  1106. )
  1107. assert send_resp.status_code == 200, send_resp.text
  1108. fresh_token = send_resp.json()["pre_auth_token"]
  1109. assert "otp" in captured, "send_email was not called or code not found in body"
  1110. # Verify with the captured OTP code — cookie still in the async_client jar
  1111. verify_resp = await async_client.post(
  1112. "/api/v1/auth/2fa/verify",
  1113. json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
  1114. )
  1115. assert verify_resp.status_code == 200
  1116. data = verify_resp.json()
  1117. assert "access_token" in data
  1118. assert data["user"]["username"] == "emailsendok"
  1119. @pytest.mark.asyncio
  1120. @pytest.mark.integration
  1121. async def test_email_otp_wrong_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  1122. """A wrong email OTP code must return 401 without burning the pre_auth_token."""
  1123. from unittest.mock import AsyncMock, MagicMock, patch
  1124. from sqlalchemy import select as sa_select
  1125. token = await _setup_and_login(async_client, "emailwrongcode", "emailwrongcode1")
  1126. result = await db_session.execute(sa_select(User).where(User.username == "emailwrongcode"))
  1127. user = result.scalar_one()
  1128. user.email = "emailwrongcode@example.com"
  1129. await db_session.commit()
  1130. setup_code = "555555"
  1131. setup_token = secrets.token_urlsafe(32)
  1132. db_session.add(
  1133. AuthEphemeralToken(
  1134. token=setup_token,
  1135. token_type="email_otp_setup",
  1136. username="emailwrongcode",
  1137. nonce=_pwd_context.hash(setup_code),
  1138. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1139. )
  1140. )
  1141. await db_session.commit()
  1142. await async_client.post(
  1143. "/api/v1/auth/2fa/email/enable/confirm",
  1144. json={"setup_token": setup_token, "code": setup_code},
  1145. headers=_auth_header(token),
  1146. )
  1147. pre_auth_token = await _login_get_pre_auth_token(async_client, "emailwrongcode", "emailwrongcode1")
  1148. captured: dict[str, str] = {}
  1149. smtp_mock = MagicMock()
  1150. def _capture(smtp_settings, to_email, subject, body_text, body_html):
  1151. import re
  1152. m = re.search(r"login code is: (\d{6})", body_text)
  1153. if m:
  1154. captured["otp"] = m.group(1)
  1155. with (
  1156. patch("backend.app.api.routes.mfa.get_smtp_settings", new=AsyncMock(return_value=smtp_mock)),
  1157. patch("backend.app.api.routes.mfa.send_email", side_effect=_capture),
  1158. ):
  1159. send_resp = await async_client.post(
  1160. "/api/v1/auth/2fa/email/send",
  1161. json={"pre_auth_token": pre_auth_token},
  1162. )
  1163. assert send_resp.status_code == 200
  1164. fresh_token = send_resp.json()["pre_auth_token"]
  1165. # Wrong code → 401
  1166. bad = await async_client.post(
  1167. "/api/v1/auth/2fa/verify",
  1168. json={"pre_auth_token": fresh_token, "method": "email", "code": "000000"},
  1169. )
  1170. assert bad.status_code == 401
  1171. # Correct code still works (token not burned by wrong attempt)
  1172. good = await async_client.post(
  1173. "/api/v1/auth/2fa/verify",
  1174. json={"pre_auth_token": fresh_token, "method": "email", "code": captured["otp"]},
  1175. )
  1176. assert good.status_code == 200
  1177. # ===========================================================================
  1178. # OIDC end-to-end (coverage gap C4)
  1179. # ===========================================================================
  1180. def _make_test_rsa_key():
  1181. """Generate a throwaway RSA key pair and a matching JWK set for tests."""
  1182. import base64
  1183. from cryptography.hazmat.primitives import serialization
  1184. from cryptography.hazmat.primitives.asymmetric import rsa
  1185. private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
  1186. private_pem = private_key.private_bytes(
  1187. serialization.Encoding.PEM,
  1188. serialization.PrivateFormat.TraditionalOpenSSL,
  1189. serialization.NoEncryption(),
  1190. )
  1191. pub_numbers = private_key.public_key().public_numbers()
  1192. def _b64url(n: int, length: int) -> str:
  1193. return base64.urlsafe_b64encode(n.to_bytes(length, "big")).rstrip(b"=").decode()
  1194. jwks = {
  1195. "keys": [
  1196. {
  1197. "kty": "RSA",
  1198. "use": "sig",
  1199. "alg": "RS256",
  1200. "kid": "test-kid-1",
  1201. "n": _b64url(pub_numbers.n, 256),
  1202. "e": _b64url(pub_numbers.e, 3),
  1203. }
  1204. ]
  1205. }
  1206. return private_pem, jwks
  1207. class TestOIDCEndToEnd:
  1208. """Full OIDC auth-code flow: state → callback (mocked IdP) → exchange → JWT."""
  1209. @pytest.mark.asyncio
  1210. @pytest.mark.integration
  1211. async def test_oidc_callback_creates_user_and_issues_jwt(self, async_client: AsyncClient, db_session: AsyncSession):
  1212. """callback validates the mocked ID token, creates a user, and redirects
  1213. with an oidc_exchange token; exchanging that token returns a full JWT."""
  1214. import time
  1215. from unittest.mock import patch
  1216. import jwt as pyjwt
  1217. private_pem, jwks_data = _make_test_rsa_key()
  1218. issuer = "https://idp.test.example.com"
  1219. client_id = "oidc-test-client"
  1220. nonce = secrets.token_urlsafe(16)
  1221. now = int(time.time())
  1222. id_token = pyjwt.encode(
  1223. {
  1224. "sub": "oidc-sub-e2e",
  1225. "iss": issuer,
  1226. "aud": client_id,
  1227. "nonce": nonce,
  1228. "email": "oidce2e@example.com",
  1229. "email_verified": True,
  1230. "iat": now,
  1231. "exp": now + 300,
  1232. },
  1233. private_pem,
  1234. algorithm="RS256",
  1235. headers={"kid": "test-kid-1"},
  1236. )
  1237. # Create OIDC provider
  1238. admin_token = await _setup_and_login(async_client, "oidce2eadm", "oidce2eadm1")
  1239. create_resp = await async_client.post(
  1240. "/api/v1/auth/oidc/providers",
  1241. json={
  1242. "name": "E2E-IdP",
  1243. "issuer_url": issuer,
  1244. "client_id": client_id,
  1245. "client_secret": "test-secret",
  1246. "scopes": "openid email profile",
  1247. "is_enabled": True,
  1248. "auto_create_users": True,
  1249. },
  1250. headers=_auth_header(admin_token),
  1251. )
  1252. assert create_resp.status_code == 201
  1253. provider_id = create_resp.json()["id"]
  1254. # Simulate the authorize step: insert an oidc_state token directly
  1255. state = secrets.token_urlsafe(32)
  1256. code_verifier = secrets.token_urlsafe(48)
  1257. db_session.add(
  1258. AuthEphemeralToken(
  1259. token=state,
  1260. token_type="oidc_state",
  1261. provider_id=provider_id,
  1262. nonce=nonce,
  1263. code_verifier=code_verifier,
  1264. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  1265. )
  1266. )
  1267. await db_session.commit()
  1268. # Mock httpx calls made inside oidc_callback
  1269. discovery_doc = {
  1270. "issuer": issuer,
  1271. "authorization_endpoint": f"{issuer}/auth",
  1272. "token_endpoint": f"{issuer}/token",
  1273. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  1274. }
  1275. token_response = {
  1276. "access_token": "mock-access",
  1277. "token_type": "Bearer",
  1278. "id_token": id_token,
  1279. }
  1280. class _MockResp:
  1281. def __init__(self, data):
  1282. self._data = data
  1283. self.status_code = 200
  1284. self.is_success = True
  1285. self.text = str(data)
  1286. def json(self):
  1287. return self._data
  1288. def raise_for_status(self):
  1289. pass
  1290. class _MockHttpxClient:
  1291. def __init__(self, *args, **kwargs):
  1292. pass
  1293. async def __aenter__(self):
  1294. return self
  1295. async def __aexit__(self, *args):
  1296. pass
  1297. async def get(self, url, **kwargs):
  1298. if "jwks" in url:
  1299. return _MockResp(jwks_data)
  1300. return _MockResp(discovery_doc)
  1301. async def post(self, url, **kwargs):
  1302. return _MockResp(token_response)
  1303. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  1304. callback_resp = await async_client.get(
  1305. f"/api/v1/auth/oidc/callback?code=test-auth-code&state={state}",
  1306. follow_redirects=False,
  1307. )
  1308. assert callback_resp.status_code == 302, callback_resp.text
  1309. location = callback_resp.headers.get("location", "")
  1310. assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"
  1311. # Extract and exchange the oidc_exchange token
  1312. oidc_exchange_token = location.split("oidc_token=")[1].split("&")[0]
  1313. exchange_resp = await async_client.post(
  1314. "/api/v1/auth/oidc/exchange",
  1315. json={"oidc_token": oidc_exchange_token},
  1316. )
  1317. assert exchange_resp.status_code == 200
  1318. data = exchange_resp.json()
  1319. assert "access_token" in data
  1320. assert data["user"]["username"] is not None
  1321. @pytest.mark.asyncio
  1322. @pytest.mark.integration
  1323. async def test_oidc_callback_invalid_state_redirects_error(self, async_client: AsyncClient):
  1324. """An unknown state token must redirect to /?oidc_error=invalid_state."""
  1325. resp = await async_client.get(
  1326. "/api/v1/auth/oidc/callback?code=x&state=totally-bogus-state",
  1327. follow_redirects=False,
  1328. )
  1329. assert resp.status_code == 302
  1330. assert "invalid_state" in resp.headers.get("location", "")
  1331. @pytest.mark.asyncio
  1332. @pytest.mark.integration
  1333. async def test_oidc_state_is_single_use(self, async_client: AsyncClient, db_session: AsyncSession):
  1334. """Replaying the same state token must fail on the second callback."""
  1335. import time
  1336. from unittest.mock import patch
  1337. import jwt as pyjwt
  1338. private_pem, jwks_data = _make_test_rsa_key()
  1339. issuer = "https://idp2.test.example.com"
  1340. client_id = "oidc-client-2"
  1341. nonce = secrets.token_urlsafe(16)
  1342. now = int(time.time())
  1343. id_token = pyjwt.encode(
  1344. {
  1345. "sub": "sub-single-use",
  1346. "iss": issuer,
  1347. "aud": client_id,
  1348. "nonce": nonce,
  1349. "email": "su@example.com",
  1350. "email_verified": True,
  1351. "iat": now,
  1352. "exp": now + 300,
  1353. },
  1354. private_pem,
  1355. algorithm="RS256",
  1356. headers={"kid": "test-kid-1"},
  1357. )
  1358. admin_token = await _setup_and_login(async_client, "oidcsuadm", "oidcsuadm1")
  1359. cr = await async_client.post(
  1360. "/api/v1/auth/oidc/providers",
  1361. json={
  1362. "name": "SU-IdP",
  1363. "issuer_url": issuer,
  1364. "client_id": client_id,
  1365. "client_secret": "s",
  1366. "scopes": "openid",
  1367. "is_enabled": True,
  1368. "auto_create_users": True,
  1369. },
  1370. headers=_auth_header(admin_token),
  1371. )
  1372. provider_id = cr.json()["id"]
  1373. state = secrets.token_urlsafe(32)
  1374. db_session.add(
  1375. AuthEphemeralToken(
  1376. token=state,
  1377. token_type="oidc_state",
  1378. provider_id=provider_id,
  1379. nonce=nonce,
  1380. code_verifier=secrets.token_urlsafe(48),
  1381. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  1382. )
  1383. )
  1384. await db_session.commit()
  1385. discovery_doc = {
  1386. "issuer": issuer,
  1387. "authorization_endpoint": f"{issuer}/auth",
  1388. "token_endpoint": f"{issuer}/token",
  1389. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  1390. }
  1391. token_response = {"access_token": "a", "token_type": "Bearer", "id_token": id_token}
  1392. class _MockResp:
  1393. def __init__(self, data):
  1394. self._data = data
  1395. self.status_code = 200
  1396. self.is_success = True
  1397. self.text = str(data)
  1398. def json(self):
  1399. return self._data
  1400. def raise_for_status(self):
  1401. pass
  1402. class _MockHttpxClient:
  1403. def __init__(self, *a, **kw):
  1404. pass
  1405. async def __aenter__(self):
  1406. return self
  1407. async def __aexit__(self, *a):
  1408. pass
  1409. async def get(self, url, **kw):
  1410. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  1411. async def post(self, url, **kw):
  1412. return _MockResp(token_response)
  1413. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  1414. first = await async_client.get(
  1415. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  1416. follow_redirects=False,
  1417. )
  1418. assert first.status_code == 302
  1419. assert "oidc_token=" in first.headers.get("location", "")
  1420. # Replay: second callback with the same state must fail
  1421. second = await async_client.get(
  1422. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  1423. follow_redirects=False,
  1424. )
  1425. assert second.status_code == 302
  1426. assert "invalid_state" in second.headers.get("location", "")
  1427. # ===========================================================================
  1428. # H-2: Wrong code must NOT consume the email OTP setup token (peek-then-consume)
  1429. # ===========================================================================
  1430. class TestEmailOTPSetupTokenPreservedOnWrongCode:
  1431. """After H-2 fix: a wrong code leaves the setup token intact so the user can retry."""
  1432. @pytest.mark.asyncio
  1433. @pytest.mark.integration
  1434. async def test_wrong_code_does_not_consume_setup_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1435. """Wrong code returns 400 but the setup token survives; correct code then works."""
  1436. token = await _setup_and_login(async_client, "h2retryuser", "h2retrypass1")
  1437. code = "999999"
  1438. code_hash = _pwd_context.hash(code)
  1439. setup_token = secrets.token_urlsafe(32)
  1440. db_session.add(
  1441. AuthEphemeralToken(
  1442. token=setup_token,
  1443. token_type="email_otp_setup",
  1444. username="h2retryuser",
  1445. nonce=code_hash,
  1446. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1447. )
  1448. )
  1449. await db_session.commit()
  1450. # First attempt: wrong code → 400
  1451. wrong = await async_client.post(
  1452. "/api/v1/auth/2fa/email/enable/confirm",
  1453. json={"setup_token": setup_token, "code": "000000"},
  1454. headers=_auth_header(token),
  1455. )
  1456. assert wrong.status_code == 400
  1457. # Second attempt: correct code → must succeed (token was NOT consumed)
  1458. correct = await async_client.post(
  1459. "/api/v1/auth/2fa/email/enable/confirm",
  1460. json={"setup_token": setup_token, "code": code},
  1461. headers=_auth_header(token),
  1462. )
  1463. assert correct.status_code == 200
  1464. # ===========================================================================
  1465. # M-2: New OIDC provider must default to auto_link_existing_accounts=False
  1466. # ===========================================================================
  1467. class TestOIDCProviderAutoLinkDefault:
  1468. """auto_link_existing_accounts must default to False (M-2 fix)."""
  1469. @pytest.mark.asyncio
  1470. @pytest.mark.integration
  1471. async def test_new_provider_auto_link_defaults_to_false(self, async_client: AsyncClient):
  1472. token = await _setup_and_login(async_client, "m2autolinkadmin", "m2autolinkadmin1")
  1473. resp = await async_client.post(
  1474. "/api/v1/auth/oidc/providers",
  1475. json={
  1476. "name": "AutoLinkTest",
  1477. "issuer_url": "https://autolink.example.com",
  1478. "client_id": "alc",
  1479. "client_secret": "als",
  1480. "scopes": "openid",
  1481. "is_enabled": True,
  1482. "auto_create_users": False,
  1483. # auto_link_existing_accounts intentionally omitted
  1484. },
  1485. headers=_auth_header(token),
  1486. )
  1487. assert resp.status_code == 201
  1488. assert resp.json()["auto_link_existing_accounts"] is False
  1489. # ===========================================================================
  1490. # L-5: 2FA verify code format validation
  1491. # ===========================================================================
  1492. class TestTwoFAVerifyCodeFormat:
  1493. """TwoFAVerifyRequest.code must be 6–8 alphanumeric characters (L-5)."""
  1494. @pytest.mark.asyncio
  1495. @pytest.mark.integration
  1496. async def test_code_too_long_rejected(self, async_client: AsyncClient):
  1497. """code > 8 characters must be rejected with 422."""
  1498. resp = await async_client.post(
  1499. "/api/v1/auth/2fa/verify",
  1500. json={"pre_auth_token": "anytoken", "code": "1" * 9, "method": "totp"},
  1501. )
  1502. assert resp.status_code == 422
  1503. @pytest.mark.asyncio
  1504. @pytest.mark.integration
  1505. async def test_code_non_alphanumeric_rejected(self, async_client: AsyncClient):
  1506. """code containing non-alphanumeric chars must be rejected with 422."""
  1507. resp = await async_client.post(
  1508. "/api/v1/auth/2fa/verify",
  1509. json={"pre_auth_token": "anytoken", "code": "12-456", "method": "totp"},
  1510. )
  1511. assert resp.status_code == 422
  1512. @pytest.mark.asyncio
  1513. @pytest.mark.integration
  1514. async def test_code_too_short_rejected(self, async_client: AsyncClient):
  1515. """code < 6 characters must be rejected with 422."""
  1516. resp = await async_client.post(
  1517. "/api/v1/auth/2fa/verify",
  1518. json={"pre_auth_token": "anytoken", "code": "12345", "method": "totp"},
  1519. )
  1520. assert resp.status_code == 422
  1521. @pytest.mark.asyncio
  1522. @pytest.mark.integration
  1523. async def test_code_exactly_6_passes_schema(self, async_client: AsyncClient):
  1524. """6-character alphanumeric code passes schema (may fail 2FA logic with 400)."""
  1525. resp = await async_client.post(
  1526. "/api/v1/auth/2fa/verify",
  1527. json={"pre_auth_token": "x" * 32, "code": "123456", "method": "totp"},
  1528. )
  1529. assert resp.status_code != 422
  1530. @pytest.mark.asyncio
  1531. @pytest.mark.integration
  1532. async def test_code_exactly_8_passes_schema(self, async_client: AsyncClient):
  1533. """8-character alphanumeric backup code passes schema."""
  1534. resp = await async_client.post(
  1535. "/api/v1/auth/2fa/verify",
  1536. json={"pre_auth_token": "x" * 32, "code": "ABCD1234", "method": "backup"},
  1537. )
  1538. assert resp.status_code != 422
  1539. # ===========================================================================
  1540. # M-NEW-1: verify_slicer_download_token must NOT consume token on wrong resource
  1541. # ===========================================================================
  1542. class TestSlicerTokenResourceBinding:
  1543. """Token for resource A must survive a wrong-resource check and still work for A."""
  1544. @pytest.mark.asyncio
  1545. @pytest.mark.integration
  1546. async def test_wrong_resource_does_not_consume_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1547. """A slicer token bound to archive:5 must NOT be consumed when checked against archive:6."""
  1548. from datetime import datetime, timedelta, timezone
  1549. from backend.app.core.auth import verify_slicer_download_token
  1550. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  1551. now = datetime.now(timezone.utc)
  1552. token_val = secrets.token_urlsafe(24)
  1553. db_session.add(
  1554. AuthEphemeralToken(
  1555. token=token_val,
  1556. token_type="slicer_download",
  1557. nonce="archive:5",
  1558. expires_at=now + timedelta(minutes=5),
  1559. )
  1560. )
  1561. await db_session.commit()
  1562. # Wrong resource → must return False and NOT consume the token
  1563. wrong = await verify_slicer_download_token(token_val, "archive", 6)
  1564. assert wrong is False
  1565. # Correct resource → must return True (token survived the wrong-resource check)
  1566. correct = await verify_slicer_download_token(token_val, "archive", 5)
  1567. assert correct is True
  1568. @pytest.mark.asyncio
  1569. @pytest.mark.integration
  1570. async def test_correct_resource_consumes_token(self, async_client: AsyncClient, db_session: AsyncSession):
  1571. """A slicer token is single-use: second correct-resource check must return False."""
  1572. from datetime import datetime, timedelta, timezone
  1573. from backend.app.core.auth import verify_slicer_download_token
  1574. from backend.app.models.auth_ephemeral import AuthEphemeralToken
  1575. now = datetime.now(timezone.utc)
  1576. token_val = secrets.token_urlsafe(24)
  1577. db_session.add(
  1578. AuthEphemeralToken(
  1579. token=token_val,
  1580. token_type="slicer_download",
  1581. nonce="library:99",
  1582. expires_at=now + timedelta(minutes=5),
  1583. )
  1584. )
  1585. await db_session.commit()
  1586. first = await verify_slicer_download_token(token_val, "library", 99)
  1587. assert first is True
  1588. second = await verify_slicer_download_token(token_val, "library", 99)
  1589. assert second is False
  1590. # ===========================================================================
  1591. # M-NEW-3 / L-NEW-1: Schema length validation for change-password & forgot-password
  1592. # ===========================================================================
  1593. class TestSchemaLengthValidationR2:
  1594. """Input length limits added in review round 2."""
  1595. @pytest.mark.asyncio
  1596. @pytest.mark.integration
  1597. async def test_change_password_current_too_long_rejected(self, async_client: AsyncClient):
  1598. """current_password > 256 chars must be rejected with 422 (prevents pbkdf2 DoS)."""
  1599. resp = await async_client.post(
  1600. "/api/v1/users/me/change-password",
  1601. json={"current_password": "x" * 257, "new_password": "ValidPass1!"},
  1602. )
  1603. assert resp.status_code == 422
  1604. @pytest.mark.asyncio
  1605. @pytest.mark.integration
  1606. async def test_forgot_password_email_too_long_rejected(self, async_client: AsyncClient):
  1607. """email > 254 chars must be rejected with 422."""
  1608. resp = await async_client.post(
  1609. "/api/v1/auth/forgot-password",
  1610. json={"email": "a" * 243 + "@example.com"},
  1611. )
  1612. assert resp.status_code == 422
  1613. @pytest.mark.asyncio
  1614. @pytest.mark.integration
  1615. async def test_forgot_password_email_at_limit_passes_schema(self, async_client: AsyncClient):
  1616. """Short email passes schema (may return 400/200 from business logic)."""
  1617. resp = await async_client.post(
  1618. "/api/v1/auth/forgot-password",
  1619. json={"email": "user@example.com"},
  1620. )
  1621. assert resp.status_code != 422
  1622. # ===========================================================================
  1623. # L-NEW-2: TOTPSetupRequest.code max_length
  1624. # ===========================================================================
  1625. class TestTOTPSetupCodeMaxLength:
  1626. """TOTPSetupRequest.code must be bounded (L-NEW-2)."""
  1627. @pytest.mark.asyncio
  1628. @pytest.mark.integration
  1629. async def test_setup_code_too_long_rejected(self, async_client: AsyncClient):
  1630. """code > 8 chars must be rejected with 422."""
  1631. import pyotp as _pyotp
  1632. token = await _setup_and_login(async_client, "totp_setup_maxlen", "totp_setup_maxlen1")
  1633. # Enable TOTP so the setup-code guard path is active
  1634. setup_resp = await async_client.post("/api/v1/auth/2fa/totp/setup", headers=_auth_header(token))
  1635. secret = setup_resp.json()["secret"]
  1636. await async_client.post(
  1637. "/api/v1/auth/2fa/totp/enable",
  1638. json={"code": _pyotp.TOTP(secret).now()},
  1639. headers=_auth_header(token),
  1640. )
  1641. resp = await async_client.post(
  1642. "/api/v1/auth/2fa/totp/setup",
  1643. json={"code": "1" * 9},
  1644. headers=_auth_header(token),
  1645. )
  1646. assert resp.status_code == 422
  1647. # ===========================================================================
  1648. # L-NEW-3: EmailOTPEnableConfirmRequest.code must be exactly 6 digits
  1649. # ===========================================================================
  1650. class TestEmailOTPConfirmCodeFormat:
  1651. """EmailOTPEnableConfirmRequest.code must be 6 digits (L-NEW-3)."""
  1652. @pytest.mark.asyncio
  1653. @pytest.mark.integration
  1654. async def test_non_digit_code_rejected(self, async_client: AsyncClient):
  1655. """Alpha characters in the email OTP confirm code must be rejected with 422."""
  1656. token = await _setup_and_login(async_client, "emailotpfmt", "emailotpfmt1")
  1657. resp = await async_client.post(
  1658. "/api/v1/auth/2fa/email/enable/confirm",
  1659. json={"setup_token": "x" * 32, "code": "ABCDEF"},
  1660. headers=_auth_header(token),
  1661. )
  1662. assert resp.status_code == 422
  1663. @pytest.mark.asyncio
  1664. @pytest.mark.integration
  1665. async def test_seven_digit_code_rejected(self, async_client: AsyncClient):
  1666. """7-digit code must be rejected with 422 (min_length=max_length=6)."""
  1667. token = await _setup_and_login(async_client, "emailotplen7", "emailotplen7x")
  1668. resp = await async_client.post(
  1669. "/api/v1/auth/2fa/email/enable/confirm",
  1670. json={"setup_token": "x" * 32, "code": "1234567"},
  1671. headers=_auth_header(token),
  1672. )
  1673. assert resp.status_code == 422
  1674. @pytest.mark.asyncio
  1675. @pytest.mark.integration
  1676. async def test_valid_six_digit_code_passes_schema(self, async_client: AsyncClient):
  1677. """6-digit numeric code passes schema (may return 400 on bad token — that's fine)."""
  1678. token = await _setup_and_login(async_client, "emailotpfmt6", "emailotpfmt6x")
  1679. resp = await async_client.post(
  1680. "/api/v1/auth/2fa/email/enable/confirm",
  1681. json={"setup_token": "x" * 32, "code": "123456"},
  1682. headers=_auth_header(token),
  1683. )
  1684. assert resp.status_code != 422
  1685. # ===========================================================================
  1686. # L-NEW-4: OIDCProviderCreate field max_length constraints
  1687. # ===========================================================================
  1688. class TestOIDCProviderFieldLengths:
  1689. """OIDCProviderCreate fields must reject inputs exceeding max_length (L-NEW-4)."""
  1690. @pytest.mark.asyncio
  1691. @pytest.mark.integration
  1692. async def test_name_too_long_rejected(self, async_client: AsyncClient):
  1693. token = await _setup_and_login(async_client, "oidcfldadmin", "oidcfldadmin1")
  1694. resp = await async_client.post(
  1695. "/api/v1/auth/oidc/providers",
  1696. json={
  1697. "name": "n" * 101,
  1698. "issuer_url": "https://test.example.com",
  1699. "client_id": "cid",
  1700. "client_secret": "csec",
  1701. "scopes": "openid",
  1702. },
  1703. headers=_auth_header(token),
  1704. )
  1705. assert resp.status_code == 422
  1706. @pytest.mark.asyncio
  1707. @pytest.mark.integration
  1708. async def test_client_secret_too_long_rejected(self, async_client: AsyncClient):
  1709. token = await _setup_and_login(async_client, "oidcseclen", "oidcseclen123")
  1710. resp = await async_client.post(
  1711. "/api/v1/auth/oidc/providers",
  1712. json={
  1713. "name": "ValidName",
  1714. "issuer_url": "https://test.example.com",
  1715. "client_id": "cid",
  1716. "client_secret": "s" * 513,
  1717. "scopes": "openid",
  1718. },
  1719. headers=_auth_header(token),
  1720. )
  1721. assert resp.status_code == 422
  1722. # ---------------------------------------------------------------------------
  1723. # M-NEW-4 / M-NEW-5 / L-NEW-5: UserCreate & UserUpdate field length limits
  1724. # ---------------------------------------------------------------------------
  1725. class TestUserCreateUpdateFieldLengths:
  1726. """UserCreate and UserUpdate must enforce max_length on username, password, email."""
  1727. @pytest.fixture
  1728. async def admin_token(self, async_client: AsyncClient) -> str:
  1729. return await _setup_and_login(async_client, "ucfldadmin", "ucfldadmin1!")
  1730. @pytest.mark.asyncio
  1731. @pytest.mark.integration
  1732. async def test_create_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1733. resp = await async_client.post(
  1734. "/api/v1/users/",
  1735. json={
  1736. "username": "u" * 151,
  1737. "password": "ValidPass1!",
  1738. "role": "user",
  1739. },
  1740. headers=_auth_header(admin_token),
  1741. )
  1742. assert resp.status_code == 422
  1743. @pytest.mark.asyncio
  1744. @pytest.mark.integration
  1745. async def test_create_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1746. resp = await async_client.post(
  1747. "/api/v1/users/",
  1748. json={
  1749. "username": "newuserX",
  1750. "password": "A1!" + "x" * 254,
  1751. "role": "user",
  1752. },
  1753. headers=_auth_header(admin_token),
  1754. )
  1755. assert resp.status_code == 422
  1756. @pytest.mark.asyncio
  1757. @pytest.mark.integration
  1758. async def test_create_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1759. resp = await async_client.post(
  1760. "/api/v1/users/",
  1761. json={
  1762. "username": "newuserY",
  1763. "password": "ValidPass1!",
  1764. "email": "a" * 246 + "@x.com", # total 253 chars -> fine; 248+@x.com=255 -> too long
  1765. "role": "user",
  1766. },
  1767. headers=_auth_header(admin_token),
  1768. )
  1769. # 248 'a' + '@x.com' (6) = 254 chars — just at limit, should pass
  1770. # Use 249 + '@x.com' = 255 chars to trigger the 422
  1771. assert resp.status_code in (201, 422) # boundary sanity check
  1772. @pytest.mark.asyncio
  1773. @pytest.mark.integration
  1774. async def test_create_email_exceeds_limit_rejected(self, async_client: AsyncClient, admin_token: str):
  1775. resp = await async_client.post(
  1776. "/api/v1/users/",
  1777. json={
  1778. "username": "newuserZ",
  1779. "password": "ValidPass1!",
  1780. "email": "a" * 249 + "@x.com", # 255 chars — exceeds RFC 5321 max of 254
  1781. "role": "user",
  1782. },
  1783. headers=_auth_header(admin_token),
  1784. )
  1785. assert resp.status_code == 422
  1786. @pytest.mark.asyncio
  1787. @pytest.mark.integration
  1788. async def test_update_username_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1789. # Create a user first
  1790. create_resp = await async_client.post(
  1791. "/api/v1/users/",
  1792. json={"username": "updusr1", "password": "ValidPass1!", "role": "user"},
  1793. headers=_auth_header(admin_token),
  1794. )
  1795. assert create_resp.status_code == 201
  1796. user_id = create_resp.json()["id"]
  1797. resp = await async_client.patch(
  1798. f"/api/v1/users/{user_id}",
  1799. json={"username": "u" * 151},
  1800. headers=_auth_header(admin_token),
  1801. )
  1802. assert resp.status_code == 422
  1803. @pytest.mark.asyncio
  1804. @pytest.mark.integration
  1805. async def test_update_password_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1806. create_resp = await async_client.post(
  1807. "/api/v1/users/",
  1808. json={"username": "updusr2", "password": "ValidPass1!", "role": "user"},
  1809. headers=_auth_header(admin_token),
  1810. )
  1811. assert create_resp.status_code == 201
  1812. user_id = create_resp.json()["id"]
  1813. resp = await async_client.patch(
  1814. f"/api/v1/users/{user_id}",
  1815. json={"password": "A1!" + "x" * 254},
  1816. headers=_auth_header(admin_token),
  1817. )
  1818. assert resp.status_code == 422
  1819. @pytest.mark.asyncio
  1820. @pytest.mark.integration
  1821. async def test_update_email_too_long_rejected(self, async_client: AsyncClient, admin_token: str):
  1822. create_resp = await async_client.post(
  1823. "/api/v1/users/",
  1824. json={"username": "updusr3", "password": "ValidPass1!", "role": "user"},
  1825. headers=_auth_header(admin_token),
  1826. )
  1827. assert create_resp.status_code == 201
  1828. user_id = create_resp.json()["id"]
  1829. resp = await async_client.patch(
  1830. f"/api/v1/users/{user_id}",
  1831. json={"email": "a" * 249 + "@x.com"}, # 255 chars
  1832. headers=_auth_header(admin_token),
  1833. )
  1834. assert resp.status_code == 422
  1835. # ---------------------------------------------------------------------------
  1836. # L-NEW-6: per-IP rate limiting on /forgot-password
  1837. # ---------------------------------------------------------------------------
  1838. _SMTP_DATA_FOR_IPLIMIT = {
  1839. "smtp_host": "smtp.test.com",
  1840. "smtp_port": 587,
  1841. "smtp_username": "test@test.com",
  1842. "smtp_password": "testpass",
  1843. "smtp_security": "starttls",
  1844. "smtp_auth_enabled": True,
  1845. "smtp_from_email": "noreply@test.com",
  1846. }
  1847. class TestForgotPasswordPerIpRateLimit:
  1848. """POST /forgot-password must enforce a per-IP cap (L-NEW-6).
  1849. The test sends 11 requests from the simulated test-client IP using 11
  1850. different email addresses (so the per-email bucket is never exhausted).
  1851. The 11th request must be rejected with 429.
  1852. """
  1853. @pytest.fixture
  1854. async def advanced_auth_token(self, async_client: AsyncClient) -> str:
  1855. """Set up auth, SMTP, and enable advanced auth; return admin token."""
  1856. token = await _setup_and_login(async_client, "iprladmin", "iprladmin1!")
  1857. headers = _auth_header(token)
  1858. await async_client.post("/api/v1/auth/smtp", headers=headers, json=_SMTP_DATA_FOR_IPLIMIT)
  1859. await async_client.post("/api/v1/auth/advanced-auth/enable", headers=headers)
  1860. return token
  1861. @pytest.mark.asyncio
  1862. @pytest.mark.integration
  1863. async def test_per_ip_limit_triggers_429(self, async_client: AsyncClient, advanced_auth_token: str):
  1864. # Send 11 requests from the same test-client IP using unique email
  1865. # addresses so the per-email bucket (limit=3) is never exhausted.
  1866. responses = []
  1867. for i in range(11):
  1868. resp = await async_client.post(
  1869. "/api/v1/auth/forgot-password",
  1870. json={"email": f"unique{i}@example.com"},
  1871. )
  1872. responses.append(resp.status_code)
  1873. # First 10 must not be rate-limited by the IP bucket
  1874. for code in responses[:10]:
  1875. assert code != 429, f"Unexpected 429 before limit reached: {responses}"
  1876. # The 11th must be rate-limited
  1877. assert responses[10] == 429, f"Expected 429 on 11th request, got {responses[10]}"
  1878. # ---------------------------------------------------------------------------
  1879. # M-NEW-6: OIDC auto-link must be rejected if target user already has an
  1880. # OIDC link to a different provider
  1881. # ---------------------------------------------------------------------------
  1882. class TestOIDCAutoLinkExistingLinkRejection:
  1883. """OIDC callback must reject auto-linking when the email-matched user
  1884. already has an OIDC link to a different provider (M-NEW-6)."""
  1885. @pytest.mark.asyncio
  1886. @pytest.mark.integration
  1887. async def test_auto_link_rejected_when_user_already_linked(
  1888. self, async_client: AsyncClient, db_session: AsyncSession
  1889. ):
  1890. """Auto-link via email-match is rejected when the target user is
  1891. already linked to another OIDC provider."""
  1892. import base64
  1893. import hashlib
  1894. from unittest.mock import AsyncMock, MagicMock, patch
  1895. from backend.app.core.auth import get_password_hash
  1896. from backend.app.models.oidc_provider import OIDCProvider, UserOIDCLink
  1897. from backend.app.models.user import User
  1898. # ── 1. Target user with a known email ────────────────────────────
  1899. target = User(
  1900. username="oidcALTarget",
  1901. email="alinktest@example.com",
  1902. auth_source="oidc",
  1903. password_hash=get_password_hash(secrets.token_urlsafe(16)),
  1904. role="user",
  1905. is_active=True,
  1906. )
  1907. db_session.add(target)
  1908. await db_session.flush()
  1909. # ── 2. Provider B — legitimate, already linked to target ──────────
  1910. prov_b = OIDCProvider(
  1911. name="ProvB_m6test",
  1912. issuer_url="https://providerb-m6.example.com",
  1913. client_id="client_b",
  1914. _client_secret_enc="secret_b",
  1915. scopes="openid email profile",
  1916. is_enabled=True,
  1917. auto_link_existing_accounts=False,
  1918. auto_create_users=False,
  1919. )
  1920. db_session.add(prov_b)
  1921. await db_session.flush()
  1922. db_session.add(
  1923. UserOIDCLink(
  1924. user_id=target.id,
  1925. provider_id=prov_b.id,
  1926. provider_user_id="legitimate_sub",
  1927. provider_email="alinktest@example.com",
  1928. )
  1929. )
  1930. # ── 3. Provider A — attacker-controlled, auto_link=True ───────────
  1931. prov_a = OIDCProvider(
  1932. name="ProvA_m6test",
  1933. issuer_url="https://providera-m6.example.com",
  1934. client_id="client_a",
  1935. _client_secret_enc="secret_a",
  1936. scopes="openid email profile",
  1937. is_enabled=True,
  1938. auto_link_existing_accounts=True,
  1939. auto_create_users=False,
  1940. )
  1941. db_session.add(prov_a)
  1942. await db_session.flush()
  1943. # ── 4. OIDC state for Provider A ──────────────────────────────────
  1944. state = secrets.token_urlsafe(32)
  1945. nonce = secrets.token_urlsafe(32)
  1946. code_verifier = secrets.token_urlsafe(48)
  1947. db_session.add(
  1948. AuthEphemeralToken(
  1949. token=state,
  1950. token_type="oidc_state",
  1951. provider_id=prov_a.id,
  1952. nonce=nonce,
  1953. code_verifier=code_verifier,
  1954. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  1955. )
  1956. )
  1957. await db_session.commit()
  1958. # ── 5. Mock HTTP + JWT so the callback can reach the auto-link check ─
  1959. fake_discovery = {
  1960. "issuer": "https://providera-m6.example.com",
  1961. "token_endpoint": "https://providera-m6.example.com/token",
  1962. "jwks_uri": "https://providera-m6.example.com/jwks",
  1963. }
  1964. fake_token = {"access_token": "acc_tok", "id_token": "fake.id.token"}
  1965. fake_claims = {
  1966. "sub": "attacker_sub_unique",
  1967. "email": "alinktest@example.com",
  1968. "email_verified": True,
  1969. "nonce": nonce,
  1970. "iss": "https://providera-m6.example.com",
  1971. "aud": "client_a",
  1972. "exp": 9_999_999_999,
  1973. }
  1974. disc_resp = AsyncMock()
  1975. disc_resp.raise_for_status = MagicMock()
  1976. disc_resp.json = MagicMock(return_value=fake_discovery)
  1977. token_resp = AsyncMock()
  1978. token_resp.ok = True
  1979. token_resp.json = MagicMock(return_value=fake_token)
  1980. jwks_resp = AsyncMock()
  1981. jwks_resp.raise_for_status = MagicMock()
  1982. jwks_resp.json = MagicMock(return_value={})
  1983. mock_http = AsyncMock()
  1984. mock_http.get = AsyncMock(side_effect=[disc_resp, jwks_resp])
  1985. mock_http.post = AsyncMock(return_value=token_resp)
  1986. mock_signing_key = MagicMock()
  1987. mock_signing_key.key = "fake_key"
  1988. with (
  1989. patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_httpx_cls,
  1990. patch("backend.app.api.routes.mfa.jwt.decode", return_value=fake_claims),
  1991. patch("backend.app.api.routes.mfa.PyJWKClient") as mock_jwks_cls,
  1992. ):
  1993. mock_httpx_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
  1994. mock_httpx_cls.return_value.__aexit__ = AsyncMock(return_value=False)
  1995. mock_jwks_cls.return_value.get_signing_key_from_jwt.return_value = mock_signing_key
  1996. resp = await async_client.get(
  1997. f"/api/v1/auth/oidc/callback?code=fake_code&state={state}",
  1998. follow_redirects=False,
  1999. )
  2000. # M-NEW-6: must redirect with no_linked_account — NOT create a second link
  2001. assert resp.status_code == 302
  2002. location = resp.headers.get("location", "")
  2003. assert "no_linked_account" in location, f"Expected no_linked_account in redirect, got: {location}"
  2004. # Verify no second OIDC link was created for Provider A
  2005. from sqlalchemy import select as sa_select
  2006. from backend.app.models.oidc_provider import UserOIDCLink as _UOL
  2007. async with db_session as s:
  2008. links_result = await s.execute(
  2009. sa_select(_UOL).where(_UOL.user_id == target.id, _UOL.provider_id == prov_a.id)
  2010. )
  2011. assert links_result.scalar_one_or_none() is None, "No link to Provider A must exist"
  2012. # ===========================================================================
  2013. # Test Gap 1: OIDC state token is single-use — replay must be rejected
  2014. # ===========================================================================
  2015. class TestOIDCStateReplay:
  2016. """OIDC state token must be consumed on first use; a second callback with
  2017. the same state must redirect to ``?oidc_error=invalid_state``."""
  2018. @pytest.mark.asyncio
  2019. @pytest.mark.integration
  2020. async def test_state_replay_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2021. """Replaying a consumed OIDC state token must return invalid_state."""
  2022. from backend.app.models.oidc_provider import OIDCProvider
  2023. # ── 1. Seed a minimal provider ────────────────────────────────────
  2024. provider = OIDCProvider(
  2025. name="StateReplayIdP",
  2026. issuer_url="https://statereplay-idp.example.com",
  2027. client_id="client_replay",
  2028. _client_secret_enc="secret_replay",
  2029. scopes="openid",
  2030. is_enabled=True,
  2031. auto_link_existing_accounts=False,
  2032. auto_create_users=False,
  2033. )
  2034. db_session.add(provider)
  2035. await db_session.flush()
  2036. # ── 2. Seed an OIDC state token ───────────────────────────────────
  2037. state = secrets.token_urlsafe(32)
  2038. db_session.add(
  2039. AuthEphemeralToken(
  2040. token=state,
  2041. token_type="oidc_state",
  2042. provider_id=provider.id,
  2043. nonce=secrets.token_urlsafe(32),
  2044. code_verifier=secrets.token_urlsafe(48),
  2045. expires_at=datetime.now(timezone.utc) + timedelta(minutes=10),
  2046. )
  2047. )
  2048. await db_session.commit()
  2049. # ── 3. First callback — discovery will fail (no real IdP), but the
  2050. # state token is atomically consumed (DELETE…RETURNING + commit)
  2051. # before the HTTP call is attempted.
  2052. first = await async_client.get(
  2053. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2054. follow_redirects=False,
  2055. )
  2056. assert first.status_code == 302
  2057. # The first call may fail for any reason except invalid_state
  2058. assert "invalid_state" not in first.headers.get("location", ""), (
  2059. f"First call should NOT get invalid_state: {first.headers.get('location')}"
  2060. )
  2061. # ── 4. Second callback with the same state → must be invalid_state ─
  2062. second = await async_client.get(
  2063. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2064. follow_redirects=False,
  2065. )
  2066. assert second.status_code == 302
  2067. assert "invalid_state" in second.headers.get("location", ""), (
  2068. f"Replayed state must redirect to invalid_state, got: {second.headers.get('location')}"
  2069. )
  2070. # ===========================================================================
  2071. # Test Gap 2: OIDC iss claim mismatch must redirect to token_validation_failed
  2072. # ===========================================================================
  2073. class TestOIDCIssMismatch:
  2074. """JWT whose iss claim does not match the discovery issuer must be rejected."""
  2075. @pytest.mark.asyncio
  2076. @pytest.mark.integration
  2077. async def test_iss_mismatch_redirects_token_validation_failed(
  2078. self, async_client: AsyncClient, db_session: AsyncSession
  2079. ):
  2080. import time
  2081. from unittest.mock import patch
  2082. import jwt as pyjwt
  2083. private_pem, jwks_data = _make_test_rsa_key()
  2084. correct_issuer = "https://correct-iss.example.com"
  2085. wrong_issuer = "https://wrong-iss.example.com"
  2086. client_id = "iss-mismatch-client"
  2087. nonce = secrets.token_urlsafe(16)
  2088. now = int(time.time())
  2089. # Sign the token with the WRONG issuer (iss != discovery_issuer)
  2090. id_token = pyjwt.encode(
  2091. {
  2092. "sub": "sub-iss-test",
  2093. "iss": wrong_issuer,
  2094. "aud": client_id,
  2095. "nonce": nonce,
  2096. "email": "iss@example.com",
  2097. "email_verified": True,
  2098. "iat": now,
  2099. "exp": now + 300,
  2100. },
  2101. private_pem,
  2102. algorithm="RS256",
  2103. headers={"kid": "test-kid-1"},
  2104. )
  2105. admin_token = await _setup_and_login(async_client, "issadmin1", "issadmin1!")
  2106. cr = await async_client.post(
  2107. "/api/v1/auth/oidc/providers",
  2108. json={
  2109. "name": "IssTest-IdP",
  2110. "issuer_url": correct_issuer,
  2111. "client_id": client_id,
  2112. "client_secret": "s",
  2113. "scopes": "openid",
  2114. "is_enabled": True,
  2115. "auto_create_users": True,
  2116. },
  2117. headers=_auth_header(admin_token),
  2118. )
  2119. assert cr.status_code in (200, 201), cr.text
  2120. provider_id = cr.json()["id"]
  2121. state = secrets.token_urlsafe(32)
  2122. db_session.add(
  2123. AuthEphemeralToken(
  2124. token=state,
  2125. token_type="oidc_state",
  2126. provider_id=provider_id,
  2127. nonce=nonce,
  2128. code_verifier=secrets.token_urlsafe(48),
  2129. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2130. )
  2131. )
  2132. await db_session.commit()
  2133. # Discovery returns the CORRECT issuer; JWT carries the WRONG one.
  2134. discovery_doc = {
  2135. "issuer": correct_issuer,
  2136. "token_endpoint": f"{correct_issuer}/token",
  2137. "jwks_uri": f"{correct_issuer}/.well-known/jwks.json",
  2138. }
  2139. token_response = {"access_token": "a", "id_token": id_token}
  2140. class _MockResp:
  2141. def __init__(self, data):
  2142. self._data = data
  2143. self.status_code = 200
  2144. self.is_success = True
  2145. self.text = ""
  2146. def json(self):
  2147. return self._data
  2148. def raise_for_status(self):
  2149. pass
  2150. class _MockHttpxClient:
  2151. def __init__(self, *a, **kw):
  2152. pass
  2153. async def __aenter__(self):
  2154. return self
  2155. async def __aexit__(self, *a):
  2156. pass
  2157. async def get(self, url, **kw):
  2158. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2159. async def post(self, url, **kw):
  2160. return _MockResp(token_response)
  2161. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2162. resp = await async_client.get(
  2163. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2164. follow_redirects=False,
  2165. )
  2166. assert resp.status_code == 302
  2167. location = resp.headers.get("location", "")
  2168. assert "token_validation_failed" in location, f"Expected token_validation_failed, got: {location}"
  2169. # ===========================================================================
  2170. # Test Gap 3: /forgot-password/confirm token is single-use
  2171. # ===========================================================================
  2172. class TestForgotPasswordTokenSingleUse:
  2173. """POST /forgot-password/confirm must reject a token after its first use."""
  2174. @pytest.mark.asyncio
  2175. @pytest.mark.integration
  2176. async def test_token_reuse_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2177. from backend.app.core.auth import get_password_hash
  2178. from backend.app.models.user import User as _User
  2179. user = _User(
  2180. username="fpcuser1",
  2181. email="fpc@example.com",
  2182. password_hash=get_password_hash("OldPass1!"),
  2183. role="user",
  2184. is_active=True,
  2185. )
  2186. db_session.add(user)
  2187. await db_session.flush()
  2188. reset_token = secrets.token_urlsafe(32)
  2189. db_session.add(
  2190. AuthEphemeralToken(
  2191. token=reset_token,
  2192. token_type="password_reset",
  2193. username="fpcuser1",
  2194. expires_at=datetime.now(timezone.utc) + timedelta(hours=1),
  2195. )
  2196. )
  2197. await db_session.commit()
  2198. # First use → success
  2199. resp1 = await async_client.post(
  2200. "/api/v1/auth/forgot-password/confirm",
  2201. json={"token": reset_token, "new_password": "NewPass1!"},
  2202. )
  2203. assert resp1.status_code == 200, resp1.text
  2204. # Second use → token already consumed, must fail
  2205. resp2 = await async_client.post(
  2206. "/api/v1/auth/forgot-password/confirm",
  2207. json={"token": reset_token, "new_password": "AnotherNew1!"},
  2208. )
  2209. assert resp2.status_code == 400
  2210. # ===========================================================================
  2211. # C1 regression: setup_totp must reject a replayed TOTP code
  2212. # ===========================================================================
  2213. class TestSetupTOTPReplayRejected:
  2214. """setup_totp must reject a TOTP code that was already accepted in its window."""
  2215. @pytest.mark.asyncio
  2216. @pytest.mark.integration
  2217. async def test_replayed_setup_code_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2218. from sqlalchemy import select as sa_select
  2219. from backend.app.models.user_totp import UserTOTP
  2220. token = await _setup_and_login(async_client, "setupreplay1", "setupreplay1!")
  2221. # Step 1: Initial TOTP setup (no active TOTP yet → no code required)
  2222. setup_resp = await async_client.post(
  2223. "/api/v1/auth/2fa/totp/setup",
  2224. headers=_auth_header(token),
  2225. )
  2226. assert setup_resp.status_code == 200
  2227. secret = setup_resp.json()["secret"]
  2228. # Step 2: Enable TOTP with a valid code
  2229. totp_obj = pyotp.TOTP(secret)
  2230. enable_resp = await async_client.post(
  2231. "/api/v1/auth/2fa/totp/enable",
  2232. json={"code": totp_obj.now()},
  2233. headers=_auth_header(token),
  2234. )
  2235. assert enable_resp.status_code == 200 # TOTP is now active (is_enabled=True)
  2236. # Step 3: Determine current valid code and its counter
  2237. me_resp = await async_client.get("/api/v1/auth/me", headers=_auth_header(token))
  2238. user_id = me_resp.json()["id"]
  2239. totp_result = await db_session.execute(sa_select(UserTOTP).where(UserTOTP.user_id == user_id))
  2240. totp_record = totp_result.scalar_one()
  2241. secret_now = totp_record.secret # decrypted via property
  2242. totp_now = pyotp.TOTP(secret_now)
  2243. valid_code = totp_now.now()
  2244. accepted_counter = totp_now.timecode(datetime.now(timezone.utc))
  2245. # Step 4: Pre-set last_totp_counter so this code looks already used
  2246. totp_record.last_totp_counter = accepted_counter
  2247. await db_session.commit()
  2248. # Step 5: Attempt setup_totp with the "already used" code → must be rejected
  2249. replay_resp = await async_client.post(
  2250. "/api/v1/auth/2fa/totp/setup",
  2251. json={"code": valid_code},
  2252. headers=_auth_header(token),
  2253. )
  2254. assert replay_resp.status_code == 400
  2255. assert "already used" in replay_resp.json()["detail"]
  2256. # ===========================================================================
  2257. # Nit8: OIDC aud mismatch and nonce mismatch tests
  2258. # ===========================================================================
  2259. class TestOIDCAudAndNonceMismatch:
  2260. """Nit8: aud != client_id and nonce != stored value must each fail the callback."""
  2261. def _make_oidc_provider_setup(self):
  2262. """Return a helper for building OIDC test fixtures inline."""
  2263. private_pem, jwks_data = _make_test_rsa_key()
  2264. return private_pem, jwks_data
  2265. @pytest.mark.asyncio
  2266. @pytest.mark.integration
  2267. async def test_aud_mismatch_redirects_token_validation_failed(
  2268. self, async_client: AsyncClient, db_session: AsyncSession
  2269. ):
  2270. """ID token with aud != client_id must be rejected (PyJWT InvalidAudienceError)."""
  2271. import time
  2272. from unittest.mock import patch
  2273. import jwt as pyjwt
  2274. private_pem, jwks_data = _make_test_rsa_key()
  2275. issuer = "https://aud-mismatch.example.com"
  2276. client_id = "aud-test-client"
  2277. wrong_aud = "some-other-client"
  2278. nonce = secrets.token_urlsafe(16)
  2279. now = int(time.time())
  2280. id_token = pyjwt.encode(
  2281. {
  2282. "sub": "sub-aud-test",
  2283. "iss": issuer,
  2284. "aud": wrong_aud, # <-- wrong audience
  2285. "nonce": nonce,
  2286. "email": "aud@example.com",
  2287. "email_verified": True,
  2288. "iat": now,
  2289. "exp": now + 300,
  2290. },
  2291. private_pem,
  2292. algorithm="RS256",
  2293. headers={"kid": "test-kid-1"},
  2294. )
  2295. admin_token = await _setup_and_login(async_client, "audmismatch_admin", "AudMismatch_admin1")
  2296. cr = await async_client.post(
  2297. "/api/v1/auth/oidc/providers",
  2298. json={
  2299. "name": "AudMismatch-IdP",
  2300. "issuer_url": issuer,
  2301. "client_id": client_id,
  2302. "client_secret": "s",
  2303. "scopes": "openid",
  2304. "is_enabled": True,
  2305. "auto_create_users": True,
  2306. },
  2307. headers=_auth_header(admin_token),
  2308. )
  2309. assert cr.status_code in (200, 201), cr.text
  2310. provider_id = cr.json()["id"]
  2311. state = secrets.token_urlsafe(32)
  2312. db_session.add(
  2313. AuthEphemeralToken(
  2314. token=state,
  2315. token_type="oidc_state",
  2316. provider_id=provider_id,
  2317. nonce=nonce,
  2318. code_verifier=secrets.token_urlsafe(48),
  2319. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2320. )
  2321. )
  2322. await db_session.commit()
  2323. discovery_doc = {
  2324. "issuer": issuer,
  2325. "token_endpoint": f"{issuer}/token",
  2326. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  2327. }
  2328. class _MockResp:
  2329. def __init__(self, data):
  2330. self._data = data
  2331. self.status_code = 200
  2332. self.is_success = True
  2333. self.text = ""
  2334. def json(self):
  2335. return self._data
  2336. def raise_for_status(self):
  2337. pass
  2338. class _MockHttpxClient:
  2339. def __init__(self, *a, **kw):
  2340. pass
  2341. async def __aenter__(self):
  2342. return self
  2343. async def __aexit__(self, *a):
  2344. pass
  2345. async def get(self, url, **kw):
  2346. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2347. async def post(self, url, **kw):
  2348. return _MockResp({"access_token": "a", "id_token": id_token})
  2349. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2350. resp = await async_client.get(
  2351. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2352. follow_redirects=False,
  2353. )
  2354. assert resp.status_code == 302
  2355. location = resp.headers.get("location", "")
  2356. assert "token_validation_failed" in location, (
  2357. f"Expected token_validation_failed redirect for aud mismatch, got: {location}"
  2358. )
  2359. @pytest.mark.asyncio
  2360. @pytest.mark.integration
  2361. async def test_nonce_mismatch_redirects_token_validation_failed(
  2362. self, async_client: AsyncClient, db_session: AsyncSession
  2363. ):
  2364. """ID token with nonce != stored state nonce must be rejected."""
  2365. import time
  2366. from unittest.mock import patch
  2367. import jwt as pyjwt
  2368. private_pem, jwks_data = _make_test_rsa_key()
  2369. issuer = "https://nonce-mismatch.example.com"
  2370. client_id = "nonce-test-client"
  2371. stored_nonce = secrets.token_urlsafe(16)
  2372. wrong_nonce = secrets.token_urlsafe(16) # different from stored_nonce
  2373. now = int(time.time())
  2374. id_token = pyjwt.encode(
  2375. {
  2376. "sub": "sub-nonce-test",
  2377. "iss": issuer,
  2378. "aud": client_id,
  2379. "nonce": wrong_nonce, # <-- does not match stored_nonce
  2380. "email": "nonce@example.com",
  2381. "email_verified": True,
  2382. "iat": now,
  2383. "exp": now + 300,
  2384. },
  2385. private_pem,
  2386. algorithm="RS256",
  2387. headers={"kid": "test-kid-1"},
  2388. )
  2389. admin_token = await _setup_and_login(async_client, "noncemismatch_admin", "NonceMismatch_admin1")
  2390. cr = await async_client.post(
  2391. "/api/v1/auth/oidc/providers",
  2392. json={
  2393. "name": "NonceMismatch-IdP",
  2394. "issuer_url": issuer,
  2395. "client_id": client_id,
  2396. "client_secret": "s",
  2397. "scopes": "openid",
  2398. "is_enabled": True,
  2399. "auto_create_users": True,
  2400. },
  2401. headers=_auth_header(admin_token),
  2402. )
  2403. assert cr.status_code in (200, 201), cr.text
  2404. provider_id = cr.json()["id"]
  2405. state = secrets.token_urlsafe(32)
  2406. db_session.add(
  2407. AuthEphemeralToken(
  2408. token=state,
  2409. token_type="oidc_state",
  2410. provider_id=provider_id,
  2411. nonce=stored_nonce, # state has correct nonce; JWT carries wrong_nonce
  2412. code_verifier=secrets.token_urlsafe(48),
  2413. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2414. )
  2415. )
  2416. await db_session.commit()
  2417. discovery_doc = {
  2418. "issuer": issuer,
  2419. "token_endpoint": f"{issuer}/token",
  2420. "jwks_uri": f"{issuer}/.well-known/jwks.json",
  2421. }
  2422. class _MockResp:
  2423. def __init__(self, data):
  2424. self._data = data
  2425. self.status_code = 200
  2426. self.is_success = True
  2427. self.text = ""
  2428. def json(self):
  2429. return self._data
  2430. def raise_for_status(self):
  2431. pass
  2432. class _MockHttpxClient:
  2433. def __init__(self, *a, **kw):
  2434. pass
  2435. async def __aenter__(self):
  2436. return self
  2437. async def __aexit__(self, *a):
  2438. pass
  2439. async def get(self, url, **kw):
  2440. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2441. async def post(self, url, **kw):
  2442. return _MockResp({"access_token": "a", "id_token": id_token})
  2443. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2444. resp = await async_client.get(
  2445. f"/api/v1/auth/oidc/callback?code=c&state={state}",
  2446. follow_redirects=False,
  2447. )
  2448. assert resp.status_code == 302
  2449. location = resp.headers.get("location", "")
  2450. # The callback redirects to ?oidc_error=nonce_mismatch when nonces differ.
  2451. assert "nonce_mismatch" in location, f"Expected nonce_mismatch redirect for nonce mismatch, got: {location}"
  2452. # ===========================================================================
  2453. # Expired OIDC token rejection — state and exchange tokens
  2454. # ===========================================================================
  2455. class TestOIDCExpiredTokenRejection:
  2456. """Expired OIDC state and exchange tokens must be rejected atomically.
  2457. The DELETE … WHERE expires_at > now must ensure that an already-expired
  2458. token is never consumed (committed) before the expiry is checked, so the
  2459. token row stays in the DB and is not silently discarded.
  2460. """
  2461. @pytest.mark.asyncio
  2462. @pytest.mark.integration
  2463. async def test_expired_state_token_rejected_as_invalid_state(
  2464. self, async_client: AsyncClient, db_session: AsyncSession
  2465. ):
  2466. """An expired OIDC state token must redirect to invalid_state without
  2467. being consumed — it must still exist in the DB after the rejected call."""
  2468. from backend.app.models.oidc_provider import OIDCProvider
  2469. provider = OIDCProvider(
  2470. name="ExpiredStateIdP",
  2471. issuer_url="https://expired-state.example.com",
  2472. client_id="client_expired_state",
  2473. _client_secret_enc="secret_exp_state",
  2474. scopes="openid",
  2475. is_enabled=True,
  2476. auto_link_existing_accounts=False,
  2477. auto_create_users=False,
  2478. )
  2479. db_session.add(provider)
  2480. await db_session.flush()
  2481. state = secrets.token_urlsafe(32)
  2482. db_session.add(
  2483. AuthEphemeralToken(
  2484. token=state,
  2485. token_type="oidc_state",
  2486. provider_id=provider.id,
  2487. nonce=secrets.token_urlsafe(16),
  2488. code_verifier=secrets.token_urlsafe(48),
  2489. # already expired
  2490. expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
  2491. )
  2492. )
  2493. await db_session.commit()
  2494. resp = await async_client.get(
  2495. f"/api/v1/auth/oidc/callback?code=any_code&state={state}",
  2496. follow_redirects=False,
  2497. )
  2498. assert resp.status_code == 302
  2499. location = resp.headers.get("location", "")
  2500. assert "invalid_state" in location, f"Expected invalid_state redirect for expired state, got: {location}"
  2501. @pytest.mark.asyncio
  2502. @pytest.mark.integration
  2503. async def test_expired_exchange_token_rejected(self, async_client: AsyncClient, db_session: AsyncSession):
  2504. """An expired OIDC exchange token must return 401 without being consumed."""
  2505. from sqlalchemy import select as sa_select
  2506. expired_token = secrets.token_urlsafe(32)
  2507. db_session.add(
  2508. AuthEphemeralToken(
  2509. token=expired_token,
  2510. token_type="oidc_exchange",
  2511. username="some_user",
  2512. # already expired
  2513. expires_at=datetime.now(timezone.utc) - timedelta(minutes=5),
  2514. )
  2515. )
  2516. await db_session.commit()
  2517. resp = await async_client.post(
  2518. "/api/v1/auth/oidc/exchange",
  2519. json={"oidc_token": expired_token},
  2520. )
  2521. assert resp.status_code == 401
  2522. assert "expired" in resp.json().get("detail", "").lower() or "invalid" in resp.json().get("detail", "").lower()
  2523. # Token must NOT have been consumed — it should still be in the DB
  2524. # (the atomic DELETE WHERE expires_at > now left it untouched)
  2525. result = await db_session.execute(
  2526. sa_select(AuthEphemeralToken).where(AuthEphemeralToken.token == expired_token)
  2527. )
  2528. remaining = result.scalar_one_or_none()
  2529. assert remaining is not None, "Expired exchange token must not be consumed by a rejected request"
  2530. # ===========================================================================
  2531. # Trailing slash in issuer_url — discovery URL must not contain double slash
  2532. # ===========================================================================
  2533. class TestOIDCIssuerUrlTrailingSlash:
  2534. """Providers like Authentik use issuer URLs with a trailing slash.
  2535. BamBuddy must strip the slash before appending /.well-known/openid-configuration
  2536. to avoid a double-slash that results in a 404.
  2537. """
  2538. @pytest.mark.asyncio
  2539. @pytest.mark.integration
  2540. async def test_trailing_slash_issuer_url_fetches_correct_discovery_url(
  2541. self, async_client: AsyncClient
  2542. ):
  2543. from unittest.mock import AsyncMock, MagicMock, patch
  2544. issuer_with_slash = "https://authentik.example.com/application/o/bambuddy/"
  2545. admin_token = await _setup_and_login(async_client, "oidcslashadm", "oidcslashadm1")
  2546. create_resp = await async_client.post(
  2547. "/api/v1/auth/oidc/providers",
  2548. json={
  2549. "name": "Authentik-Slash",
  2550. "issuer_url": issuer_with_slash,
  2551. "client_id": "bambuddy",
  2552. "client_secret": "secret",
  2553. "scopes": "openid email profile",
  2554. "is_enabled": True,
  2555. "auto_create_users": False,
  2556. },
  2557. headers=_auth_header(admin_token),
  2558. )
  2559. assert create_resp.status_code == 201
  2560. provider_id = create_resp.json()["id"]
  2561. fake_discovery = {
  2562. "issuer": issuer_with_slash,
  2563. "authorization_endpoint": "https://authentik.example.com/application/o/bambuddy/authorize",
  2564. }
  2565. disc_resp = AsyncMock()
  2566. disc_resp.raise_for_status = MagicMock()
  2567. disc_resp.json = MagicMock(return_value=fake_discovery)
  2568. mock_http = AsyncMock()
  2569. mock_http.get = AsyncMock(return_value=disc_resp)
  2570. with patch("backend.app.api.routes.mfa.httpx.AsyncClient") as mock_cls:
  2571. mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_http)
  2572. mock_cls.return_value.__aexit__ = AsyncMock(return_value=False)
  2573. resp = await async_client.get(f"/api/v1/auth/oidc/authorize/{provider_id}")
  2574. assert resp.status_code == 200
  2575. called_url = mock_http.get.call_args_list[0][0][0]
  2576. assert "//" not in called_url.replace("https://", ""), (
  2577. f"Discovery URL must not contain double slash: {called_url}"
  2578. )
  2579. assert called_url.endswith("/.well-known/openid-configuration"), (
  2580. f"Expected discovery URL to end with /.well-known/openid-configuration, got: {called_url}"
  2581. )
  2582. @pytest.mark.asyncio
  2583. @pytest.mark.integration
  2584. async def test_iss_claim_trailing_slash_accepted(
  2585. self, async_client: AsyncClient, db_session: AsyncSession
  2586. ):
  2587. """Provider configured without trailing slash, Authentik JWT iss has trailing slash.
  2588. Both sides must be normalised before comparison so the login succeeds.
  2589. """
  2590. import time
  2591. from unittest.mock import patch
  2592. import jwt as pyjwt
  2593. private_pem, jwks_data = _make_test_rsa_key()
  2594. issuer_no_slash = "https://authentik.example.com/application/o/bambuddy"
  2595. issuer_with_slash = issuer_no_slash + "/"
  2596. client_id = "bambuddy-client"
  2597. nonce = secrets.token_urlsafe(16)
  2598. now = int(time.time())
  2599. id_token = pyjwt.encode(
  2600. {
  2601. "sub": "authentik-sub-123",
  2602. "iss": issuer_with_slash,
  2603. "aud": client_id,
  2604. "nonce": nonce,
  2605. "email": "authentik-user@example.com",
  2606. "email_verified": True,
  2607. "iat": now,
  2608. "exp": now + 300,
  2609. },
  2610. private_pem,
  2611. algorithm="RS256",
  2612. headers={"kid": "test-kid-1"},
  2613. )
  2614. admin_token = await _setup_and_login(async_client, "authentikadm", "authentikadm1")
  2615. create_resp = await async_client.post(
  2616. "/api/v1/auth/oidc/providers",
  2617. json={
  2618. "name": "Authentik-ISS",
  2619. "issuer_url": issuer_no_slash,
  2620. "client_id": client_id,
  2621. "client_secret": "secret",
  2622. "scopes": "openid email profile",
  2623. "is_enabled": True,
  2624. "auto_create_users": True,
  2625. },
  2626. headers=_auth_header(admin_token),
  2627. )
  2628. assert create_resp.status_code == 201
  2629. provider_id = create_resp.json()["id"]
  2630. state = secrets.token_urlsafe(32)
  2631. code_verifier = secrets.token_urlsafe(48)
  2632. db_session.add(
  2633. AuthEphemeralToken(
  2634. token=state,
  2635. token_type="oidc_state",
  2636. provider_id=provider_id,
  2637. nonce=nonce,
  2638. code_verifier=code_verifier,
  2639. expires_at=datetime.now(timezone.utc) + timedelta(minutes=5),
  2640. )
  2641. )
  2642. await db_session.commit()
  2643. discovery_doc = {
  2644. "issuer": issuer_with_slash,
  2645. "authorization_endpoint": f"{issuer_no_slash}/authorize",
  2646. "token_endpoint": f"{issuer_no_slash}/token",
  2647. "jwks_uri": f"{issuer_no_slash}/.well-known/jwks.json",
  2648. }
  2649. token_response = {"access_token": "mock", "token_type": "Bearer", "id_token": id_token}
  2650. class _MockResp:
  2651. def __init__(self, data):
  2652. self._data = data
  2653. self.is_success = True
  2654. self.status_code = 200
  2655. self.text = str(data)
  2656. def json(self):
  2657. return self._data
  2658. def raise_for_status(self):
  2659. pass
  2660. class _MockHttpxClient:
  2661. def __init__(self, *a, **kw):
  2662. pass
  2663. async def __aenter__(self):
  2664. return self
  2665. async def __aexit__(self, *a):
  2666. pass
  2667. async def get(self, url, **kw):
  2668. return _MockResp(jwks_data if "jwks" in url else discovery_doc)
  2669. async def post(self, url, **kw):
  2670. return _MockResp(token_response)
  2671. with patch("backend.app.api.routes.mfa.httpx.AsyncClient", _MockHttpxClient):
  2672. resp = await async_client.get(
  2673. f"/api/v1/auth/oidc/callback?code=auth-code&state={state}",
  2674. follow_redirects=False,
  2675. )
  2676. location = resp.headers.get("location", "")
  2677. assert resp.status_code == 302, f"Expected redirect, got {resp.status_code}"
  2678. assert "token_validation_failed" not in location, (
  2679. "Trailing slash mismatch in iss claim must not cause token_validation_failed"
  2680. )
  2681. assert "oidc_token=" in location, f"Expected oidc_token in redirect, got: {location}"