Перейти к содержанию

Работа с базой данных

В Planiqum для работы с базой данных используется библиотека planiqum.core.libs.db, которая предоставляет удобные функции для выполнения SQL-запросов с автоматическим логированием и управлением транзакциями.

Основные функции

execute_query

Файл: src/planiqum/core/libs/db.py

Назначение: Основная функция для выполнения SQL-запросов с детальным логированием и автоматическим управлением транзакциями.

Сигнатура:

def execute_query(query, to_dict: bool = True, log_context: dict = None) -> list:

Параметры: - query - SQL-запрос (строка или объект PyPika Query) - to_dict - если True, возвращает список словарей; если False, возвращает список списков - log_context - дополнительный контекст для логирования (опционально)

Возвращает: Список результатов запроса

Особенности работы

1. Автоматическое управление транзакциями:

  • Если функция вызывается вне транзакции (@transaction.atomic), работает как обычно
  • Если функция вызывается внутри транзакции, автоматически создает savepoint для безопасного отката при ошибках

2. Детальное логирование:

  • INFO - начало выполнения запроса с уникальным query_id, длиной запроса, типом и preview
  • INFO - успешное завершение с временем выполнения и количеством строк
  • ERROR - ошибка выполнения с типом ошибки и временем выполнения
  • DEBUG - создание/использование курсоров, операции с savepoint, конвертация данных
  • DEBUG - план выполнения запроса (EXPLAIN) при включенной настройке EXPLAIN_QUERIES

3. Трассировка запросов:

  • Каждый запрос получает уникальный query_id для полной трассировки
  • Все логи содержат префикс execute_query: query_id: {id} для удобного поиска
  • Время конвертации данных измеряется и логируется отдельно
  • Полная трассировка жизненного цикла запроса от начала до завершения

4. Усечение длинных запросов:

  • Запросы длиннее MAX_DEBUG_QUERY_LENGTH (по умолчанию 5000 символов) усекаются во всех логах
  • Текст запроса во всех логах (INFO, DEBUG, ERROR) ограничен этой настройкой
  • Очистка запроса от лишних символов для читаемого логирования

5. Конвертация типов данных:

  • datetime и date → строки в формате YYYY-MM-DD
  • Decimalfloat
  • float с бесконечными значениями → None

Примеры использования

Простой SELECT запрос:

from planiqum.core.libs.db import execute_query
from pypika import PostgreSQLQuery as Query, functions as fn

# Запрос с PyPika
query = Query.select('id', 'name').from_('users').where('active = true')
results = execute_query(query)

# Результат: [{'id': 1, 'name': 'John'}, {'id': 2, 'name': 'Jane'}]

Строковый запрос:

# Прямой SQL
results = execute_query("SELECT COUNT(*) as total FROM users")
# Результат: [{'total': 150}]

Возврат списков вместо словарей:

results = execute_query(query, to_dict=False)
# Результат: [[1, 'John'], [2, 'Jane']]

Работа в транзакции:

from django.db import transaction

with transaction.atomic():
    # Успешный запрос
    result1 = execute_query("SELECT NOW() as time1")

    try:
        # Запрос с ошибкой - откатится только к savepoint
        execute_query("SELECT invalid_column FROM non_existent_table")
    except Exception:
        # Ошибка обработана, основная транзакция не сломалась
        pass

    # Этот запрос выполнится успешно
    result3 = execute_query("SELECT NOW() as time3")

Логирование

Примеры логов:

Начало выполнения:

INFO: Query execution started - hash: a1b2c3d4, length: 45, truncated: False, in_transaction: True, text: SELECT NOW() "current_time"

Успешное завершение:

INFO: Query executed successfully - hash: a1b2c3d4, time: 0.001s, rows: 1

Ошибка выполнения:

ERROR: Query execution failed - hash: a1b2c3d4, error: ProgrammingError: relation "non_existent_table" does not exist, time: 0.001s, in_transaction: True

Поиск в GrayLog: - По хэшу: hash:a1b2c3d4 - По типу операции: Query execution started - По статусу транзакции: in_transaction: True

execute_sql

Назначение: Выполнение SQL-запросов без возврата результатов (INSERT, UPDATE, DELETE, DDL).

⚠️ Устаревшая функция: Для новых запросов рекомендуется использовать execute_query(), которая обеспечивает детальное логирование и автоматическое управление транзакциями. execute_sql() используется в основном для DDL операций и совместимости с legacy кодом.

Сигнатура:

