Блог инженера

History is written by its contributors

NSQ: идеальная очередь сообщений для real-time систем (когда Redis уже не справляется)

2025-04-21 время чтения 8 мин Distributed-Systems Message-Queues Nsq Ilya Brin

Привет, архитектор распределённых систем! 🚀

Замучился с настройкой RabbitMQ?
Устал от потери сообщений в Redis?
Нужно что-то, что просто работает на любом масштабе?

NSQ - очередью сообщений, которая используется в Discord, Bitly и других высоконагруженных системах.

На примерах разберёмся, когда NSQ не просто полезен, а абсолютно идеален для задачи.

1. Что такое NSQ и чем он отличается?

NSQ простыми словами

NSQ - это распределённая платформа обмена сообщениями в реальном времени, созданная для работы в масштабе, обрабатывающая миллиарды сообщений в день. Она не требует топологии, устраняет единые точки отказа и гарантирует доставку сообщений.

Основные компоненты:

  • nsqd - демон, который получает, ставит в очередь и доставляет сообщения
  • nsqadmin - веб-интерфейс для администрирования кластера в реальном времени
  • nsqlookupd - управляет информацией о топологии и обеспечивает service discovery

Где NSQ блистает

Другие очереди страдают от:

  • Сложной кластеризации и настройки
  • Единых точек отказа
  • Потери сообщений при сбоях
  • Сложного горизонтального масштабирования

NSQ превосходит в:

  • Кластеризации без настройки
  • Автоматическом failover и восстановлении
  • Гарантированной доставке сообщений
  • Горизонтальном масштабировании без координации

Ключевая мысль:

NSQ создали в Bitly для обработки их огромного трафика сокращения URL. Если он справляется с миллиардами кликов в день, он справится с твоей нагрузкой.

2. Идеальный кейс: real-time аналитика

🔥 Проблема

Представь, что ты строишь систему аналитики в реальном времени как Mixpanel или Amplitude. Тебе нужно:

  • Обрабатывать их в реальном времени
  • Собирать миллионы событий в секунду
  • Масштабироваться горизонтально по мере роста
  • Справляться с пиками трафика без потери данных
// Структура события для аналитики
type AnalyticsEvent struct {
    UserID     string                 `json:"user_id"`
    EventType  string                 `json:"event_type"`
    Properties map[string]any `json:"properties"`
    Timestamp  int64                  `json:"timestamp"`
    SessionID  string                 `json:"session_id"`
    DeviceID   string                 `json:"device_id"`
}

// Сервис сбора событий
type EventCollector struct {
    producer *nsq.Producer
    config   *nsq.Config
}

func NewEventCollector(nsqdAddr string) (*EventCollector, error) {
    config := nsq.NewConfig()
    producer, err := nsq.NewProducer(nsqdAddr, config)
    if err != nil {
        return nil, err
    }
    
    return &EventCollector{
        producer: producer,
        config:   config,
    }, nil
}

// Собираем и ставим события в очередь для обработки
func (ec *EventCollector) CollectEvent(event *AnalyticsEvent) error {
    // Добавляем серверный timestamp
    event.Timestamp = time.Now().UnixNano()
    
    // Сериализуем событие
    data, err := json.Marshal(event)
    if err != nil {
        return err
    }
    
    // Публикуем в NSQ топик
    return ec.producer.Publish("analytics_events", data)
}

🔥 Почему NSQ здесь идеален

1. Никаких потерь сообщений - События слишком ценны, чтобы их терять 2. Горизонтальное масштабирование - Добавляй больше nsqd инстансов по мере роста трафика 3. Множественные потребители - Разные сервисы могут обрабатывать одни и те же события 4. Автоматический failover - Если один nsqd умирает, другие берут на себя нагрузку 5. Простота эксплуатации - Минимальная настройка и администрирование

3. Воркеры обработки событий

роцессор событий в реальном времени

type EventProcessor struct {
    consumer     *nsq.Consumer
    clickhouse   *sql.DB
    redis        *redis.Client
    rateLimiter  *rate.Limiter
}

func NewEventProcessor(lookupAddr string) (*EventProcessor, error) {
    config := nsq.NewConfig()
    config.MaxInFlight = 1000 // Обрабатываем до 1000 сообщений одновременно
    
    consumer, err := nsq.NewConsumer("analytics_events", "processor", config)
    if err != nil {
        return nil, err
    }
    
    processor := &EventProcessor{
        consumer:    consumer,
        rateLimiter: rate.NewLimiter(rate.Limit(10000), 1000), // 10k событий/сек
    }
    
    // Устанавливаем обработчик сообщений
    consumer.AddHandler(processor)
    
    // Подключаемся к nsqlookupd для автоматического discovery
    err = consumer.ConnectToNSQLookupd(lookupAddr)
    if err != nil {
        return nil, err
    }
    
    return processor, nil
}

