Kravchenko

Web Lab

АудитБлогКонтакты

Kravchenko

Web Lab

Разрабатываем сайты и автоматизацию на современных фреймворках под ключ

Услуги
ЛендингМногостраничныйВизитка
E-commerceБронированиеПортфолио
Навигация
БлогКонтактыАудит
Обратная связь
+7 921 567-11-16
info@kravlab.ru
с 09:00 до 18:00

© 2026 Все права защищены

•

ИП Кравченко Никита Владимирович

•

ОГРНИП: 324784700339743

Политика конфиденциальности

Очередь задач на PostgreSQL с SKIP LOCKED: отложенные операции без лишних сервисов и затрат

Разработка и технологии1 марта 2026 г.
Как выполнить отложенные и фоновые операции без запуска отдельной очереди сообщений. Разбираем реализацию очереди на PostgreSQL с блокировками SKIP LOCKED: приоритеты, ретраи, таймауты, дедупликация, очистка и мониторинг. Это упрощает инфраструктуру, снижает расходы и остаётся надёжным для большинства бизнес‑кейсов.
Очередь задач на PostgreSQL с SKIP LOCKED: отложенные операции без лишних сервисов и затрат

Оглавление

  • Зачем делать очередь на PostgreSQL
  • Когда решение подходит, а когда уже нет
  • Схема таблицы: статусы, индексы, дедупликация
  • Выдача задач без гонок: SELECT FOR UPDATE SKIP LOCKED
  • Ретраи, таймауты и возврат в очередь
  • Пример готовых SQL‑функций
  • Пример исполнителя на Go: запуск, обработка, подтверждение
  • Эксплуатация: метрики, очистка, масштабирование
  • Риски, грабли и как их обойти
  • Чек‑лист для продакшена

Зачем делать очередь на PostgreSQL

Фоновые и отложенные операции есть у любого продукта: отправить письма, пересчитать отчёт, подтянуть курс валют, синхронизировать каталог, перестроить кэш, сгенерировать акт. Логичный путь — поставить отдельную очередь (Kafka, RabbitMQ, Redis‑очередь). Но за это платят сложностью, администрированием и счетами за инфраструктуру.

Во многих продуктах трафик на фоновые задачи умеренный и не оправдывает отдельный контур. Если у вас уже есть PostgreSQL, можно обойтись без лишних сервисов: хранить задачи в таблице, безопасно забирать их конкурентно и выполнять. Ключ к надёжности — «правильные» индексы, блокировки FOR UPDATE SKIP LOCKED, таймауты и аккуратные ретраи.

Итог для бизнеса:

  • меньше сервисов — ниже затраты и меньше инцидентов;
  • предсказуемость: транзакции БД обеспечивают атомарность постановки и выдачи задач;
  • простота: один бэкап для данных и задач, одна политика доступа, одна точка мониторинга.

Когда решение подходит, а когда уже нет

Подходит, если:

  • суммарно до десятков тысяч задач в минуту (в одном регионе), нет экстремально тяжёлых «шипов»;
  • важна простота и открытость (SQL видно и понятно, легко дебажить);
  • нужна строгая консистентность при постановке задач вместе с бизнес‑данными (в одной транзакции).

Лучше взять специализированную очередь, если:

  • сотни тысяч задач в секунду, распределённые регионы, порядок доставки на уровне партиций;
  • критичны почти нулевые задержки на миллисекундах и гарантия упорядочивания потоков;
  • требуется фан‑аут на десятки тысяч подписчиков событий.

Схема таблицы: статусы, индексы, дедупликация

Опишем минимальный, но полноценный формат:

  • статус: pending/reserved/done/failed/dead;
  • приоритет: чем выше, тем раньше берём задачу;
  • run_at: время, когда задачу можно исполнять (поддерживает отложенный старт и ретраи);
  • попытки, максимум попыток, таймаут удержания;
  • dedupe_key: чтобы не ставить дубликаты «активных» задач одного типа.
