Работа с базой данных ¶
В Planiqum для работы с базой данных используется библиотека planiqum.core.libs.db, которая предоставляет удобные функции для выполнения SQL-запросов с автоматическим логированием и управлением транзакциями.
Основные функции ¶
execute_query ¶
Файл: src/planiqum/core/libs/db.py
Назначение: Основная функция для выполнения SQL-запросов с детальным логированием и автоматическим управлением транзакциями.
Сигнатура:
def execute_query(query, to_dict: bool = True, log_context: dict = None) -> list:
Параметры:
- query - SQL-запрос (строка или объект PyPika Query)
- to_dict - если True, возвращает список словарей; если False, возвращает список списков
- log_context - дополнительный контекст для логирования (опционально)
Возвращает: Список результатов запроса
Особенности работы ¶
1. Автоматическое управление транзакциями:
- Если функция вызывается вне транзакции (
@transaction.atomic), работает как обычно - Если функция вызывается внутри транзакции, автоматически создает savepoint для безопасного отката при ошибках
2. Детальное логирование:
- INFO - начало выполнения запроса с уникальным
query_id, длиной запроса, типом и preview - INFO - успешное завершение с временем выполнения и количеством строк
- ERROR - ошибка выполнения с типом ошибки и временем выполнения
- DEBUG - создание/использование курсоров, операции с savepoint, конвертация данных
- DEBUG - план выполнения запроса (EXPLAIN) при включенной настройке
EXPLAIN_QUERIES
3. Трассировка запросов:
- Каждый запрос получает уникальный
query_idдля полной трассировки - Все логи содержат префикс
execute_query: query_id: {id}для удобного поиска - Время конвертации данных измеряется и логируется отдельно
- Полная трассировка жизненного цикла запроса от начала до завершения
4. Усечение длинных запросов:
- Запросы длиннее
MAX_DEBUG_QUERY_LENGTH(по умолчанию 5000 символов) усекаются во всех логах - Текст запроса во всех логах (INFO, DEBUG, ERROR) ограничен этой настройкой
- Очистка запроса от лишних символов для читаемого логирования
5. Конвертация типов данных:
datetimeиdate→ строки в форматеYYYY-MM-DDDecimal→floatfloatс бесконечными значениями →None
Примеры использования ¶
Простой SELECT запрос:
from planiqum.core.libs.db import execute_query
from pypika import PostgreSQLQuery as Query, functions as fn
# Запрос с PyPika
query = Query.select('id', 'name').from_('users').where('active = true')
results = execute_query(query)
# Результат: [{'id': 1, 'name': 'John'}, {'id': 2, 'name': 'Jane'}]
Строковый запрос:
# Прямой SQL
results = execute_query("SELECT COUNT(*) as total FROM users")
# Результат: [{'total': 150}]
Возврат списков вместо словарей:
results = execute_query(query, to_dict=False)
# Результат: [[1, 'John'], [2, 'Jane']]
Работа в транзакции:
from django.db import transaction
with transaction.atomic():
# Успешный запрос
result1 = execute_query("SELECT NOW() as time1")
try:
# Запрос с ошибкой - откатится только к savepoint
execute_query("SELECT invalid_column FROM non_existent_table")
except Exception:
# Ошибка обработана, основная транзакция не сломалась
pass
# Этот запрос выполнится успешно
result3 = execute_query("SELECT NOW() as time3")
Логирование ¶
Примеры логов:
Начало выполнения:
INFO: Query execution started - hash: a1b2c3d4, length: 45, truncated: False, in_transaction: True, text: SELECT NOW() "current_time"
Успешное завершение:
INFO: Query executed successfully - hash: a1b2c3d4, time: 0.001s, rows: 1
Ошибка выполнения:
ERROR: Query execution failed - hash: a1b2c3d4, error: ProgrammingError: relation "non_existent_table" does not exist, time: 0.001s, in_transaction: True
Поиск в GrayLog:
- По хэшу: hash:a1b2c3d4
- По типу операции: Query execution started
- По статусу транзакции: in_transaction: True
execute_sql ¶
Назначение: Выполнение SQL-запросов без возврата результатов (INSERT, UPDATE, DELETE, DDL).
⚠️ Устаревшая функция: Для новых запросов рекомендуется использовать
execute_query(), которая обеспечивает детальное логирование и автоматическое управление транзакциями.execute_sql()используется в основном для DDL операций и совместимости с legacy кодом.
Сигнатура:
def execute_sql(sql, params=None, cursor=None):
Параметры:
- sql - SQL-запрос (строка)
- params - параметры запроса для безопасных запросов (опционально)
- cursor - курсор базы данных (опционально)
Примеры использования:
from planiqum.core.libs.db import execute_sql
# Создание таблицы
execute_sql("""
CREATE TABLE test_table (
id SERIAL PRIMARY KEY,
name VARCHAR(255)
)
""")
# Вставка данных с параметрами
execute_sql("INSERT INTO test_table (name) VALUES (%s)", ['test'])
# Обновление данных
execute_sql("UPDATE test_table SET name = 'updated' WHERE id = 1")
select_to_df ¶
Назначение: Выполнение SELECT-запроса с возвратом результата в виде pandas DataFrame.
Сигнатура:
def select_to_df(sql):
Параметры:
- sql - SQL-запрос (строка или объект PyPika Query)
Возвращает: pandas DataFrame
Примеры использования:
from planiqum.core.libs.db import select_to_df
# Получение данных в DataFrame
df = select_to_df("SELECT id, name, created_at FROM users WHERE active = true")
# Работа с DataFrame
print(df.head())
print(df.describe())
Настройки ¶
MAX_DEBUG_QUERY_LENGTH ¶
Файл: src/planiqum/core/settings.py
Назначение: Максимальная длина SQL-запроса для включения в debug-логи.
По умолчанию: 5000 символов
Настройка через переменную окружения:
export MAX_DEBUG_QUERY_LENGTH=8000
Использование:
- Запросы длиннее этого значения усекаются во всех логах (INFO, DEBUG, ERROR)
- Текст запроса во всех логах ограничен этой настройкой
- Уникальный query_id позволяет найти все логи конкретного запроса
EXPLAIN_QUERIES ¶
Файл: src/planiqum/core/settings.py
Назначение: Включение вывода плана выполнения SQL-запросов для отладки производительности.
По умолчанию: False (отключен)
Настройка через переменную окружения:
export EXPLAIN_QUERIES=true
Использование:
- При True выполняется EXPLAIN перед каждым запросом
- Результат плана выполнения выводится в debug лог
- Помогает анализировать производительность и использование индексов
- Не влияет на основную работу приложения
Пример лога с EXPLAIN:
execute_query: query_id: abc12345 EXPLAIN результат:
('Seq Scan on users', 'cost=0.00..18334.00 rows=1000000 width=36')
('Filter: (age > 25)', 'cost=0.00..18334.00 rows=333333 width=36')
Мониторинг и отладка ¶
Структура логов execute_query ¶
Все логи execute_query имеют единообразный формат с префиксом execute_query: query_id: {id}:
Основные логи (INFO/ERROR):
execute_query: query_id: abc12345 START
query_length: 1061
query_type: SELECT
query: SELECT * FROM users WHERE active = true
execute_query: query_id: abc12345 SUCCESS
query_length: 1061
query_type: SELECT
execution_time: 0.123s
rows_returned: 42
query: SELECT * FROM users WHERE active = true
execute_query: query_id: abc12345 ERROR
query_length: 1061
query_type: SELECT
execution_time: 0.123s
error_type: Exception
error_message: Table 'users' doesn't exist
query: SELECT * FROM users WHERE active = true
Примечание: Поле
queryво всех логах ограничено настройкойMAX_DEBUG_QUERY_LENGTH(по умолчанию 5000 символов). Длинные запросы усекаются с добавлением "...".
Детальные логи (DEBUG):
execute_query: query_id: abc12345 создан курсор
execute_query: query_id: abc12345 created_savepoint sp_001
execute_query: query_id: abc12345 committed_savepoint sp_001
execute_query: query_id: abc12345 Результаты запроса конвертированы, на конвертацию затрачено: 0.001s
EXPLAIN логи (при включенной настройке):
execute_query: query_id: abc12345 EXPLAIN результат:
('Seq Scan on users', 'cost=0.00..18334.00 rows=1000000 width=36')
('Filter: (active = true)', 'cost=0.00..18334.00 rows=500000 width=36')
Фильтрация в GrayLog ¶
Все логи конкретного запроса:
message:"query_id: abc12345"
По типу операции:
# Старты запросов
message:"START" AND message:"query_id:"
# Успехи запросов
message:"SUCCESS" AND message:"query_id:"
# Ошибки запросов
message:"ERROR" AND message:"query_id:"
# EXPLAIN результаты
message:"EXPLAIN результат:" AND message:"query_id:"
# Операции с курсорами
message:"курсор" AND message:"query_id:"
# Операции с savepoint
message:"savepoint" AND message:"query_id:"
По производительности:
# Медленные запросы (>1 секунды)
message:"execution_time:" AND message:{"$gt": "1.000s"} AND message:"query_id:"
# Большие запросы (>5000 символов)
message:"query_length:" AND message:{"$gt": 5000} AND message:"query_id:"
# Медленная конвертация (>0.1 секунды)
message:"конвертированы" AND message:"затрачено" AND message:{"$gt": "0.100s"}
Полная трассировка запроса ¶
Пример полного жизненного цикла запроса:
execute_query: query_id: abc12345 START- начало выполненияexecute_query: query_id: abc12345 создан курсор- создание нового курсораexecute_query: query_id: abc12345 EXPLAIN результат:- план выполнения (если включен)execute_query: query_id: abc12345 created_savepoint sp_001- создание savepoint в транзакцииexecute_query: query_id: abc12345 committed_savepoint sp_001- коммит savepointexecute_query: query_id: abc12345 Результаты запроса конвертированы, на конвертацию затрачено: 0.001s- конвертация данныхexecute_query: query_id: abc12345 SUCCESS- успешное завершение
Рекомендации по использованию ¶
Выбор функции ¶
Используйте execute_query для:
- SELECT-запросов с возвратом данных
- Запросов, требующих детального логирования
- Работы в транзакциях с автоматическим управлением ошибками
Используйте execute_sql для:
- DDL-операций (CREATE, ALTER, DROP) в legacy коде
- Совместимости с существующим кодом
- Операций, требующих внешний курсор
⚠️ Рекомендация: Для новых запросов предпочитайте
execute_query()из-за лучшего логирования и управления транзакциями.
Используйте select_to_df для:
- Аналитических запросов
- Работы с данными в pandas
- Экспорта данных
Работа с транзакциями ¶
Автоматическое управление:
# execute_query автоматически создает savepoint в транзакции
with transaction.atomic():
result = execute_query("SELECT * FROM users")
# При ошибке откатится только к savepoint, не сломав транзакцию
Ручное управление:
# Для execute_sql нужно управлять транзакциями вручную
with transaction.atomic():
execute_sql("INSERT INTO users (name) VALUES ('John')")
execute_sql("UPDATE users SET active = true WHERE name = 'John')")
# Рекомендуется использовать execute_query для новых запросов
with transaction.atomic():
execute_query("INSERT INTO users (name) VALUES ('John')")
execute_query("UPDATE users SET active = true WHERE name = 'John')")
Логирование и отладка ¶
Поиск в GrayLog:
- По хэшу запроса: hash:a1b2c3d4
- По типу операции: Query execution started или Query execution failed
- По времени выполнения: time: 0.001s
- По статусу транзакции: in_transaction: True/False
Мониторинг производительности: - Время выполнения запросов логируется автоматически - Количество возвращаемых строк для анализа объема данных - Статус транзакции для понимания контекста выполнения
Временные таблицы ¶
Для работы с временными таблицами в Planiqum используется библиотека planiqum.core.libs.temp_table, которая предоставляет автоматическое управление жизненным циклом таблиц через сборщик мусора Python.
Основные возможности ¶
- Автоматическая очистка - таблицы удаляются при уничтожении объекта Python через
weakref.finalize - Контекстный менеджер - гарантированная очистка при выходе из блока
with - Межпроцессная видимость - таблицы создаются в БД и доступны из разных процессов
- Настраиваемая схема - возможность выделить отдельную схему для временных таблиц
- UNLOGGED таблицы - поддержка UNLOGGED для производительности
- Интеграция с PyPika - наследует все возможности PyPika Table для построения запросов
TempTable.create() ¶
Файл: src/planiqum/core/libs/temp_table.py
Назначение: Создаёт временную таблицу в БД с автоматическим управлением жизненным циклом.
Сигнатура:
@classmethod
def create(cls, query: Union[str, QueryBuilder],
name: Optional[str] = None,
schema: Optional[str] = None,
unlogged: Optional[bool] = None,
auto_cleanup: Optional[bool] = None,
create_indexes: Optional[list] = None) -> 'TempTable'
Параметры:
- query - SQL-запрос или объект PyPika Query для создания таблицы (CREATE TABLE AS ...)
- name - имя таблицы (по умолчанию: None - генерируется автоматически в формате tmp_{uuid})
- schema - схема таблицы (по умолчанию: None - используется TEMP_TABLES_SCHEMA из settings, по умолчанию public)
- unlogged - использовать UNLOGGED таблицу (по умолчанию: None - используется TEMP_TABLES_UNLOGGED из settings, по умолчанию True)
- auto_cleanup - автоматически удалять при уничтожении объекта (по умолчанию: None - используется TEMP_TABLES_AUTO_CLEANUP из settings, по умолчанию True)
- create_indexes - список полей для создания индексов (по умолчанию: None - индексы не создаются). Может быть списком строк или кортежей (field, unique)
Возвращает: Объект TempTable (наследует PyPika Table)
Примеры использования ¶
1. Простое создание с автоматической очисткой:
from planiqum.core.libs.temp_table import TempTable
from planiqum.core.libs.db import execute_query
# Создаём временную таблицу
temp = TempTable.create("SELECT * FROM source_table WHERE active = true")
# Используем в запросах (TempTable наследует PyPika Table)
from pypika import PostgreSQLQuery as Query
query = Query.from_(temp).select('*')
result = execute_query(query)
# Таблица автоматически удалится при выходе temp из области видимости
# или при явном вызове сборщика мусора
2. Контекстный менеджер (гарантированная очистка):
from planiqum.core.libs.temp_table import temp_table
# Использование через функцию-хелпер
with temp_table("SELECT * FROM big_table WHERE status = 'active'") as t:
count = t.count()
result = execute_query(f"SELECT * FROM {t.full_name} LIMIT 100")
# Таблица удалится автоматически при выходе из блока
3. Создание в отдельной схеме:
temp = TempTable.create(
"SELECT * FROM source",
schema="temp_tables" # Создастся в схеме temp_tables
)
4. С индексами:
temp = TempTable.create(
"SELECT id, name, value FROM source",
create_indexes=[
('id', True), # Уникальный индекс на id
'name' # Обычный индекс на name
]
)
5. Явное управление жизненным циклом:
temp = TempTable.create("SELECT * FROM source")
try:
# Работа с таблицей
process_data(temp.full_name)
finally:
# Явное удаление (отключает автоочистку)
temp.drop()
6. Сохранение таблицы (отмена автоудаления):
temp = TempTable.create("SELECT * FROM calculation_results")
# Решили сохранить результат
temp.persist('saved_results', new_schema='public')
# Теперь таблица переименована и не будет удалена автоматически
Методы TempTable ¶
drop() - Явно удаляет таблицу из БД и отключает автоочистку
keep() - Отменяет автоматическое удаление, таблица останется в БД
persist(new_name, new_schema=None) - Переименовывает таблицу и отменяет автоудаление
count() - Возвращает количество строк в таблице
analyze() - Выполняет ANALYZE для таблицы
full_name - Свойство, возвращающее полное имя таблицы (схема.таблица)
Настройки ¶
Библиотека использует следующие настройки Django (в project/settings.py):
# Схема для временных таблиц (None = использовать public)
TEMP_TABLES_SCHEMA = env("TEMP_TABLES_SCHEMA", default=None)
# Префикс имён временных таблиц
TEMP_TABLES_PREFIX = env("TEMP_TABLES_PREFIX", default='tmp_')
# Использовать UNLOGGED таблицы (быстрее, но не логируются)
TEMP_TABLES_UNLOGGED = env("TEMP_TABLES_UNLOGGED", default=True)
# Автоматическая очистка через сборщик мусора
TEMP_TABLES_AUTO_CLEANUP = env("TEMP_TABLES_AUTO_CLEANUP", default=True)
Значения по умолчанию ¶
В settings.py настройки заданы со следующими значениями по умолчанию:
TEMP_TABLES_SCHEMA=None→ таблицы создаются в схемеpublicTEMP_TABLES_PREFIX='tmp_'→ префикс имён таблицTEMP_TABLES_UNLOGGED=True→ создаются UNLOGGED таблицы (быстрее, но не логируются в WAL)TEMP_TABLES_AUTO_CLEANUP=True→ автоматическая очистка через сборщик мусора включена
Пример результирующих значений при создании таблицы:
# При использовании настроек по умолчанию из settings.py
temp = TempTable.create("SELECT * FROM source")
# Будет создана таблица:
# - Имя: tmp_{8_символов_uuid} (например: tmp_a1b2c3d4)
# - Схема: public
# - Тип: UNLOGGED TABLE
# - Автоочистка: включена (удалится при GC)
Особенности работы ¶
1. Автоматическая очистка:
- При создании таблицы с auto_cleanup=True регистрируется финализатор через weakref.finalize
- При уничтожении объекта Python вызывается _cleanup_table(), который удаляет таблицу из БД
- Работает даже если объект не был явно удалён (del temp)
2. UNLOGGED таблицы: - По умолчанию создаются UNLOGGED таблицы для производительности - Не логируются в WAL, быстрее создаются и записываются - Автоматически удаляются при перезапуске PostgreSQL
3. Схемы:
- Можно создать отдельную схему для временных таблиц
- При отсутствии прав на создание схемы библиотека автоматически использует public
- Если права отсутствуют, логируется подробное сообщение для администратора
4. Имена таблиц:
- По умолчанию генерируются уникальные имена с префиксом и UUID
- Можно указать своё имя через параметр name
- Формат: {schema}.{prefix}{uuid} или {prefix}{uuid} если схема не указана
Обработка ошибок ¶
Отсутствие прав на создание схемы:
Если у пользователя БД нет прав на создание схемы, библиотека:
1. Логирует подробное сообщение с контекстом и инструкциями для администратора
2. Автоматически использует схему public если включён fallback_to_public
3. Возвращает ошибку только если raise_on_error=True
Пример сообщения об ошибке:
ADMIN_ACTION_REQUIRED: Нет прав на создание схемы temp_tables
Контекст: Создание временной таблицы для промежуточных вычислений
Пользователь БД: planiqum_user
Требуемые права: CREATEDB (для создания схемы) или CREATE (в существующей схеме)
SQL fix: ALTER USER planiqum_user CREATEDB;
Очистка забытых таблиц ¶
Для очистки временных таблиц, которые остались в БД (например, после аварийного завершения процесса), используется функция cleanup_temp_tables():
from planiqum.core.libs.temp_table import cleanup_temp_tables
# Режим проверки (dry run)
tables = cleanup_temp_tables(max_age_hours=24, schema='temp_tables', dry_run=True)
print(f"Найдено таблиц для удаления: {len(tables)}")
# Реальное удаление
dropped = cleanup_temp_tables(max_age_hours=24, schema='temp_tables', dry_run=False)
print(f"Удалено таблиц: {len(dropped)}")
Интеграция с PyPika ¶
TempTable наследует класс Table из PyPika, поэтому можно использовать все возможности PyPika для построения запросов:
from pypika import PostgreSQLQuery as Query, functions as fn
temp = TempTable.create("SELECT id, value FROM source")
# Использование в запросах PyPika
query = (
Query.from_(temp)
.select(fn.Sum(temp.value).as_('total'))
.groupby(temp.id)
)
result = execute_query(query)
Отличия от старого механизма ¶
Библиотека полностью независима от старого механизма временных таблиц:
- Не использует @pq_script декоратор
- Не использует UniqName singleton
- Генерирует уникальные имена через UUID
- Использует weakref.finalize вместо __del__
- Поддерживает отдельную схему для временных таблиц
Тестирование ¶
Для тестирования функций работы с базой данных используются тесты в src/planiqum/core/tests/libs/db/:
test_execute_query.py- тесты дляexecute_querytest_get_indexes_query.py- тесты для функций работы с индексамиtest_temp_table.py- тесты для временных таблиц
Пример теста:
import pytest
from planiqum.core.libs.db import execute_query
@pytest.mark.django_db
def test_execute_query_basic():
result = execute_query("SELECT NOW() as current_time")
assert len(result) == 1
assert 'current_time' in result[0]
Управление структурой таблиц ¶
Для декларативного управления структурой таблиц (создание, обновление, синхронизация) используется класс Table из модуля planiqum.core.libs.table.
Основные возможности: - Загрузка структуры существующей таблицы из БД - Создание новой структуры таблицы программно - Создание таблицы в базе данных по описанию - Обновление структуры существующей таблицы - Автоматическая синхронизация структуры
Пример использования:
from planiqum.core.libs.table import Table, ColumnDefinition, IndexDefinition
# Описываем структуру
table = Table.new("users", columns=[
ColumnDefinition("id", "SERIAL", nullable=False),
ColumnDefinition("name", "VARCHAR(255)", nullable=False),
], indexes=[
IndexDefinition("pk_users", ["id"], primary=True),
])
# Создаем или синхронизируем таблицу
table.create_or_update()
Подробная документация: Управление структурой таблиц
Связанные темы ¶
- Управление структурой таблиц - класс Table для создания и синхронизации таблиц
- Синхронизация параметров - использование
execute_sqlв синхронизации - Вычисляемые меры - работа с VIEW и материализованными представлениями
- Импорт данных параметров - массовые операции с данными
Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию
ЮНИК СОФТ