Перейти к содержимому

13. Async/Await

Иллюстрация к уроку

Представь сервер обрабатывает 1000 запросов. Каждый запрос делает запрос к БД (занимает 100ms).

  • Синхронно: 1000 × 100ms = 100 секунд
  • Асинхронно: пока ждём БД — обрабатываем другой запрос → ~0.1 секунды!
# Синхронный код — блокирует поток
import time
def fetch_user(id: int) -> dict:
time.sleep(1) # симуляция запроса к БД
return {"id": id, "name": f"User {id}"}
# Получить 5 пользователей — займёт 5 секунд!
users = [fetch_user(i) for i in range(1, 6)]
import asyncio
# async def — корутина (coroutine)
async def fetch_user(id: int) -> dict:
await asyncio.sleep(1) # не блокирует поток!
return {"id": id, "name": f"User {id}"}
# Запуск корутины
async def main():
user = await fetch_user(1)
print(user)
asyncio.run(main())
# Параллельный запуск — все 5 запросов одновременно!
async def main():
tasks = [fetch_user(i) for i in range(1, 6)]
users = await asyncio.gather(*tasks)
print(users) # получим за ~1 секунду, а не 5!
asyncio.run(main())
import asyncio
import httpx
# Простая корутина
async def say_hello(name: str, delay: float = 0) -> str:
await asyncio.sleep(delay)
return f"Привет, {name}!"
# Вызов корутины
async def main():
# await ждёт завершения корутины
result = await say_hello("Яша", delay=0.5)
print(result)
# Параллельно с gather
async def main():
# Запускаем всё одновременно!
results = await asyncio.gather(
say_hello("Яша", 1),
say_hello("Анна", 0.5),
say_hello("Борис", 0.3),
)
# Вернёт через ~1 секунду (а не 1.8)
for r in results:
print(r)
# create_task — для более гибкого управления
async def main():
task1 = asyncio.create_task(say_hello("Яша", 1))
task2 = asyncio.create_task(say_hello("Анна", 0.5))
# Можем делать что-то ещё пока задачи выполняются
print("Задачи запущены!")
result1 = await task1
result2 = await task2
print(result1, result2)
import asyncio
import httpx
async def fetch_github_user(username: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.github.com/users/{username}"
)
response.raise_for_status()
return response.json()
async def fetch_multiple_users(usernames: list[str]) -> list[dict]:
"""Параллельно получить данные всех пользователей."""
async with httpx.AsyncClient() as client:
tasks = [
client.get(f"https://api.github.com/users/{u}")
for u in usernames
]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
async def main():
users = await fetch_multiple_users(["octocat", "torvalds", "gvanrossum"])
for user in users:
print(f"{user['login']}: {user['public_repos']} repos")
asyncio.run(main())
import asyncio
from asyncio import Queue, Semaphore
# Ограничение параллельных запросов (rate limiting)
async def fetch_with_limit(url: str, semaphore: Semaphore) -> str:
async with semaphore: # не более N одновременных запросов
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.text
async def fetch_all(urls: list[str], max_concurrent: int = 10):
semaphore = Semaphore(max_concurrent)
tasks = [fetch_with_limit(url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
# Producer-Consumer pattern
async def producer(queue: Queue, items: list):
for item in items:
await queue.put(item)
await asyncio.sleep(0) # дать возможность другим корутинам
async def consumer(queue: Queue, worker_id: int):
while True:
item = await queue.get()
if item is None: # сигнал остановки
break
print(f"Worker {worker_id} обрабатывает: {item}")
await asyncio.sleep(0.1) # симуляция обработки
queue.task_done()
async def main():
queue = Queue()
items = list(range(20))
workers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
await producer(queue, items)
# Отправить сигнал остановки для каждого worker
for _ in workers:
await queue.put(None)
await asyncio.gather(*workers)
# Timeout
async def fetch_with_timeout(url: str, timeout: float = 5.0):
try:
async with asyncio.timeout(timeout): # Python 3.11+
async with httpx.AsyncClient() as client:
return await client.get(url)
except asyncio.TimeoutError:
print(f"Превышено время ожидания для {url}")
return None
from typing import AsyncGenerator, AsyncIterator
# Async generator
async def read_large_file(path: str) -> AsyncGenerator[str, None]:
with open(path, "r") as f:
for line in f:
await asyncio.sleep(0) # не блокируем
yield line.strip()
async def process_file():
async for line in read_large_file("big_file.txt"):
# обрабатываем строку
pass
# Async context manager
class DatabaseConnection:
async def __aenter__(self):
self.conn = await connect_to_db()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
async def main():
async with DatabaseConnection() as db:
result = await db.query("SELECT * FROM users")
from fastapi import FastAPI
import asyncio
import httpx
app = FastAPI()
# Синхронный endpoint — FastAPI запускает в threadpool
@app.get("/sync")
def sync_endpoint():
import time
time.sleep(1) # блокирует, но в отдельном потоке
return {"message": "sync"}
# Асинхронный endpoint — не блокирует event loop
@app.get("/async")
async def async_endpoint():
await asyncio.sleep(1) # не блокирует!
return {"message": "async"}
# Правило: если используешь await — пиши async def
# Если не используешь await — можно оба варианта (sync лучше для CPU-bound)
# Параллельные запросы к внешним API
@app.get("/dashboard/{user_id}")
async def get_dashboard(user_id: int):
async with httpx.AsyncClient() as client:
# Параллельно запрашиваем разные сервисы
profile, orders, notifications = await asyncio.gather(
client.get(f"/api/profile/{user_id}"),
client.get(f"/api/orders/{user_id}"),
client.get(f"/api/notifications/{user_id}"),
)
return {
"profile": profile.json(),
"orders": orders.json(),
"notifications": notifications.json()
}
# Задание 1: Параллельный парсер
# Напиши async функцию fetch_all_prices(product_ids: list[int]) -> dict[int, float]
# Которая одновременно запрашивает цены для всех товаров
# Ограничение: не более 5 одновременных запросов
# Задание 2: Async retry
async def async_retry(coroutine_func, max_attempts: int = 3, delay: float = 1.0):
"""Повторяет async функцию при ошибке с задержкой."""
pass
# Задание 3: Rate limiter
class RateLimiter:
"""Ограничитель частоты запросов."""
def __init__(self, max_calls: int, period: float):
pass
async def acquire(self) -> None:
"""Ждёт если превышен лимит."""
pass

В следующем уроке — FastAPI: основы!