-- Схема очереди задач
CREATE TABLE IF NOT EXISTS task_queue (
  id           BIGSERIAL PRIMARY KEY,
  task_type    TEXT        NOT NULL,
  payload      JSONB       NOT NULL,
  dedupe_key   TEXT,
  priority     INT         NOT NULL DEFAULT 0,
  status       TEXT        NOT NULL DEFAULT 'pending'
               CHECK (status IN ('pending','reserved','done','failed','dead')),
  attempt      INT         NOT NULL DEFAULT 0,
  max_attempts INT         NOT NULL DEFAULT 10,
  run_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
  reserved_at  TIMESTAMPTZ,
  reserved_by  TEXT,
  timeout      INTERVAL    NOT NULL DEFAULT INTERVAL '30 seconds',
  last_error   TEXT,
  created_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
  finished_at  TIMESTAMPTZ
);

-- Активные (в очереди или в работе) задачи с одинаковым dedupe_key и типом не дублируем
CREATE UNIQUE INDEX IF NOT EXISTS uniq_active_dedupe
  ON task_queue (task_type, dedupe_key)
  WHERE dedupe_key IS NOT NULL AND status IN ('pending','reserved');

-- Быстрый выбор задач к исполнению (сортируем по приоритету и времени запуска)
CREATE INDEX IF NOT EXISTS idx_task_pending
  ON task_queue (priority DESC, run_at ASC, id)
  WHERE status = 'pending';

-- Технический триггер для updated_at
CREATE OR REPLACE FUNCTION set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
  NEW.updated_at := now();
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

DROP TRIGGER IF EXISTS trg_set_updated_at ON task_queue;
CREATE TRIGGER trg_set_updated_at
BEFORE UPDATE ON task_queue
FOR EACH ROW EXECUTE FUNCTION set_updated_at();

Почему так:

  • частичный уникальный индекс по dedupe_key спасает от дублей «активных» задач;
  • частичный индекс по pending экономит место и ускоряет выборку исполнителей;
  • отдельные поля таймаута и попыток позволяют строить предсказуемые ретраи.

Выдача задач без гонок: SELECT FOR UPDATE SKIP LOCKED

Нам нужно, чтобы несколько экземпляров приложения безопасно забирали разные задачи. Для этого:

  • выбираем pending‑задачи с run_at <= now() с сортировкой по приоритету и времени;
  • блокируем выбранные строки FOR UPDATE SKIP LOCKED — конкуренты пропустят их;
  • помечаем задачи как reserved и сохраняем «кто взял» и время.

Плюсы: никакой глобальной блокировки, минимальные конфликты, хорошая масштабируемость на десятки параллельных исполнителей.

Ретраи, таймауты и возврат в очередь

Гарантия «ровно один раз» в распределённых системах стоит очень дорого. Реалистично — «как минимум один раз». Значит:

  • исполнитель должен быть идемпотентен (безопасно повторять задачу);
  • если задача «зависла» у умершего процесса, по таймауту её надо вернуть в очередь;
  • при ошибке увеличиваем счётчик попыток и переносим run_at по формуле экспоненциальной паузы с небольшим случайным «дрожанием», чтобы исполнители не били в одну точку одновременно.

Пример готовых SQL‑функций

Ниже четыре функции: постановка, выдача, завершение и реанимация «просроченных» задач.

-- Постановка задачи с возможной дедупликацией по (task_type, dedupe_key)
CREATE OR REPLACE FUNCTION enqueue_task(
  p_task_type    TEXT,
  p_payload      JSONB,
  p_run_at       TIMESTAMPTZ DEFAULT now(),
  p_priority     INT         DEFAULT 0,
  p_dedupe_key   TEXT        DEFAULT NULL,
  p_max_attempts INT         DEFAULT 10,
  p_timeout      INTERVAL    DEFAULT INTERVAL '30 seconds'
) RETURNS task_queue AS $$
DECLARE
  rec task_queue;
