Перейти к содержанию

Архитектура Direct Pipeline

Обзор

Direct Pipeline представляет собой новую архитектуру для работы с иерархическими данными через специализированные dim-direct и dim-full таблицы.

Структура dim_*_direct таблиц

Назначение

Таблицы dim_{level_key}_direct хранят элементы иерархии с прямыми родительскими связями и метаданными для синхронизации.

Базовые поля

Все dim_*_direct таблицы содержат следующие обязательные поля:

Поле Тип Описание
id BIGINT PRIMARY KEY Уникальный идентификатор (использует последовательность core_hierarchy_item_id_seq)
shortname VARCHAR(256) NOT NULL Краткое имя элемента (уникальное)
description VARCHAR(512) Описание элемента
is_active BOOLEAN DEFAULT TRUE Активность элемента
is_new BOOLEAN DEFAULT TRUE Флаг новых элементов
is_deleted BOOLEAN DEFAULT FALSE Флаг удалённых элементов (для каскадного удаления)
needs_attr_sync BOOLEAN DEFAULT FALSE Требуется синхронизация атрибутов в dim_*_full
needs_rel_sync BOOLEAN DEFAULT FALSE Требуется синхронизация связей в dim_*_full

Флаги синхронизации (⚠️ временные)

Эти поля используются для временной синхронизации с Item и будут удалены после полного перехода на Direct Pipeline:

Поле Тип Назначение
update_item BOOLEAN DEFAULT FALSE Требуется синхронизация базовых полей в core_hierarchy_item
update_parents BOOLEAN DEFAULT FALSE Требуется синхронизация родительских связей в core_hierarchy_item_parents

Календарные поля (для is_calendar=True)

Для календарных уровней (level.is_calendar = True) добавляются дополнительные поля:

Поле Тип Описание
start_date DATE Дата начала периода
end_date DATE Дата окончания периода
num INTEGER Порядковый номер периода
update_horizon BOOLEAN DEFAULT FALSE ⚠️ Временный: требуется синхронизация с core_hierarchy_horizonitem

Поля родителей

Для каждого прямого родительского уровня создаётся отдельное поле:

dim_{parent_level_key} BIGINT

Пример: Если у уровня product есть родители brand и category, то в таблице dim_product_direct будут поля: - dim_brand BIGINT - dim_category BIGINT

Индексы

Частичные индексы для флагов синхронизации

Для оптимизации запросов синхронизации создаются частичные индексы:

-- Индексы для синхронизации dim_*_full
CREATE INDEX idx_dim_{level_key}_direct_needs_attr_sync 
ON dim_{level_key}_direct(needs_attr_sync) 
WHERE needs_attr_sync = TRUE;

CREATE INDEX idx_dim_{level_key}_direct_needs_rel_sync 
ON dim_{level_key}_direct(needs_rel_sync) 
WHERE needs_rel_sync = TRUE;

CREATE INDEX idx_dim_{level_key}_direct_is_deleted 
ON dim_{level_key}_direct(is_deleted) 
WHERE is_deleted = TRUE;

-- Временные индексы для синхронизации с Item (будут удалены после отказа от Item)
CREATE INDEX idx_dim_{level_key}_direct_update_item 
ON dim_{level_key}_direct(update_item) 
WHERE update_item = TRUE;

CREATE INDEX idx_dim_{level_key}_direct_update_parents 
ON dim_{level_key}_direct(update_parents) 
WHERE update_parents = TRUE;

-- Для календарных уровней
CREATE INDEX idx_dim_{level_key}_direct_update_horizon 
ON dim_{level_key}_direct(update_horizon) 
WHERE update_horizon = TRUE;

Индексы для полей родителей

Для каждого поля родителя создаётся индекс:

CREATE INDEX idx_dim_{level_key}_direct_{parent_key}
ON dim_{level_key}_direct(dim_{parent_key});

UNIQUE constraint

На поле shortname устанавливается уникальное ограничение:

CREATE UNIQUE INDEX unique_dim_{level_key}_direct_shortname
ON dim_{level_key}_direct(shortname);

Класс DimDirectTableBuilder

Расположение

/src/planiqum/core/hierarchy/libs/direct/table_builder.py: DimDirectTableBuilder