// HandleMessage обрабатывает каждое аналитическое событие
func (ep *EventProcessor) HandleMessage(message *nsq.Message) error {
    // Rate limiting для предотвращения перегрузки downstream систем
    if !ep.rateLimiter.Allow() {
        message.Requeue(time.Second)
        return nil
    }
    
    var event AnalyticsEvent
    if err := json.Unmarshal(message.Body, &event); err != nil {
        // Невалидный JSON - завершаем сообщение, чтобы избежать бесконечного requeue
        return nil
    }
    
    // Обрабатываем событие параллельно
    go func() {
        ep.processEvent(&event)
        ep.updateRealTimeMetrics(&event)
        ep.triggerAlerts(&event)
    }()
    
    return nil
}

func (ep *EventProcessor) processEvent(event *AnalyticsEvent) {
    // Сохраняем в ClickHouse для аналитики
    query := `
        INSERT INTO events (user_id, event_type, properties, timestamp, session_id, device_id)
        VALUES (?, ?, ?, ?, ?, ?)
    `
    
    propertiesJSON, _ := json.Marshal(event.Properties)
    ep.clickhouse.Exec(query,
        event.UserID,
        event.EventType,
        string(propertiesJSON),
        event.Timestamp,
        event.SessionID,
        event.DeviceID,
    )
}

func (ep *EventProcessor) updateRealTimeMetrics(event *AnalyticsEvent) {
    // Обновляем счётчики в реальном времени в Redis
    pipe := ep.redis.Pipeline()
    
    // Увеличиваем глобальные счётчики
    pipe.Incr(fmt.Sprintf("events:total:%s", time.Now().Format("2006-01-02-15")))
    pipe.Incr(fmt.Sprintf("events:type:%s:%s", event.EventType, time.Now().Format("2006-01-02-15")))
    
    // Обновляем активность пользователя
    pipe.Set(fmt.Sprintf("user:last_seen:%s", event.UserID), time.Now().Unix(), time.Hour*24)
    
    pipe.Exec()
}

4. Продвинутый кейс: многоэтапный pipeline обработки

Сложная обработка событий с несколькими этапами

// Многоэтапный pipeline: Сырые События → Обогащённые События → Агрегированные Метрики
type PipelineManager struct {
    enricher   *EventEnricher
    aggregator *EventAggregator
    alerter    *AlertManager
}

// Этап 1: Обогащение событий
type EventEnricher struct {
    consumer *nsq.Consumer
    producer *nsq.Producer
    userDB   *sql.DB
}

func (ee *EventEnricher) HandleMessage(message *nsq.Message) error {
    var event AnalyticsEvent
    json.Unmarshal(message.Body, &event)
    
    // Обогащаем данными пользователя
    enrichedEvent := ee.enrichWithUserData(&event)
    
    // Обогащаем гео-данными
    enrichedEvent = ee.enrichWithGeoData(enrichedEvent)
    
    // Передаём на следующий этап
    data, _ := json.Marshal(enrichedEvent)
    return ee.producer.Publish("enriched_events", data)
}

// Этап 2: Агрегация в реальном времени
type EventAggregator struct {
    consumer *nsq.Consumer
    producer *nsq.Producer
    windows  map[string]*TimeWindow
    mutex    sync.RWMutex
}

type TimeWindow struct {
    StartTime time.Time
    Events    []AnalyticsEvent
    Metrics   map[string]int64
}

func (ea *EventAggregator) HandleMessage(message *nsq.Message) error {
    var event AnalyticsEvent
    json.Unmarshal(message.Body, &event)
    
    // Добавляем во временное окно (1-минутные окна)
    windowKey := fmt.Sprintf("%s_%d", 
        event.EventType, 
        time.Unix(0, event.Timestamp).Truncate(time.Minute).Unix())
    
    ea.mutex.Lock()
    if ea.windows[windowKey] == nil {
        ea.windows[windowKey] = &TimeWindow{
            StartTime: time.Unix(0, event.Timestamp).Truncate(time.Minute),
            Events:    make([]AnalyticsEvent, 0),
            Metrics:   make(map[string]int64),
        }
    }
    
    window := ea.windows[windowKey]
    window.Events = append(window.Events, event)
    window.Metrics["count"]++
    ea.mutex.Unlock()
    
    // Проверяем, завершено ли окно (через 1 минуту + буфер)
    if time.Since(window.StartTime) > 70*time.Second {
        ea.flushWindow(windowKey, window)
    }
    
    return nil
}

