Работа с базой данных ¶
В Planiqum для работы с базой данных используется библиотека planiqum.core.libs.db, которая предоставляет удобные функции для выполнения SQL-запросов с автоматическим логированием и управлением транзакциями.
Основные функции ¶
execute_query ¶
Файл: src/planiqum/core/libs/db.py
Назначение: Основная функция для выполнения SQL-запросов с детальным логированием и автоматическим управлением транзакциями.
Сигнатура:
def execute_query(query, to_dict: bool = True, log_context: dict = None) -> list:
Параметры:
- query - SQL-запрос (строка или объект PyPika Query)
- to_dict - если True, возвращает список словарей; если False, возвращает список списков
- log_context - дополнительный контекст для логирования (опционально)
Возвращает: Список результатов запроса
Особенности работы ¶
1. Автоматическое управление транзакциями:
- Если функция вызывается вне транзакции (
@transaction.atomic), работает как обычно - Если функция вызывается внутри транзакции, автоматически создает savepoint для безопасного отката при ошибках
2. Детальное логирование:
- INFO - начало выполнения запроса с уникальным
query_id, длиной запроса, типом и preview - INFO - успешное завершение с временем выполнения и количеством строк
- ERROR - ошибка выполнения с типом ошибки и временем выполнения
- DEBUG - создание/использование курсоров, операции с savepoint, конвертация данных
- DEBUG - план выполнения запроса (EXPLAIN) при включенной настройке
EXPLAIN_QUERIES
3. Трассировка запросов:
- Каждый запрос получает уникальный
query_idдля полной трассировки - Все логи содержат префикс
execute_query: query_id: {id}для удобного поиска - Время конвертации данных измеряется и логируется отдельно
- Полная трассировка жизненного цикла запроса от начала до завершения
4. Усечение длинных запросов:
- Запросы длиннее
MAX_DEBUG_QUERY_LENGTH(по умолчанию 5000 символов) усекаются во всех логах - Текст запроса во всех логах (INFO, DEBUG, ERROR) ограничен этой настройкой
- Очистка запроса от лишних символов для читаемого логирования
5. Конвертация типов данных:
datetimeиdate→ строки в форматеYYYY-MM-DDDecimal→floatfloatс бесконечными значениями →None
Примеры использования ¶
Простой SELECT запрос:
from planiqum.core.libs.db import execute_query
from pypika import PostgreSQLQuery as Query, functions as fn
# Запрос с PyPika
query = Query.select('id', 'name').from_('users').where('active = true')
results = execute_query(query)
# Результат: [{'id': 1, 'name': 'John'}, {'id': 2, 'name': 'Jane'}]
Строковый запрос:
# Прямой SQL
results = execute_query("SELECT COUNT(*) as total FROM users")
# Результат: [{'total': 150}]
Возврат списков вместо словарей:
results = execute_query(query, to_dict=False)
# Результат: [[1, 'John'], [2, 'Jane']]
Работа в транзакции:
from django.db import transaction
with transaction.atomic():
# Успешный запрос
result1 = execute_query("SELECT NOW() as time1")
try:
# Запрос с ошибкой - откатится только к savepoint
execute_query("SELECT invalid_column FROM non_existent_table")
except Exception:
# Ошибка обработана, основная транзакция не сломалась
pass
# Этот запрос выполнится успешно
result3 = execute_query("SELECT NOW() as time3")
Логирование ¶
Примеры логов:
Начало выполнения:
INFO: Query execution started - hash: a1b2c3d4, length: 45, truncated: False, in_transaction: True, text: SELECT NOW() "current_time"
Успешное завершение:
INFO: Query executed successfully - hash: a1b2c3d4, time: 0.001s, rows: 1
Ошибка выполнения:
ERROR: Query execution failed - hash: a1b2c3d4, error: ProgrammingError: relation "non_existent_table" does not exist, time: 0.001s, in_transaction: True
Поиск в GrayLog:
- По хэшу: hash:a1b2c3d4
- По типу операции: Query execution started
- По статусу транзакции: in_transaction: True
execute_sql ¶
Назначение: Выполнение SQL-запросов без возврата результатов (INSERT, UPDATE, DELETE, DDL).
⚠️ Устаревшая функция: Для новых запросов рекомендуется использовать
execute_query(), которая обеспечивает детальное логирование и автоматическое управление транзакциями.execute_sql()используется в основном для DDL операций и совместимости с legacy кодом.
Сигнатура:
def execute_sql(sql, params=None, cursor=None):
Параметры:
- sql - SQL-запрос (строка)
- params - параметры запроса для безопасных запросов (опционально)
- cursor - курсор базы данных (опционально)
Примеры использования:
from planiqum.core.libs.db import execute_sql
# Создание таблицы
execute_sql("""
CREATE TABLE test_table (
id SERIAL PRIMARY KEY,
name VARCHAR(255)
)
""")
# Вставка данных с параметрами
execute_sql("INSERT INTO test_table (name) VALUES (%s)", ['test'])
# Обновление данных
execute_sql("UPDATE test_table SET name = 'updated' WHERE id = 1")
select_to_df ¶
Назначение: Выполнение SELECT-запроса с возвратом результата в виде pandas DataFrame.
Сигнатура:
def select_to_df(sql):
Параметры:
- sql - SQL-запрос (строка или объект PyPika Query)
Возвращает: pandas DataFrame
Примеры использования:
from planiqum.core.libs.db import select_to_df
# Получение данных в DataFrame
df = select_to_df("SELECT id, name, created_at FROM users WHERE active = true")
# Работа с DataFrame
print(df.head())
print(df.describe())
Настройки ¶
MAX_DEBUG_QUERY_LENGTH ¶
Файл: src/planiqum/core/settings.py
Назначение: Максимальная длина SQL-запроса для включения в debug-логи.
По умолчанию: 5000 символов
Настройка через переменную окружения:
export MAX_DEBUG_QUERY_LENGTH=8000
Использование:
- Запросы длиннее этого значения усекаются во всех логах (INFO, DEBUG, ERROR)
- Текст запроса во всех логах ограничен этой настройкой
- Уникальный query_id позволяет найти все логи конкретного запроса
EXPLAIN_QUERIES ¶
Файл: src/planiqum/core/settings.py
Назначение: Включение вывода плана выполнения SQL-запросов для отладки производительности.
По умолчанию: False (отключен)
Настройка через переменную окружения:
export EXPLAIN_QUERIES=true
Использование:
- При True выполняется EXPLAIN перед каждым запросом
- Результат плана выполнения выводится в debug лог
- Помогает анализировать производительность и использование индексов
- Не влияет на основную работу приложения
Пример лога с EXPLAIN:
execute_query: query_id: abc12345 EXPLAIN результат:
('Seq Scan on users', 'cost=0.00..18334.00 rows=1000000 width=36')
('Filter: (age > 25)', 'cost=0.00..18334.00 rows=333333 width=36')
Мониторинг и отладка ¶
Структура логов execute_query ¶
Все логи execute_query имеют единообразный формат с префиксом execute_query: query_id: {id}:
Основные логи (INFO/ERROR):
execute_query: query_id: abc12345 START
query_length: 1061
query_type: SELECT
query: SELECT * FROM users WHERE active = true
execute_query: query_id: abc12345 SUCCESS
query_length: 1061
query_type: SELECT
execution_time: 0.123s
rows_returned: 42
query: SELECT * FROM users WHERE active = true
execute_query: query_id: abc12345 ERROR
query_length: 1061
query_type: SELECT
execution_time: 0.123s
error_type: Exception
error_message: Table 'users' doesn't exist
query: SELECT * FROM users WHERE active = true
Примечание: Поле
queryво всех логах ограничено настройкойMAX_DEBUG_QUERY_LENGTH(по умолчанию 5000 символов). Длинные запросы усекаются с добавлением "...".
Детальные логи (DEBUG):
execute_query: query_id: abc12345 создан курсор
execute_query: query_id: abc12345 created_savepoint sp_001
execute_query: query_id: abc12345 committed_savepoint sp_001
execute_query: query_id: abc12345 Результаты запроса конвертированы, на конвертацию затрачено: 0.001s
EXPLAIN логи (при включенной настройке):
execute_query: query_id: abc12345 EXPLAIN результат:
('Seq Scan on users', 'cost=0.00..18334.00 rows=1000000 width=36')
('Filter: (active = true)', 'cost=0.00..18334.00 rows=500000 width=36')
Фильтрация в GrayLog ¶
Все логи конкретного запроса:
message:"query_id: abc12345"
По типу операции:
# Старты запросов
message:"START" AND message:"query_id:"
# Успехи запросов
message:"SUCCESS" AND message:"query_id:"
# Ошибки запросов
message:"ERROR" AND message:"query_id:"
# EXPLAIN результаты
message:"EXPLAIN результат:" AND message:"query_id:"
# Операции с курсорами
message:"курсор" AND message:"query_id:"
# Операции с savepoint
message:"savepoint" AND message:"query_id:"
По производительности:
# Медленные запросы (>1 секунды)
message:"execution_time:" AND message:{"$gt": "1.000s"} AND message:"query_id:"
# Большие запросы (>5000 символов)
message:"query_length:" AND message:{"$gt": 5000} AND message:"query_id:"
# Медленная конвертация (>0.1 секунды)
message:"конвертированы" AND message:"затрачено" AND message:{"$gt": "0.100s"}
Полная трассировка запроса ¶
Пример полного жизненного цикла запроса:
execute_query: query_id: abc12345 START- начало выполненияexecute_query: query_id: abc12345 создан курсор- создание нового курсораexecute_query: query_id: abc12345 EXPLAIN результат:- план выполнения (если включен)execute_query: query_id: abc12345 created_savepoint sp_001- создание savepoint в транзакцииexecute_query: query_id: abc12345 committed_savepoint sp_001- коммит savepointexecute_query: query_id: abc12345 Результаты запроса конвертированы, на конвертацию затрачено: 0.001s- конвертация данныхexecute_query: query_id: abc12345 SUCCESS- успешное завершение
Рекомендации по использованию ¶
Выбор функции ¶
Используйте execute_query для:
- SELECT-запросов с возвратом данных
- Запросов, требующих детального логирования
- Работы в транзакциях с автоматическим управлением ошибками
Используйте execute_sql для:
- DDL-операций (CREATE, ALTER, DROP) в legacy коде
- Совместимости с существующим кодом
- Операций, требующих внешний курсор
⚠️ Рекомендация: Для новых запросов предпочитайте
execute_query()из-за лучшего логирования и управления транзакциями.
Используйте select_to_df для:
- Аналитических запросов
- Работы с данными в pandas
- Экспорта данных
Работа с транзакциями ¶
Автоматическое управление:
# execute_query автоматически создает savepoint в транзакции
with transaction.atomic():
result = execute_query("SELECT * FROM users")
# При ошибке откатится только к savepoint, не сломав транзакцию
Ручное управление:
# Для execute_sql нужно управлять транзакциями вручную
with transaction.atomic():
execute_sql("INSERT INTO users (name) VALUES ('John')")
execute_sql("UPDATE users SET active = true WHERE name = 'John')")
# Рекомендуется использовать execute_query для новых запросов
with transaction.atomic():
execute_query("INSERT INTO users (name) VALUES ('John')")
execute_query("UPDATE users SET active = true WHERE name = 'John')")
Логирование и отладка ¶
Поиск в GrayLog:
- По хэшу запроса: hash:a1b2c3d4
- По типу операции: Query execution started или Query execution failed
- По времени выполнения: time: 0.001s
- По статусу транзакции: in_transaction: True/False
Мониторинг производительности: - Время выполнения запросов логируется автоматически - Количество возвращаемых строк для анализа объема данных - Статус транзакции для понимания контекста выполнения
Тестирование ¶
Для тестирования функций работы с базой данных используются тесты в src/planiqum/core/tests/libs/db/:
test_execute_query.py- тесты дляexecute_querytest_get_indexes_query.py- тесты для функций работы с индексами
Пример теста:
import pytest
from planiqum.core.libs.db import execute_query
@pytest.mark.django_db
def test_execute_query_basic():
result = execute_query("SELECT NOW() as current_time")
assert len(result) == 1
assert 'current_time' in result[0]
Связанные темы ¶
- Синхронизация параметров - использование
execute_sqlв синхронизации - Вычисляемые меры - работа с VIEW и материализованными представлениями
- Импорт данных параметров - массовые операции с данными