Перейти к содержанию

Работа с базой данных

В Planiqum для работы с базой данных используется библиотека planiqum.core.libs.db, которая предоставляет удобные функции для выполнения SQL-запросов с автоматическим логированием и управлением транзакциями.

Основные функции

execute_query

Файл: src/planiqum/core/libs/db.py

Назначение: Основная функция для выполнения SQL-запросов с детальным логированием и автоматическим управлением транзакциями.

Сигнатура:

def execute_query(query, to_dict: bool = True, log_context: dict = None) -> list:

Параметры: - query - SQL-запрос (строка или объект PyPika Query) - to_dict - если True, возвращает список словарей; если False, возвращает список списков - log_context - дополнительный контекст для логирования (опционально)

Возвращает: Список результатов запроса

Особенности работы

1. Автоматическое управление транзакциями:

  • Если функция вызывается вне транзакции (@transaction.atomic), работает как обычно
  • Если функция вызывается внутри транзакции, автоматически создает savepoint для безопасного отката при ошибках

2. Детальное логирование:

  • INFO - начало выполнения запроса с уникальным query_id, длиной запроса, типом и preview
  • INFO - успешное завершение с временем выполнения и количеством строк
  • ERROR - ошибка выполнения с типом ошибки и временем выполнения
  • DEBUG - создание/использование курсоров, операции с savepoint, конвертация данных
  • DEBUG - план выполнения запроса (EXPLAIN) при включенной настройке EXPLAIN_QUERIES

3. Трассировка запросов:

  • Каждый запрос получает уникальный query_id для полной трассировки
  • Все логи содержат префикс execute_query: query_id: {id} для удобного поиска
  • Время конвертации данных измеряется и логируется отдельно
  • Полная трассировка жизненного цикла запроса от начала до завершения

4. Усечение длинных запросов:

  • Запросы длиннее MAX_DEBUG_QUERY_LENGTH (по умолчанию 5000 символов) усекаются во всех логах
  • Текст запроса во всех логах (INFO, DEBUG, ERROR) ограничен этой настройкой
  • Очистка запроса от лишних символов для читаемого логирования

5. Конвертация типов данных:

  • datetime и date → строки в формате YYYY-MM-DD
  • Decimalfloat
  • float с бесконечными значениями → None

Примеры использования

Простой SELECT запрос:

from planiqum.core.libs.db import execute_query
from pypika import PostgreSQLQuery as Query, functions as fn

# Запрос с PyPika
query = Query.select('id', 'name').from_('users').where('active = true')
results = execute_query(query)

# Результат: [{'id': 1, 'name': 'John'}, {'id': 2, 'name': 'Jane'}]

Строковый запрос:

# Прямой SQL
results = execute_query("SELECT COUNT(*) as total FROM users")
# Результат: [{'total': 150}]

Возврат списков вместо словарей:

results = execute_query(query, to_dict=False)
# Результат: [[1, 'John'], [2, 'Jane']]

Работа в транзакции:

from django.db import transaction

with transaction.atomic():
    # Успешный запрос
    result1 = execute_query("SELECT NOW() as time1")

    try:
        # Запрос с ошибкой - откатится только к savepoint
        execute_query("SELECT invalid_column FROM non_existent_table")
    except Exception:
        # Ошибка обработана, основная транзакция не сломалась
        pass

    # Этот запрос выполнится успешно
    result3 = execute_query("SELECT NOW() as time3")

Логирование

Примеры логов:

Начало выполнения:

INFO: Query execution started - hash: a1b2c3d4, length: 45, truncated: False, in_transaction: True, text: SELECT NOW() "current_time"

Успешное завершение:

INFO: Query executed successfully - hash: a1b2c3d4, time: 0.001s, rows: 1

Ошибка выполнения:

ERROR: Query execution failed - hash: a1b2c3d4, error: ProgrammingError: relation "non_existent_table" does not exist, time: 0.001s, in_transaction: True

Поиск в GrayLog: - По хэшу: hash:a1b2c3d4 - По типу операции: Query execution started - По статусу транзакции: in_transaction: True

execute_sql

Назначение: Выполнение SQL-запросов без возврата результатов (INSERT, UPDATE, DELETE, DDL).