def execute_sql(sql, params=None, cursor=None):

Параметры: - sql - SQL-запрос (строка) - params - параметры запроса для безопасных запросов (опционально) - cursor - курсор базы данных (опционально)

Примеры использования:

from planiqum.core.libs.db import execute_sql

# Создание таблицы
execute_sql("""
    CREATE TABLE test_table (
        id SERIAL PRIMARY KEY,
        name VARCHAR(255)
    )
""")

# Вставка данных с параметрами
execute_sql("INSERT INTO test_table (name) VALUES (%s)", ['test'])

# Обновление данных
execute_sql("UPDATE test_table SET name = 'updated' WHERE id = 1")

select_to_df

Назначение: Выполнение SELECT-запроса с возвратом результата в виде pandas DataFrame.

Сигнатура:

def select_to_df(sql):

Параметры: - sql - SQL-запрос (строка или объект PyPika Query)

Возвращает: pandas DataFrame

Примеры использования:

from planiqum.core.libs.db import select_to_df

# Получение данных в DataFrame
df = select_to_df("SELECT id, name, created_at FROM users WHERE active = true")

# Работа с DataFrame
print(df.head())
print(df.describe())

Настройки

MAX_DEBUG_QUERY_LENGTH

Файл: src/planiqum/core/settings.py

Назначение: Максимальная длина SQL-запроса для включения в debug-логи.

По умолчанию: 5000 символов

Настройка через переменную окружения:

export MAX_DEBUG_QUERY_LENGTH=8000

Использование: - Запросы длиннее этого значения усекаются во всех логах (INFO, DEBUG, ERROR) - Текст запроса во всех логах ограничен этой настройкой - Уникальный query_id позволяет найти все логи конкретного запроса

EXPLAIN_QUERIES

Файл: src/planiqum/core/settings.py

Назначение: Включение вывода плана выполнения SQL-запросов для отладки производительности.

По умолчанию: False (отключен)

Настройка через переменную окружения:

export EXPLAIN_QUERIES=true

Использование: - При True выполняется EXPLAIN перед каждым запросом - Результат плана выполнения выводится в debug лог - Помогает анализировать производительность и использование индексов - Не влияет на основную работу приложения

Пример лога с EXPLAIN:

execute_query: query_id: abc12345 EXPLAIN результат:
  ('Seq Scan on users', 'cost=0.00..18334.00 rows=1000000 width=36')
  ('Filter: (age > 25)', 'cost=0.00..18334.00 rows=333333 width=36')

Мониторинг и отладка

Структура логов execute_query

Все логи execute_query имеют единообразный формат с префиксом execute_query: query_id: {id}:

Основные логи (INFO/ERROR):

execute_query: query_id: abc12345 START
  query_length: 1061
  query_type: SELECT
  query: SELECT * FROM users WHERE active = true

execute_query: query_id: abc12345 SUCCESS
  query_length: 1061
  query_type: SELECT
  execution_time: 0.123s
  rows_returned: 42
  query: SELECT * FROM users WHERE active = true

execute_query: query_id: abc12345 ERROR
  query_length: 1061
  query_type: SELECT
  execution_time: 0.123s
  error_type: Exception
  error_message: Table 'users' doesn't exist
  query: SELECT * FROM users WHERE active = true

Примечание: Поле query во всех логах ограничено настройкой MAX_DEBUG_QUERY_LENGTH (по умолчанию 5000 символов). Длинные запросы усекаются с добавлением "...".

Детальные логи (DEBUG):

execute_query: query_id: abc12345 создан курсор
execute_query: query_id: abc12345 created_savepoint sp_001
execute_query: query_id: abc12345 committed_savepoint sp_001
execute_query: query_id: abc12345 Результаты запроса конвертированы, на конвертацию затрачено: 0.001s

EXPLAIN логи (при включенной настройке):

execute_query: query_id: abc12345 EXPLAIN результат:
  ('Seq Scan on users', 'cost=0.00..18334.00 rows=1000000 width=36')
  ('Filter: (active = true)', 'cost=0.00..18334.00 rows=500000 width=36')

Фильтрация в GrayLog

Все логи конкретного запроса:

message:"query_id: abc12345"

По типу операции:

# Старты запросов
message:"START" AND message:"query_id:"

# Успехи запросов  
message:"SUCCESS" AND message:"query_id:"

# Ошибки запросов
message:"ERROR" AND message:"query_id:"

# EXPLAIN результаты
message:"EXPLAIN результат:" AND message:"query_id:"

