Перейти к содержимому

16. FastAPI: аутентификация

Иллюстрация к уроку

JSON Web Tokens (JWT) — стандарт аутентификации для REST API.

Окно терминала
pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt] python-multipart
1. POST /auth/login {username, password}
2. Сервер проверяет пароль
3. Создаёт JWT токен (access + refresh)
4. Клиент сохраняет токен
5. Каждый запрос: Authorization: Bearer <token>
6. Сервер проверяет токен → возвращает данные
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from passlib.context import CryptContext
SECRET_KEY = "your-secret-key-keep-it-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict) -> str:
expire = datetime.now(timezone.utc) + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
return jwt.encode({**data, "exp": expire}, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(data: dict) -> str:
expire = datetime.now(timezone.utc) + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
return jwt.encode({**data, "exp": expire, "type": "refresh"}, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> dict:
try:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError:
raise ValueError("Неверный или просроченный токен")
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from auth.security import decode_token
from database import get_db
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db = Depends(get_db)
):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Не авторизован",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = decode_token(token)
user_id: int = payload.get("sub")
if user_id is None:
raise credentials_exception
except ValueError:
raise credentials_exception
user = await db.get(User, int(user_id))
if user is None:
raise credentials_exception
return user
async def get_admin_user(current_user = Depends(get_current_user)):
if current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Требуются права администратора"
)
return current_user
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel, EmailStr
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from database import get_db
from models import User
from auth.security import hash_password, verify_password, create_access_token, create_refresh_token
router = APIRouter(prefix="/auth", tags=["auth"])
class UserRegister(BaseModel):
name: str
email: EmailStr
password: str
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(data: UserRegister, db: AsyncSession = Depends(get_db)):
# Проверяем уникальность email
existing = await db.execute(select(User).where(User.email == data.email))
if existing.scalar():
raise HTTPException(status_code=400, detail="Email уже используется")
user = User(
name=data.name,
email=data.email,
hashed_password=hash_password(data.password)
)
db.add(user)
await db.flush()
return {"message": "Регистрация успешна", "id": user.id}
@router.post("/login", response_model=Token)
async def login(
form: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
# form.username, form.password
result = await db.execute(select(User).where(User.email == form.username))
user = result.scalar()
if not user or not verify_password(form.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверный email или пароль"
)
token_data = {"sub": str(user.id), "role": user.role}
return Token(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token(token_data)
)
@router.post("/refresh", response_model=Token)
async def refresh(refresh_token: str, db: AsyncSession = Depends(get_db)):
from auth.security import decode_token
try:
payload = decode_token(refresh_token)
if payload.get("type") != "refresh":
raise ValueError("Неверный тип токена")
except ValueError:
raise HTTPException(status_code=401, detail="Неверный refresh token")
token_data = {"sub": payload["sub"], "role": payload["role"]}
return Token(
access_token=create_access_token(token_data),
refresh_token=create_refresh_token(token_data)
)
from fastapi import APIRouter, Depends
from auth.dependencies import get_current_user, get_admin_user
router = APIRouter()
@router.get("/me")
async def get_profile(current_user = Depends(get_current_user)):
return {
"id": current_user.id,
"name": current_user.name,
"email": current_user.email,
"role": current_user.role
}
@router.get("/admin/users")
async def get_all_users(admin = Depends(get_admin_user), db = Depends(get_db)):
# Только для администраторов
result = await db.execute(select(User))
return result.scalars().all()
# Опциональная аутентификация
async def get_optional_user(token: str | None = Depends(OAuth2PasswordBearer(tokenUrl="/auth/login", auto_error=False))):
if token is None:
return None
try:
return await get_current_user(token)
except HTTPException:
return None
@router.get("/posts")
async def get_posts(current_user = Depends(get_optional_user)):
if current_user:
# показываем личные посты
return {"user": current_user.name, "posts": ["personal", "public"]}
return {"posts": ["public"]}
from fastapi import Security
from fastapi.security import APIKeyHeader, APIKeyQuery
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)
VALID_API_KEYS = {"key1": "service1", "key2": "service2"}
async def get_api_key(
header_key: str | None = Security(api_key_header),
query_key: str | None = Security(api_key_query),
):
key = header_key or query_key
if key not in VALID_API_KEYS:
raise HTTPException(status_code=403, detail="Неверный API ключ")
return key
@app.get("/api/data")
async def get_data(api_key: str = Depends(get_api_key)):
return {"service": VALID_API_KEYS[api_key], "data": [...]}
# Реализуй полную систему аутентификации:
#
# 1. Роли пользователей: guest, user, editor, admin
# 2. Эндпоинты:
# POST /auth/register
# POST /auth/login
# POST /auth/logout (blacklist токена)
# GET /auth/me
# PUT /auth/me (обновить профиль)
# PUT /auth/change-password
# POST /auth/forgot-password (отправить письмо)
# POST /auth/reset-password
#
# 3. Middleware для логирования всех запросов с userId
# 4. Rate limiting: не более 5 попыток входа с одного IP за 15 минут

В следующем уроке — Django: основы!