16. FastAPI: аутентификация

JWT аутентификация
Заголовок раздела «JWT аутентификация»JSON Web Tokens (JWT) — стандарт аутентификации для REST API.
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipartСхема работы
Заголовок раздела «Схема работы»1. POST /auth/login {username, password} ↓2. Сервер проверяет пароль ↓3. Создаёт JWT токен (access + refresh) ↓4. Клиент сохраняет токен ↓5. Каждый запрос: Authorization: Bearer <token> ↓6. Сервер проверяет токен → возвращает данныеauth/security.py
Заголовок раздела «auth/security.py»from datetime import datetime, timedelta, timezonefrom jose import jwt, JWTErrorfrom passlib.context import CryptContext
SECRET_KEY = "your-secret-key-keep-it-secret"ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str: return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed)
def create_access_token(data: dict) -> str: expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) return jwt.encode({**data, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str: expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) return jwt.encode({**data, "exp": expire, "type": "refresh"}, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> dict: try: return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) except JWTError: raise ValueError("Неверный или просроченный токен")auth/dependencies.py
Заголовок раздела «auth/dependencies.py»from fastapi import Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordBearerfrom auth.security import decode_tokenfrom database import get_db
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user( token: str = Depends(oauth2_scheme), db = Depends(get_db)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Не авторизован", headers={"WWW-Authenticate": "Bearer"} ) try: payload = decode_token(token) user_id: int = payload.get("sub") if user_id is None: raise credentials_exception except ValueError: raise credentials_exception
user = await db.get(User, int(user_id)) if user is None: raise credentials_exception return user
async def get_admin_user(current_user = Depends(get_current_user)): if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Требуются права администратора" ) return current_userrouters/auth.py
Заголовок раздела «routers/auth.py»from fastapi import APIRouter, Depends, HTTPException, statusfrom fastapi.security import OAuth2PasswordRequestFormfrom pydantic import BaseModel, EmailStrfrom sqlalchemy.ext.asyncio import AsyncSessionfrom sqlalchemy import selectfrom database import get_dbfrom models import Userfrom auth.security import hash_password, verify_password, create_access_token, create_refresh_token
router = APIRouter(prefix="/auth", tags=["auth"])
class UserRegister(BaseModel): name: str email: EmailStr password: str
class Token(BaseModel): access_token: str refresh_token: str token_type: str = "bearer"
@router.post("/register", status_code=status.HTTP_201_CREATED)async def register(data: UserRegister, db: AsyncSession = Depends(get_db)): # Проверяем уникальность email existing = await db.execute(select(User).where(User.email == data.email)) if existing.scalar(): raise HTTPException(status_code=400, detail="Email уже используется")
user = User( name=data.name, email=data.email, hashed_password=hash_password(data.password) ) db.add(user) await db.flush() return {"message": "Регистрация успешна", "id": user.id}
@router.post("/login", response_model=Token)async def login( form: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)): # form.username, form.password result = await db.execute(select(User).where(User.email == form.username)) user = result.scalar()
if not user or not verify_password(form.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Неверный email или пароль" )
token_data = {"sub": str(user.id), "role": user.role} return Token( access_token=create_access_token(token_data), refresh_token=create_refresh_token(token_data) )
@router.post("/refresh", response_model=Token)async def refresh(refresh_token: str, db: AsyncSession = Depends(get_db)): from auth.security import decode_token try: payload = decode_token(refresh_token) if payload.get("type") != "refresh": raise ValueError("Неверный тип токена") except ValueError: raise HTTPException(status_code=401, detail="Неверный refresh token")
token_data = {"sub": payload["sub"], "role": payload["role"]} return Token( access_token=create_access_token(token_data), refresh_token=create_refresh_token(token_data) )Защищённые маршруты
Заголовок раздела «Защищённые маршруты»from fastapi import APIRouter, Dependsfrom auth.dependencies import get_current_user, get_admin_user
router = APIRouter()
@router.get("/me")async def get_profile(current_user = Depends(get_current_user)): return { "id": current_user.id, "name": current_user.name, "email": current_user.email, "role": current_user.role }
@router.get("/admin/users")async def get_all_users(admin = Depends(get_admin_user), db = Depends(get_db)): # Только для администраторов result = await db.execute(select(User)) return result.scalars().all()
# Опциональная аутентификацияasync def get_optional_user(token: str | None = Depends(OAuth2PasswordBearer(tokenUrl="/auth/login", auto_error=False))): if token is None: return None try: return await get_current_user(token) except HTTPException: return None
@router.get("/posts")async def get_posts(current_user = Depends(get_optional_user)): if current_user: # показываем личные посты return {"user": current_user.name, "posts": ["personal", "public"]} return {"posts": ["public"]}API Key аутентификация
Заголовок раздела «API Key аутентификация»from fastapi import Securityfrom fastapi.security import APIKeyHeader, APIKeyQuery
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)api_key_query = APIKeyQuery(name="api_key", auto_error=False)
VALID_API_KEYS = {"key1": "service1", "key2": "service2"}
async def get_api_key( header_key: str | None = Security(api_key_header), query_key: str | None = Security(api_key_query),): key = header_key or query_key if key not in VALID_API_KEYS: raise HTTPException(status_code=403, detail="Неверный API ключ") return key
@app.get("/api/data")async def get_data(api_key: str = Depends(get_api_key)): return {"service": VALID_API_KEYS[api_key], "data": [...]}Задание
Заголовок раздела «Задание»# Реализуй полную систему аутентификации:## 1. Роли пользователей: guest, user, editor, admin# 2. Эндпоинты:# POST /auth/register# POST /auth/login# POST /auth/logout (blacklist токена)# GET /auth/me# PUT /auth/me (обновить профиль)# PUT /auth/change-password# POST /auth/forgot-password (отправить письмо)# POST /auth/reset-password## 3. Middleware для логирования всех запросов с userId# 4. Rate limiting: не более 5 попыток входа с одного IP за 15 минутВ следующем уроке — Django: основы!