# Операции с курсорами
message:"курсор" AND message:"query_id:"

# Операции с savepoint
message:"savepoint" AND message:"query_id:"

По производительности:

# Медленные запросы (>1 секунды)
message:"execution_time:" AND message:{"$gt": "1.000s"} AND message:"query_id:"

# Большие запросы (>5000 символов)
message:"query_length:" AND message:{"$gt": 5000} AND message:"query_id:"

# Медленная конвертация (>0.1 секунды)
message:"конвертированы" AND message:"затрачено" AND message:{"$gt": "0.100s"}

Полная трассировка запроса

Пример полного жизненного цикла запроса:

  1. execute_query: query_id: abc12345 START - начало выполнения
  2. execute_query: query_id: abc12345 создан курсор - создание нового курсора
  3. execute_query: query_id: abc12345 EXPLAIN результат: - план выполнения (если включен)
  4. execute_query: query_id: abc12345 created_savepoint sp_001 - создание savepoint в транзакции
  5. execute_query: query_id: abc12345 committed_savepoint sp_001 - коммит savepoint
  6. execute_query: query_id: abc12345 Результаты запроса конвертированы, на конвертацию затрачено: 0.001s - конвертация данных
  7. execute_query: query_id: abc12345 SUCCESS - успешное завершение

Рекомендации по использованию

Выбор функции

Используйте execute_query для: - SELECT-запросов с возвратом данных - Запросов, требующих детального логирования - Работы в транзакциях с автоматическим управлением ошибками

Используйте execute_sql для: - DDL-операций (CREATE, ALTER, DROP) в legacy коде - Совместимости с существующим кодом - Операций, требующих внешний курсор

⚠️ Рекомендация: Для новых запросов предпочитайте execute_query() из-за лучшего логирования и управления транзакциями.

Используйте select_to_df для: - Аналитических запросов - Работы с данными в pandas - Экспорта данных

Работа с транзакциями

Автоматическое управление:

# execute_query автоматически создает savepoint в транзакции
with transaction.atomic():
    result = execute_query("SELECT * FROM users")
    # При ошибке откатится только к savepoint, не сломав транзакцию

Ручное управление:

# Для execute_sql нужно управлять транзакциями вручную
with transaction.atomic():
    execute_sql("INSERT INTO users (name) VALUES ('John')")
    execute_sql("UPDATE users SET active = true WHERE name = 'John')")

# Рекомендуется использовать execute_query для новых запросов
with transaction.atomic():
    execute_query("INSERT INTO users (name) VALUES ('John')")
    execute_query("UPDATE users SET active = true WHERE name = 'John')")

Логирование и отладка

Поиск в GrayLog: - По хэшу запроса: hash:a1b2c3d4 - По типу операции: Query execution started или Query execution failed - По времени выполнения: time: 0.001s - По статусу транзакции: in_transaction: True/False

Мониторинг производительности: - Время выполнения запросов логируется автоматически - Количество возвращаемых строк для анализа объема данных - Статус транзакции для понимания контекста выполнения

Временные таблицы

Для работы с временными таблицами в Planiqum используется библиотека planiqum.core.libs.temp_table, которая предоставляет автоматическое управление жизненным циклом таблиц через сборщик мусора Python.

Основные возможности

  • Автоматическая очистка - таблицы удаляются при уничтожении объекта Python через weakref.finalize
  • Контекстный менеджер - гарантированная очистка при выходе из блока with
  • Межпроцессная видимость - таблицы создаются в БД и доступны из разных процессов
  • Настраиваемая схема - возможность выделить отдельную схему для временных таблиц
  • UNLOGGED таблицы - поддержка UNLOGGED для производительности
  • Интеграция с PyPika - наследует все возможности PyPika Table для построения запросов

TempTable.create()

Файл: src/planiqum/core/libs/temp_table.py

Назначение: Создаёт временную таблицу в БД с автоматическим управлением жизненным циклом.

Сигнатура:

@classmethod
def create(cls, query: Union[str, QueryBuilder],
           name: Optional[str] = None,
           schema: Optional[str] = None,
           unlogged: Optional[bool] = None,
           auto_cleanup: Optional[bool] = None,
           create_indexes: Optional[list] = None) -> 'TempTable'

