Перейти к содержанию

Синхронизация dim_*_full таблиц

Обзор

Синхронизация dim_*_full таблиц обеспечивает построение полной иерархии элементов со всеми предками (прямыми и косвенными) на основе данных из dim_*_direct таблиц.

Статус: Полностью реализовано. Структура таблиц создаётся автоматически, синхронизация содержимого выполняется при импорте (если auto_sync=True) или может быть вызвана вручную через DimTableFull.sync_content().

Типы синхронизации

Синхронизация разделена на два независимых этапа:

1. Синхронизация атрибутов (быстрая)

Флаг: needs_attr_sync

Синхронизируемые поля: - shortname - название элемента - description - описание элемента - start_date, end_date, num - календарные поля (только для календарных уровней)

Особенности: - Выполняется простым UPDATE без JOIN'ов - Очень быстрая операция - Не требует пересчёта иерархических связей

2. Синхронизация иерархических связей (медленная)

Флаг: needs_rel_sync

Синхронизируемые поля: - dim_* колонки - ссылки на родительские элементы - Косвенные связи через LEFT JOIN к родительским dim_*_full таблицам

Особенности: - Требует построения сложных JOIN запросов - Медленная операция для глубоких иерархий - Триггерит каскадную синхронизацию дочерних элементов

Преимущества раздельной синхронизации

  1. Производительность: Изменение только description не требует пересчёта всей иерархии
  2. Гибкость: Можно синхронизировать атрибуты и связи независимо
  3. Масштабируемость: Для больших иерархий можно отложить синхронизацию связей

Структура dim_*_full таблиц

Назначение

Таблицы dim_{level_key}_full содержат: - Все поля из dim_{level_key}_direct - Колонки для всех предков (прямых и косвенных)

Пример структуры

Иерархия:

dpu
|-- delivery_address
|-- product
     |-- subcategory
          |-- category
     |-- brand

Таблица dim_category_full:

CREATE TABLE dim_category_full (
    id BIGINT PRIMARY KEY,
    shortname VARCHAR(256) NOT NULL,
    description VARCHAR(512),
    is_active BOOLEAN,
    is_new BOOLEAN,

    -- Прямые предки (из dim_category_direct)
    dim_subcategory BIGINT,

    -- Косвенные предки (построенные через JOIN)
    dim_product BIGINT,
    dim_delivery_address BIGINT,
    dim_dpu BIGINT
);

Класс DimTableFull

Расположение

/src/planiqum/core/hierarchy/libs/direct/dim_full.py: DimTableFull

Основные методы

create_table(level: Level)

Создаёт новую dim_*_full таблицу для указанного уровня.

Процесс: 1. Получение всех предков через MPTT (level.get_ancestors()) 2. Создание колонок для всех предков 3. Создание индексов для колонок предков

Пример:

from planiqum.core.hierarchy.libs.direct import DimTableFull

dim_full = DimTableFull()
table_name = dim_full.create_table(category_level)
# Результат: 'dim_category_full'

rebuild_table(level: Level)

Полностью перестраивает dim_*_full таблицу: 1. Удаляет существующую таблицу 2. Создаёт новую с актуальной структурой 3. Заполняет данными через JOIN с dim_*_direct и другими dim_*_full

Использование:

dim_full.rebuild_table(category_level)

Алгоритм синхронизации

Триггеры синхронизации

Синхронизация инициируется через два независимых флага в dim_*_direct таблицах:

needs_attr_sync

Когда устанавливается needs_attr_sync = TRUE: - При добавлении новых элементов (is_new = TRUE) - При изменении shortname или description - При изменении календарных полей (start_date, end_date, num)

Сбрасывается методом: _sync_attributes()

needs_rel_sync

Когда устанавливается needs_rel_sync = TRUE: - При добавлении новых элементов (is_new = TRUE) - При изменении родительских связей (dim_* колонок) - При изменении связей у родительских элементов (каскадно)

Сбрасывается методом: _sync_relations()

Процесс синхронизации

def sync_from_direct(level: Level):
    """
    Синхронизирует изменения из dim_*_direct в dim_*_full.

    Процесс:
    1. Получить элементы с needs_sync = TRUE
    2. Построить полную иерархию через JOIN
    3. UPSERT в dim_*_full
    4. Сбросить needs_sync = FALSE
    """

    # 1. Определяем таблицы
    direct_table = f"dim_{level.key}_direct"
    full_table = f"dim_{level.key}_full"

    # 2. Получаем список всех предков
    ancestors = level.get_ancestors(include_self=False)

    # 3. Строим запрос с JOIN
    query = build_full_hierarchy_query(
        level=level,
        direct_table=direct_table,
        ancestors=ancestors
    )

    # 4. UPSERT в dim_*_full
    upsert_to_full_table(full_table, query)

    # 5. Сбрасываем флаг
    execute_query(f"""
        UPDATE {direct_table} 
        SET needs_sync = FALSE 
        WHERE needs_sync = TRUE
    """)

