
N+1 — это когда вы сначала делаете один запрос за списком объектов, а потом для каждого объекта ещё один (или несколько). Например, список статей и для каждой — автор и теги. В итоге вместо 1–2 запросов получаете десятки или сотни.
Симптомы:
# python -m pip install sqlalchemy
from contextlib import contextmanager
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base, relationship, Session, joinedload
from sqlalchemy import event
Base = declarative_base()
class Author(Base):
__tablename__ = "author"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
class Article(Base):
__tablename__ = "article"
id = Column(Integer, primary_key=True)
title = Column(String, nullable=False)
author_id = Column(Integer, ForeignKey("author.id"), nullable=False)
author = relationship("Author", backref="articles")
engine = create_engine("sqlite:///:memory:", echo=False, future=True)
Base.metadata.create_all(engine)
# Наполним данными
with Session(engine) as s:
authors = [Author(name=f"Author {i}") for i in range(10)]
s.add_all(authors)
s.flush()
arts = [Article(title=f"Post {i}", author_id=authors[i % 10].id) for i in range(100)]
s.add_all(arts)
s.commit()
@contextmanager
def count_sql(engine):
counter = {"n": 0}
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
counter["n"] += 1
h = event.listen(engine, "before_cursor_execute", before_cursor_execute)
try:
yield counter
finally:
event.remove(engine, "before_cursor_execute", before_cursor_execute)
# N+1
with Session(engine) as s, count_sql(engine) as c:
articles = s.query(Article).all() # 1 запрос
authors = [a.author.name for a in articles] # +100 запросов
print("N+1 queries:", c["n"]) # ~101
# Оптимизировано: joinedload
from sqlalchemy.orm import selectinload
with Session(engine) as s, count_sql(engine) as c:
articles = s.query(Article).options(joinedload(Article.author)).all() # 1-2 запроса
authors = [a.author.name for a in articles]
print("Optimized queries:", c["n"]) # ~1
Представим простые модели.
# models.py
from django.db import models
class Author(models.Model):
name = models.CharField(max_length=200)
class Tag(models.Model):
name = models.CharField(max_length=50, unique=True)
class Article(models.Model):
title = models.CharField(max_length=200)
author = models.ForeignKey(Author, on_delete=models.CASCADE, related_name="articles")
tags = models.ManyToManyField(Tag, related_name="articles")
published_at = models.DateTimeField()
# views.py (антипаттерн)
from .models import Article
def list_titles_and_meta():
result = []
for a in Article.objects.order_by('-published_at')[:100]:
result.append({
"id": a.id,
"title": a.title,
"author": a.author.name, # +1 на каждый объект
"tags": [t.name for t in a.tags.all()], # ещё +1 на каждый объект
})
return result
Получаем 1 запрос за список + до 200 дополнительных.
# views.py (оптимально)
from django.db.models import Prefetch
from .models import Article, Tag
def list_titles_and_meta_fast():
qs = (
Article.objects
.select_related('author')
.prefetch_related(
Prefetch('tags', queryset=Tag.objects.only('id', 'name').order_by('name'))
)
.only('id', 'title', 'author__name', 'published_at')
.order_by('-published_at')
)
result = []
for a in qs[:100]:
result.append({
"id": a.id,
"title": a.title,
"author": a.author.name,
"tags": [t.name for t in a.tags.all()],
})
return result
Замечания:
Пример:
# Предположим модели Article/Author/Tag с отношениями
from sqlalchemy.orm import Session, joinedload, selectinload
# Список статей с автором и тегами, без N+1
with Session(engine) as s:
articles = (
s.query(Article)
.options(
joinedload(Article.author),
selectinload(Article.tags)
)
.order_by(Article.id.desc())
.limit(100)
.all()
)
data = [{
"id": a.id,
"title": a.title,
"author": a.author.name,
"tags": [t.name for t in a.tags]
} for a in articles]
Совет: если результат «раздувается» из‑за JOIN, переключайтесь на selectinload.
Пример Django:
# увеличить счётчик просмотров у множества статей
from django.db.models import F
from .models import Article
Article.objects.filter(id__in=[1, 2, 3]).update(views_count=F('views_count') + 1)
Пример SQLAlchemy:
from sqlalchemy.orm import Session
from sqlalchemy import update
with Session(engine) as s:
stmt = update(Article).where(Article.id.in_([1, 2, 3])).values(views_count=Article.views_count + 1)
s.execute(stmt)
s.commit()
Кэш не лечит плохие запросы, но закрепляет результат оптимизации:
# python -m pip install redis
# simple_cache.py
import json
import time
import redis
r = redis.Redis(host="localhost", port=6379, db=0)
class Cache:
def __init__(self, client: redis.Redis):
self.r = client
def get_json(self, key: str):
val = self.r.get(key)
return json.loads(val) if val else None
def set_json(self, key: str, value, ttl: int):
self.r.setex(key, ttl, json.dumps(value, ensure_ascii=False))
def with_lock(self, lock_key: str, timeout: int = 5):
return self.r.lock(lock_key, timeout=timeout)
cache = Cache(r)
# cache_article.py
from django.db.models.signals import post_save, post_delete, m2m_changed
from django.dispatch import receiver
from .models import Article
from .simple_cache import cache
ARTICLE_TTL = 300 # 5 минут
def article_cache_key(article_id: int) -> str:
return f"article:{article_id}:v1"
def get_article_cached(article_id: int):
key = article_cache_key(article_id)
data = cache.get_json(key)
if data is not None:
return data
lock_key = f"lock:{key}"
with cache.with_lock(lock_key, timeout=5):
# двойная проверка (другой поток мог уже положить)
data = cache.get_json(key)
if data is not None:
return data
# грузим из БД одной пачкой, без N+1
from django.db.models import Prefetch
from .models import Tag
a = (Article.objects
.select_related('author')
.prefetch_related(Prefetch('tags', queryset=Tag.objects.only('id', 'name')))
.only('id', 'title', 'author__name', 'published_at')
.get(id=article_id))
data = {
"id": a.id,
"title": a.title,
"author": a.author.name,
"published_at": a.published_at.isoformat(),
"tags": [t.name for t in a.tags.all()],
}
cache.set_json(key, data, ARTICLE_TTL)
return data
# Инвалидация при изменениях
@receiver(post_save, sender=Article)
@receiver(post_delete, sender=Article)
def invalidate_article_cache(sender, instance, **kwargs):
cache.r.delete(article_cache_key(instance.id))
# Если меняются теги статьи — тоже инвалидируем
@receiver(m2m_changed, sender=Article.tags.through)
def invalidate_article_tags_cache(sender, instance, action, **kwargs):
if action in {"post_add", "post_remove", "post_clear"}:
cache.r.delete(article_cache_key(instance.id))
# list_cache.py
from .simple_cache import cache
from .models import Article
LIST_TTL = 120
LIST_VERSION_KEY = "articles:list:v"
def _get_list_version() -> int:
v = cache.r.get(LIST_VERSION_KEY)
if v is None:
cache.r.set(LIST_VERSION_KEY, 1)
return 1
return int(v)
def _bump_list_version():
cache.r.incr(LIST_VERSION_KEY)
def list_cache_key(suffix: str) -> str:
return f"articles:list:{_get_list_version()}:{suffix}"
def get_latest_articles_cached(limit: int = 100):
key = list_cache_key(f"latest:{limit}")
data = cache.get_json(key)
if data is not None:
return data
# защищаемся от набега
with cache.with_lock(f"lock:{key}", timeout=5):
data = cache.get_json(key)
if data is not None:
return data
qs = (Article.objects
.select_related('author')
.only('id', 'title', 'author__name', 'published_at')
.order_by('-published_at')[:limit])
data = [{
"id": a.id,
"title": a.title,
"author": a.author.name,
"published_at": a.published_at.isoformat(),
} for a in qs]
cache.set_json(key, data, LIST_TTL)
return data
# вызывать при изменении каталога (сигналы post_save/post_delete Article или по расписанию)
def on_articles_changed():
_bump_list_version()
Почему версионный ключ лучше, чем удалить «все по маске»:
# tests/test_queries.py
from django.test import TestCase
from django.db import connection
from app.models import Article
from app.views import list_titles_and_meta_fast
class QueryTests(TestCase):
fixtures = ["demo.json"] # Предположим, у вас есть тестовые данные
def test_list_uses_few_queries(self):
with self.assertNumQueries(3):
data = list_titles_and_meta_fast()
self.assertGreater(len(data), 0)
В примере выше показан count_sql. Оборачивайте критичные запросы в тестах и фиксируйте верхнюю границу.
Даже неделя работы команды часто окупается за 1–2 месяца счетов за облако.