Система уведомлений: как синхронизировать миллионы устройств без боли
Привет, архитектор! 📱
Тебе нужно построить систему уведомлений, которая работает как у больших парней? Чтобы сообщения приходили мгновенно на все устройства пользователя, а сервер не падал от нагрузки?
Разберём, как спроектировать notification service, который выдержит миллионы пользователей и синхронизирует состояние между их устройствами.
1. Что такое система уведомлений и зачем она нужна?
Реальные задачи
- Маркетинговые push - акции, напоминания, персональные предложения
- Мгновенные сообщения - новые чаты, лайки, комментарии
- Системные уведомления - обновления статуса, ошибки, предупреждения
- Синхронизация состояния - прочитанные сообщения, онлайн-статус
Ключевые требования
✅ Синхронизация - состояние должно быть одинаковым на всех устройствах
✅ Низкая задержка - уведомления должны приходить за миллисекунды
✅ Масштабируемость - от 1000 до 100 миллионов пользователей
✅ Высокая доступность - 99.9% uptime, даже если один дата-центр упал
Проблема:
Как доставить уведомление пользователю, который может быть онлайн с телефона, планшета и компьютера одновременно?
2. Архитектура высокого уровня
🔥 Компоненты системы
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Clients │>>>>>| Gateway │>>>>>│ Notification │
│ (iOS/Web/ │ │ (WebSocket │ │ Service │
│ Android) │ │ /gRPC) │ │ │
└─────────────┘ └──────────────┘ └──────────────┘
⌄ ⌄
⌄ ⌄
⌄ ⌄
┌──────────────┐ ┌──────────────┐
│ Session │ │ Message │
│ Manager │ │ Queue │
│ | │ (Redis/ │
└──────────────┘ │ RabbitMQ) │
└──────────────┘
🔥 Основные сервисы
4. Message Queue - надёжная доставка сообщений 1. Gateway Service - точка входа для всех клиентов 3. Session Manager - отслеживание активных соединений 2. Notification Service - бизнес-логика уведомлений
3. Gateway Service: точка входа
Задачи Gateway
- Аутентификация и авторизация
- Heartbeat и reconnection logic
- Load balancing между инстансами
- Управление WebSocket соединениями
type Gateway struct {
sessions *SessionManager
hub *ConnectionHub
auth AuthService
msgQueue MessageQueue
}
type Connection struct {
UserID string
DeviceID string
Conn *websocket.Conn
Send chan []byte
LastSeen time.Time
}
func (g *Gateway) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := websocket.Upgrade(w, r, nil)
if err != nil {
return
}
userID := g.auth.GetUserID(r)
deviceID := r.Header.Get("Device-ID")
client := &Connection{
UserID: userID,
DeviceID: deviceID,
Conn: conn,
Send: make(chan []byte, 256),
LastSeen: time.Now(),
}
g.hub.Register(client)
go g.writePump(client)
go g.readPump(client)
}
Connection Hub - управление соединениями
type ConnectionHub struct {
clients map[string]map[string]*Connection // userID -> deviceID -> connection
register chan *Connection
unregister chan *Connection
broadcast chan *Message
mu sync.RWMutex
}
func (h *ConnectionHub) Run() {
for {
select {
case client := <-h.register:
h.mu.Lock()
if h.clients[client.UserID] == nil {
h.clients[client.UserID] = make(map[string]*Connection)
}
h.clients[client.UserID][client.DeviceID] = client
h.mu.Unlock()
case client := <-h.unregister:
h.mu.Lock()
if devices := h.clients[client.UserID]; devices != nil {
delete(devices, client.DeviceID)
if len(devices) == 0 {
delete(h.clients, client.UserID)
}
}
h.mu.Unlock()
case message := <-h.broadcast:
h.sendToUser(message.UserID, message.Data)
}
}
}
4. Notification Service: бизнес-логика
Типы уведомлений
type NotificationType string
const (
TypeMessage NotificationType = "message"
TypeLike NotificationType = "like"
TypeComment NotificationType = "comment"
TypeSystemAlert NotificationType = "system_alert"
)
type Notification struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Type NotificationType `json:"type"`
Title string `json:"title"`
Body string `json:"body"`
Data map[string]any `json:"data"`
CreatedAt time.Time `json:"created_at"`
ReadAt *time.Time `json:"read_at,omitempty"`
}
🔹 Сервис обработки уведомлений:
type NotificationService struct {
db Database
push PushService
queue MessageQueue
sessions SessionManager
}
func (ns *NotificationService) SendNotification(notification *Notification) error {
// 1. Сохраняем в базу для истории
if err := ns.db.SaveNotification(notification); err != nil {
return err
}
// 2. Проверяем, онлайн ли пользователь
if ns.sessions.IsUserOnline(notification.UserID) {
// Отправляем через WebSocket
return ns.sendRealtime(notification)
}
// 3. Если оффлайн - отправляем push
return ns.push.SendPush(notification)
}
func (ns *NotificationService) sendRealtime(notification *Notification) error {
message := &Message{
Type: "notification",
UserID: notification.UserID,
Data: notification,
}
return ns.queue.Publish("realtime_notifications", message)
}
5. Синхронизация состояния между устройствами
Проблема
Пользователь прочитал сообщение на телефоне - нужно пометить как прочитанное на всех устройствах.
type SyncService struct {
redis *redis.Client
sessions SessionManager
queue MessageQueue
}
type SyncEvent struct {
UserID string `json:"user_id"`
DeviceID string `json:"device_id"`
Type string `json:"type"` // "read", "typing", "online"
Data any `json:"data"`
Timestamp time.Time `json:"timestamp"`
}
func (ss *SyncService) MarkAsRead(userID, messageID, deviceID string) error {
// 1. Обновляем в базе
if err := ss.updateReadStatus(userID, messageID); err != nil {
return err
}
// 2. Синхронизируем с другими устройствами
syncEvent := &SyncEvent{
UserID: userID,
DeviceID: deviceID,
Type: "read",
Data: map[string]string{"message_id": messageID},
Timestamp: time.Now(),
}
return ss.broadcastToOtherDevices(syncEvent)
}
func (ss *SyncService) broadcastToOtherDevices(event *SyncEvent) error {
devices := ss.sessions.GetUserDevices(event.UserID)
for _, device := range devices {
if device.DeviceID != event.DeviceID {
ss.queue.Publish("sync_events", &Message{
UserID: event.UserID,
Data: event,
Target: device.DeviceID,
})
}
}
return nil
}
6. Message Queue: надёжная доставка
Используем Redis Streams для гарантированной доставки
type RedisQueue struct {
client *redis.Client
}
func (rq *RedisQueue) Publish(stream string, message *Message) error {
data := map[string]interface{}{
"user_id": message.UserID,
"type": message.Type,
"data": message.Data,
}
return rq.client.XAdd(context.Background(), &redis.XAddArgs{
Stream: stream,
Values: data,
}).Err()
}
func (rq *RedisQueue) Subscribe(stream, consumerGroup string, handler func(*Message)) {
for {
msgs, err := rq.client.XReadGroup(context.Background(), &redis.XReadGroupArgs{
Group: consumerGroup,
Consumer: "worker-" + uuid.New().String(),
Streams: []string{stream, ">"},
Count: 10,
Block: time.Second,
}).Result()
if err != nil {
continue
}
for _, msg := range msgs {
for _, message := range msg.Messages {
handler(parseMessage(message))
rq.client.XAck(context.Background(), stream, consumerGroup, message.ID)
}
}
}
}
7. Push-уведомления для оффлайн пользователей
Интеграция с FCM (Firebase) и APNs
type PushService struct {
fcm *messaging.Client
apns *apns2.Client
db Database
}
func (ps *PushService) SendPush(notification *Notification) error {
devices, err := ps.db.GetUserDevices(notification.UserID)
if err != nil {
return err
}
for _, device := range devices {
switch device.Platform {
case "android":
ps.sendFCM(device.Token, notification)
case "ios":
ps.sendAPNs(device.Token, notification)
}
}
return nil
}
func (ps *PushService) sendFCM(token string, notification *Notification) error {
message := &messaging.Message{
Token: token,
Notification: &messaging.Notification{
Title: notification.Title,
Body: notification.Body,
},
Data: map[string]string{
"notification_id": notification.ID,
"type": string(notification.Type),
},
}
_, err := ps.fcm.Send(context.Background(), message)
return err
}
8. Масштабирование и производительность
Горизонтальное масштабирование
1. Шардинг по пользователям:
func (s *ShardManager) GetShardForUser(userID string) string {
hash := fnv.New32a()
hash.Write([]byte(userID))
shardID := hash.Sum32() % uint32(s.shardCount)
return fmt.Sprintf("shard-%d", shardID)
}
2. Load Balancer с применением sticky sessions (nginx):
upstream notification_gateway {
ip_hash; # Sticky sessions по IP
server gateway1:8080;
server gateway2:8080;
server gateway3:8080;
}
3. Redis Cluster для сессий:
type SessionManager struct {
cluster *redis.ClusterClient
}
func (sm *SessionManager) StoreSession(userID, deviceID string, conn *Connection) {
key := fmt.Sprintf("session:%s:%s", userID, deviceID)
data := map[string]interface{}{
"server_id": sm.serverID,
"connected_at": time.Now().Unix(),
}
sm.cluster.HMSet(context.Background(), key, data)
sm.cluster.Expire(context.Background(), key, 30*time.Minute)
}
9. Мониторинг и метрики
Ключевые метрики
type Metrics struct {
ActiveConnections prometheus.Gauge
NotificationsSent prometheus.Counter
DeliveryLatency prometheus.Histogram
FailedDeliveries prometheus.Counter
}
func (m *Metrics) RecordNotificationSent(userID string, latency time.Duration) {
m.NotificationsSent.Inc()
m.DeliveryLatency.Observe(latency.Seconds())
}
Health checks
func (g *Gateway) HealthCheck(w http.ResponseWriter, r *http.Request) {
status := map[string]string{
"status": "healthy",
"connections": fmt.Sprintf("%d", g.hub.GetConnectionCount()),
"uptime": time.Since(g.startTime).String(),
}
json.NewEncoder(w).Encode(status)
}
Вывод: система уведомлений - это не просто WebSocket
Построить надёжную систему уведомлений - это про:
✅ Надёжность - гарантированная доставка через очереди сообщений
✅ Архитектуру - правильное разделение ответственности между сервисами
✅ Синхронизацию - состояние должно быть консистентным на всех устройствах
✅ Масштабируемость - горизонтальное масштабирование и шардинг
Главное правило:
Система уведомлений - это не фича, а критически важная инфраструктура. Планируй масштабирование с первого дня.
P.S. Какие проблемы вам приходилось решать при построении real-time систем? Поделитесь опытом в комментариях! 🚀
// Дополнительные ресурсы:
// - WebSocket RFC: https://tools.ietf.org/html/rfc6455
// - Redis Streams: https://redis.io/topics/streams-intro
// - System Design Interview: https://github.com/donnemartin/system-design-primer
// - Firebase Cloud Messaging: https://firebase.google.com/docs/cloud-messaging
// - Apple Push Notification Service: https://developer.apple.com/documentation/usernotifications/setting_up_a_remote_notification_server