13. Async/Await

Зачем асинхронность?
Заголовок раздела «Зачем асинхронность?»Представь сервер обрабатывает 1000 запросов. Каждый запрос делает запрос к БД (занимает 100ms).
- Синхронно: 1000 × 100ms = 100 секунд
- Асинхронно: пока ждём БД — обрабатываем другой запрос → ~0.1 секунды!
# Синхронный код — блокирует потокimport time
def fetch_user(id: int) -> dict: time.sleep(1) # симуляция запроса к БД return {"id": id, "name": f"User {id}"}
# Получить 5 пользователей — займёт 5 секунд!users = [fetch_user(i) for i in range(1, 6)]asyncio — основа асинхронности
Заголовок раздела «asyncio — основа асинхронности»import asyncio
# async def — корутина (coroutine)async def fetch_user(id: int) -> dict: await asyncio.sleep(1) # не блокирует поток! return {"id": id, "name": f"User {id}"}
# Запуск корутиныasync def main(): user = await fetch_user(1) print(user)
asyncio.run(main())
# Параллельный запуск — все 5 запросов одновременно!async def main(): tasks = [fetch_user(i) for i in range(1, 6)] users = await asyncio.gather(*tasks) print(users) # получим за ~1 секунду, а не 5!
asyncio.run(main())async/await синтаксис
Заголовок раздела «async/await синтаксис»import asyncioimport httpx
# Простая корутинаasync def say_hello(name: str, delay: float = 0) -> str: await asyncio.sleep(delay) return f"Привет, {name}!"
# Вызов корутиныasync def main(): # await ждёт завершения корутины result = await say_hello("Яша", delay=0.5) print(result)
# Параллельно с gatherasync def main(): # Запускаем всё одновременно! results = await asyncio.gather( say_hello("Яша", 1), say_hello("Анна", 0.5), say_hello("Борис", 0.3), ) # Вернёт через ~1 секунду (а не 1.8) for r in results: print(r)
# create_task — для более гибкого управленияasync def main(): task1 = asyncio.create_task(say_hello("Яша", 1)) task2 = asyncio.create_task(say_hello("Анна", 0.5))
# Можем делать что-то ещё пока задачи выполняются print("Задачи запущены!")
result1 = await task1 result2 = await task2 print(result1, result2)Реальные примеры: httpx
Заголовок раздела «Реальные примеры: httpx»import asyncioimport httpx
async def fetch_github_user(username: str) -> dict: async with httpx.AsyncClient() as client: response = await client.get( f"https://api.github.com/users/{username}" ) response.raise_for_status() return response.json()
async def fetch_multiple_users(usernames: list[str]) -> list[dict]: """Параллельно получить данные всех пользователей.""" async with httpx.AsyncClient() as client: tasks = [ client.get(f"https://api.github.com/users/{u}") for u in usernames ] responses = await asyncio.gather(*tasks) return [r.json() for r in responses]
async def main(): users = await fetch_multiple_users(["octocat", "torvalds", "gvanrossum"]) for user in users: print(f"{user['login']}: {user['public_repos']} repos")
asyncio.run(main())Asyncio паттерны
Заголовок раздела «Asyncio паттерны»import asynciofrom asyncio import Queue, Semaphore
# Ограничение параллельных запросов (rate limiting)async def fetch_with_limit(url: str, semaphore: Semaphore) -> str: async with semaphore: # не более N одновременных запросов async with httpx.AsyncClient() as client: response = await client.get(url) return response.text
async def fetch_all(urls: list[str], max_concurrent: int = 10): semaphore = Semaphore(max_concurrent) tasks = [fetch_with_limit(url, semaphore) for url in urls] return await asyncio.gather(*tasks)
# Producer-Consumer patternasync def producer(queue: Queue, items: list): for item in items: await queue.put(item) await asyncio.sleep(0) # дать возможность другим корутинам
async def consumer(queue: Queue, worker_id: int): while True: item = await queue.get() if item is None: # сигнал остановки break print(f"Worker {worker_id} обрабатывает: {item}") await asyncio.sleep(0.1) # симуляция обработки queue.task_done()
async def main(): queue = Queue() items = list(range(20))
workers = [asyncio.create_task(consumer(queue, i)) for i in range(3)] await producer(queue, items)
# Отправить сигнал остановки для каждого worker for _ in workers: await queue.put(None)
await asyncio.gather(*workers)
# Timeoutasync def fetch_with_timeout(url: str, timeout: float = 5.0): try: async with asyncio.timeout(timeout): # Python 3.11+ async with httpx.AsyncClient() as client: return await client.get(url) except asyncio.TimeoutError: print(f"Превышено время ожидания для {url}") return NoneAsync generators и context managers
Заголовок раздела «Async generators и context managers»from typing import AsyncGenerator, AsyncIterator
# Async generatorasync def read_large_file(path: str) -> AsyncGenerator[str, None]: with open(path, "r") as f: for line in f: await asyncio.sleep(0) # не блокируем yield line.strip()
async def process_file(): async for line in read_large_file("big_file.txt"): # обрабатываем строку pass
# Async context managerclass DatabaseConnection: async def __aenter__(self): self.conn = await connect_to_db() return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb): await self.conn.close()
async def main(): async with DatabaseConnection() as db: result = await db.query("SELECT * FROM users")Sync vs Async в FastAPI
Заголовок раздела «Sync vs Async в FastAPI»from fastapi import FastAPIimport asyncioimport httpx
app = FastAPI()
# Синхронный endpoint — FastAPI запускает в threadpool@app.get("/sync")def sync_endpoint(): import time time.sleep(1) # блокирует, но в отдельном потоке return {"message": "sync"}
# Асинхронный endpoint — не блокирует event loop@app.get("/async")async def async_endpoint(): await asyncio.sleep(1) # не блокирует! return {"message": "async"}
# Правило: если используешь await — пиши async def# Если не используешь await — можно оба варианта (sync лучше для CPU-bound)
# Параллельные запросы к внешним API@app.get("/dashboard/{user_id}")async def get_dashboard(user_id: int): async with httpx.AsyncClient() as client: # Параллельно запрашиваем разные сервисы profile, orders, notifications = await asyncio.gather( client.get(f"/api/profile/{user_id}"), client.get(f"/api/orders/{user_id}"), client.get(f"/api/notifications/{user_id}"), )
return { "profile": profile.json(), "orders": orders.json(), "notifications": notifications.json() }Задание
Заголовок раздела «Задание»# Задание 1: Параллельный парсер# Напиши async функцию fetch_all_prices(product_ids: list[int]) -> dict[int, float]# Которая одновременно запрашивает цены для всех товаров# Ограничение: не более 5 одновременных запросов
# Задание 2: Async retryasync def async_retry(coroutine_func, max_attempts: int = 3, delay: float = 1.0): """Повторяет async функцию при ошибке с задержкой.""" pass
# Задание 3: Rate limiterclass RateLimiter: """Ограничитель частоты запросов.""" def __init__(self, max_calls: int, period: float): pass
async def acquire(self) -> None: """Ждёт если превышен лимит.""" passВ следующем уроке — FastAPI: основы!