Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Очередь задач на PostgreSQL с SKIP LOCKED: быстрый старт без отдельного брокера и меньше расходов

Разработка и технологии20 января 2026 г.
Нужны фоновые задачи, но не хочется тянуть отдельный брокер? PostgreSQL уже умеет быть надежной очередью. Разберём, как построить очередь с конкурентными воркерами на FOR UPDATE SKIP LOCKED, ретраями, дедупликацией и уведомлениями через LISTEN/NOTIFY — без дедлоков и с понятными границами масштабирования.
Очередь задач на PostgreSQL с SKIP LOCKED: быстрый старт без отдельного брокера и меньше расходов

  • Зачем и когда это оправдано
  • Схема таблицы и индексы
  • Выбор задач без конфликтов: FOR UPDATE SKIP LOCKED
  • Ретраи, паузы и предельные попытки
  • Дедупликация задач и уникальные ключи
  • Быстрые реакции на новые задачи: LISTEN/NOTIFY
  • Готовый пример на Go: воркер с батчами и graceful shutdown
  • Наблюдаемость и контроль: метрики и алерты
  • Масштабирование и пределы подхода
  • Типичные ошибки и как их избежать
  • Путь эволюции: от PostgreSQL к брокеру
  • Итоги

Зачем и когда это оправдано

Отправка писем, пересчёт агрегатов, вебхуки, генерация отчетов — для всего этого обычно тянут отдельный брокер (RabbitMQ, Kafka, SQS). Но во многих продуктах это усложняет инфраструктуру и увеличивает время вывода фич. Если у вас:

  • один основной Postgres;
  • десятки–сотни тысяч задач в сутки, а не миллионы в минуту;
  • критичны простота и скорость запуска;

то очередь на PostgreSQL — отличный компромисс. Плюсы для бизнеса:

  • меньше систем — меньше инцидентов и расходов на поддержку;
  • транзакционная запись задач: положили в ту же БД, что и бизнес‑данные, — не потеряли;
  • быстрый старт: один SQL‑миграционный файл и небольшой воркер.

Ниже — полноценная схема и рабочий код. Подход даёт «по крайней мере один раз» доставку (at-least-once). Поэтому обработчики должны быть идемпотентными: повторный запуск не должен ломать данные.

Схема таблицы и индексы

Создадим таблицу задач, индексы и удобные функции — вставка с дедупликацией и retry‑delay.

-- Схема очереди задач
create table if not exists job (
  id            bigserial primary key,
  queue         text        not null, -- имя очереди/темы
  priority      int         not null default 100, -- ниже число — выше приоритет
  status        text        not null default 'queued' check (status in ('queued','running','done','failed','cancelled')),
  run_at        timestamptz not null default now(), -- когда можно брать в работу
  attempts      int         not null default 0,     -- сколько раз пытались
  max_attempts  int         not null default 20,
  unique_key    text,                               -- для дедупликации (по желанию)
  last_error    text,
  payload       jsonb       not null,
  created_at    timestamptz not null default now(),
  updated_at    timestamptz not null default now()
);

-- Быстрый выбор очереди к обработке
create index if not exists idx_job_queued on job (queue, priority, run_at, id) where status = 'queued';

-- Дедупликация: активная задача с таким ключом может быть только одна
create unique index if not exists uniq_job_active_key on job (unique_key) where unique_key is not null and status in ('queued','running');

-- Триггер на updated_at
create or replace function set_updated_at() returns trigger as $$
begin
  new.updated_at = now();
  return new;
end; $$ language plpgsql;

drop trigger if exists trg_job_updated_at on job;
create trigger trg_job_updated_at before update on job
for each row execute function set_updated_at();

-- Экспоненциальная пауза между ретраями (не более часа)
create or replace function retry_delay(p_attempts int) returns interval as $$
declare
  -- с 1-й попытки: 10с, 2-я: 20с, 3-я: 40с ... до 1 часа
  secs int := least(3600, (power(2, greatest(p_attempts, 1))::int) * 10);
begin
  return make_interval(secs => secs);
end; $$ language plpgsql immutable;

-- Удобная функция постановки задачи с опциональной дедупликацией и уведомлением воркеров
create or replace function enqueue_job(
  p_queue text,
  p_payload jsonb,
  p_priority int default 100,
  p_run_at timestamptz default now(),
  p_unique_key text default null,
  p_max_attempts int default 20
) returns bigint as $$
declare
  v_id bigint;