Стратегия JOIN для построения полной иерархии

Иерархические JOIN

Для построения полной иерархии используются последовательные LEFT JOIN:

-- Пример для category (иерархия: category → subcategory → product → dpu)

SELECT 
    cat.id,
    cat.shortname,
    cat.description,
    cat.is_active,
    cat.is_new,

    -- Прямой предок
    cat.dim_subcategory,

    -- Косвенные предки (через JOIN)
    subcat.dim_product,
    prod.dim_dpu

FROM dim_category_direct cat
LEFT JOIN dim_subcategory_full subcat ON cat.dim_subcategory = subcat.id
LEFT JOIN dim_product_full prod ON subcat.dim_product = prod.id

WHERE cat.needs_sync = TRUE

Последовательность JOIN

Порядок JOIN важен и определяется иерархией:

  1. Прямой родитель: Берём из dim_*_direct (например, cat.dim_subcategory)
  2. Первый косвенный предок: JOIN с dim_subcategory_full → получаем dim_product
  3. Второй косвенный предок: JOIN с dim_product_full → получаем dim_dpu
  4. И так далее вверх по иерархии

UPSERT в dim_*_full

INSERT INTO dim_category_full (
    id, shortname, description, is_active, is_new,
    dim_subcategory, dim_product, dim_dpu
)
SELECT 
    cat.id,
    cat.shortname,
    cat.description,
    cat.is_active,
    cat.is_new,
    cat.dim_subcategory,
    subcat.dim_product,
    prod.dim_dpu
FROM dim_category_direct cat
LEFT JOIN dim_subcategory_full subcat ON cat.dim_subcategory = subcat.id
LEFT JOIN dim_product_full prod ON subcat.dim_product = prod.id
WHERE cat.needs_sync = TRUE

ON CONFLICT (id) DO UPDATE SET
    shortname = EXCLUDED.shortname,
    description = EXCLUDED.description,
    is_active = EXCLUDED.is_active,
    is_new = EXCLUDED.is_new,
    dim_subcategory = EXCLUDED.dim_subcategory,
    dim_product = EXCLUDED.dim_product,
    dim_dpu = EXCLUDED.dim_dpu

Каскадная синхронизация

Проблема

Изменение элемента в dim_product_direct должно: 1. Обновить dim_product_full 2. Инициировать обновление всех дочерних элементов в dim_subcategory_full и dim_category_full

Решение

При синхронизации элемента устанавливаем needs_sync = TRUE для всех зависимых элементов:

-- После обновления dim_product_direct:
-- Помечаем все элементы subcategory, которые ссылаются на обновлённый product

UPDATE dim_subcategory_direct
SET needs_sync = TRUE
WHERE dim_product IN (
    SELECT id FROM dim_product_direct WHERE needs_sync = TRUE
)

Порядок синхронизации

Синхронизация должна выполняться от корня к листьям:

  1. dim_dpu_full
  2. dim_product_full
  3. dim_subcategory_full
  4. dim_category_full

Производительность

Массовые операции

Все операции выполняются через массовые UPSERT: - Один запрос для всех элементов с needs_sync = TRUE - Никаких циклов на Python

Частичные индексы

Для оптимизации выборки элементов используются отдельные частичные индексы для каждого флага:

-- Индекс для синхронизации атрибутов
CREATE INDEX idx_dim_category_direct_needs_attr_sync 
ON dim_category_direct(needs_attr_sync) 
WHERE needs_attr_sync = TRUE;

-- Индекс для синхронизации связей
CREATE INDEX idx_dim_category_direct_needs_rel_sync 
ON dim_category_direct(needs_rel_sync) 
WHERE needs_rel_sync = TRUE;

Транзакционность

Вся синхронизация выполняется в одной транзакции.

Текущий статус реализации

✅ Реализовано

  • Создание структуры dim_*_full таблиц
  • Метод rebuild_table() для полной перестройки
  • Установка флага needs_sync при импорте
  • Переопределение Level.sync() и LevelManager.sync() для использования Direct Pipeline
  • Переопределение dimension_table_name() для использования DimTableFull.get_table_name()
  • Автоматическая синхронизация через sync_dim_direct_from_item() с auto_sync=True

🚧 В разработке

  • Каскадная синхронизация зависимых уровней
  • Метод sync_from_direct() для инкрементальной синхронизации

📋 Планируется

  • Оптимизация для больших объёмов данных
  • Параллельная синхронизация независимых веток
  • Мониторинг и логирование синхронизации

