Перейти к содержимому

19. SQLAlchemy

Иллюстрация к уроку

SQLAlchemy — самый мощный Python ORM. Используется с FastAPI, Flask и другими фреймворками.

Окно терминала
pip install sqlalchemy asyncpg psycopg2-binary aiosqlite alembic
database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
# SQLite (для разработки)
DATABASE_URL = "sqlite+aiosqlite:///./app.db"
# PostgreSQL (продакшн)
# DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
engine = create_async_engine(
DATABASE_URL,
echo=True, # логировать SQL запросы
pool_size=10, # размер пула соединений (только для PostgreSQL)
max_overflow=20
)
AsyncSessionLocal = async_sessionmaker(
engine,
expire_on_commit=False # важно для async!
)
class Base(DeclarativeBase):
pass
models.py
from datetime import datetime
from sqlalchemy import String, Integer, Text, Boolean, ForeignKey, Table, Column
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from database import Base
# Таблица для Many-to-Many
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id"), primary_key=True),
)
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(200))
role: Mapped[str] = mapped_column(String(20), default="user")
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
# Relationship — для удобного доступа
posts: Mapped[list["Post"]] = relationship(back_populates="author")
class Category(Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True)
slug: Mapped[str] = mapped_column(String(100), unique=True)
posts: Mapped[list["Post"]] = relationship(back_populates="category")
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
body: Mapped[str] = mapped_column(Text)
status: Mapped[str] = mapped_column(String(20), default="draft")
views: Mapped[int] = mapped_column(Integer, default=0)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
category_id: Mapped[int | None] = mapped_column(ForeignKey("categories.id"), nullable=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(onupdate=func.now(), nullable=True)
# Relationships
author: Mapped["User"] = relationship(back_populates="posts")
category: Mapped["Category | None"] = relationship(back_populates="posts")
tags: Mapped[list["Tag"]] = relationship(secondary=post_tags, back_populates="posts")
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list["Post"]] = relationship(secondary=post_tags, back_populates="tags")
from sqlalchemy import select, update, delete, func
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload, joinedload
# CREATE
async def create_post(db: AsyncSession, title: str, body: str, author_id: int) -> Post:
post = Post(title=title, body=body, author_id=author_id)
db.add(post)
await db.flush() # получить id без коммита
await db.refresh(post) # обновить из БД
return post
# READ — один объект
async def get_post(db: AsyncSession, post_id: int) -> Post | None:
return await db.get(Post, post_id)
# READ — список с eager loading
async def get_posts(
db: AsyncSession,
status: str | None = None,
page: int = 1,
size: int = 10
) -> list[Post]:
query = (
select(Post)
.options(
joinedload(Post.author), # JOIN для ForeignKey
joinedload(Post.category),
selectinload(Post.tags) # отдельный запрос для M2M
)
.order_by(Post.created_at.desc())
.offset((page - 1) * size)
.limit(size)
)
if status:
query = query.where(Post.status == status)
result = await db.execute(query)
return result.scalars().all()
# UPDATE
async def update_post(db: AsyncSession, post_id: int, **kwargs) -> Post | None:
post = await db.get(Post, post_id)
if not post:
return None
for key, value in kwargs.items():
setattr(post, key, value)
await db.flush()
await db.refresh(post)
return post
# Bulk UPDATE
async def publish_all_drafts(db: AsyncSession, author_id: int) -> int:
result = await db.execute(
update(Post)
.where(Post.author_id == author_id, Post.status == "draft")
.values(status="published")
)
return result.rowcount
# DELETE
async def delete_post(db: AsyncSession, post_id: int) -> bool:
post = await db.get(Post, post_id)
if not post:
return False
await db.delete(post)
return True
from sqlalchemy import select, func, desc, and_, or_
# Подсчёт постов по статусам
async def get_stats(db: AsyncSession) -> dict:
result = await db.execute(
select(Post.status, func.count(Post.id).label("count"))
.group_by(Post.status)
)
return {row.status: row.count for row in result}
# Топ авторов
async def get_top_authors(db: AsyncSession, limit: int = 5):
result = await db.execute(
select(
User.name,
func.count(Post.id).label("posts_count"),
func.sum(Post.views).label("total_views")
)
.join(Post, Post.author_id == User.id)
.where(Post.status == "published")
.group_by(User.id, User.name)
.order_by(desc("posts_count"))
.limit(limit)
)
return result.all()
# Полнотекстовый поиск
async def search_posts(db: AsyncSession, query: str) -> list[Post]:
result = await db.execute(
select(Post)
.where(
or_(
Post.title.ilike(f"%{query}%"),
Post.body.ilike(f"%{query}%")
)
)
.where(Post.status == "published")
.order_by(Post.created_at.desc())
)
return result.scalars().all()
Окно терминала
# Установка и инициализация
pip install alembic
alembic init migrations
# Создать миграцию (автоопределение изменений)
alembic revision --autogenerate -m "add users table"
# Применить
alembic upgrade head
# Откат
alembic downgrade -1
# История
alembic history
# migrations/env.py — настройка
from database import Base
from models import User, Post, Tag # импортируй все модели!
target_metadata = Base.metadata
from abc import ABC, abstractmethod
class PostRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get_by_id(self, id: int) -> Post | None:
return await self.db.get(Post, id)
async def get_by_slug(self, slug: str) -> Post | None:
result = await self.db.execute(select(Post).where(Post.slug == slug))
return result.scalar()
async def get_published(self, page: int = 1, size: int = 10) -> tuple[list[Post], int]:
base_query = select(Post).where(Post.status == "published")
# Подсчёт
total = (await self.db.execute(
select(func.count()).select_from(base_query.subquery())
)).scalar()
# Данные
posts = (await self.db.execute(
base_query.order_by(Post.created_at.desc())
.offset((page - 1) * size).limit(size)
.options(joinedload(Post.author), selectinload(Post.tags))
)).scalars().all()
return posts, total
async def save(self, post: Post) -> Post:
self.db.add(post)
await self.db.flush()
await self.db.refresh(post)
return post
async def delete(self, post: Post) -> None:
await self.db.delete(post)
# В FastAPI:
async def get_post_repo(db: AsyncSession = Depends(get_db)) -> PostRepository:
return PostRepository(db)
@router.get("/{post_id}")
async def get_post(post_id: int, repo: PostRepository = Depends(get_post_repo)):
post = await repo.get_by_id(post_id)
if not post:
raise HTTPException(404, "Post not found")
return post
# Реализуй систему комментариев с вложенностью (threaded comments):
#
# Модель Comment:
# - id, post_id, author_id, body, parent_id (nullable, self-referential)
# - created_at, is_deleted (soft delete)
#
# CommentRepository с методами:
# - get_thread(post_id) -> list[Comment] — дерево комментариев
# - add_reply(parent_id, author_id, body) -> Comment
# - soft_delete(comment_id, user_id) -> bool
# - get_count(post_id) -> int
#
# Рекурсивный запрос (Common Table Expression) для всего дерева:
# Используй sqlalchemy.orm.relationship с lazy="subquery"
# или реализуй через рекурсивный CTE

В следующем уроке — интеграция с JS фронтендом!