begin
  if p_unique_key is not null then
    insert into job(queue, payload, priority, run_at, unique_key, max_attempts)
    values (p_queue, p_payload, p_priority, p_run_at, p_unique_key, p_max_attempts)
    on conflict (unique_key) do update
      set payload = excluded.payload,
          priority = excluded.priority,
          run_at = excluded.run_at,
          max_attempts = excluded.max_attempts
    returning id into v_id;
  else
    insert into job(queue, payload, priority, run_at, max_attempts)
    values (p_queue, p_payload, p_priority, p_run_at, p_max_attempts)
    returning id into v_id;
  end if;

  perform pg_notify('queue_' || p_queue, v_id::text);
  return v_id;
end; $$ language plpgsql;

Почему так:

  • partial‑индекс по статусу=queued делает выбор быстрым и не мешает завершённым задачам;
  • unique_key предотвращает дубли (например, «отправить отчёт за 2026‑01 пользователю X»);
  • run_at позволяет откладывать задачи и делать backoff по расписанию.

Выбор задач без конфликтов: FOR UPDATE SKIP LOCKED

Чтобы десятки воркеров не схватили одну и ту же задачу, используем одновременное блокирующее чтение и пропуск уже занятых записей:

-- Забрать пачку задач в работу атомарно
with cte as (
  select id
  from job
  where status = 'queued' and queue = $1 and run_at <= now()
  order by priority asc, run_at asc, id asc
  for update skip locked
  limit $2
)
update job j
set status = 'running', attempts = attempts + 1, updated_at = now()
from cte
where j.id = cte.id
returning j.id, j.payload, j.attempts, j.max_attempts;
  • FOR UPDATE делает запись «занятой» текущей транзакцией;
  • SKIP LOCKED заставляет другие воркеры пропускать занятые строки, а не ждать — никаких дедлоков и очередей на локи;
  • UPDATE в одном выражении — атомарно: взяли и отметили как running.

Ретраи, паузы и предельные попытки

Если обработчик вернул ошибку, возвращаем задачу в очередь с паузой. Когда попыток стало слишком много — ставим статус failed.

-- Успешно
update job set status = 'done' where id = $1;

-- Ошибка: отложить или пометить как failed
update job
set status = case when attempts >= max_attempts then 'failed' else 'queued' end,
    run_at = case when attempts >= max_attempts then run_at else now() + retry_delay(attempts) end,
    last_error = $2
where id = $1;

Важно: attempts мы увеличили при взятии в работу. Значит, retry_delay(attempts) уже учитывает текущую попытку.

Дедупликация задач и уникальные ключи

Частый кейс: одна и та же логическая задача может быть создана много раз (например, система дергает пересчёт сегмента). Чтобы не перемалывать одинаковое, используйте unique_key. Функция enqueue_job делает UPSERT — либо создаёт новую запись, либо обновляет существующую queued/running. Это особенно полезно для «последнего состояния» (например, «пересчитать до версии N»).

Быстрые реакции на новые задачи: LISTEN/NOTIFY

Постоянный поллинг раз в 100–500 мс — нормальная стратегия, но можно снизить лишние запросы. После вставки делаем NOTIFY, а воркеры делают LISTEN и просыпаются мгновенно.

Канал формируем как queue_<имя_очереди>, чтобы у каждого типа задач был свой сигнал.

Готовый пример на Go: воркер с батчами и graceful shutdown

Ниже — минимальный, но рабочий пример. Он:

  • подключается к Postgres (pgx) и слушает уведомления по каналу очереди;
  • каждые N миллисекунд проверяет очередь и берёт пачку задач без конфликта;
  • обрабатывает задания последовательно (для простоты) и корректно завершает работу по сигналу ОС;
  • показывает, как помечать успех/ошибку с retry.

Соберите переменную окружения DATABASE_URL (например, postgres://user

@localhost
/app?sslmode=disable).

package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
)

type Job struct {
	ID          int64
	Payload     map[string]any
	Attempts    int32
	MaxAttempts int32
}

