auth.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import hashlib
  2. import secrets
  3. from datetime import datetime
  4. from typing import Optional
  5. from fastapi import Header, HTTPException, Depends
  6. from sqlalchemy.ext.asyncio import AsyncSession
  7. from sqlalchemy import select
  8. from backend.app.core.database import get_db
  9. from backend.app.models.api_key import APIKey
  10. def generate_api_key() -> tuple[str, str, str]:
  11. """Generate a new API key.
  12. Returns:
  13. Tuple of (full_key, key_hash, key_prefix)
  14. """
  15. # Generate a random 32-byte key and encode as hex (64 chars)
  16. full_key = f"bb_{secrets.token_hex(32)}"
  17. key_hash = hashlib.sha256(full_key.encode()).hexdigest()
  18. key_prefix = full_key[:11] # "bb_" + first 8 chars of token
  19. return full_key, key_hash, key_prefix
  20. def hash_api_key(key: str) -> str:
  21. """Hash an API key for comparison."""
  22. return hashlib.sha256(key.encode()).hexdigest()
  23. async def get_api_key(
  24. x_api_key: str = Header(..., alias="X-API-Key"),
  25. db: AsyncSession = Depends(get_db),
  26. ) -> APIKey:
  27. """Verify API key and return the key record.
  28. Raises HTTPException if key is invalid, disabled, or expired.
  29. """
  30. key_hash = hash_api_key(x_api_key)
  31. result = await db.execute(
  32. select(APIKey).where(APIKey.key_hash == key_hash)
  33. )
  34. api_key = result.scalar_one_or_none()
  35. if not api_key:
  36. raise HTTPException(status_code=401, detail="Invalid API key")
  37. if not api_key.enabled:
  38. raise HTTPException(status_code=403, detail="API key is disabled")
  39. if api_key.expires_at and api_key.expires_at < datetime.utcnow():
  40. raise HTTPException(status_code=403, detail="API key has expired")
  41. # Update last_used timestamp
  42. api_key.last_used = datetime.utcnow()
  43. return api_key
  44. async def get_optional_api_key(
  45. x_api_key: Optional[str] = Header(None, alias="X-API-Key"),
  46. db: AsyncSession = Depends(get_db),
  47. ) -> Optional[APIKey]:
  48. """Get API key if provided, return None otherwise."""
  49. if not x_api_key:
  50. return None
  51. try:
  52. return await get_api_key(x_api_key, db)
  53. except HTTPException:
  54. return None
  55. def check_permission(api_key: APIKey, permission: str) -> None:
  56. """Check if API key has a specific permission.
  57. Args:
  58. api_key: The API key record
  59. permission: One of 'queue', 'control_printer', 'read_status'
  60. Raises HTTPException if permission is denied.
  61. """
  62. permission_map = {
  63. 'queue': api_key.can_queue,
  64. 'control_printer': api_key.can_control_printer,
  65. 'read_status': api_key.can_read_status,
  66. }
  67. if permission not in permission_map:
  68. raise HTTPException(status_code=500, detail=f"Unknown permission: {permission}")
  69. if not permission_map[permission]:
  70. raise HTTPException(
  71. status_code=403,
  72. detail=f"API key does not have '{permission}' permission"
  73. )
  74. def check_printer_access(api_key: APIKey, printer_id: int) -> None:
  75. """Check if API key has access to a specific printer.
  76. Args:
  77. api_key: The API key record
  78. printer_id: The printer ID to check
  79. Raises HTTPException if access is denied.
  80. """
  81. if api_key.printer_ids is not None and printer_id not in api_key.printer_ids:
  82. raise HTTPException(
  83. status_code=403,
  84. detail=f"API key does not have access to printer {printer_id}"
  85. )