Перейти к содержанию

Работа с WebSocket в Planiqum

Система WebSocket в Planiqum реализована для обеспечения реального времени уведомлений пользователей. Архитектура включает как серверную, так и клиентскую части.

Ограничения системы

Ограничение количества сообщений

Для предотвращения перегрузки системы и улучшения производительности установлены следующие ограничения:

  • Серверное ограничение: MAX_OFFLINE_MESSAGES = 100 - максимальное количество офлайн сообщений, отправляемых при подключении
  • Клиентское ограничение: максимум 100 уведомлений в localStorage с индикатором "100+" при превышении

Позиционирование уведомлений

Toast-уведомления отображаются в правом нижнем углу экрана для лучшего пользовательского опыта.

Архитектура системы

Backend компоненты

1. WebSocket Consumer (src/planiqum/core/consumers.py)

Основной обработчик WebSocket подключений. Реализует NotificationConsumer на базе Django Channels:

class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.connection_id = str(uuid.uuid4())
        self.user = await self.get_user()

        # Проверка аутентификации
        if isinstance(self.user, AnonymousUser) or not getattr(self.user, 'is_authenticated', False):
            await self.close()
            return

        await self.accept()

        # Добавление в группу пользователя
        self.group_name = f"user_{self.user.id}"
        await self.channel_layer.group_add(self.group_name, self.channel_name)
        websocket_service.add_connection(self.connection_id, self.user, self)

        # Отправка информации о подключении
        await self.send(text_data=json.dumps({
            'type': 'connection_info',
            'data': {
                'connection_id': self.connection_id,
                'user_id': self.user.id,
                'username': self.user.username 
            }
        }))

        # Доставка офлайн сообщений (с ограничением)
        await self.deliver_offline_messages()

Метод доставки офлайн сообщений

async def deliver_offline_messages(self):
    # Ограничиваем количество офлайн сообщений для предотвращения перегрузки
    to_deliver = await database_sync_to_async(list)(
        WebSocketMessage.objects.filter(
            user_id=self.user.id, 
            delivered=False
        ).order_by('sent_at')[:MAX_OFFLINE_MESSAGES]
    )
    for msg in to_deliver:
        await self.send(text_data=json.dumps({
            'type': msg.message_type,
            'data': {**msg.data, 'id': msg.id, 'silent': True},
        }))
    if to_deliver:
        await database_sync_to_async(WebSocketMessage.objects.filter(
            id__in=[m.id for m in to_deliver]
        ).update)(delivered=True)

2. WebSocket Service (src/planiqum/core/services/websocket_service.py)

Сервис для управления активными подключениями:

class WebSocketService:
    def __init__(self):
        self.connections: Dict[str, Any] = {}
        self.sync_with_database()

    def add_connection(self, connection_id: str, user: Any, websocket):
        self.connections[connection_id] = {
            'user': user,
            'websocket': websocket,
            'tab_id': None,
        }
        # Создание записи в базе данных
        WebSocketConnection.objects.create(
            user=user,
            connection_id=connection_id,
            is_active=True
        )

    def send_to_user(self, user: Any, message_type: str, data: dict):
        user_connections = [
            conn_id for conn_id, conn_data in self.connections.items()
            if conn_data['user'].id == user.id
        ]

        for connection_id in user_connections:
            self.send_to_connection(connection_id, message_type, data)

3. Notification Service (src/planiqum/core/services/notification_service.py)

Сервис для отправки уведомлений:

class NotificationService:
    @staticmethod
    def send_notification_sync(user, title: str, message: str, level: str = 'info'):
        if level not in {'info', 'success', 'error'}:
            level = 'info'

        # Создание записи в базе данных
        record = WebSocketMessage.objects.create(
            user=user,
            message_type='notification',
            data={'title': title, 'message': message, 'level': level},
            delivered=False,
        )

        # Отправка через WebSocket канал
        channel_layer = get_channel_layer()
        group = f"user_{user.id}"
        payload = {
            'type': 'user.message',
            'message_type': 'notification',
            'data': {
                'id': record.id,
                'title': title,
                'message': message,
                'level': level,
                'silent': False,
            }
        }
        async_to_sync(channel_layer.group_send)(group, payload)

