Архитектура Direct Pipeline¶
Обзор¶
Direct Pipeline представляет собой новую архитектуру для работы с иерархическими данными через специализированные dim-direct и dim-full таблицы.
Структура dim_*_direct таблиц¶
Назначение¶
Таблицы dim_{level_key}_direct хранят элементы иерархии с прямыми родительскими связями и метаданными для синхронизации.
Базовые поля¶
Все dim_*_direct таблицы содержат следующие обязательные поля:
| Поле | Тип | Описание |
|---|---|---|
id |
BIGINT PRIMARY KEY | Уникальный идентификатор (использует последовательность core_hierarchy_item_id_seq) |
shortname |
VARCHAR(256) NOT NULL | Краткое имя элемента (уникальное) |
description |
VARCHAR(512) | Описание элемента |
is_active |
BOOLEAN DEFAULT TRUE | Активность элемента |
is_new |
BOOLEAN DEFAULT TRUE | Флаг новых элементов |
is_deleted |
BOOLEAN DEFAULT FALSE | Флаг удалённых элементов (для каскадного удаления) |
needs_attr_sync |
BOOLEAN DEFAULT FALSE | Требуется синхронизация атрибутов в dim_*_full |
needs_rel_sync |
BOOLEAN DEFAULT FALSE | Требуется синхронизация связей в dim_*_full |
Флаги синхронизации (⚠️ временные)¶
Эти поля используются для временной синхронизации с Item и будут удалены после полного перехода на Direct Pipeline:
| Поле | Тип | Назначение |
|---|---|---|
update_item |
BOOLEAN DEFAULT FALSE | Требуется синхронизация базовых полей в core_hierarchy_item |
update_parents |
BOOLEAN DEFAULT FALSE | Требуется синхронизация родительских связей в core_hierarchy_item_parents |
Календарные поля (для is_calendar=True)¶
Для календарных уровней (level.is_calendar = True) добавляются дополнительные поля:
| Поле | Тип | Описание |
|---|---|---|
start_date |
DATE | Дата начала периода |
end_date |
DATE | Дата окончания периода |
num |
INTEGER | Порядковый номер периода |
update_horizon |
BOOLEAN DEFAULT FALSE | ⚠️ Временный: требуется синхронизация с core_hierarchy_horizonitem |
Поля родителей¶
Для каждого прямого родительского уровня создаётся отдельное поле:
dim_{parent_level_key} BIGINT
Пример: Если у уровня product есть родители brand и category, то в таблице dim_product_direct будут поля:
- dim_brand BIGINT
- dim_category BIGINT
Индексы¶
Частичные индексы для флагов синхронизации¶
Для оптимизации запросов синхронизации создаются частичные индексы:
-- Индексы для синхронизации dim_*_full
CREATE INDEX idx_dim_{level_key}_direct_needs_attr_sync
ON dim_{level_key}_direct(needs_attr_sync)
WHERE needs_attr_sync = TRUE;
CREATE INDEX idx_dim_{level_key}_direct_needs_rel_sync
ON dim_{level_key}_direct(needs_rel_sync)
WHERE needs_rel_sync = TRUE;
CREATE INDEX idx_dim_{level_key}_direct_is_deleted
ON dim_{level_key}_direct(is_deleted)
WHERE is_deleted = TRUE;
-- Временные индексы для синхронизации с Item (будут удалены после отказа от Item)
CREATE INDEX idx_dim_{level_key}_direct_update_item
ON dim_{level_key}_direct(update_item)
WHERE update_item = TRUE;
CREATE INDEX idx_dim_{level_key}_direct_update_parents
ON dim_{level_key}_direct(update_parents)
WHERE update_parents = TRUE;
-- Для календарных уровней
CREATE INDEX idx_dim_{level_key}_direct_update_horizon
ON dim_{level_key}_direct(update_horizon)
WHERE update_horizon = TRUE;
Индексы для полей родителей¶
Для каждого поля родителя создаётся индекс:
CREATE INDEX idx_dim_{level_key}_direct_{parent_key}
ON dim_{level_key}_direct(dim_{parent_key});
UNIQUE constraint¶
На поле shortname устанавливается уникальное ограничение:
CREATE UNIQUE INDEX unique_dim_{level_key}_direct_shortname
ON dim_{level_key}_direct(shortname);
Класс DimDirectTableBuilder¶
Расположение¶
/src/planiqum/core/hierarchy/libs/direct/table_builder.py: DimDirectTableBuilder
Основные методы¶
create_table(level: Level) -> str¶
Создаёт новую dim_*_direct таблицу для указанного уровня.
Процесс:
1. Проверка существования таблицы
2. Определение списка родительских уровней через level.get_children()
3. Формирование SQL для создания таблицы со всеми полями
4. Создание индексов (частичных для флагов, обычных для родителей)
5. Установка DEFAULT значения для id (использует последовательность Item)
Пример:
from planiqum.core.hierarchy.libs.direct import DimDirectTableBuilder
builder = DimDirectTableBuilder()
table_name = builder.create_table(product_level)
# Результат: 'dim_product_direct'
sync_structure(level: Level) -> dict¶
Синхронизирует структуру существующей таблицы с текущей структурой уровня.
Что делает: - Добавляет колонки для новых родителей - Удаляет колонки для удалённых родителей - Добавляет индексы для новых колонок
Возвращает:
{
'added': ['dim_category', 'dim_subcategory'], # Добавленные колонки
'removed': ['dim_brand'] # Удалённые колонки
}
get_table_name(level: Level) -> str¶
Возвращает имя таблицы для уровня.
table_name = builder.get_table_name(product_level)
# Результат: 'dim_product_direct'
get_field_name(parent_level: Level) -> str¶
Возвращает имя поля для родительского уровня.
field_name = builder.get_field_name(brand_level)
# Результат: 'dim_brand'
Управление последовательностями ID¶
Общая последовательность¶
⚠️ Временное решение: Все dim_*_direct таблицы используют одну и ту же последовательность core_hierarchy_item_id_seq, что и модель Item.
Причина: Обеспечение совпадения ID между dim_*_direct таблицами и Item таблицей в период миграции.
Реализация:
# В DimDirectTableBuilder.create_table()
item_sequence = Item._meta.get_field('id').db_default.name
# Результат: 'core_hierarchy_item_id_seq'
# Создание поля id
f"id BIGINT PRIMARY KEY DEFAULT nextval('{item_sequence}'::regclass)"
Получение последовательности¶
def _get_item_sequence_name(self) -> str:
"""Получает имя последовательности для ID из модели Item."""
id_field = Item._meta.get_field('id')
if hasattr(id_field, 'db_default'):
sequence_name = id_field.db_default.name
if sequence_name:
return sequence_name
raise ValueError("Не удалось получить имя последовательности из Item.id")
Синхронизация последовательности¶
При вызове sync_structure проверяется и при необходимости обновляется DEFAULT значение для колонки id:
def _ensure_item_sequence_default(self, table_name: str):
"""Устанавливает DEFAULT для id используя последовательность Item."""
item_sequence = self._get_item_sequence_name()
alter_sql = f"""
ALTER TABLE {table_name}
ALTER COLUMN id SET DEFAULT nextval('{item_sequence}'::regclass)
"""
execute_query(alter_sql)
Пример создания таблицы¶
Для простого уровня¶
from planiqum.core.hierarchy.models import Level
from planiqum.core.hierarchy.libs.direct import DimDirectTableBuilder
# Получаем уровень
product = Level.objects.get(key='product')
# Создаём таблицу
builder = DimDirectTableBuilder()
table_name = builder.create_table(product)
# Проверяем структуру
columns = builder.get_table_columns(product)
# ['id', 'shortname', 'description', 'is_active', 'is_new',
# 'needs_sync', 'update_item', 'update_parents', 'dim_brand', 'dim_category']
Для календарного уровня¶
week = Level.objects.get(key='week')
table_name = builder.create_table(week)
columns = builder.get_table_columns(week)
# ['id', 'shortname', 'description', 'is_active', 'is_new',
# 'needs_sync', 'update_item', 'update_parents',
# 'start_date', 'end_date', 'num', 'update_horizon']
Замечания по миграции¶
-
Временные поля: Поля
update_item,update_parents,update_horizonбудут удалены после полного отказа от моделиItem -
Общая последовательность: Использование одной последовательности для всех таблиц — временное решение
-
Обратная совместимость: Структура поддерживает работу как с новым (dim), так и со старым (Item) механизмом
См. также¶
- Direct Pipeline: Руководство для разработчиков
- Импорт данных
- Синхронизация Item (временная)
- Синхронизация dim_*_full
Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию
ЮНИК СОФТ