func main() {
	dsn := os.Getenv("DATABASE_URL")
	if dsn == "" {
		log.Fatal("DATABASE_URL is not set")
	}

	queue := getenv("QUEUE", "email")
	batchSize := getenvInt("BATCH_SIZE", 10)
	pollEvery := getenvDuration("POLL_MS", 200*time.Millisecond)

	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	pool, err := pgxpool.New(ctx, dsn)
	if err != nil {
		log.Fatalf("pgxpool: %v", err)
	}
	defer pool.Close()

	// Отдельное соединение для LISTEN
	listener, err := pgx.Connect(ctx, dsn)
	if err != nil {
		log.Fatalf("listener connect: %v", err)
	}
	defer listener.Close(ctx)

	channel := "queue_" + queue
	if _, err := listener.Exec(ctx, "listen "+pgx.Identifier{channel}.Sanitize()); err != nil {
		log.Fatalf("listen: %v", err)
	}
	log.Printf("listening on channel=%s queue=%s", channel, queue)

	for {
		if ctx.Err() != nil {
			break
		}

		jobs, err := fetchAndLockJobs(ctx, pool, queue, batchSize)
		if err != nil {
			log.Printf("fetch: %v", err)
			// Небольшая пауза, чтобы не крутить цикл при ошибке подключения
			time.Sleep(500 * time.Millisecond)
			continue
		}

		if len(jobs) > 0 {
			for _, j := range jobs {
				if err := handleJob(ctx, pool, j); err != nil {
					log.Printf("job %d error: %v", j.ID, err)
				}
			}
			continue
		}

		// Если задач нет — ждём уведомление или таймаут поллинга
		waitCtx, cancel := context.WithTimeout(ctx, pollEvery)
		_, err = listener.WaitForNotification(waitCtx)
		cancel()
		// По таймауту просто продолжим цикл (поллинг), по ошибке контекста — тоже
	}

	log.Println("graceful shutdown")
}

func fetchAndLockJobs(ctx context.Context, pool *pgxpool.Pool, queue string, limit int) ([]Job, error) {
	const q = `
with cte as (
  select id
  from job
  where status = 'queued' and queue = $1 and run_at <= now()
  order by priority asc, run_at asc, id asc
  for update skip locked
  limit $2
)
update job j
set status = 'running', attempts = attempts + 1, updated_at = now()
from cte
where j.id = cte.id
returning j.id, j.payload, j.attempts, j.max_attempts;
`
	rows, err := pool.Query(ctx, q, queue, limit)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	jobs := make([]Job, 0, limit)
	for rows.Next() {
		var (
			id int64
			payloadBytes []byte
			attempts int32
			maxAttempts int32
		)
		if err := rows.Scan(&id, &payloadBytes, &attempts, &maxAttempts); err != nil {
			return nil, err
		}
		var payload map[string]any
		if err := json.Unmarshal(payloadBytes, &payload); err != nil {
			return nil, err
		}
		jobs = append(jobs, Job{ID: id, Payload: payload, Attempts: attempts, MaxAttempts: maxAttempts})
	}
	return jobs, rows.Err()
}

func handleJob(ctx context.Context, pool *pgxpool.Pool, j Job) error {
	// Пример обработчика: имитируем работу 100–300 мс
	dur := 100*time.Millisecond + time.Duration(j.ID%200)*time.Millisecond
	time.Sleep(dur)

	// Можно управлять ошибками через payload, например {"force_error": true}
	if b, ok := j.Payload["force_error"].(bool); ok && b {
		return failJob(ctx, pool, j.ID, errors.New("forced error"))
	}

	return completeJob(ctx, pool, j.ID)
}

func completeJob(ctx context.Context, pool *pgxpool.Pool, id int64) error {
	cmd, err := pool.Exec(ctx, "update job set status='done' where id=$1", id)
	if err != nil {
		return err
	}
	if cmd.RowsAffected() == 0 {
		return fmt.Errorf("job %d not found", id)
	}
	return nil
}

func failJob(ctx context.Context, pool *pgxpool.Pool, id int64, cause error) error {
	msg := cause.Error()
	const q = `
update job
set status = case when attempts >= max_attempts then 'failed' else 'queued' end,
    run_at = case when attempts >= max_attempts then run_at else now() + retry_delay(attempts) end,
    last_error = $2,
    updated_at = now()
where id = $1;
`
	_, err := pool.Exec(ctx, q, id, msg)
	return err
}