Параметры: - query - SQL-запрос или объект PyPika Query для создания таблицы (CREATE TABLE AS ...) - name - имя таблицы (по умолчанию: None - генерируется автоматически в формате tmp_{uuid}) - schema - схема таблицы (по умолчанию: None - используется TEMP_TABLES_SCHEMA из settings, по умолчанию public) - unlogged - использовать UNLOGGED таблицу (по умолчанию: None - используется TEMP_TABLES_UNLOGGED из settings, по умолчанию True) - auto_cleanup - автоматически удалять при уничтожении объекта (по умолчанию: None - используется TEMP_TABLES_AUTO_CLEANUP из settings, по умолчанию True) - create_indexes - список полей для создания индексов (по умолчанию: None - индексы не создаются). Может быть списком строк или кортежей (field, unique)

Возвращает: Объект TempTable (наследует PyPika Table)

Примеры использования

1. Простое создание с автоматической очисткой:

from planiqum.core.libs.temp_table import TempTable
from planiqum.core.libs.db import execute_query

# Создаём временную таблицу
temp = TempTable.create("SELECT * FROM source_table WHERE active = true")

# Используем в запросах (TempTable наследует PyPika Table)
from pypika import PostgreSQLQuery as Query
query = Query.from_(temp).select('*')
result = execute_query(query)

# Таблица автоматически удалится при выходе temp из области видимости
# или при явном вызове сборщика мусора

2. Контекстный менеджер (гарантированная очистка):

from planiqum.core.libs.temp_table import temp_table

# Использование через функцию-хелпер
with temp_table("SELECT * FROM big_table WHERE status = 'active'") as t:
    count = t.count()
    result = execute_query(f"SELECT * FROM {t.full_name} LIMIT 100")
    # Таблица удалится автоматически при выходе из блока

3. Создание в отдельной схеме:

temp = TempTable.create(
    "SELECT * FROM source",
    schema="temp_tables"  # Создастся в схеме temp_tables
)

4. С индексами:

temp = TempTable.create(
    "SELECT id, name, value FROM source",
    create_indexes=[
        ('id', True),      # Уникальный индекс на id
        'name'             # Обычный индекс на name
    ]
)

5. Явное управление жизненным циклом:

temp = TempTable.create("SELECT * FROM source")

try:
    # Работа с таблицей
    process_data(temp.full_name)
finally:
    # Явное удаление (отключает автоочистку)
    temp.drop()

6. Сохранение таблицы (отмена автоудаления):

temp = TempTable.create("SELECT * FROM calculation_results")

# Решили сохранить результат
temp.persist('saved_results', new_schema='public')
# Теперь таблица переименована и не будет удалена автоматически

Методы TempTable

drop() - Явно удаляет таблицу из БД и отключает автоочистку

keep() - Отменяет автоматическое удаление, таблица останется в БД

persist(new_name, new_schema=None) - Переименовывает таблицу и отменяет автоудаление

count() - Возвращает количество строк в таблице

analyze() - Выполняет ANALYZE для таблицы

full_name - Свойство, возвращающее полное имя таблицы (схема.таблица)

Настройки

Библиотека использует следующие настройки Django (в project/settings.py):

# Схема для временных таблиц (None = использовать public)
TEMP_TABLES_SCHEMA = env("TEMP_TABLES_SCHEMA", default=None)

# Префикс имён временных таблиц
TEMP_TABLES_PREFIX = env("TEMP_TABLES_PREFIX", default='tmp_')

# Использовать UNLOGGED таблицы (быстрее, но не логируются)
TEMP_TABLES_UNLOGGED = env("TEMP_TABLES_UNLOGGED", default=True)

# Автоматическая очистка через сборщик мусора
TEMP_TABLES_AUTO_CLEANUP = env("TEMP_TABLES_AUTO_CLEANUP", default=True)

Значения по умолчанию

В settings.py настройки заданы со следующими значениями по умолчанию:

  • TEMP_TABLES_SCHEMA = None → таблицы создаются в схеме public
  • TEMP_TABLES_PREFIX = 'tmp_' → префикс имён таблиц
  • TEMP_TABLES_UNLOGGED = True → создаются UNLOGGED таблицы (быстрее, но не логируются в WAL)
  • TEMP_TABLES_AUTO_CLEANUP = True → автоматическая очистка через сборщик мусора включена

Пример результирующих значений при создании таблицы:

# При использовании настроек по умолчанию из settings.py
temp = TempTable.create("SELECT * FROM source")
# Будет создана таблица:
# - Имя: tmp_{8_символов_uuid} (например: tmp_a1b2c3d4)
# - Схема: public
# - Тип: UNLOGGED TABLE
# - Автоочистка: включена (удалится при GC)