4. Модели данных (src/planiqum/core/models.py)

WebSocketConnection - модель для хранения активных подключений:

class WebSocketConnection(models.Model):
    user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    connection_id = models.CharField(max_length=255, unique=True)
    created_at = models.DateTimeField(auto_now_add=True)
    is_active = models.BooleanField(default=True)

WebSocketMessage - модель для хранения сообщений:

class WebSocketMessage(models.Model):
    user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
    connection = models.ForeignKey(WebSocketConnection, on_delete=models.CASCADE, null=True, blank=True)
    message_type = models.CharField(max_length=50)
    data = models.JSONField()
    sent_at = models.DateTimeField(auto_now_add=True)
    delivered = models.BooleanField(default=False)

Frontend компоненты

WebSocket Manager (src/planiqum/static/js/websocket.js)

JavaScript клиент для работы с WebSocket:

class WebSocketManager {
    constructor() {
        this.socket = null;
        this.connectionId = null;
        this.reconnectAttempts = 0;
        this.maxReconnectAttempts = 5;
        this.reconnectDelay = 1000;
        this.isConnected = false;
        this.messageHandlers = new Map();
        this.storageKey = 'websocket_connection';
        this.unreadStorageKey = 'websocket_unread_notifications';
        this.unread = [];
        this.MAX_NOTIFICATIONS = 100; // Максимальное количество уведомлений
        this.init();
    }

    connect() {
        const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
        const wsUrl = `${protocol}//${window.location.host}/ws/notifications/`;

        this.socket = new WebSocket(wsUrl);

        this.socket.onopen = () => {
            this.isConnected = true;
            this.reconnectAttempts = 0;
            // Отправка tab_id для ограничения одного подключения на вкладку
            try {
                this.socket.send(JSON.stringify({ type: 'tab', tab_id: this.getTabId() }));
            } catch {}
        };

        this.socket.onmessage = (event) => {
            try {
                const data = JSON.parse(event.data);
                this.handleMessage(data);
            } catch (e) {
                // Обработка ошибок
            }
        };
    }

    handleMessage(data) {
        const { type, data: messageData } = data;

        switch (type) {
            case 'connection_info':
                this.handleConnectionInfo(messageData);
                break;
            case 'notification':
                this.showNotification(messageData);
                break;
            case 'system_message':
                this.showSystemMessage(messageData);
                break;
            case 'pong':
                break;
            default:
                // Обработка неизвестных типов сообщений
        }
    }

    showNotification(data) {
        const { title, message, level = 'info', silent = false } = data;
        const id = (data && data.id) ? String(data.id) : Date.now().toString();

        // Дублирование по id
        if (this.unread.some(x => x.id === id)) {
            return;
        }

        this.addUnread({ id, title, message, level, ts: Date.now() });

        // Отправка ACK на сервер
        if (this.socket && this.socket.readyState === WebSocket.OPEN && data && data.id) {
            try { 
                this.socket.send(JSON.stringify({ type: 'ack', id: data.id })); 
            } catch {}
        }

        // Показ тоста для онлайн сообщений
        if (!silent && this.isConnected) {
            this.showToast(title, message, level, id);
        }
    }

    addUnread(item) {
        this.unread.unshift(item);

        // Ограничиваем количество уведомлений
        if (this.unread.length > this.MAX_NOTIFICATIONS) {
            this.unread = this.unread.slice(0, this.MAX_NOTIFICATIONS);
        }

        this.saveUnread();
        this.renderUnread();
        return item.id;
    }