func (ea *EventAggregator) flushWindow(key string, window *TimeWindow) {
    // Вычисляем агрегированные метрики
    aggregatedMetrics := map[string]interface{}{
        "window_start": window.StartTime.Unix(),
        "event_count":  len(window.Events),
        "unique_users": ea.countUniqueUsers(window.Events),
        "top_properties": ea.getTopProperties(window.Events),
    }
    
    // Публикуем агрегированные данные
    data, _ := json.Marshal(aggregatedMetrics)
    ea.producer.Publish("aggregated_metrics", data)
    
    // Очищаем
    ea.mutex.Lock()
    delete(ea.windows, key)
    ea.mutex.Unlock()
}

5. Настройка NSQ кластера для высокой доступности

Production-ready NSQ кластер

# docker-compose.yml для NSQ кластера
services:
  nsqlookupd1:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160:4160"
      - "4161:4161"
    
  nsqlookupd2:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4162:4160"
      - "4163:4161"
    
  nsqd1:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd1:4160 --lookupd-tcp-address=nsqlookupd2:4160
    ports:
      - "4150:4150"
      - "4151:4151"
    volumes:
      - ./nsqd1:/data
    
  nsqd2:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd1:4160 --lookupd-tcp-address=nsqlookupd2:4160
    ports:
      - "4152:4150"
      - "4153:4151"
    volumes:
      - ./nsqd2:/data
    
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd1:4161 --lookupd-http-address=nsqlookupd2:4161
    ports:
      - "4171:4171"

Управление соединениями с failover

type NSQManager struct {
    producers map[string]*nsq.Producer
    consumers map[string]*nsq.Consumer
    lookupds  []string
    mutex     sync.RWMutex
}

func NewNSQManager(lookupds []string) *NSQManager {
    return &NSQManager{
        producers: make(map[string]*nsq.Producer),
        consumers: make(map[string]*nsq.Consumer),
        lookupds:  lookupds,
    }
}

func (nm *NSQManager) GetProducer(nsqdAddr string) (*nsq.Producer, error) {
    nm.mutex.RLock()
    if producer, exists := nm.producers[nsqdAddr]; exists {
        nm.mutex.RUnlock()
        return producer, nil
    }
    nm.mutex.RUnlock()
    
    nm.mutex.Lock()
    defer nm.mutex.Unlock()
    
    // Double-check pattern
    if producer, exists := nm.producers[nsqdAddr]; exists {
        return producer, nil
    }
    
    config := nsq.NewConfig()
    config.HeartbeatInterval = 30 * time.Second
    config.MaxRequeueDelay = 15 * time.Minute
    
    producer, err := nsq.NewProducer(nsqdAddr, config)
    if err != nil {
        return nil, err
    }
    
    nm.producers[nsqdAddr] = producer
    return producer, nil
}

// Публикация с автоматическим failover
func (nm *NSQManager) PublishWithFailover(topic string, data []byte) error {
    // Пробуем каждый nsqd инстанс, пока один не сработает
    for _, nsqdAddr := range nm.getNSQDAddresses() {
        producer, err := nm.GetProducer(nsqdAddr)
        if err != nil {
            continue
        }
        
        err = producer.Publish(topic, data)
        if err == nil {
            return nil
        }
        
        // Удаляем неработающий producer
        nm.mutex.Lock()
        delete(nm.producers, nsqdAddr)
        nm.mutex.Unlock()
    }
    
    return fmt.Errorf("все nsqd инстансы недоступны")
}

6. Мониторинг и наблюдаемость

Сбор метрик NSQ

type NSQMetrics struct {
    MessagesSent     prometheus.Counter
    MessagesReceived prometheus.Counter
    ProcessingTime   prometheus.Histogram
    QueueDepth       prometheus.Gauge
    ErrorRate        prometheus.Counter
}

func (nm *NSQMetrics) RecordMessageSent(topic string) {
    nm.MessagesSent.WithLabelValues(topic).Inc()
}

func (nm *NSQMetrics) RecordProcessingTime(topic string, duration time.Duration) {
    nm.ProcessingTime.WithLabelValues(topic).Observe(duration.Seconds())
}

// Сборщик статистики NSQ
type StatsCollector struct {
    nsqadminURL string
    metrics     *NSQMetrics
}

