
Проблема пиков знакома всем: утром клиенты заходят одновременно, партнёр прислал шквал вебхуков, маркетинг выкатил акцию. Без управления потоком:
Решение — обратное давление (backpressure) и лимитирование скорости. Мы явно говорим системе и клиентам: «столько‑то запросов в секунду, остальные в ожидание или на потом». Это дисциплинирует поток, позволяет прогнозировать время и держать SLA без вынужденного горизонтального раздувания.
Ключевая бизнес‑выгода: предсказуемость. Даже в пик система остаётся управляемой и деградирует аккуратно (429/503/очередь), а не рушится каскадом.
Лучше комбинировать: грубые лимиты на периметре, умные — в бизнес‑логике.
Ограничение на IP/ключ с возможностью «всплеска»:
map $http_x_api_key $rate_key {
default $binary_remote_addr; # если нет ключа — по IP
"" $binary_remote_addr;
~.+ $http_x_api_key; # если есть ключ — по нему
}
limit_req_zone $rate_key zone=api_per_client:20m rate=20r/s; # 20 запросов/сек на клиента
limit_conn_zone $rate_key zone=api_conns:10m; # одновременные соединения
server {
listen 443 ssl;
location /api/ {
limit_conn api_conns 10; # не более 10 одновременных подключений
limit_req zone=api_per_client burst=40 nodelay; # до 40 в всплеске, без задержки
proxy_set_header X-Request-Id $request_id;
proxy_pass http://backend;
}
}
В Kubernetes Ingress есть аннотации для лимитов. Для сложных схем (по тарифам) лучше используйте Redis‑скрипт ниже.
Серверный лимит по организации/тарифу/операции. Скрипт хранит текущее число токенов и время последнего пополнения.
-- file: token_bucket.lua
-- Хеш-ключ: rate:{client}:{scope}
-- Поля: tokens (число), ts (мс)
-- ARGV:
-- 1: capacity (максимум токенов)
-- 2: refill_per_sec (токенов в сек)
-- 3: now_ms (текущее время, мс)
-- 4: cost (стоимость операции в токенах)
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local cost = tonumber(ARGV[4]) or 1
local data = redis.call('HMGET', key, 'tokens', 'ts')
local tokens = tonumber(data[1])
local ts = tonumber(data[2])
if tokens == nil then
tokens = capacity
ts = now
end
local delta = math.max(0, now - ts) / 1000.0
local filled = math.min(capacity, tokens + delta * refill)
local allowed = 0
if filled >= cost then
tokens = filled - cost
allowed = 1
else
tokens = filled
end
redis.call('HMSET', key, 'tokens', tokens, 'ts', now)
-- TTL в 2 периода пополнения, чтобы не держать мусорные ключи
local ttl = math.ceil((capacity / refill) * 2)
redis.call('EXPIRE', key, ttl)
return { allowed, tokens }
Пример использования в Node.js (ioredis):
// npm i ioredis express
const fs = require('fs');
const Redis = require('ioredis');
const express = require('express');
const redis = new Redis(process.env.REDIS_URL || 'redis://127.0.0.1:6379');
const script = fs.readFileSync('./token_bucket.lua', 'utf8');
let sha;
async function ensureScript() {
sha = await redis.script('load', script);
}
function limiter({ capacity, refillPerSec, keyFn, cost = 1 }) {
return async (req, res, next) => {
try {
const key = `rate:${keyFn(req)}`;
const now = Date.now();
const result = await redis.evalsha(
sha,
1,
key,
capacity,
refillPerSec,
now,
cost
);
const allowed = result[0] === 1;
const tokens = Number(result[1]);
res.setHeader('X-Rate-Tokens-Left', tokens.toString());
if (!allowed) {
res.setHeader('Retry-After', '1'); // можно вычислять точнее
return res.status(429).json({ error: 'rate_limited' });
}
next();
} catch (e) {
next(e);
}
};
}
(async () => {
await ensureScript();
const app = express();
app.use(
'/api/payments',
limiter({
capacity: 50, // до 50 запросов в всплеске
refillPerSec: 25, // 25 операций в секунду стабильно
keyFn: (req) => req.header('X-Api-Key') || req.ip,
cost: 1,
})
);
app.get('/api/payments', (req, res) => {
res.json({ ok: true });
});
app.listen(8080, () => console.log('Listening on 8080'));
})();
# pip install pika
import pika, time
params = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
conn = pika.BlockingConnection(params)
ch = conn.channel()
# Очередь с DLX на случай переполнения/ошибок
ch.exchange_declare(exchange='dead', exchange_type='fanout', durable=True)
ch.queue_declare(queue='tasks', durable=True, arguments={
'x-dead-letter-exchange': 'dead',
'x-max-length': 100000
})
ch.basic_qos(prefetch_count=20) # не более 20 сообщений «в полёте» на потребителя
def handle(ch, method, props, body):
try:
# Обработка
time.sleep(0.05) # имитация работы
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
ch.basic_consume('tasks', handle)
print('Consuming...')
ch.start_consuming()
Масштабируем потребителей в разумных пределах, следим за временем ожидания сообщения в очереди (age) — это прямой индикатор, что не успеваем.
Используем возможность приостанавливать чтение, если обработка не поспевает, и возобновлять, когда «догнали».
// Gradle: implementation 'org.apache.kafka:kafka-clients:3.7.0'
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class ThrottledConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "payments-workers");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("payments"));
long PAUSE_LAG = 50_000; // пауза, если лаг больше 50k
long RESUME_LAG = 10_000; // возобновить, когда лаг упадёт ниже 10k
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> rec : records) {
// обработка
}
consumer.commitAsync();
Set<TopicPartition> parts = consumer.assignment();
if (!parts.isEmpty()) {
Map<TopicPartition, Long> end = consumer.endOffsets(parts);
long totalLag = 0;
for (TopicPartition tp : parts) {
long pos = consumer.position(tp);
long e = end.get(tp);
totalLag += Math.max(0, e - pos);
}
if (totalLag > PAUSE_LAG) {
consumer.pause(parts);
} else if (totalLag < RESUME_LAG) {
consumer.resume(parts);
}
}
}
}
}
Дополнительно задайте верхние лимиты продьюсеру (batch.size, linger.ms) и потребителю (max.poll.interval.ms) и следите за «старейшим» возрастом сообщения.
Когда лимит исчерпан, корректный ответ — 429 Too Many Requests, с подсказкой, когда пробовать снова.
// Отвечаем 429 и подсказываем клиенту, когда повторить
res.setHeader('Retry-After', '2'); // секунды или HTTP‑дату
res.status(429).json({ error: 'rate_limited', retry_after: 2 });
Рекомендуйте клиентам экспоненциальный бэкофф с «дрожью» (jitter), чтобы не ударяли синхронно:
function backoffWithJitter(attempt, baseMs = 200, capMs = 10_000) {
const exp = Math.min(capMs, baseMs * Math.pow(2, attempt));
const jitter = Math.random() * exp * 0.2; // +/-20%
return Math.floor(exp * 0.9 + jitter);
}
Если партнёр ограничивает вас (присылает 429/503), уважайте его лимиты, иначе получите бан и срыв интеграции.
Некоторые запросы (отчёты, сложные джоины, экспорт) опасны параллелизмом. Простой способ — семафор в коде. Пример на Go:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/lib/pq"
)
// канал как семафор: не более N параллельных тяжёлых операций
var heavySem = make(chan struct{}, 4)
func withHeavyLimit(ctx context.Context, fn func(ctx context.Context) error) error {
select {
case heavySem <- struct{}{}:
defer func() { <-heavySem }()
return fn(ctx)
case <-ctx.Done():
return ctx.Err()
}
}
func generateReport(ctx context.Context, db *sql.DB, userID int64) error {
q := `SELECT generate_series(1, 5)` // вместо тяжёлого запроса
rows, err := db.QueryContext(ctx, q)
if err != nil { return err }
defer rows.Close()
for rows.Next() {}
return rows.Err()
}
func main() {
db, err := sql.Open("postgres", "postgres://user:pass@localhost:5432/app?sslmode=disable")
if err != nil { log.Fatal(err) }
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = withHeavyLimit(ctx, func(ctx context.Context) error {
return generateReport(ctx, db, 42)
})
if err != nil {
fmt.Println("failed:", err)
} else {
fmt.Println("ok")
}
}
Этот приём защищает базу от «пылесоса», когда десятки тяжёлых запросов запускаются параллельно. Дополнительно можно отдавать пользователю «ваш отчёт готовится» и присылать ссылку на скачивание позже.
Измерьте устойчивую пропускную способность сервиса без деградации (например, 200 rps при p95<200 мс и CPU<70%).
Установите базовый лимит на 60–80% этой скорости (запас на всплески и джиттер). Burst — 2–3× от лимита, но не больше, чем система переварит за 1–2 секунды.
Отдельно посчитайте «дорогие» операции: например, отчёт = 50 обычных запросов. Задайте для них собственные лимиты и семафоры.
Пилотное включение:
Метрики:
Алерты:
Дашборды: тепловая карта по клиентам/тарифам (кто «ест» лимит), коридоры SLA.
Обратное давление и лимиты скорости — не «затычка», а фундамент стабильности. Пара конфигов на периметре, один Lua‑скрипт в Redis, правильные настройки потребителей очереди, аккуратные ответы 429 и простой семафор на тяжёлые операции — и вы превращаете хаос пиков в управляемый поток. В выигрыше все: пользователи получают предсказуемый сервис, команда — спокойные ночи, бизнес — экономию на инфраструктуре и SLA без сюрпризов.