    renderUnread() {
        const badge = document.getElementById('ws-bell-badge');
        if (badge) {
            if (this.unread.length > 0) {
                badge.style.display = 'inline-block';
                // Отображаем "100+" если количество превышает лимит
                if (this.unread.length >= this.MAX_NOTIFICATIONS) {
                    badge.textContent = `${this.MAX_NOTIFICATIONS}+`;
                    badge.style.fontSize = '9px';
                    badge.style.padding = '0 3px';
                } else {
                    badge.textContent = String(this.unread.length);
                    badge.style.fontSize = '10px';
                    badge.style.padding = '0 4px';
                }
            } else {
                badge.style.display = 'none';
            }
        }
    }
}

Маршрутизация

WebSocket маршруты определены в src/planiqum/core/routing.py:

from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'^ws/notifications/$', consumers.NotificationConsumer.as_asgi()),
]

И подключены в project/asgi.py:

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from planiqum.core.routing import websocket_urlpatterns

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(websocket_urlpatterns)
    ),
})

Типы сообщений

Поддерживаемые типы сообщений от клиента:

  1. ping - проверка соединения
  2. tab - установка tab_id для ограничения подключений
  3. ack - подтверждение получения сообщения
  4. read_all - отметка всех сообщений как прочитанных

Поддерживаемые типы сообщений от сервера:

  1. connection_info - информация о подключении
  2. notification - уведомления
  3. system_message - системные сообщения
  4. pong - ответ на ping

Примеры использования

Отправка уведомления пользователю

from planiqum.core.services.notification_service import NotificationService
from django.contrib.auth import get_user_model

User = get_user_model()
user = User.objects.get(username='admin')

# Отправка уведомления
NotificationService.send_notification_sync(
    user=user,
    title='Заголовок уведомления',
    message='Текст уведомления',
    level='info'  # 'info', 'success', 'error'
)

Проверка активных подключений

from planiqum.core.models import WebSocketConnection
from django.contrib.auth import get_user_model

User = get_user_model()
user = User.objects.get(username='admin')

# Получение количества активных подключений
connections = WebSocketConnection.objects.filter(user=user)
print(f'Активных подключений: {connections.count()}')

# Детальная информация о подключениях
for conn in connections:
    print(f'ID: {conn.connection_id}, создано: {conn.created_at}')

Отправка произвольного сообщения

from planiqum.core.models import WebSocketMessage
from django.contrib.auth import get_user_model

User = get_user_model()
user = User.objects.get(username='admin')

# Создание сообщения
message = WebSocketMessage.objects.create(
    user=user,
    message_type='custom_message',
    data={'key': 'value', 'message': 'Произвольное сообщение'}
)

Особенности реализации

Ограничение подключений по вкладкам

Система автоматически закрывает дублирующие подключения с одинаковым tab_id, обеспечивая только одно подключение на вкладку браузера.

Офлайн доставка

Сообщения сохраняются в базе данных и доставляются при следующем подключении пользователя, если он был офлайн.

Автоматическое переподключение

Клиент автоматически пытается переподключиться при разрыве соединения с экспоненциальной задержкой.

Подтверждение доставки

Клиент отправляет ACK сообщения для подтверждения получения уведомлений, что позволяет серверу отмечать сообщения как доставленные.

Совместимость с браузерами

Система оптимизирована для работы в различных браузерах:

  • Chrome/Edge: полная поддержка всех функций
  • Yandex Browser: исправлены проблемы с z-index и backdrop-filter
  • Firefox: полная поддержка
  • Safari: полная поддержка

Исправления для Yandex Browser

  • Увеличен z-index для элементов уведомлений (1050-1051)
  • Добавлены CSS fallbacks для backdrop-filter
  • Улучшена инициализация Bootstrap dropdown

Интеграция с интерфейсом

WebSocket автоматически подключается для аутентифицированных пользователей через включение скрипта в src/planiqum/core/templates/base.html:

{% if user.is_authenticated %}
  <script>
    window.userId = {{ user.id }};
  </script>
  <script src="{% static 'js/websocket.js' %}"></script>
{% endif %}