BEGIN
  INSERT INTO task_queue(task_type, payload, run_at, priority, dedupe_key, max_attempts, timeout)
  VALUES (p_task_type, p_payload, p_run_at, p_priority, p_dedupe_key, p_max_attempts, p_timeout)
  ON CONFLICT (task_type, dedupe_key) WHERE p_dedupe_key IS NOT NULL AND status IN ('pending','reserved')
  DO NOTHING
  RETURNING * INTO rec;

  IF rec.id IS NULL AND p_dedupe_key IS NOT NULL THEN
    SELECT * INTO rec
    FROM task_queue
    WHERE task_type = p_task_type AND dedupe_key = p_dedupe_key AND status IN ('pending','reserved')
    ORDER BY id DESC
    LIMIT 1;
  END IF;

  IF rec.id IS NULL THEN
    -- Не было конфликта, а RETURNING не сработал? Подстрахуемся выборкой по данным
    SELECT * INTO rec
    FROM task_queue
    WHERE task_type = p_task_type AND payload = p_payload AND run_at = p_run_at
    ORDER BY id DESC
    LIMIT 1;
  END IF;

  RETURN rec;
END;
$$ LANGUAGE plpgsql;

-- Выдача и бронирование задач исполнителю
CREATE OR REPLACE FUNCTION fetch_and_reserve(
  p_limit  INT,
  p_worker TEXT
) RETURNS SETOF task_queue AS $$
BEGIN
  RETURN QUERY
  WITH cte AS (
    SELECT id
    FROM task_queue
    WHERE status = 'pending' AND run_at <= now()
    ORDER BY priority DESC, run_at ASC, id ASC
    FOR UPDATE SKIP LOCKED
    LIMIT p_limit
  )
  UPDATE task_queue t
  SET status = 'reserved', reserved_at = now(), reserved_by = p_worker
  FROM cte
  WHERE t.id = cte.id
  RETURNING t.*;
END;
$$ LANGUAGE plpgsql;

-- Экспоненциальный бэкофф с «дрожанием» (0..1 сек)
CREATE OR REPLACE FUNCTION next_run_at(p_attempt INT)
RETURNS TIMESTAMPTZ AS $$
DECLARE
  base INTERVAL := INTERVAL '5 seconds';
  jitter_ms INT := (random() * 1000)::INT; -- 0..1000 мс
BEGIN
  RETURN now() + base * (2 ^ GREATEST(p_attempt, 0)) + make_interval(secs => 0, millis => jitter_ms);
END;
$$ LANGUAGE plpgsql;

-- Завершение задачи: успех или ошибка с ретраем
CREATE OR REPLACE FUNCTION finish_task(
  p_id      BIGINT,
  p_success BOOLEAN,
  p_error   TEXT DEFAULT NULL
) RETURNS VOID AS $$
BEGIN
  IF p_success THEN
    UPDATE task_queue
    SET status='done', finished_at=now(), last_error=NULL
    WHERE id=p_id AND status='reserved';
  ELSE
    UPDATE task_queue
    SET attempt = attempt + 1,
        last_error = p_error,
        status = CASE WHEN attempt + 1 >= max_attempts THEN 'dead' ELSE 'pending' END,
        run_at = CASE WHEN attempt + 1 >= max_attempts THEN run_at ELSE next_run_at(attempt + 1) END,
        reserved_at = NULL,
        reserved_by = NULL
    WHERE id=p_id AND status='reserved';
  END IF;
END;
$$ LANGUAGE plpgsql;

-- Реанимация «зависших»: просроченные резервы возвращаем или помечаем как dead
CREATE OR REPLACE FUNCTION requeue_timed_out(
  p_batch INT DEFAULT 1000
) RETURNS INT AS $$
DECLARE
  moved INT := 0;
BEGIN
  -- В dead, если попыток больше нельзя
  WITH cte AS (
    SELECT id
    FROM task_queue
    WHERE status='reserved' AND reserved_at + timeout <= now() AND attempt + 1 >= max_attempts
    LIMIT p_batch
  )
  UPDATE task_queue t
  SET status='dead', last_error = COALESCE(last_error,'') || '\nTimeout exceeded', reserved_at=NULL, reserved_by=NULL
  FROM cte
  WHERE t.id = cte.id;

  GET DIAGNOSTICS moved = ROW_COUNT;

  -- Остальные вернём в очередь с бэкоффом
  WITH cte2 AS (
    SELECT id, attempt
    FROM task_queue
    WHERE status='reserved' AND reserved_at + timeout <= now() AND attempt + 1 < max_attempts
    LIMIT GREATEST(p_batch - moved, 0)
  )
  UPDATE task_queue t
  SET attempt = t.attempt + 1,
      status = 'pending',
      run_at = next_run_at(t.attempt + 1),
      reserved_at = NULL,
      reserved_by = NULL,
      last_error = COALESCE(last_error,'') || '\nRequeued after timeout'
  FROM cte2
  WHERE t.id = cte2.id;

  GET DIAGNOSTICS moved = moved + ROW_COUNT;
  RETURN moved;