Примеры использования

Пример 1: Полная перестройка

from planiqum.core.hierarchy.libs.direct import DimTableFull

dim_full = DimTableFull()

# Полная перестройка таблицы
dim_full.rebuild_table(category_level)

Пример 2: Проверка элементов требующих синхронизации

from planiqum.core.libs.db import select_to_df

# Получаем элементы с needs_sync = TRUE
df = select_to_df("""
    SELECT id, shortname, needs_sync
    FROM dim_category_direct
    WHERE needs_sync = TRUE
""")

print(f"Элементов требующих синхронизации: {len(df)}")

Пример 3: Ручная синхронизация

from planiqum.core.libs.db import execute_query

# Вручную запускаем синхронизацию (когда будет реализован метод)
# from planiqum.core.hierarchy.libs.direct import sync_dim_full_from_direct
# sync_dim_full_from_direct(category_level)

# Пока можно использовать rebuild:
dim_full.rebuild_table(category_level)

Интеграция с импортом

После импорта данных через HierarchyImporter:

from planiqum.core.hierarchy.libs.direct import HierarchyImporter
import pandas as pd

importer = HierarchyImporter()
df = pd.DataFrame([...])

# Импорт устанавливает needs_sync = TRUE
importer.import_from_dataframe(level=category_level, df=df)

# TODO: Автоматическая синхронизация (пока в разработке)
# Синхронизация dim_*_full должна запускаться автоматически

# Пока используем ручную перестройку:
from planiqum.core.hierarchy.libs.direct import DimTableFull
dim_full = DimTableFull()
dim_full.rebuild_table(category_level)

Планы развития

Инкрементальная синхронизация

Вместо полной перестройки — синхронизация только изменённых элементов:

def sync_dim_full_from_direct(level: Level):
    """Синхронизирует только элементы с needs_sync = TRUE."""
    # Реализация в разработке

Параллелизация

Синхронизация независимых веток иерархии параллельно:

from concurrent.futures import ThreadPoolExecutor

def sync_all_levels():
    """Синхронизирует все уровни параллельно."""
    with ThreadPoolExecutor() as executor:
        # Синхронизация независимых веток
        futures = [
            executor.submit(sync_dim_full_from_direct, brand_level),
            executor.submit(sync_dim_full_from_direct, delivery_address_level),
        ]

Оптимизация JOIN

Для глубоких иерархий (>5 уровней) — использование CTE:

WITH RECURSIVE hierarchy AS (
    SELECT id, shortname, dim_parent
    FROM dim_level_direct
    WHERE needs_sync = TRUE

    UNION ALL

    SELECT h.id, h.shortname, p.dim_parent
    FROM hierarchy h
    JOIN dim_parent_full p ON h.dim_parent = p.id
)
SELECT * FROM hierarchy

Интеграция с методами синхронизации

Переопределение Level.sync() и LevelManager.sync()

Методы Level.sync() и LevelManager.sync() были переопределены для использования Direct Pipeline вместо старого механизма. Это обеспечивает обратную совместимость — все существующие вызовы автоматически используют новый механизм.

Реализация Level.sync(): - Синхронизирует структуру dim_*_direct через DimDirectTableBuilder.sync_structure() - Синхронизирует структуру dim_*_full через DimTableFull.sync_structure() - Синхронизирует контент через sync_dim_direct_from_item() с auto_sync=True - Поддерживает все параметры (ids, with_children, drop, sync_structure, sync_content) - Рекурсивно синхронизирует дочерние уровни

Реализация LevelManager.sync(): - Перестраивает MPTT дерево - Удаляет устаревшие dim-таблицы (учитывает dim_*_direct и dim_* таблицы) - Синхронизирует каждый уровень через Level.sync()

Пример использования:

from planiqum.core.hierarchy.models import Level

# Синхронизация одного уровня (автоматически использует Direct Pipeline)
product_level.sync()

# Синхронизация всех уровней (автоматически использует Direct Pipeline)
Level.objects.sync()

Преимущества: - ✅ Все существующие вызовы автоматически используют Direct Pipeline - ✅ Не требуется обновление кода в местах использования - ✅ Полная обратная совместимость

Метод dimension_table_name()

Метод Level.dimension_table_name() был переопределён для использования DimTableFull.get_table_name(level).

Важно: Метод возвращает имя dim-full таблицы (dim_{key} без суффикса _full), а не dim-direct таблицы (dim_{key}_direct). Это важно для обратной совместимости.

Пример:

level = Level.objects.get(key='product')
table_name = level.dimension_table_name()  # Возвращает 'dim_product' (dim-full таблица)

См. также


Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию ЮНИК СОФТ