func getenv(key, def string) string {
	v := os.Getenv(key)
	if v == "" { return def }
	return v
}

func getenvInt(key string, def int) int {
	v := os.Getenv(key)
	if v == "" { return def }
	var x int
	_, err := fmt.Sscanf(v, "%d", &x)
	if err != nil { return def }
	return x
}

func getenvDuration(key string, def time.Duration) time.Duration {
	v := os.Getenv(key)
	if v == "" { return def }
	d, err := time.ParseDuration(v+"ms")
	if err != nil { return def }
	return d
}

Замечания к коду:

  • listener.WaitForNotification ждёт сигнал по каналу очереди, но при его отсутствии через pollEvery мы всё равно проверяем БД;
  • обработка задач последовательная для краткости — для большей пропускной способности распараллельте внутри цикла;
  • для нескольких очередей запустите несколько экземпляров воркера с разными QUEUE.

Наблюдаемость и контроль: метрики и алерты

Что стоит мерить сразу:

  • глубина очереди по queue и статусу (queued/running/failed);
  • время ожидания в очереди: now() - created_at для queued;
  • успешные/ошибочные обработки и распределение attempts;
  • длительность задачи (от running до done);
  • доля задач, упавших в failed.

Простые алерты:

  • queued > N в течение M минут;
  • p95 «время ожидания» > SLA;
  • всплеск failed/attempts — вероятно, внешний сервис недоступен.

Масштабирование и пределы подхода

  • На одной таблице и индексе такой подход спокойно тянет десятки тысяч задач в минуту на среднем инстансе Postgres при грамотных индексах и коротких транзакциях.
  • Горизонтально масштабируется на уровне воркеров: добавьте процессы — и пропускная растёт.
  • Горячие ключи: если все задачи одной очереди с приоритетом 0 — следите, чтобы SELECT не перебирал много строк. Часто помогает сортировка по (priority, run_at, id) и ограничение batchSize.
  • Хранение истории: done/failed засоряют таблицу. Регулярно чистите старые записи (например, хранить 7–30 дней) или архивируйте в отдельную таблицу.

Типичные ошибки и как их избежать

  • Длинные транзакции в воркере: держат блокировки дольше, увеличивают конкуренцию. Делайте работу с внешними сервисами ВНЕ транзакции, фиксируйте изменение статуса как можно раньше/позже по необходимости.
  • Отсутствие partial‑индекса по queued: выбор становится медленным при росте таблицы.
  • Слишком агрессивный поллинг: 10 воркеров × 10 rps × пустая очередь = лишняя нагрузка. LISTEN/NOTIFY помогает снизить шум.
  • Неидемпотентные обработчики: при at‑least‑once вы получите дубликаты выполнения. Сохраняйте «ключ повторов» на уровне домена (например, idempotency_key) или проверяйте, применялись ли изменения.
  • Бесконечные ретраи: всегда ограничивайте max_attempts, иначе одна «битая» задача будет грузить систему бесконечно.

Путь эволюции: от PostgreSQL к брокеру

Когда стоит задуматься о миграции на специализированный брокер:

  • сотни тысяч задач в минуту, нужен горизонтальный масштаб без упора в один узел БД;
  • необходимые семантики, которых нет «из коробки» (например, широковещательная доставка каждому подписчику по событию);
  • требования по задержке < 10 мс и почти мгновенная масштабируемость консьюмеров;
  • отчётность в разрезе потоков/партиций, сложные топологии маршрутизации.

Мигрировать проще, если с первого дня держать обработчики идемпотентными и описать контракт задачи (тип, схема payload). Тогда перенос «транспортного слоя» (Postgres → брокер) будет наименее болезненный.

Итоги

Очередь на PostgreSQL с FOR UPDATE SKIP LOCKED — практичный способ быстро запустить фоновые задачи без новой инфраструктуры. Вы получаете транзакционную постановку, конкурентных воркеров без дедлоков, ретраи с экспоненциальной паузой, дедупликацию и мгновенные уведомления через LISTEN/NOTIFY. Это закрывает 80% повседневных сценариев и снижает расходы. При росте нагрузки и требований вы сможете эволюционировать к брокеру — без переписывания всей бизнес‑логики.


PostgreSQLочереди задачSKIP LOCKED