END;
$$ LANGUAGE plpgsql;

Пример исполнителя на Go: запуск, обработка, подтверждение

Ниже минимальный, но рабочий пример. Он:

  • забирает пачку задач и исполняет их параллельно с ограничением по числу воркеров;
  • периодически «реанимирует» просроченные задачи;
  • умеет завершать задачу успешно или с ошибкой (триггеря ретраи SQL‑логикой выше).
package main

import (
	context "context"
	"encoding/json"
	"fmt"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
)

type Task struct {
	ID         int64           `json:"id"`
	TaskType   string          `json:"task_type"`
	Payload    json.RawMessage `json:"payload"`
	Priority   int             `json:"priority"`
	Attempt    int             `json:"attempt"`
	MaxAttempts int            `json:"max_attempts"`
}

func main() {
	dsn := os.Getenv("PG_DSN")
	if dsn == "" {
		dsn = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	pool, err := pgxpool.New(ctx, dsn)
	if err != nil { log.Fatalf("pg connect: %v", err) }
	defer pool.Close()

	workerName := fmt.Sprintf("worker-%d", time.Now().UnixNano())
	workers := 8
	batch := 32
	rand.Seed(time.Now().UnixNano())

	// Реаниматор таймаутов
	go func() {
		t := time.NewTicker(5 * time.Second)
		defer t.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case <-t.C:
				var moved int
				if err := pool.QueryRow(ctx, "SELECT requeue_timed_out($1)", 1000).Scan(&moved); err != nil {
					log.Printf("requeue error: %v", err)
				} else if moved > 0 {
					log.Printf("requeued %d tasks after timeout", moved)
				}
			}
		}
	}()

	// Грейсфул-шатдаун
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

	sem := make(chan struct{}, workers)
	var wg sync.WaitGroup

	loop := func() bool {
		rows, err := pool.Query(ctx, "SELECT id, task_type, payload, priority, attempt, max_attempts FROM fetch_and_reserve($1,$2)", batch, workerName)
		if err != nil {
			log.Printf("fetch error: %v", err)
			time.Sleep(500 * time.Millisecond)
			return true
		}
		defer rows.Close()

		count := 0
		for rows.Next() {
			var t Task
			if err := rows.Scan(&t.ID, &t.TaskType, &t.Payload, &t.Priority, &t.Attempt, &t.MaxAttempts); err != nil {
				log.Printf("scan error: %v", err)
				continue
			}
			count++
			sem <- struct{}{}
			wg.Add(1)
			go func(task Task) {
				defer func() { <-sem; wg.Done() }()
				ok, errMsg := handle(task)
				if _, err := pool.Exec(ctx, "SELECT finish_task($1,$2,$3)", task.ID, ok, errMsg); err != nil {
					log.Printf("finish error id=%d: %v", task.ID, err)
				}
			}(t)
		}

		if count == 0 {
			time.Sleep(300 * time.Millisecond)
		}
		return true
	}

	go func() {
		for loop() {
		}
	}()

	<-sigCh
	cancel()
	wg.Wait()
	log.Println("graceful stop")
}

