
Фоновые и отложенные операции есть у любого продукта: отправить письма, пересчитать отчёт, подтянуть курс валют, синхронизировать каталог, перестроить кэш, сгенерировать акт. Логичный путь — поставить отдельную очередь (Kafka, RabbitMQ, Redis‑очередь). Но за это платят сложностью, администрированием и счетами за инфраструктуру.
Во многих продуктах трафик на фоновые задачи умеренный и не оправдывает отдельный контур. Если у вас уже есть PostgreSQL, можно обойтись без лишних сервисов: хранить задачи в таблице, безопасно забирать их конкурентно и выполнять. Ключ к надёжности — «правильные» индексы, блокировки FOR UPDATE SKIP LOCKED, таймауты и аккуратные ретраи.
Итог для бизнеса:
Подходит, если:
Лучше взять специализированную очередь, если:
Опишем минимальный, но полноценный формат:
-- Схема очереди задач
CREATE TABLE IF NOT EXISTS task_queue (
id BIGSERIAL PRIMARY KEY,
task_type TEXT NOT NULL,
payload JSONB NOT NULL,
dedupe_key TEXT,
priority INT NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending','reserved','done','failed','dead')),
attempt INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 10,
run_at TIMESTAMPTZ NOT NULL DEFAULT now(),
reserved_at TIMESTAMPTZ,
reserved_by TEXT,
timeout INTERVAL NOT NULL DEFAULT INTERVAL '30 seconds',
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
finished_at TIMESTAMPTZ
);
-- Активные (в очереди или в работе) задачи с одинаковым dedupe_key и типом не дублируем
CREATE UNIQUE INDEX IF NOT EXISTS uniq_active_dedupe
ON task_queue (task_type, dedupe_key)
WHERE dedupe_key IS NOT NULL AND status IN ('pending','reserved');
-- Быстрый выбор задач к исполнению (сортируем по приоритету и времени запуска)
CREATE INDEX IF NOT EXISTS idx_task_pending
ON task_queue (priority DESC, run_at ASC, id)
WHERE status = 'pending';
-- Технический триггер для updated_at
CREATE OR REPLACE FUNCTION set_updated_at()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at := now();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
DROP TRIGGER IF EXISTS trg_set_updated_at ON task_queue;
CREATE TRIGGER trg_set_updated_at
BEFORE UPDATE ON task_queue
FOR EACH ROW EXECUTE FUNCTION set_updated_at();
Почему так:
Нам нужно, чтобы несколько экземпляров приложения безопасно забирали разные задачи. Для этого:
run_at <= now() с сортировкой по приоритету и времени;FOR UPDATE SKIP LOCKED — конкуренты пропустят их;reserved и сохраняем «кто взял» и время.Плюсы: никакой глобальной блокировки, минимальные конфликты, хорошая масштабируемость на десятки параллельных исполнителей.
Гарантия «ровно один раз» в распределённых системах стоит очень дорого. Реалистично — «как минимум один раз». Значит:
run_at по формуле экспоненциальной паузы с небольшим случайным «дрожанием», чтобы исполнители не били в одну точку одновременно.Ниже четыре функции: постановка, выдача, завершение и реанимация «просроченных» задач.
-- Постановка задачи с возможной дедупликацией по (task_type, dedupe_key)
CREATE OR REPLACE FUNCTION enqueue_task(
p_task_type TEXT,
p_payload JSONB,
p_run_at TIMESTAMPTZ DEFAULT now(),
p_priority INT DEFAULT 0,
p_dedupe_key TEXT DEFAULT NULL,
p_max_attempts INT DEFAULT 10,
p_timeout INTERVAL DEFAULT INTERVAL '30 seconds'
) RETURNS task_queue AS $$
DECLARE
rec task_queue;
BEGIN
INSERT INTO task_queue(task_type, payload, run_at, priority, dedupe_key, max_attempts, timeout)
VALUES (p_task_type, p_payload, p_run_at, p_priority, p_dedupe_key, p_max_attempts, p_timeout)
ON CONFLICT (task_type, dedupe_key) WHERE p_dedupe_key IS NOT NULL AND status IN ('pending','reserved')
DO NOTHING
RETURNING * INTO rec;
IF rec.id IS NULL AND p_dedupe_key IS NOT NULL THEN
SELECT * INTO rec
FROM task_queue
WHERE task_type = p_task_type AND dedupe_key = p_dedupe_key AND status IN ('pending','reserved')
ORDER BY id DESC
LIMIT 1;
END IF;
IF rec.id IS NULL THEN
-- Не было конфликта, а RETURNING не сработал? Подстрахуемся выборкой по данным
SELECT * INTO rec
FROM task_queue
WHERE task_type = p_task_type AND payload = p_payload AND run_at = p_run_at
ORDER BY id DESC
LIMIT 1;
END IF;
RETURN rec;
END;
$$ LANGUAGE plpgsql;
-- Выдача и бронирование задач исполнителю
CREATE OR REPLACE FUNCTION fetch_and_reserve(
p_limit INT,
p_worker TEXT
) RETURNS SETOF task_queue AS $$
BEGIN
RETURN QUERY
WITH cte AS (
SELECT id
FROM task_queue
WHERE status = 'pending' AND run_at <= now()
ORDER BY priority DESC, run_at ASC, id ASC
FOR UPDATE SKIP LOCKED
LIMIT p_limit
)
UPDATE task_queue t
SET status = 'reserved', reserved_at = now(), reserved_by = p_worker
FROM cte
WHERE t.id = cte.id
RETURNING t.*;
END;
$$ LANGUAGE plpgsql;
-- Экспоненциальный бэкофф с «дрожанием» (0..1 сек)
CREATE OR REPLACE FUNCTION next_run_at(p_attempt INT)
RETURNS TIMESTAMPTZ AS $$
DECLARE
base INTERVAL := INTERVAL '5 seconds';
jitter_ms INT := (random() * 1000)::INT; -- 0..1000 мс
BEGIN
RETURN now() + base * (2 ^ GREATEST(p_attempt, 0)) + make_interval(secs => 0, millis => jitter_ms);
END;
$$ LANGUAGE plpgsql;
-- Завершение задачи: успех или ошибка с ретраем
CREATE OR REPLACE FUNCTION finish_task(
p_id BIGINT,
p_success BOOLEAN,
p_error TEXT DEFAULT NULL
) RETURNS VOID AS $$
BEGIN
IF p_success THEN
UPDATE task_queue
SET status='done', finished_at=now(), last_error=NULL
WHERE id=p_id AND status='reserved';
ELSE
UPDATE task_queue
SET attempt = attempt + 1,
last_error = p_error,
status = CASE WHEN attempt + 1 >= max_attempts THEN 'dead' ELSE 'pending' END,
run_at = CASE WHEN attempt + 1 >= max_attempts THEN run_at ELSE next_run_at(attempt + 1) END,
reserved_at = NULL,
reserved_by = NULL
WHERE id=p_id AND status='reserved';
END IF;
END;
$$ LANGUAGE plpgsql;
-- Реанимация «зависших»: просроченные резервы возвращаем или помечаем как dead
CREATE OR REPLACE FUNCTION requeue_timed_out(
p_batch INT DEFAULT 1000
) RETURNS INT AS $$
DECLARE
moved INT := 0;
BEGIN
-- В dead, если попыток больше нельзя
WITH cte AS (
SELECT id
FROM task_queue
WHERE status='reserved' AND reserved_at + timeout <= now() AND attempt + 1 >= max_attempts
LIMIT p_batch
)
UPDATE task_queue t
SET status='dead', last_error = COALESCE(last_error,'') || '\nTimeout exceeded', reserved_at=NULL, reserved_by=NULL
FROM cte
WHERE t.id = cte.id;
GET DIAGNOSTICS moved = ROW_COUNT;
-- Остальные вернём в очередь с бэкоффом
WITH cte2 AS (
SELECT id, attempt
FROM task_queue
WHERE status='reserved' AND reserved_at + timeout <= now() AND attempt + 1 < max_attempts
LIMIT GREATEST(p_batch - moved, 0)
)
UPDATE task_queue t
SET attempt = t.attempt + 1,
status = 'pending',
run_at = next_run_at(t.attempt + 1),
reserved_at = NULL,
reserved_by = NULL,
last_error = COALESCE(last_error,'') || '\nRequeued after timeout'
FROM cte2
WHERE t.id = cte2.id;
GET DIAGNOSTICS moved = moved + ROW_COUNT;
RETURN moved;
END;
$$ LANGUAGE plpgsql;
Ниже минимальный, но рабочий пример. Он:
package main
import (
context "context"
"encoding/json"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/jackc/pgx/v5/pgxpool"
)
type Task struct {
ID int64 `json:"id"`
TaskType string `json:"task_type"`
Payload json.RawMessage `json:"payload"`
Priority int `json:"priority"`
Attempt int `json:"attempt"`
MaxAttempts int `json:"max_attempts"`
}
func main() {
dsn := os.Getenv("PG_DSN")
if dsn == "" {
dsn = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable"
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := pgxpool.New(ctx, dsn)
if err != nil { log.Fatalf("pg connect: %v", err) }
defer pool.Close()
workerName := fmt.Sprintf("worker-%d", time.Now().UnixNano())
workers := 8
batch := 32
rand.Seed(time.Now().UnixNano())
// Реаниматор таймаутов
go func() {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
var moved int
if err := pool.QueryRow(ctx, "SELECT requeue_timed_out($1)", 1000).Scan(&moved); err != nil {
log.Printf("requeue error: %v", err)
} else if moved > 0 {
log.Printf("requeued %d tasks after timeout", moved)
}
}
}
}()
// Грейсфул-шатдаун
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
sem := make(chan struct{}, workers)
var wg sync.WaitGroup
loop := func() bool {
rows, err := pool.Query(ctx, "SELECT id, task_type, payload, priority, attempt, max_attempts FROM fetch_and_reserve($1,$2)", batch, workerName)
if err != nil {
log.Printf("fetch error: %v", err)
time.Sleep(500 * time.Millisecond)
return true
}
defer rows.Close()
count := 0
for rows.Next() {
var t Task
if err := rows.Scan(&t.ID, &t.TaskType, &t.Payload, &t.Priority, &t.Attempt, &t.MaxAttempts); err != nil {
log.Printf("scan error: %v", err)
continue
}
count++
sem <- struct{}{}
wg.Add(1)
go func(task Task) {
defer func() { <-sem; wg.Done() }()
ok, errMsg := handle(task)
if _, err := pool.Exec(ctx, "SELECT finish_task($1,$2,$3)", task.ID, ok, errMsg); err != nil {
log.Printf("finish error id=%d: %v", task.ID, err)
}
}(t)
}
if count == 0 {
time.Sleep(300 * time.Millisecond)
}
return true
}
go func() {
for loop() {
}
}()
<-sigCh
cancel()
wg.Wait()
log.Println("graceful stop")
}
func handle(t Task) (bool, *string) {
switch t.TaskType {
case "send_email":
var p struct{ To, Subject, Body string }
if err := json.Unmarshal(t.Payload, &p); err != nil {
errMsg := fmt.Sprintf("bad payload: %v", err)
return false, &errMsg
}
// Имитация внешнего вызова
time.Sleep(100*time.Millisecond + time.Duration(rand.Intn(200))*time.Millisecond)
log.Printf("email to=%s subject=%s", p.To, p.Subject)
return true, nil
case "generate_report":
var p struct{ ReportID int64 }
if err := json.Unmarshal(t.Payload, &p); err != nil {
errMsg := fmt.Sprintf("bad payload: %v", err)
return false, &errMsg
}
// Тяжелее: дольше считаем
time.Sleep(500*time.Millisecond + time.Duration(rand.Intn(500))*time.Millisecond)
log.Printf("report generated id=%d", p.ReportID)
return true, nil
default:
errMsg := fmt.Sprintf("unknown task_type=%s", t.TaskType)
return false, &errMsg
}
}
Как поставить задачу из приложения (SQL):
SELECT enqueue_task(
p_task_type => 'send_email',
p_payload => '{"To":"user@example.com","Subject":"Добро пожаловать","Body":"…"}',
p_run_at => now(),
p_priority => 10,
p_dedupe_key => 'welcome-user-42',
p_max_attempts => 8,
p_timeout => INTERVAL '45 seconds'
);
Метрики, которые действительно помогают:
Очистка и хранение истории:
created_at) — это ускорит очистку DROP PARTITION и сохранит планировщику нервы.Масштабирование:
SKIP LOCKED разрулит конкуренцию;timeout и requeue_timed_out в базе, чтобы поведение было единым и прозрачным.Итого: очередь на PostgreSQL с SKIP LOCKED даёт предсказуемое, дешёвое и прозрачное решение для подавляющего большинства фоновых задач в продукте. При правильных индексах, ретраях и мониторинге это крепкая основа, от которой всегда можно эволюционно перейти к отдельной очереди, когда бизнес доростёт до этого объёма.