func (sc *StatsCollector) CollectStats() {
    resp, err := http.Get(sc.nsqadminURL + "/api/stats")
    if err != nil {
        return
    }
    defer resp.Body.Close()
    
    var stats NSQStats
    json.NewDecoder(resp.Body).Decode(&stats)
    
    for _, producer := range stats.Producers {
        for _, topic := range producer.Topics {
            sc.metrics.QueueDepth.WithLabelValues(topic.Name).Set(float64(topic.Depth))
        }
    }
}

7. Когда NSQ идеален vs когда стоит избегать

NSQ идеален для

Очередей задач - Обработка фоновых задач
Event sourcing - Надёжная потоковая передача событий
Агрегации логов - Сбор логов из множественных источников
Real-time аналитики - Высокая пропускная способность, гарантированная доставка
Коммуникации микросервисов - Масштабируемый обмен сообщениями

🔹 Избегай NSQ когда:

Малый масштаб - Redis может быть проще
Сложная маршрутизация - Используй RabbitMQ для продвинутой маршрутизации
Транзакционные сообщения - Используй очереди в базе данных
Упорядочивание сообщений - NSQ не гарантирует порядок

NSQ vs Альтернативы

ФункцияNSQRedisRabbitMQKafka
Горизонтальное масштабированиеОграниченное
Упорядочивание сообщений
Гарантированная доставка
Сложность эксплуатацииНизкаяНизкаяВысокаяВысокая
Сложность настройкиНизкаяНизкаяВысокаяВысокая

8. Production советы и лучшие практики

Конфигурация для высокой пропускной способности

func OptimizedNSQConfig() *nsq.Config {
    config := nsq.NewConfig()
    
    // Настройка производительности
    config.MaxInFlight = 2500        // Больше параллелизма
    config.HeartbeatInterval = 30 * time.Second
    config.MaxRequeueDelay = 15 * time.Minute
    config.DefaultRequeueDelay = 90 * time.Second
    
    // Надёжность
    config.MaxAttempts = 5
    config.LookupdPollInterval = 15 * time.Second
    
    return config
}

// Graceful shutdown
func (ep *EventProcessor) Shutdown() {
    ep.consumer.Stop()
    <-ep.consumer.StopChan
}

Обработка ошибок и повторы

func (ep *EventProcessor) HandleMessage(message *nsq.Message) error {
    var event AnalyticsEvent
    if err := json.Unmarshal(message.Body, &event); err != nil {
        // Невалидный JSON - не ставим в requeue
        log.Printf("Невалидный JSON: %v", err)
        return nil
    }
    
    // Обрабатываем с повторами
    err := ep.processWithRetry(&event, 3)
    if err != nil {
        // После максимального количества повторов отправляем в dead letter queue
        ep.sendToDeadLetter(message.Body, err)
        return nil
    }
    
    return nil
}

func (ep *EventProcessor) processWithRetry(event *AnalyticsEvent, maxRetries int) error {
    for i := 0; i < maxRetries; i++ {
        err := ep.processEvent(event)
        if err == nil {
            return nil
        }
        
        // Экспоненциальная задержка
        time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second)
    }
    
    return fmt.Errorf("не удалось обработать после %d попыток", maxRetries)
}

Вывод: NSQ - это швейцарский нож очередей сообщений

NSQ блистает, когда тебе нужно:

Простота - Кластеризация без настройки и автоматическое обнаружение
Надёжность - Гарантированная доставка без сложности
Масштабируемость - Добавление узлов без координации
Производительность - Обработка миллионов сообщений в секунду

Главное правило:

Если тебе нужна очередь сообщений, которая просто работает в масштабе без операционных накладных расходов, выбирай NSQ.

Идеален для:

  • Пайплайнов real-time аналитики
  • Event-driven микросервисов
  • Систем агрегации логов
  • Обработки фоновых задач
  • Любого высокопроизводительного обмена сообщениями

P.S. Какие задачи обмена сообщениями вам помог решить NSQ? Поделитесь опытом в комментариях! 🚀

// Дополнительные ресурсы:
// - Документация NSQ: https://nsq.io/
// - NSQ в Bitly: https://word.bitly.com/post/33232969144/nsq
// - Go NSQ Client: https://github.com/nsqio/go-nsq
// - Real-time аналитика с NSQ: https://blog.gopheracademy.com/advent-2015/real-time-analytics-nsq/
// - Мониторинг NSQ с помощью Prometheus: https://medium.com/@craigp/nsq-monitoring-with-prometheus-5f4f3f4e6f3e
comments powered by Disqus