
Реалтайм — это не про «модно», а про деньги и опыт пользователя:
Но лишние соединения стоят денег. Цель — выбрать протокол, который даёт нужную свежесть данных по минимальной цене и без каскада проблем в безопасности и масштабировании.
Вывод: начинайте с SSE, если вам не нужна двусторонняя связь, и переходите на WebSocket только когда это экономически оправдано.
Ниже — минимальный сервер, который:
# Node.js >= 18
mkdir realtime-demo && cd realtime-demo
npm init -y
npm i express ws jsonwebtoken redis cors uuid
// server.js
import express from 'express';
import cors from 'cors';
import jwt from 'jsonwebtoken';
import { WebSocketServer } from 'ws';
import { createClient } from 'redis';
import { randomUUID } from 'crypto';
const JWT_SECRET = 'dev-secret'; // Для продакшна — хранить в менеджере секретов и ротировать
const PORT = process.env.PORT || 8080;
const REDIS_URL = process.env.REDIS_URL || 'redis://127.0.0.1:6379';
// Redis pub/sub
const redisSub = createClient({ url: REDIS_URL });
const redisPub = createClient({ url: REDIS_URL });
await redisSub.connect();
await redisPub.connect();
// Кольцевой буфер событий по каналу
const RING_SIZE = 1000;
const rings = new Map(); // channel -> { seq, items: Map, order: Array }
function ensureRing(channel) {
if (!rings.has(channel)) {
rings.set(channel, { seq: 0, items: new Map(), order: [] });
}
return rings.get(channel);
}
function pushEvent(channel, payload) {
const ring = ensureRing(channel);
const id = ++ring.seq;
const event = { id, ts: Date.now(), payload };
ring.items.set(id, event);
ring.order.push(id);
if (ring.order.length > RING_SIZE) {
const oldest = ring.order.shift();
ring.items.delete(oldest);
}
return event;
}
function getEventsSince(channel, lastId) {
const ring = ensureRing(channel);
const from = Number(lastId || 0);
const res = [];
for (const id of ring.order) {
if (id > from) res.push(ring.items.get(id));
}
return res;
}
// Память активных подписок
const sseClients = new Map(); // channel -> Set(res)
const wsClients = new Map(); // channel -> Set(ws)
function addSseClient(channel, res) {
if (!sseClients.has(channel)) sseClients.set(channel, new Set());
sseClients.get(channel).add(res);
}
function removeSseClient(channel, res) {
const set = sseClients.get(channel);
if (set) { set.delete(res); if (set.size === 0) sseClients.delete(channel); }
}
function addWsClient(channel, sock) {
if (!wsClients.has(channel)) wsClients.set(channel, new Set());
wsClients.get(channel).add(sock);
}
function removeWsClient(channel, sock) {
const set = wsClients.get(channel);
if (set) { set.delete(sock); if (set.size === 0) wsClients.delete(channel); }
}
// Простой лимит соединений на IP
const connCount = new Map(); // ip -> count
const MAX_CONN_PER_IP = 50;
function incConn(ip) {
const n = (connCount.get(ip) || 0) + 1; connCount.set(ip, n); return n;
}
function decConn(ip) {
const n = (connCount.get(ip) || 1) - 1; if (n <= 0) connCount.delete(ip); else connCount.set(ip, n);
}
// JWT проверка и извлечение пользователя
function verifyToken(token) {
try { return jwt.verify(token, JWT_SECRET); } catch { return null; }
}
const app = express();
app.use(cors({ origin: [/^https?:\/\/localhost(:\d+)?$/], credentials: true }));
app.use(express.json());
// Подписка SSE: /sse?channel=tenant:1:orders
app.get('/sse', (req, res) => {
const ip = req.ip || req.socket.remoteAddress;
if (incConn(ip) > MAX_CONN_PER_IP) { decConn(ip); return res.status(429).end(); }
const token = (req.headers['authorization'] || '').replace('Bearer ', '');
const claims = verifyToken(token);
const channel = String(req.query.channel || '');
if (!claims || !channel || !claims.channels?.includes(channel)) {
decConn(ip); return res.status(403).json({ error: 'forbidden' });
}
res.writeHead(200, {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no'
});
res.write(': heartbeat\n\n'); // Чтобы прокси не закрывал
addSseClient(channel, res);
// Догоним пропуски, если был Last-Event-ID
const lastId = req.headers['last-event-id'];
if (lastId) {
const missed = getEventsSince(channel, lastId);
for (const ev of missed) {
res.write(`id: ${ev.id}\n`);
res.write(`event: message\n`);
res.write(`data: ${JSON.stringify(ev.payload)}\n\n`);
}
}
req.on('close', () => {
removeSseClient(channel, res);
decConn(ip);
});
});
// Публикация (для тестов и внутренних сервисов)
app.post('/publish', async (req, res) => {
const token = (req.headers['authorization'] || '').replace('Bearer ', '');
const claims = verifyToken(token);
if (!claims || claims.role !== 'publisher') return res.status(403).json({ error: 'forbidden' });
const { channel, data } = req.body || {};
if (!channel) return res.status(400).json({ error: 'channel required' });
const event = pushEvent(channel, { id: randomUUID(), data });
await redisPub.publish(channel, JSON.stringify(event));
res.json({ ok: true, id: event.id });
});
const server = app.listen(PORT, () => {
console.log('HTTP listening on', PORT);
});
// WebSocket сервер на том же порту
const wss = new WebSocketServer({ noServer: true });
// Heartbeat
function heartbeat() { this.isAlive = true; }
setInterval(() => {
wss.clients.forEach((ws) => {
if (!ws.isAlive) return ws.terminate();
ws.isAlive = false; ws.ping();
});
}, 30000);
// Апгрейд HTTP->WS с проверкой токена и прав
server.on('upgrade', (req, socket, head) => {
if (!req.url.startsWith('/ws')) { socket.destroy(); return; }
const url = new URL(req.url, `http://${req.headers.host}`);
const channel = url.searchParams.get('channel') || '';
const token = url.searchParams.get('token') || '';
const ip = req.socket.remoteAddress;
if (incConn(ip) > MAX_CONN_PER_IP) { decConn(ip); socket.destroy(); return; }
const claims = verifyToken(token);
if (!claims || !channel || !claims.channels?.includes(channel)) { decConn(ip); socket.destroy(); return; }
// Проверка Origin (если есть)
const origin = req.headers['origin'] || '';
if (origin && !/^https?:\/\/localhost(:\d+)?$/.test(origin)) { decConn(ip); socket.destroy(); return; }
wss.handleUpgrade(req, socket, head, (ws) => {
ws.isAlive = true; ws.on('pong', heartbeat);
ws.channel = channel; ws.ip = ip;
addWsClient(channel, ws);
// Догоним пропуски по last_id (опционально)
const lastId = url.searchParams.get('last_id');
if (lastId) {
const missed = getEventsSince(channel, lastId);
for (const ev of missed) {
const msg = JSON.stringify({ id: ev.id, data: ev.payload });
if (ws.readyState === ws.OPEN) ws.send(msg);
}
}
ws.on('message', (raw) => {
// Здесь можно принимать команды клиента; обязательно валидируйте
if (Buffer.byteLength(raw) > 64 * 1024) return ws.close(1009, 'message too big');
// Ничего не делаем в демо
});
ws.on('close', () => { removeWsClient(channel, ws); decConn(ip); });
ws.on('error', () => { /* логируйте по желанию */ });
});
});
// Подписка воркера на все каналы начинается по требованию: подписываемся динамически
const activeRedisSubs = new Set();
async function ensureRedisSub(channel) {
if (activeRedisSubs.has(channel)) return;
activeRedisSubs.add(channel);
await redisSub.subscribe(channel, (message) => {
const ev = JSON.parse(message);
// SSE фан-аут
const sseSet = sseClients.get(channel);
if (sseSet) {
for (const res of sseSet) {
res.write(`id: ${ev.id}\n`);
res.write(`event: message\n`);
res.write(`data: ${JSON.stringify(ev.payload)}\n\n`);
}
}
// WS фан-аут с защитой от переполнения
const wsSet = wsClients.get(channel);
if (wsSet) {
for (const ws of wsSet) {
if (ws.readyState !== ws.OPEN) continue;
if (ws.bufferedAmount > 512 * 1024) { // 512 КБ в очереди — отключаем
ws.close(1013, 'backpressure');
continue;
}
ws.send(JSON.stringify({ id: ev.id, data: ev.payload }));
}
}
});
}
// Перехват публикуемых событий локальным процессом тоже кладёт в буфер (на случай, если Redis недоступен)
app.post('/ensure-channel', (req, res) => {
const { channel } = req.body || {}; if (!channel) return res.status(400).end();
ensureRedisSub(channel).then(() => res.json({ ok: true }));
});
console.log('Realtime server ready');
# 1) Запустите Redis локально или в Docker
# docker run -p 6379:6379 redis:7
# 2) Запуск сервера
node server.js
# 3) Сгенерируйте тестовый токен издателя и подписчика в Node REPL:
# node -e "console.log(require('jsonwebtoken').sign({ role: 'publisher', channels: ['tenant:1:orders'] }, 'dev-secret', { expiresIn: '10m' }))"
# node -e "console.log(require('jsonwebtoken').sign({ sub: 'u1', channels: ['tenant:1:orders'] }, 'dev-secret', { expiresIn: '10m' }))"
# 4) Подключитесь к SSE (например, через curl):
# Замените <TOKEN_SUB> на токен подписчика
curl -H "Authorization: Bearer <TOKEN_SUB>" "http://localhost:8080/sse?channel=tenant:1:orders"
# 5) Отправьте событие
# Замените <TOKEN_PUB> на токен издателя
curl -X POST -H "Authorization: Bearer <TOKEN_PUB>" -H 'Content-Type: application/json' \
-d '{"channel":"tenant:1:orders","data":{"order_id":123,"status":"shipped"}}' \
http://localhost:8080/publish
# 6) Подключитесь по WebSocket (в браузерной консоли):
# const ws = new WebSocket('ws://localhost:8080/ws?channel=tenant:1:orders&token=TOKEN');
# ws.onmessage = (e) => console.log('ev', e.data)
# /etc/nginx/conf.d/realtime.conf
upstream realtime_backend {
server 127.0.0.1:8080;
keepalive 64;
}
server {
listen 80;
server_name localhost;
# SSE и Long Polling
location /sse {
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_set_header Host $host;
proxy_read_timeout 1h; # не обрывать долгий стрим
proxy_send_timeout 1h;
chunked_transfer_encoding on;
proxy_pass http://realtime_backend;
}
# WebSocket
location /ws {
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_read_timeout 1h;
proxy_send_timeout 1h;
proxy_pass http://realtime_backend;
}
# Публикация/прочие API
location / {
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_pass http://realtime_backend;
}
}
Советы:
Пример SLO: 99.5% событий должны доходить до клиента за ≤1 секунду, а не более 0.5% соединений обрываться внезапно в час.
Реалтайм — это про правильный выбор простых решений. В большинстве бизнес‑кейсов достаточно SSE с хорошей авторизацией и буфером для догонки. Когда нужна двусторонняя связь и минимальные задержки — берите WebSocket, но заранее продумайте балансировку, heartbeat и защиту от перегрузок. Пример выше даёт рабочую основу: добавляйте доменные события, метрики и тесты — и у вас будет быстрый, безопасный и экономичный канал обновлений, который реально влияет на выручку и опыт пользователей.