⚠️ Устаревшая функция: Для новых запросов рекомендуется использовать execute_query(), которая обеспечивает детальное логирование и автоматическое управление транзакциями. execute_sql() используется в основном для DDL операций и совместимости с legacy кодом.

Сигнатура:

def execute_sql(sql, params=None, cursor=None):

Параметры: - sql - SQL-запрос (строка) - params - параметры запроса для безопасных запросов (опционально) - cursor - курсор базы данных (опционально)

Примеры использования:

from planiqum.core.libs.db import execute_sql

# Создание таблицы
execute_sql("""
    CREATE TABLE test_table (
        id SERIAL PRIMARY KEY,
        name VARCHAR(255)
    )
""")

# Вставка данных с параметрами
execute_sql("INSERT INTO test_table (name) VALUES (%s)", ['test'])

# Обновление данных
execute_sql("UPDATE test_table SET name = 'updated' WHERE id = 1")

select_to_df

Назначение: Выполнение SELECT-запроса с возвратом результата в виде pandas DataFrame.

Сигнатура:

def select_to_df(sql):

Параметры: - sql - SQL-запрос (строка или объект PyPika Query)

Возвращает: pandas DataFrame

Примеры использования:

from planiqum.core.libs.db import select_to_df

# Получение данных в DataFrame
df = select_to_df("SELECT id, name, created_at FROM users WHERE active = true")

# Работа с DataFrame
print(df.head())
print(df.describe())

Настройки

MAX_DEBUG_QUERY_LENGTH

Файл: src/planiqum/core/settings.py

Назначение: Максимальная длина SQL-запроса для включения в debug-логи.

По умолчанию: 5000 символов

Настройка через переменную окружения:

export MAX_DEBUG_QUERY_LENGTH=8000

Использование: - Запросы длиннее этого значения усекаются во всех логах (INFO, DEBUG, ERROR) - Текст запроса во всех логах ограничен этой настройкой - Уникальный query_id позволяет найти все логи конкретного запроса

EXPLAIN_QUERIES

Файл: src/planiqum/core/settings.py

Назначение: Включение вывода плана выполнения SQL-запросов для отладки производительности.

По умолчанию: False (отключен)

Настройка через переменную окружения:

export EXPLAIN_QUERIES=true

Использование: - При True выполняется EXPLAIN перед каждым запросом - Результат плана выполнения выводится в debug лог - Помогает анализировать производительность и использование индексов - Не влияет на основную работу приложения

Пример лога с EXPLAIN:

execute_query: query_id: abc12345 EXPLAIN результат:
  ('Seq Scan on users', 'cost=0.00..18334.00 rows=1000000 width=36')
  ('Filter: (age > 25)', 'cost=0.00..18334.00 rows=333333 width=36')

Мониторинг и отладка

Структура логов execute_query

Все логи execute_query имеют единообразный формат с префиксом execute_query: query_id: {id}:

Основные логи (INFO/ERROR):

execute_query: query_id: abc12345 START
  query_length: 1061
  query_type: SELECT
  query: SELECT * FROM users WHERE active = true

execute_query: query_id: abc12345 SUCCESS
  query_length: 1061
  query_type: SELECT
  execution_time: 0.123s
  rows_returned: 42
  query: SELECT * FROM users WHERE active = true

execute_query: query_id: abc12345 ERROR
  query_length: 1061
  query_type: SELECT
  execution_time: 0.123s
  error_type: Exception
  error_message: Table 'users' doesn't exist
  query: SELECT * FROM users WHERE active = true

Примечание: Поле query во всех логах ограничено настройкой MAX_DEBUG_QUERY_LENGTH (по умолчанию 5000 символов). Длинные запросы усекаются с добавлением "...".

Детальные логи (DEBUG):

execute_query: query_id: abc12345 создан курсор
execute_query: query_id: abc12345 created_savepoint sp_001
execute_query: query_id: abc12345 committed_savepoint sp_001
execute_query: query_id: abc12345 Результаты запроса конвертированы, на конвертацию затрачено: 0.001s

EXPLAIN логи (при включенной настройке):