Особенности работы

1. Автоматическая очистка: - При создании таблицы с auto_cleanup=True регистрируется финализатор через weakref.finalize - При уничтожении объекта Python вызывается _cleanup_table(), который удаляет таблицу из БД - Работает даже если объект не был явно удалён (del temp)

2. UNLOGGED таблицы: - По умолчанию создаются UNLOGGED таблицы для производительности - Не логируются в WAL, быстрее создаются и записываются - Автоматически удаляются при перезапуске PostgreSQL

3. Схемы: - Можно создать отдельную схему для временных таблиц - При отсутствии прав на создание схемы библиотека автоматически использует public - Если права отсутствуют, логируется подробное сообщение для администратора

4. Имена таблиц: - По умолчанию генерируются уникальные имена с префиксом и UUID - Можно указать своё имя через параметр name - Формат: {schema}.{prefix}{uuid} или {prefix}{uuid} если схема не указана

Обработка ошибок

Отсутствие прав на создание схемы: Если у пользователя БД нет прав на создание схемы, библиотека: 1. Логирует подробное сообщение с контекстом и инструкциями для администратора 2. Автоматически использует схему public если включён fallback_to_public 3. Возвращает ошибку только если raise_on_error=True

Пример сообщения об ошибке:

ADMIN_ACTION_REQUIRED: Нет прав на создание схемы temp_tables
Контекст: Создание временной таблицы для промежуточных вычислений
Пользователь БД: planiqum_user
Требуемые права: CREATEDB (для создания схемы) или CREATE (в существующей схеме)
SQL fix: ALTER USER planiqum_user CREATEDB;

Очистка забытых таблиц

Для очистки временных таблиц, которые остались в БД (например, после аварийного завершения процесса), используется функция cleanup_temp_tables():

from planiqum.core.libs.temp_table import cleanup_temp_tables

# Режим проверки (dry run)
tables = cleanup_temp_tables(max_age_hours=24, schema='temp_tables', dry_run=True)
print(f"Найдено таблиц для удаления: {len(tables)}")

# Реальное удаление
dropped = cleanup_temp_tables(max_age_hours=24, schema='temp_tables', dry_run=False)
print(f"Удалено таблиц: {len(dropped)}")

Интеграция с PyPika

TempTable наследует класс Table из PyPika, поэтому можно использовать все возможности PyPika для построения запросов:

from pypika import PostgreSQLQuery as Query, functions as fn

temp = TempTable.create("SELECT id, value FROM source")

# Использование в запросах PyPika
query = (
    Query.from_(temp)
    .select(fn.Sum(temp.value).as_('total'))
    .groupby(temp.id)
)
result = execute_query(query)

Отличия от старого механизма

Библиотека полностью независима от старого механизма временных таблиц: - Не использует @pq_script декоратор - Не использует UniqName singleton - Генерирует уникальные имена через UUID - Использует weakref.finalize вместо __del__ - Поддерживает отдельную схему для временных таблиц

Тестирование

Для тестирования функций работы с базой данных используются тесты в src/planiqum/core/tests/libs/db/:

  • test_execute_query.py - тесты для execute_query
  • test_get_indexes_query.py - тесты для функций работы с индексами
  • test_temp_table.py - тесты для временных таблиц

Пример теста:

import pytest
from planiqum.core.libs.db import execute_query

@pytest.mark.django_db
def test_execute_query_basic():
    result = execute_query("SELECT NOW() as current_time")
    assert len(result) == 1
    assert 'current_time' in result[0]

Управление структурой таблиц

Для декларативного управления структурой таблиц (создание, обновление, синхронизация) используется класс Table из модуля planiqum.core.libs.table.

Основные возможности: - Загрузка структуры существующей таблицы из БД - Создание новой структуры таблицы программно - Создание таблицы в базе данных по описанию - Обновление структуры существующей таблицы - Автоматическая синхронизация структуры

Пример использования:

from planiqum.core.libs.table import Table, ColumnDefinition, IndexDefinition

# Описываем структуру
table = Table.new("users", columns=[
    ColumnDefinition("id", "SERIAL", nullable=False),
    ColumnDefinition("name", "VARCHAR(255)", nullable=False),
], indexes=[
    IndexDefinition("pk_users", ["id"], primary=True),
])

# Создаем или синхронизируем таблицу
table.create_or_update()

Подробная документация: Управление структурой таблиц

Связанные темы


Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию ЮНИК СОФТ