func handle(t Task) (bool, *string) {
	switch t.TaskType {
	case "send_email":
		var p struct{ To, Subject, Body string }
		if err := json.Unmarshal(t.Payload, &p); err != nil {
			errMsg := fmt.Sprintf("bad payload: %v", err)
			return false, &errMsg
		}
		// Имитация внешнего вызова
		time.Sleep(100*time.Millisecond + time.Duration(rand.Intn(200))*time.Millisecond)
		log.Printf("email to=%s subject=%s", p.To, p.Subject)
		return true, nil
	case "generate_report":
		var p struct{ ReportID int64 }
		if err := json.Unmarshal(t.Payload, &p); err != nil {
			errMsg := fmt.Sprintf("bad payload: %v", err)
			return false, &errMsg
		}
		// Тяжелее: дольше считаем
		time.Sleep(500*time.Millisecond + time.Duration(rand.Intn(500))*time.Millisecond)
		log.Printf("report generated id=%d", p.ReportID)
		return true, nil
	default:
		errMsg := fmt.Sprintf("unknown task_type=%s", t.TaskType)
		return false, &errMsg
	}
}

Как поставить задачу из приложения (SQL):

SELECT enqueue_task(
  p_task_type    => 'send_email',
  p_payload      => '{"To":"user@example.com","Subject":"Добро пожаловать","Body":"…"}',
  p_run_at       => now(),
  p_priority     => 10,
  p_dedupe_key   => 'welcome-user-42',
  p_max_attempts => 8,
  p_timeout      => INTERVAL '45 seconds'
);

Эксплуатация: метрики, очистка, масштабирование

Метрики, которые действительно помогают:

  • глубина очереди: количество pending по типам задач;
  • задержка выполнения: p50/p95 (now() − run_at) для pending;
  • успехи/ошибки и распределение попыток;
  • число «мертвых» задач (dead) и причина;
  • доля таймаутов (сколько requeue_timed_out за период).

Очистка и хранение истории:

  • не копите завершённые задачи бесконечно. Дешёвый вариант: раз в день переносить done/failed/dead старше X дней в архивную таблицу или выгружать в объектное хранилище;
  • для очень больших объёмов используйте секционирование по дате создания (created_at) — это ускорит очистку DROP PARTITION и сохранит планировщику нервы.

Масштабирование:

  • добавляйте инстансы исполнителей линейно — SKIP LOCKED разрулит конкуренцию;
  • держите размер пачки и число воркеров разумными: слишком большие пачки увеличивают «невидимый хвост» задержек;
  • если очередь систематически не успевает (высокая задержка), поднимайте воркеров и/или приоритеты для критичных типов задач.

Риски, грабли и как их обойти

  • «Ровно один раз» не бывает — делайте обработчики идемпотентными (повтор безопасен).
  • Не вставляйте тяжёлые вложенные SELECT’ы в обработчики: они конкурируют с бизнес‑транзакциями. Лучше готовьте данные заранее или кэшируйте.
  • Следите за автovacuum: бурный поток вставок/апдейтов в очереди может требовать более агрессивных настроек для конкретной таблицы (порогов и scale_factor).
  • Не злоупотребляйте TTL на стороне приложения — используйте timeout и requeue_timed_out в базе, чтобы поведение было единым и прозрачным.
  • Дедупликация — палка о двух концах. Убедитесь, что логика dedupe_key адекватна: не «заедает» важные задачи и не мешает новой версии задачи вытеснить старую (если нужно — делайте явное обновление существующей записи по ключу).

Чек‑лист для продакшена

  • Таблица, индексы, функции — задеплоены миграцией.
  • Мониторинг: глубина очереди, задержка, проценты ошибок, dead‑задачи.
  • Ретенции: политика очистки done/failed/dead, секционирование при нужных объёмах.
  • Перезапуски: исполнители обрабатывают SIGTERM, завершают начатое и отдают незавершённое по таймауту обратно в очередь.
  • Тест нагрузкой: убедитесь, что при пиковых вставках и обработке база держит плановые SLO.
  • План «вырасти из решения»: если объём выйдет за рамки, заранее определите, как будете выносить самые шумные типы задач в специализированную очередь — обычно это точечно и не больно.

Итого: очередь на PostgreSQL с SKIP LOCKED даёт предсказуемое, дешёвое и прозрачное решение для подавляющего большинства фоновых задач в продукте. При правильных индексах, ретраях и мониторинге это крепкая основа, от которой всегда можно эволюционно перейти к отдельной очереди, когда бизнес доростёт до этого объёма.


PostgreSQLSKIP LOCKEDочередь задач