execute_query: query_id: abc12345 EXPLAIN результат:
  ('Seq Scan on users', 'cost=0.00..18334.00 rows=1000000 width=36')
  ('Filter: (active = true)', 'cost=0.00..18334.00 rows=500000 width=36')

Фильтрация в GrayLog

Все логи конкретного запроса:

message:"query_id: abc12345"

По типу операции:

# Старты запросов
message:"START" AND message:"query_id:"

# Успехи запросов  
message:"SUCCESS" AND message:"query_id:"

# Ошибки запросов
message:"ERROR" AND message:"query_id:"

# EXPLAIN результаты
message:"EXPLAIN результат:" AND message:"query_id:"

# Операции с курсорами
message:"курсор" AND message:"query_id:"

# Операции с savepoint
message:"savepoint" AND message:"query_id:"

По производительности:

# Медленные запросы (>1 секунды)
message:"execution_time:" AND message:{"$gt": "1.000s"} AND message:"query_id:"

# Большие запросы (>5000 символов)
message:"query_length:" AND message:{"$gt": 5000} AND message:"query_id:"

# Медленная конвертация (>0.1 секунды)
message:"конвертированы" AND message:"затрачено" AND message:{"$gt": "0.100s"}

Полная трассировка запроса

Пример полного жизненного цикла запроса:

  1. execute_query: query_id: abc12345 START - начало выполнения
  2. execute_query: query_id: abc12345 создан курсор - создание нового курсора
  3. execute_query: query_id: abc12345 EXPLAIN результат: - план выполнения (если включен)
  4. execute_query: query_id: abc12345 created_savepoint sp_001 - создание savepoint в транзакции
  5. execute_query: query_id: abc12345 committed_savepoint sp_001 - коммит savepoint
  6. execute_query: query_id: abc12345 Результаты запроса конвертированы, на конвертацию затрачено: 0.001s - конвертация данных
  7. execute_query: query_id: abc12345 SUCCESS - успешное завершение

Рекомендации по использованию

Выбор функции

Используйте execute_query для: - SELECT-запросов с возвратом данных - Запросов, требующих детального логирования - Работы в транзакциях с автоматическим управлением ошибками

Используйте execute_sql для: - DDL-операций (CREATE, ALTER, DROP) в legacy коде - Совместимости с существующим кодом - Операций, требующих внешний курсор

⚠️ Рекомендация: Для новых запросов предпочитайте execute_query() из-за лучшего логирования и управления транзакциями.

Используйте select_to_df для: - Аналитических запросов - Работы с данными в pandas - Экспорта данных

Работа с транзакциями

Автоматическое управление:

# execute_query автоматически создает savepoint в транзакции
with transaction.atomic():
    result = execute_query("SELECT * FROM users")
    # При ошибке откатится только к savepoint, не сломав транзакцию

Ручное управление:

# Для execute_sql нужно управлять транзакциями вручную
with transaction.atomic():
    execute_sql("INSERT INTO users (name) VALUES ('John')")
    execute_sql("UPDATE users SET active = true WHERE name = 'John')")

# Рекомендуется использовать execute_query для новых запросов
with transaction.atomic():
    execute_query("INSERT INTO users (name) VALUES ('John')")
    execute_query("UPDATE users SET active = true WHERE name = 'John')")

Логирование и отладка

Поиск в GrayLog: - По хэшу запроса: hash:a1b2c3d4 - По типу операции: Query execution started или Query execution failed - По времени выполнения: time: 0.001s - По статусу транзакции: in_transaction: True/False

Мониторинг производительности: - Время выполнения запросов логируется автоматически - Количество возвращаемых строк для анализа объема данных - Статус транзакции для понимания контекста выполнения

Тестирование

Для тестирования функций работы с базой данных используются тесты в src/planiqum/core/tests/libs/db/:

  • test_execute_query.py - тесты для execute_query
  • test_get_indexes_query.py - тесты для функций работы с индексами

Пример теста:

import pytest
from planiqum.core.libs.db import execute_query

@pytest.mark.django_db
def test_execute_query_basic():
    result = execute_query("SELECT NOW() as current_time")
    assert len(result) == 1
    assert 'current_time' in result[0]

Связанные темы