users.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. from fastapi import APIRouter, Depends, HTTPException, status
  2. from sqlalchemy import select
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from backend.app.core.auth import RequireAdmin, get_current_active_user, get_password_hash
  5. from backend.app.core.database import get_db
  6. from backend.app.models.user import User
  7. from backend.app.schemas.auth import UserCreate, UserResponse, UserUpdate
  8. router = APIRouter(prefix="/users", tags=["users"])
  9. @router.get("", response_model=list[UserResponse])
  10. @router.get("/", response_model=list[UserResponse])
  11. async def list_users(
  12. current_user: User = RequireAdmin(),
  13. db: AsyncSession = Depends(get_db),
  14. ):
  15. """List all users (admin only)."""
  16. result = await db.execute(select(User).order_by(User.created_at))
  17. users = result.scalars().all()
  18. return [
  19. UserResponse(
  20. id=user.id,
  21. username=user.username,
  22. role=user.role,
  23. is_active=user.is_active,
  24. created_at=user.created_at.isoformat(),
  25. )
  26. for user in users
  27. ]
  28. @router.post("", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
  29. @router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
  30. async def create_user(
  31. user_data: UserCreate,
  32. current_user: User = RequireAdmin(),
  33. db: AsyncSession = Depends(get_db),
  34. ):
  35. """Create a new user (admin only)."""
  36. # Check if username already exists
  37. existing_user = await db.execute(select(User).where(User.username == user_data.username))
  38. if existing_user.scalar_one_or_none():
  39. raise HTTPException(
  40. status_code=status.HTTP_400_BAD_REQUEST,
  41. detail="Username already exists",
  42. )
  43. # Validate role
  44. if user_data.role not in ["admin", "user"]:
  45. raise HTTPException(
  46. status_code=status.HTTP_400_BAD_REQUEST,
  47. detail="Role must be 'admin' or 'user'",
  48. )
  49. new_user = User(
  50. username=user_data.username,
  51. password_hash=get_password_hash(user_data.password),
  52. role=user_data.role,
  53. is_active=True,
  54. )
  55. db.add(new_user)
  56. await db.commit()
  57. await db.refresh(new_user)
  58. return UserResponse(
  59. id=new_user.id,
  60. username=new_user.username,
  61. role=new_user.role,
  62. is_active=new_user.is_active,
  63. created_at=new_user.created_at.isoformat(),
  64. )
  65. @router.get("/{user_id}", response_model=UserResponse)
  66. async def get_user(
  67. user_id: int,
  68. current_user: User = RequireAdmin(),
  69. db: AsyncSession = Depends(get_db),
  70. ):
  71. """Get a user by ID (admin only)."""
  72. result = await db.execute(select(User).where(User.id == user_id))
  73. user = result.scalar_one_or_none()
  74. if not user:
  75. raise HTTPException(
  76. status_code=status.HTTP_404_NOT_FOUND,
  77. detail="User not found",
  78. )
  79. return UserResponse(
  80. id=user.id,
  81. username=user.username,
  82. role=user.role,
  83. is_active=user.is_active,
  84. created_at=user.created_at.isoformat(),
  85. )
  86. @router.patch("/{user_id}", response_model=UserResponse)
  87. async def update_user(
  88. user_id: int,
  89. user_data: UserUpdate,
  90. current_user: User = RequireAdmin(),
  91. db: AsyncSession = Depends(get_db),
  92. ):
  93. """Update a user (admin only)."""
  94. result = await db.execute(select(User).where(User.id == user_id))
  95. user = result.scalar_one_or_none()
  96. if not user:
  97. raise HTTPException(
  98. status_code=status.HTTP_404_NOT_FOUND,
  99. detail="User not found",
  100. )
  101. # Prevent deactivating the last admin
  102. if user_data.is_active is False and user.role == "admin":
  103. admin_count_result = await db.execute(select(User).where(User.role == "admin", User.is_active.is_(True)))
  104. admin_count = len(admin_count_result.scalars().all())
  105. if admin_count <= 1:
  106. raise HTTPException(
  107. status_code=status.HTTP_400_BAD_REQUEST,
  108. detail="Cannot deactivate the last admin user",
  109. )
  110. # Prevent changing role of last admin
  111. if user_data.role and user_data.role != "admin" and user.role == "admin":
  112. admin_count_result = await db.execute(select(User).where(User.role == "admin", User.is_active.is_(True)))
  113. admin_count = len(admin_count_result.scalars().all())
  114. if admin_count <= 1:
  115. raise HTTPException(
  116. status_code=status.HTTP_400_BAD_REQUEST,
  117. detail="Cannot change role of the last admin user",
  118. )
  119. if user_data.username is not None:
  120. # Check if new username already exists
  121. existing_user = await db.execute(select(User).where(User.username == user_data.username, User.id != user_id))
  122. if existing_user.scalar_one_or_none():
  123. raise HTTPException(
  124. status_code=status.HTTP_400_BAD_REQUEST,
  125. detail="Username already exists",
  126. )
  127. user.username = user_data.username
  128. if user_data.password is not None:
  129. user.password_hash = get_password_hash(user_data.password)
  130. if user_data.role is not None:
  131. if user_data.role not in ["admin", "user"]:
  132. raise HTTPException(
  133. status_code=status.HTTP_400_BAD_REQUEST,
  134. detail="Role must be 'admin' or 'user'",
  135. )
  136. user.role = user_data.role
  137. if user_data.is_active is not None:
  138. user.is_active = user_data.is_active
  139. await db.commit()
  140. await db.refresh(user)
  141. return UserResponse(
  142. id=user.id,
  143. username=user.username,
  144. role=user.role,
  145. is_active=user.is_active,
  146. created_at=user.created_at.isoformat(),
  147. )
  148. @router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
  149. async def delete_user(
  150. user_id: int,
  151. current_user: User = RequireAdmin(),
  152. db: AsyncSession = Depends(get_db),
  153. ):
  154. """Delete a user (admin only)."""
  155. result = await db.execute(select(User).where(User.id == user_id))
  156. user = result.scalar_one_or_none()
  157. if not user:
  158. raise HTTPException(
  159. status_code=status.HTTP_404_NOT_FOUND,
  160. detail="User not found",
  161. )
  162. # Prevent deleting the last admin
  163. if user.role == "admin":
  164. admin_count_result = await db.execute(select(User).where(User.role == "admin", User.id != user_id))
  165. admin_count = len(admin_count_result.scalars().all())
  166. if admin_count == 0:
  167. raise HTTPException(
  168. status_code=status.HTTP_400_BAD_REQUEST,
  169. detail="Cannot delete the last admin user",
  170. )
  171. # Prevent deleting yourself
  172. if user.id == current_user.id:
  173. raise HTTPException(
  174. status_code=status.HTTP_400_BAD_REQUEST,
  175. detail="Cannot delete your own account",
  176. )
  177. await db.delete(user)
  178. await db.commit()