Синхронизация dim_*_full таблиц¶
Обзор¶
Синхронизация dim_*_full таблиц обеспечивает построение полной иерархии элементов со всеми предками (прямыми и косвенными) на основе данных из dim_*_direct таблиц.
✅ Статус: Полностью реализовано. Структура таблиц создаётся автоматически, синхронизация содержимого выполняется при импорте (если auto_sync=True) или может быть вызвана вручную через DimTableFull.sync_content().
Типы синхронизации¶
Синхронизация разделена на два независимых этапа:
1. Синхронизация атрибутов (быстрая)¶
Флаг: needs_attr_sync
Синхронизируемые поля:
- shortname - название элемента
- description - описание элемента
- start_date, end_date, num - календарные поля (только для календарных уровней)
Особенности: - Выполняется простым UPDATE без JOIN'ов - Очень быстрая операция - Не требует пересчёта иерархических связей
2. Синхронизация иерархических связей (медленная)¶
Флаг: needs_rel_sync
Синхронизируемые поля:
- dim_* колонки - ссылки на родительские элементы
- Косвенные связи через LEFT JOIN к родительским dim_*_full таблицам
Особенности: - Требует построения сложных JOIN запросов - Медленная операция для глубоких иерархий - Триггерит каскадную синхронизацию дочерних элементов
Преимущества раздельной синхронизации¶
- Производительность: Изменение только description не требует пересчёта всей иерархии
- Гибкость: Можно синхронизировать атрибуты и связи независимо
- Масштабируемость: Для больших иерархий можно отложить синхронизацию связей
Структура dim_*_full таблиц¶
Назначение¶
Таблицы dim_{level_key}_full содержат:
- Все поля из dim_{level_key}_direct
- Колонки для всех предков (прямых и косвенных)
Пример структуры¶
Иерархия:
dpu
|-- delivery_address
|-- product
|-- subcategory
|-- category
|-- brand
Таблица dim_category_full:
CREATE TABLE dim_category_full (
id BIGINT PRIMARY KEY,
shortname VARCHAR(256) NOT NULL,
description VARCHAR(512),
is_active BOOLEAN,
is_new BOOLEAN,
-- Прямые предки (из dim_category_direct)
dim_subcategory BIGINT,
-- Косвенные предки (построенные через JOIN)
dim_product BIGINT,
dim_delivery_address BIGINT,
dim_dpu BIGINT
);
Класс DimTableFull¶
Расположение¶
/src/planiqum/core/hierarchy/libs/direct/dim_full.py: DimTableFull
Основные методы¶
create_table(level: Level)¶
Создаёт новую dim_*_full таблицу для указанного уровня.
Процесс:
1. Получение всех предков через MPTT (level.get_ancestors())
2. Создание колонок для всех предков
3. Создание индексов для колонок предков
Пример:
from planiqum.core.hierarchy.libs.direct import DimTableFull
dim_full = DimTableFull()
table_name = dim_full.create_table(category_level)
# Результат: 'dim_category_full'
rebuild_table(level: Level)¶
Полностью перестраивает dim_*_full таблицу:
1. Удаляет существующую таблицу
2. Создаёт новую с актуальной структурой
3. Заполняет данными через JOIN с dim_*_direct и другими dim_*_full
Использование:
dim_full.rebuild_table(category_level)
Алгоритм синхронизации¶
Триггеры синхронизации¶
Синхронизация инициируется через два независимых флага в dim_*_direct таблицах:
needs_attr_sync¶
Когда устанавливается needs_attr_sync = TRUE:
- При добавлении новых элементов (is_new = TRUE)
- При изменении shortname или description
- При изменении календарных полей (start_date, end_date, num)
Сбрасывается методом: _sync_attributes()
needs_rel_sync¶
Когда устанавливается needs_rel_sync = TRUE:
- При добавлении новых элементов (is_new = TRUE)
- При изменении родительских связей (dim_* колонок)
- При изменении связей у родительских элементов (каскадно)
Сбрасывается методом: _sync_relations()
Процесс синхронизации¶
def sync_from_direct(level: Level):
"""
Синхронизирует изменения из dim_*_direct в dim_*_full.
Процесс:
1. Получить элементы с needs_sync = TRUE
2. Построить полную иерархию через JOIN
3. UPSERT в dim_*_full
4. Сбросить needs_sync = FALSE
"""
# 1. Определяем таблицы
direct_table = f"dim_{level.key}_direct"
full_table = f"dim_{level.key}_full"
# 2. Получаем список всех предков
ancestors = level.get_ancestors(include_self=False)
# 3. Строим запрос с JOIN
query = build_full_hierarchy_query(
level=level,
direct_table=direct_table,
ancestors=ancestors
)
# 4. UPSERT в dim_*_full
upsert_to_full_table(full_table, query)
# 5. Сбрасываем флаг
execute_query(f"""
UPDATE {direct_table}
SET needs_sync = FALSE
WHERE needs_sync = TRUE
""")
Стратегия JOIN для построения полной иерархии¶
Иерархические JOIN¶
Для построения полной иерархии используются последовательные LEFT JOIN:
-- Пример для category (иерархия: category → subcategory → product → dpu)
SELECT
cat.id,
cat.shortname,
cat.description,
cat.is_active,
cat.is_new,
-- Прямой предок
cat.dim_subcategory,
-- Косвенные предки (через JOIN)
subcat.dim_product,
prod.dim_dpu
FROM dim_category_direct cat
LEFT JOIN dim_subcategory_full subcat ON cat.dim_subcategory = subcat.id
LEFT JOIN dim_product_full prod ON subcat.dim_product = prod.id
WHERE cat.needs_sync = TRUE
Последовательность JOIN¶
Порядок JOIN важен и определяется иерархией:
- Прямой родитель: Берём из
dim_*_direct(например,cat.dim_subcategory) - Первый косвенный предок: JOIN с
dim_subcategory_full→ получаемdim_product - Второй косвенный предок: JOIN с
dim_product_full→ получаемdim_dpu - И так далее вверх по иерархии
UPSERT в dim_*_full¶
INSERT INTO dim_category_full (
id, shortname, description, is_active, is_new,
dim_subcategory, dim_product, dim_dpu
)
SELECT
cat.id,
cat.shortname,
cat.description,
cat.is_active,
cat.is_new,
cat.dim_subcategory,
subcat.dim_product,
prod.dim_dpu
FROM dim_category_direct cat
LEFT JOIN dim_subcategory_full subcat ON cat.dim_subcategory = subcat.id
LEFT JOIN dim_product_full prod ON subcat.dim_product = prod.id
WHERE cat.needs_sync = TRUE
ON CONFLICT (id) DO UPDATE SET
shortname = EXCLUDED.shortname,
description = EXCLUDED.description,
is_active = EXCLUDED.is_active,
is_new = EXCLUDED.is_new,
dim_subcategory = EXCLUDED.dim_subcategory,
dim_product = EXCLUDED.dim_product,
dim_dpu = EXCLUDED.dim_dpu
Каскадная синхронизация¶
Проблема¶
Изменение элемента в dim_product_direct должно:
1. Обновить dim_product_full
2. Инициировать обновление всех дочерних элементов в dim_subcategory_full и dim_category_full
Решение¶
При синхронизации элемента устанавливаем needs_sync = TRUE для всех зависимых элементов:
-- После обновления dim_product_direct:
-- Помечаем все элементы subcategory, которые ссылаются на обновлённый product
UPDATE dim_subcategory_direct
SET needs_sync = TRUE
WHERE dim_product IN (
SELECT id FROM dim_product_direct WHERE needs_sync = TRUE
)
Порядок синхронизации¶
Синхронизация должна выполняться от корня к листьям:
dim_dpu_fulldim_product_fulldim_subcategory_fulldim_category_full
Производительность¶
Массовые операции¶
Все операции выполняются через массовые UPSERT:
- Один запрос для всех элементов с needs_sync = TRUE
- Никаких циклов на Python
Частичные индексы¶
Для оптимизации выборки элементов используются отдельные частичные индексы для каждого флага:
-- Индекс для синхронизации атрибутов
CREATE INDEX idx_dim_category_direct_needs_attr_sync
ON dim_category_direct(needs_attr_sync)
WHERE needs_attr_sync = TRUE;
-- Индекс для синхронизации связей
CREATE INDEX idx_dim_category_direct_needs_rel_sync
ON dim_category_direct(needs_rel_sync)
WHERE needs_rel_sync = TRUE;
Транзакционность¶
Вся синхронизация выполняется в одной транзакции.
Текущий статус реализации¶
✅ Реализовано¶
- Создание структуры
dim_*_fullтаблиц - Метод
rebuild_table()для полной перестройки - Установка флага
needs_syncпри импорте - Переопределение
Level.sync()иLevelManager.sync()для использования Direct Pipeline - Переопределение
dimension_table_name()для использованияDimTableFull.get_table_name() - Автоматическая синхронизация через
sync_dim_direct_from_item()сauto_sync=True
🚧 В разработке¶
- Каскадная синхронизация зависимых уровней
- Метод
sync_from_direct()для инкрементальной синхронизации
📋 Планируется¶
- Оптимизация для больших объёмов данных
- Параллельная синхронизация независимых веток
- Мониторинг и логирование синхронизации
Примеры использования¶
Пример 1: Полная перестройка¶
from planiqum.core.hierarchy.libs.direct import DimTableFull
dim_full = DimTableFull()
# Полная перестройка таблицы
dim_full.rebuild_table(category_level)
Пример 2: Проверка элементов требующих синхронизации¶
from planiqum.core.libs.db import select_to_df
# Получаем элементы с needs_sync = TRUE
df = select_to_df("""
SELECT id, shortname, needs_sync
FROM dim_category_direct
WHERE needs_sync = TRUE
""")
print(f"Элементов требующих синхронизации: {len(df)}")
Пример 3: Ручная синхронизация¶
from planiqum.core.libs.db import execute_query
# Вручную запускаем синхронизацию (когда будет реализован метод)
# from planiqum.core.hierarchy.libs.direct import sync_dim_full_from_direct
# sync_dim_full_from_direct(category_level)
# Пока можно использовать rebuild:
dim_full.rebuild_table(category_level)
Интеграция с импортом¶
После импорта данных через HierarchyImporter:
from planiqum.core.hierarchy.libs.direct import HierarchyImporter
import pandas as pd
importer = HierarchyImporter()
df = pd.DataFrame([...])
# Импорт устанавливает needs_sync = TRUE
importer.import_from_dataframe(level=category_level, df=df)
# TODO: Автоматическая синхронизация (пока в разработке)
# Синхронизация dim_*_full должна запускаться автоматически
# Пока используем ручную перестройку:
from planiqum.core.hierarchy.libs.direct import DimTableFull
dim_full = DimTableFull()
dim_full.rebuild_table(category_level)
Планы развития¶
Инкрементальная синхронизация¶
Вместо полной перестройки — синхронизация только изменённых элементов:
def sync_dim_full_from_direct(level: Level):
"""Синхронизирует только элементы с needs_sync = TRUE."""
# Реализация в разработке
Параллелизация¶
Синхронизация независимых веток иерархии параллельно:
from concurrent.futures import ThreadPoolExecutor
def sync_all_levels():
"""Синхронизирует все уровни параллельно."""
with ThreadPoolExecutor() as executor:
# Синхронизация независимых веток
futures = [
executor.submit(sync_dim_full_from_direct, brand_level),
executor.submit(sync_dim_full_from_direct, delivery_address_level),
]
Оптимизация JOIN¶
Для глубоких иерархий (>5 уровней) — использование CTE:
WITH RECURSIVE hierarchy AS (
SELECT id, shortname, dim_parent
FROM dim_level_direct
WHERE needs_sync = TRUE
UNION ALL
SELECT h.id, h.shortname, p.dim_parent
FROM hierarchy h
JOIN dim_parent_full p ON h.dim_parent = p.id
)
SELECT * FROM hierarchy
Интеграция с методами синхронизации¶
Переопределение Level.sync() и LevelManager.sync()¶
Методы Level.sync() и LevelManager.sync() были переопределены для использования Direct Pipeline вместо старого механизма. Это обеспечивает обратную совместимость — все существующие вызовы автоматически используют новый механизм.
Реализация Level.sync():
- Синхронизирует структуру dim_*_direct через DimDirectTableBuilder.sync_structure()
- Синхронизирует структуру dim_*_full через DimTableFull.sync_structure()
- Синхронизирует контент через sync_dim_direct_from_item() с auto_sync=True
- Поддерживает все параметры (ids, with_children, drop, sync_structure, sync_content)
- Рекурсивно синхронизирует дочерние уровни
Реализация LevelManager.sync():
- Перестраивает MPTT дерево
- Удаляет устаревшие dim-таблицы (учитывает dim_*_direct и dim_* таблицы)
- Синхронизирует каждый уровень через Level.sync()
Пример использования:
from planiqum.core.hierarchy.models import Level
# Синхронизация одного уровня (автоматически использует Direct Pipeline)
product_level.sync()
# Синхронизация всех уровней (автоматически использует Direct Pipeline)
Level.objects.sync()
Преимущества: - ✅ Все существующие вызовы автоматически используют Direct Pipeline - ✅ Не требуется обновление кода в местах использования - ✅ Полная обратная совместимость
Метод dimension_table_name()¶
Метод Level.dimension_table_name() был переопределён для использования DimTableFull.get_table_name(level).
Важно: Метод возвращает имя dim-full таблицы (dim_{key} без суффикса _full), а не dim-direct таблицы (dim_{key}_direct). Это важно для обратной совместимости.
Пример:
level = Level.objects.get(key='product')
table_name = level.dimension_table_name() # Возвращает 'dim_product' (dim-full таблица)
См. также¶
- Direct Pipeline: Руководство для разработчиков
- Архитектура dim_*_direct
- Импорт данных
- Синхронизация Item (временная)
- Синхронизация иерархий (sync)
Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию
ЮНИК СОФТ