Основные методы

create_table(level: Level) -> str

Создаёт новую dim_*_direct таблицу для указанного уровня.

Процесс: 1. Проверка существования таблицы 2. Определение списка родительских уровней через level.get_children() 3. Формирование SQL для создания таблицы со всеми полями 4. Создание индексов (частичных для флагов, обычных для родителей) 5. Установка DEFAULT значения для id (использует последовательность Item)

Пример:

from planiqum.core.hierarchy.libs.direct import DimDirectTableBuilder

builder = DimDirectTableBuilder()
table_name = builder.create_table(product_level)
# Результат: 'dim_product_direct'

sync_structure(level: Level) -> dict

Синхронизирует структуру существующей таблицы с текущей структурой уровня.

Что делает: - Добавляет колонки для новых родителей - Удаляет колонки для удалённых родителей - Добавляет индексы для новых колонок

Возвращает:

{
    'added': ['dim_category', 'dim_subcategory'],  # Добавленные колонки
    'removed': ['dim_brand']  # Удалённые колонки
}

get_table_name(level: Level) -> str

Возвращает имя таблицы для уровня.

table_name = builder.get_table_name(product_level)
# Результат: 'dim_product_direct'

get_field_name(parent_level: Level) -> str

Возвращает имя поля для родительского уровня.

field_name = builder.get_field_name(brand_level)
# Результат: 'dim_brand'

Управление последовательностями ID

Общая последовательность

⚠️ Временное решение: Все dim_*_direct таблицы используют одну и ту же последовательность core_hierarchy_item_id_seq, что и модель Item.

Причина: Обеспечение совпадения ID между dim_*_direct таблицами и Item таблицей в период миграции.

Реализация:

# В DimDirectTableBuilder.create_table()
item_sequence = Item._meta.get_field('id').db_default.name
# Результат: 'core_hierarchy_item_id_seq'

# Создание поля id
f"id BIGINT PRIMARY KEY DEFAULT nextval('{item_sequence}'::regclass)"

Получение последовательности

def _get_item_sequence_name(self) -> str:
    """Получает имя последовательности для ID из модели Item."""
    id_field = Item._meta.get_field('id')
    if hasattr(id_field, 'db_default'):
        sequence_name = id_field.db_default.name
        if sequence_name:
            return sequence_name
    raise ValueError("Не удалось получить имя последовательности из Item.id")

Синхронизация последовательности

При вызове sync_structure проверяется и при необходимости обновляется DEFAULT значение для колонки id:

def _ensure_item_sequence_default(self, table_name: str):
    """Устанавливает DEFAULT для id используя последовательность Item."""
    item_sequence = self._get_item_sequence_name()
    alter_sql = f"""
        ALTER TABLE {table_name} 
        ALTER COLUMN id SET DEFAULT nextval('{item_sequence}'::regclass)
    """
    execute_query(alter_sql)

Пример создания таблицы

Для простого уровня

from planiqum.core.hierarchy.models import Level
from planiqum.core.hierarchy.libs.direct import DimDirectTableBuilder

# Получаем уровень
product = Level.objects.get(key='product')

# Создаём таблицу
builder = DimDirectTableBuilder()
table_name = builder.create_table(product)

# Проверяем структуру
columns = builder.get_table_columns(product)
# ['id', 'shortname', 'description', 'is_active', 'is_new', 
#  'needs_sync', 'update_item', 'update_parents', 'dim_brand', 'dim_category']

Для календарного уровня

week = Level.objects.get(key='week')
table_name = builder.create_table(week)

columns = builder.get_table_columns(week)
# ['id', 'shortname', 'description', 'is_active', 'is_new', 
#  'needs_sync', 'update_item', 'update_parents',
#  'start_date', 'end_date', 'num', 'update_horizon']

Замечания по миграции

  1. Временные поля: Поля update_item, update_parents, update_horizon будут удалены после полного отказа от модели Item

  2. Общая последовательность: Использование одной последовательности для всех таблиц — временное решение

  3. Обратная совместимость: Структура поддерживает работу как с новым (dim), так и со старым (Item) механизмом

См. также


Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию ЮНИК СОФТ