Перейти к содержанию

Direct Pipeline: Руководство для разработчиков

Обзор

Direct Pipeline — это новая архитектура работы с иерархическими данными в Planiqum, которая заменяет старый механизм на основе модели Item и таблиц core_hierarchy_item*. В этой статье описаны все изменения по сравнению со старым подходом, процесс миграции и практическое использование.

Основные преимущества:

  • Упрощённая структура данных — элементы хранятся с прямыми родителями, без необходимости вычислять всех предков
  • Высокая производительность — все операции выполняются массово через SQL без итераций
  • Автоматическая синхронизация — изменения в dim_*_direct автоматически отражаются в dim_* таблицах
  • Поддержка календарных уровней — встроенная поддержка num, start_date, end_date для HorizonItem
  • Масштабируемость — подходит для работы с миллионами элементов

Архитектурная схема:

┌─────────────────────┐
│   Источник данных   │  (CSV, XLSX, DataFrame, API)
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  HierarchyImporter  │  Импорт → временная таблица
└──────────┬──────────┘
           │
           ▼
┌─────────────────────┐
│  dim_*_direct       │  Прямые связи (UPSERT)
│  - shortname        │
│  - description      │
│  - dim_brand        │  ← ID прямого родителя
│  - dim_category     │  ← ID прямого родителя
│  - needs_attr_sync  │  ← флаг синхронизации
│  - needs_rel_sync   │  ← флаг синхронизации
└──────────┬──────────┘
           │
           ├─────────────────────┐
           │                     │
           ▼                     ▼
    ┌─────────────┐      ┌──────────────────────┐
    │  dim_*      │      │  Django Signals      │
    │  (JOIN)     │      │  ┌────────────────┐  │
    │  - Все      │      │  │  ItemBridge     │  │  (временный)
    │    предки   │      │  │  - Item        │  │
    │  - Full     │      │  │  - parents     │  │
    │    hierarchy│      │  │  - HorizonItem │  │
    └─────────────┘      │  └────────────────┘  │
                         └──────────┬───────────┘
                                    │
                                    ▼
                            ┌───────────────────┐
                            │ core_hierarchy_*  │
                            │ (старые таблицы)  │
                            └───────────────────┘

Содержание


Быстрый старт

Создание таблиц

from planiqum.core.hierarchy.models import Level
from planiqum.core.hierarchy.libs.direct import DimDirectTableBuilder

# Получаем уровень
level = Level.objects.get(key='product')

# Создаём таблицу dim_product_direct
builder = DimDirectTableBuilder()
table_name = builder.create_table(level)
# → 'dim_product_direct'

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_table_builder.py: TestDimDirectTableCreation

Импорт данных

import pandas as pd
from planiqum.core.hierarchy.libs.direct import HierarchyImporter

# Подготавливаем данные
df = pd.DataFrame([
    ['prod1', 'Product 1', 'brand_a', 'cat_x'],
    ['prod2', 'Product 2', 'brand_b', 'cat_y'],
], columns=['shortname', 'description', 'brand', 'category'])

# Импортируем
importer = HierarchyImporter(auto_sync=True, sync_item_bridge=True)
importer.import_from_dataframe(level=level, df=df)

# ✅ Данные автоматически:
# 1. Загружены в dim_product_direct
# 2. Синхронизированы в dim_product (full)
# 3. Созданы записи в Item и Item.parents (через ItemBridge)

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_importer.py: TestHierarchyImporterBasic

Импорт из файлов

from planiqum.core.hierarchy.libs.direct import HierarchyImporter

importer = HierarchyImporter(auto_sync=True)

# Импорт из CSV
importer.import_from_csv(level=level, filepath='products.csv')

# Импорт из XLSX
importer.import_from_xlsx(level=level, filepath='products.xlsx')

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_importer_files.py: TestImportFromCSV

Синхронизация календарных данных

# Для календарного уровня
week_level = Level.objects.get(key='week')

week_df = pd.DataFrame([
    ['2024-W01', 'Week 1', '2024-01-01', '2024-01-07', 1],
], columns=['shortname', 'description', 'start_date', 'end_date', 'num'])

importer = HierarchyImporter(auto_sync=True, sync_item_bridge=True)
importer.import_from_dataframe(level=week_level, df=week_df)

# ✅ Календарные поля автоматически синхронизированы с HorizonItem

Синхронизация структуры при изменении иерархии

from planiqum.core.hierarchy.models import Level

# Изменяем структуру иерархии
brand = Level.objects.get(key='brand')
product = Level.objects.get(key='product')

# Добавляем родителя для product
brand.child = product
brand.save()  # Автоматически синхронизируется структура dim_*_direct и dim_*

# Удаляем родителя
brand.child = None
brand.save()  # Автоматически удаляются колонки из dim_*_direct и dim_*

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_level_signals.py: TestLevelPostSaveSignal

Удаление записей

from planiqum.core.hierarchy.libs.direct import DimRecordCleaner
from planiqum.core.libs.db import execute_query

# Пометить на удаление
execute_query("UPDATE dim_product_direct SET is_deleted = TRUE WHERE shortname = 'obsolete'")

# Удалить помеченные записи (каскадно)
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=product_level)
# → Автоматически удаляет из Item, Item.parents, dim_*_full

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py: TestCascadeDeletionBasic


dim-direct и dim-full таблицы

dim_*_direct — основной источник данных

Таблицы dim_{level_key}_direct являются основным источником данных об иерархии в новом подходе.

Что хранится в dim-direct: - Атрибуты элементов: shortname, description - Календарные поля (для календарных уровней): start_date, end_date, num - Прямые родительские связи: колонки dim_{parent_key} для каждого прямого родителя - Метаданные синхронизации: флаги needs_attr_sync, needs_rel_sync, is_deleted

Пример структуры для уровня product с родителями brand и category:

CREATE TABLE dim_product_direct (
    id BIGINT PRIMARY KEY,
    shortname VARCHAR(256) NOT NULL UNIQUE,
    description VARCHAR(512),
    is_active BOOLEAN DEFAULT TRUE,
    is_new BOOLEAN DEFAULT TRUE,
    needs_attr_sync BOOLEAN DEFAULT FALSE,
    needs_rel_sync BOOLEAN DEFAULT FALSE,
    is_deleted BOOLEAN DEFAULT FALSE,
    dim_brand BIGINT,        -- Ссылка на родителя brand
    dim_category BIGINT       -- Ссылка на родителя category
);

Ключевые особенности: - ✅ Хранит только прямых родителей (не все предки) - ✅ Является единственным источником правды для данных иерархии - ✅ Все операции импорта и изменения данных происходят напрямую в dim-direct - ✅ Автоматически синхронизируется с dim-full при изменениях

dim_* (dim-full) — подготовленный источник для отчётов

Таблицы dim_{level_key} (без суффикса _full, но концептуально называются "dim-full") содержат денормализованные данные со всеми предками (прямыми и непрямыми).

Отличия от dim-direct:

Аспект dim_*_direct dim_* (dim-full)
Родители Только прямые (dim_brand, dim_category) Все предки (dim_brand, dim_segment, dim_category, dim_subcategory, ...)
Назначение Хранение и изменение данных Быстрый доступ для отчётов и фильтров
Обновление Напрямую через импорт Автоматически из dim-direct через JOIN'ы
Структура Минимальная (только прямые связи) Полная (все предки для быстрого доступа)

Пример структуры для уровня product:

CREATE TABLE dim_product (
    id BIGINT PRIMARY KEY,
    shortname VARCHAR(256) NOT NULL,
    description VARCHAR(512),
    dim_product BIGINT,        -- Основное поле (UNIQUE)
    dim_brand BIGINT,          -- Прямой предок
    dim_segment BIGINT,        -- Непрямой предок (через brand)
    dim_category BIGINT,       -- Прямой предок
    dim_subcategory BIGINT    -- Непрямой предок (через category)
);

Важно: В дальнейшем в документации и коде эти таблицы будут называться просто "dim-таблицы" (без уточнения _direct или _full), так как: - dim_*_direct — это рабочие таблицы для хранения данных - dim_* — это подготовленные таблицы для отчётов

Преимущества разделения: 1. Производительность: Изменение данных в dim-direct не требует пересчёта всех предков 2. Гибкость: Можно обновлять только атрибуты без пересчёта иерархии 3. Масштабируемость: Для больших иерархий синхронизация связей может быть отложена

Импорт данных

Импорт происходит напрямую в dim-direct с последующей автоматической синхронизацией в dim-full:

from planiqum.core.hierarchy.libs.direct import HierarchyImporter
import pandas as pd

importer = HierarchyImporter(auto_sync=True)  # Автоматическая синхронизация в dim-full

df = pd.DataFrame([
    ['prod1', 'Product 1', 'brand_a', 'cat_x'],
], columns=['shortname', 'description', 'brand', 'category'])

# Импорт в dim_product_direct → автоматическая синхронизация в dim_product
importer.import_from_dataframe(level=product_level, df=df)

Процесс: 1. Данные импортируются в dim_product_direct 2. Устанавливаются флаги needs_attr_sync и needs_rel_sync 3. Автоматически запускается синхронизация в dim_product (если auto_sync=True)

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_importer.py: TestHierarchyImporterBasic


Журналы импорта

Изменение подхода к хранению журналов

Старый подход: - Журналы импорта хранились в файлах на диске - Нормализация данных выполнялась в pandas (Python) - Сложно было осуществлять поиск по истории импортов

Новый подход: - Журналы импорта хранятся в партиционированных таблицах в отдельной схеме import_logs - Нормализация данных перенесена в SQL (выполняется в PostgreSQL) - Журналы остаются в базе данных для сквозного поиска

Структура журналов импорта

Схема: import_logs (отдельная схема для аудита)

Основная таблица: import_logs.hierarchy_import_{level_id} (партиционированная)

Партиции: import_logs.hierarchy_import_{level_id}_{session_id} (по сессиям импорта)

Что хранится в журнале: - Все колонки из normalized_table (shortname, description, календарные поля) - Родительские колонки (dim_{parent}_raw, dim_{parent}) - Технические колонки (row_number, session_id) - Флаги статуса (error, warning, is_new, needs_attr_sync, needs_rel_sync)

Пример структуры:

CREATE TABLE import_logs.hierarchy_import_123 (
    row_number INTEGER,
    session_id UUID,
    id BIGINT,
    shortname VARCHAR(256),
    description VARCHAR(512),
    dim_brand_raw VARCHAR(256),  -- Исходное значение из файла
    dim_brand BIGINT,            -- Нормализованное значение (ID)
    error BOOLEAN,
    warning BOOLEAN,
    is_new BOOLEAN,
    needs_attr_sync BOOLEAN,
    needs_rel_sync BOOLEAN
) PARTITION BY LIST (session_id);

Преимущества нового подхода

  1. Сквозной поиск: SQL-запросы к партиционированной таблице позволяют искать по всей истории импортов
  2. Производительность: Нормализация в SQL быстрее, чем в pandas
  3. Аудит: Все изменения сохраняются в базе данных для последующего анализа
  4. Масштабируемость: Партиционирование позволяет эффективно работать с большими объёмами данных

Важные замечания

⚠️ Схема import_logs не должна попадать в дамп базы данных — это схема для аудита, которая может быть очень большой.

Планы развития

Что пока не реализовано, но должно быть реализовано в ближайшем будущем:

  1. Просмотр журналов из админки:
  2. По ссылке открывается view, который забирает журнал импорта из таблицы
  3. Возвращает соответствующий CSV для скачивания

  4. Очистка истории:

  5. Удаление старых партиций из базы данных вместо удаления файлов
  6. Команда для управления жизненным циклом журналов

  7. Перенос в "холодное хранилище" на S3:

  8. Устаревшие партиции не удаляются из базы, а переносятся в файлы
  9. Файлы хранятся в S3 для долгосрочного архивирования
  10. Позволяет освободить место в базе данных, сохранив историю

Детали синхронизации dim-direct и dim-full

Создание и синхронизация структуры таблиц

Создание таблицы dim-direct

Таблица dim_*_direct создаётся автоматически при каждом импорте, если её ещё нет:

from planiqum.core.hierarchy.libs.direct import DimDirectTableBuilder

builder = DimDirectTableBuilder()
builder.create_table(level)  # Создаёт таблицу если не существует

Что происходит: - Создаются базовые колонки (id, shortname, description, флаги) - Добавляются колонки для всех прямых родителей (dim_{parent_key}) - Создаются индексы для оптимизации запросов - Для календарных уровней добавляются поля start_date, end_date, num

Синхронизация структуры dim-direct

При изменении структуры иерархии (добавление/удаление родителей) структура таблицы автоматически обновляется:

# При изменении level.child автоматически вызывается:
builder.sync_structure(level)  # Добавляет/удаляет колонки для родителей

Что делает sync_structure(): - Добавляет колонки для новых родителей - Удаляет колонки для удалённых родителей - Обновляет индексы

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_table_builder.py: TestDimDirectTableCreation

Синхронизация структуры dim-full

Структура dim_* таблицы синхронизируется отдельно и содержит колонки для всех предков (прямых и непрямых):

from planiqum.core.hierarchy.libs.direct import DimTableFull

dim_table_full = DimTableFull()
dim_table_full.sync_structure(level)  # Создаёт/обновляет структуру dim_* таблицы

Что делает sync_structure(): - Создаёт таблицу если не существует - Добавляет колонки для всех предков (через level.get_descendants()) - Удаляет колонки для удалённых предков - Создаёт индексы для всех колонок предков

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_dim_table_full_sync.py

Пометки на синхронизацию

В dim-direct таблицах используются два независимых флага для управления синхронизацией:

needs_attr_sync — синхронизация атрибутов

Когда устанавливается: - При добавлении новых элементов (is_new = TRUE) - При изменении shortname или description - При изменении календарных полей (start_date, end_date, num)

Что синхронизируется: - shortname, description - Календарные поля (для календарных уровней)

Особенности: - ⚡ Быстрая операция — простой UPDATE без JOIN'ов - Не требует пересчёта иерархических связей - Можно выполнять независимо от синхронизации связей

needs_rel_sync — синхронизация связей

Когда устанавливается: - При добавлении новых элементов (is_new = TRUE) - При изменении родительских связей (dim_* колонок) - При изменении связей у родительских элементов (каскадно)

Что синхронизируется: - Все колонки dim_* (иерархические связи) - Построение полной иерархии через LEFT JOIN к родительским dim_* таблицам

Особенности: - 🐢 Медленная операция — требует построения сложных JOIN запросов - Триггерит каскадную синхронизацию дочерних элементов - Для глубоких иерархий может занять значительное время

Пометки на удаление

Для удаления записей используется флаг is_deleted:

Процесс удаления:

  1. Пометка: Устанавливается is_deleted = TRUE в dim_*_direct
  2. Каскадное обнуление: Обнуляются ссылки на удаляемый элемент у всех потомков
  3. Удаление из dim-full: Удаляются все записи, содержащие удаляемый элемент
  4. Физическое удаление: Запись удаляется из dim_*_direct

Пример:

from planiqum.core.hierarchy.libs.direct import DimRecordCleaner

# Пометить на удаление
execute_query("UPDATE dim_product_direct SET is_deleted = TRUE WHERE shortname = 'obsolete'")

# Удалить помеченные записи
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=product_level)

Запуск механизмов синхронизации и удаления

Важно: Сначала устанавливаются пометки, затем запускается механизм синхронизации/удаления.

Порядок операций:

  1. Импорт/изменение данных → устанавливаются флаги (needs_attr_sync, needs_rel_sync, is_deleted)
  2. Запуск синхронизации → обработка записей с установленными флагами
  3. Сброс флагов → после успешной синхронизации флаги сбрасываются

Автоматическая синхронизация:

При импорте через HierarchyImporter с параметром auto_sync=True синхронизация запускается автоматически:

importer = HierarchyImporter(auto_sync=True)  # Автоматическая синхронизация
importer.import_from_dataframe(level=level, df=df)
# → Автоматически синхронизирует dim_*_full после импорта

Ручная синхронизация:

from planiqum.core.hierarchy.libs.direct import DimTableFull

dim_table_full = DimTableFull()

# Синхронизация только атрибутов
dim_table_full._sync_attributes(level)

# Синхронизация только связей
dim_table_full._sync_relations(level)

# Полная синхронизация (атрибуты + связи)
dim_table_full.sync_content(level)

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_dim_table_full_sync.py


Старый подход и обратная совместимость

Будущий отказ от Item

⚠️ В будущем нужно полностью отказаться от Item и Item.parents.

Целевая архитектура: - Вся информация должна храниться в dim_*_direct - Синхронизироваться в dim_* для использования в отчётах (полный набор классификаторов) - Модель Item и таблицы core_hierarchy_item* будут удалены

Двусторонняя синхронизация (переходный период)

На переходный период реализована двусторонняя синхронизация между dim_*_direct и Item/Item.parents:

Синхронизация dim-direct → Item

При импорте данных в dim-direct:

Реализовано: При импорте через HierarchyImporter с параметром sync_item_bridge=True автоматически создаются записи в Item и Item.parents:

importer = HierarchyImporter(sync_item_bridge=True)
importer.import_from_dataframe(level=level, df=df)
# → Автоматически создаёт Item и Item.parents

Что синхронизируется: - Базовые поля Item (shortname, description, is_active) - Родительские связи (core_hierarchy_item_parents) - Календарные поля (HorizonItem для календарных уровней)

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_item_bridge.py: TestItemBridgeBasicSync

Синхронизация при удалении из dim-direct

Реализовано: При удалении записей из dim_*_direct (через DimRecordCleaner) автоматически удаляются записи из Item и Item.parents:

# Удаление через DimRecordCleaner
cleaner = DimRecordCleaner()
cleaner.delete_marked(level=level)
# → Автоматически удаляет из Item и Item.parents

Процесс: 1. Пометка is_deleted = TRUE в dim_*_direct 2. Каскадное удаление через DimRecordCleaner 3. Автоматическое удаление из Item и Item.parents (через сигнал item_pre_delete_direct)

Синхронизация Item → dim-direct

При добавлении, изменении, удалении записей в Item и Item.parents:

Реализовано: Происходит синхронизация с dim_*_direct и dim_* через Django сигналы:

1. Создание/изменение Item: - Сигнал item_post_save_direct срабатывает при создании или изменении Item - Синхронизирует атрибуты и связи в dim_*_direct - Автоматически синхронизирует dim_* (если auto_sync=True)

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_item_signals.py: TestItemPostSaveSignal

2. Изменение Item.parents: - Сигнал item_parents_changed_direct срабатывает при изменении Item.parents - Обрабатывает действия: post_add, post_remove, post_clear - ✅ Реализовано: Синхронизация происходит при всех действиях: - post_add — добавление родителей - post_remove — удаление родителей - post_clear — очистка всех родителей

3. Удаление Item: - Сигнал item_pre_delete_direct срабатывает перед удалением Item - Выполняет каскадное удаление через ItemBridge.delete_item_cascade() - ✅ Реализовано: Удаляет из dim_*_direct и dim_* - ✅ Реализовано: Удаляет из Item и Item.parents (через каскадное удаление Django)

Синхронизация при изменении HorizonItem

⚠️ Частично реализовано:

HorizonItem наследуется от Item, поэтому изменения через Item.save() обрабатываются сигналом item_post_save_direct. Однако, если изменяются только поля HorizonItem (start_date, end_date, num) без явного вызова Item.save(), синхронизация может не сработать.

Текущая реализация: - ✅ При создании/изменении Item через item_post_save_direct синхронизируются календарные поля из HorizonItem - ⚠️ Требует проверки: Прямое изменение HorizonItem без вызова Item.save() может не триггерить синхронизацию

Рекомендация: При изменении календарных полей всегда вызывать item.save() или использовать sync_dim_direct_from_item() напрямую.

Пример:

horizon_item = HorizonItem.objects.get(id=123)
horizon_item.start_date = date(2024, 1, 1)
horizon_item.end_date = date(2024, 1, 7)
horizon_item.num = 1
horizon_item.save()  # Вызов Item.save() триггерит синхронизацию

Миграция на новый подход

⚠️ Важно: Перед использованием нового подхода

Перед использованием нового подхода (Direct Pipeline) необходимо осуществить миграцию: все данные из Item и Item.parents должны быть перенесены в dim-таблицы.

Команда миграции

Для миграции используется management команда:

python manage.py migrate_to_direct_pipeline

Параметры команды:

  • --type=<type_key> — мигрировать только уровни указанного типа (например, --type=products)
  • --level=<level_key> — мигрировать только указанный уровень (например, --level=product)
  • --dry-run — показать что будет сделано без выполнения

Примеры использования:

# Миграция всех уровней
python manage.py migrate_to_direct_pipeline

# Миграция конкретного типа
python manage.py migrate_to_direct_pipeline --type=products

# Миграция конкретного уровня
python manage.py migrate_to_direct_pipeline --level=product

# Dry-run (показать что будет сделано)
python manage.py migrate_to_direct_pipeline --dry-run

Что делает команда

1. Синхронизация структуры: - Создаёт/обновляет структуру dim_*_direct таблиц (колонки, индексы) - Создаёт/обновляет структуру dim_* таблиц (колонки, индексы)

2. Миграция контента: - Синхронизирует данные из Item в dim_*_direct - Автоматически синхронизирует dim_* (если auto_sync=True)

3. Правильный порядок: - Уровни обрабатываются в правильном порядке (от родителей к детям) - Обеспечивается корректность связей между уровнями

⚠️ Важные замечания

1. Потенциально тяжёлая операция: - При большом количестве данных миграция может занять значительное время - Рекомендуется запускать в фоне или через Celery - Можно мигрировать по частям (по типам или уровням)

2. Подконтрольное выполнение: - Используйте --dry-run для предварительной оценки - Мигрируйте по частям для контроля процесса - Проверяйте результаты после каждого этапа

3. Идемпотентность: - Команда безопасна для повторного запуска - Использует ON CONFLICT для обновления существующих записей - Проверяет существование колонок перед добавлением

4. Проверка данных: - После миграции нужно сверить данные: - Количество записей в Item и dim_*_direct должно совпадать - Связи Item.parents должны соответствовать колонкам dim_*_direct

Процесс миграции

Рекомендуемый порядок действий:

  1. Подготовка:

    # Проверка что будет сделано
    python manage.py migrate_to_direct_pipeline --dry-run
    

  2. Миграция по частям:

    # Сначала мигрируем один тип
    python manage.py migrate_to_direct_pipeline --type=products
    
    # Проверяем результаты
    # Затем мигрируем следующий тип
    

  3. Проверка:

    # Сравнение количества записей
    from planiqum.core.hierarchy.models import Item, Level
    from planiqum.core.libs.db import select_to_df
    
    level = Level.objects.get(key='product')
    item_count = Item.objects.filter(level=level).count()
    dim_count = select_to_df("SELECT COUNT(*) as cnt FROM dim_product_direct").iloc[0]['cnt']
    
    assert item_count == dim_count, f"Несоответствие: Item={item_count}, dim-direct={dim_count}"
    

  4. Полная миграция:

    # После проверки всех типов
    python manage.py migrate_to_direct_pipeline
    

См. тесты: /src/planiqum/core/hierarchy/tests/direct/test_migrate_command.py: TestMigrateToDirectPipeline


Переопределение методов синхронизации

Для обеспечения обратной совместимости и полного перехода на Direct Pipeline были переопределены ключевые методы:

Level.sync() и LevelManager.sync()

Переопределены для использования Direct Pipeline вместо старого механизма.

Что изменилось: - Level.sync() теперь использует DimDirectTableBuilder.sync_structure() и DimTableFull.sync_structure() - LevelManager.sync() использует Direct Pipeline для синхронизации всех уровней - Все существующие вызовы level.sync() и Level.objects.sync() автоматически используют новый механизм

Преимущества: - ✅ Полная обратная совместимость — не требуется обновление кода - ✅ Все синхронизации идут через Direct Pipeline - ✅ Единая точка входа для всех операций синхронизации

Пример использования (без изменений в коде):

from planiqum.core.hierarchy.models import Level

# Синхронизация одного уровня (автоматически использует Direct Pipeline)
product_level.sync()

# Синхронизация всех уровней (автоматически использует Direct Pipeline)
Level.objects.sync()

dimension_table_name()

Переопределён для использования DimTableFull.get_table_name(level).

Важно: Метод возвращает имя dim-full таблицы (dim_{key} без суффикса _full), а не dim-direct таблицы (dim_{key}_direct). Это важно для обратной совместимости — старый код ожидает имя dim-full таблицы.

Пример:

level = Level.objects.get(key='product')
table_name = level.dimension_table_name()  # Возвращает 'dim_product' (dim-full таблица)

Часто задаваемые вопросы

В чём разница между dim-direct и dim-full?

  • dim-direct (dim_*_direct) — хранит только прямых родителей, используется для хранения и изменения данных
  • dim-full (dim_*) — хранит всех предков, используется для быстрого доступа в отчётах

Нужно ли вызывать синхронизацию вручную?

При импорте через HierarchyImporter с auto_sync=True синхронизация происходит автоматически. В остальных случаях можно вызывать DimTableFull.sync_content() вручную.

Что происходит при изменении структуры иерархии?

При изменении Level.child автоматически: 1. Синхронизируется структура dim_*_direct (добавляются/удаляются колонки) 2. Синхронизируется структура dim_* (добавляются/удаляются колонки для всех предков) 3. Обновляются данные в затронутых таблицах

Можно ли использовать старый подход параллельно?

Да, на переходный период оба подхода работают параллельно благодаря двусторонней синхронизации. Однако рекомендуется как можно скорее перейти на новый подход.

Когда будет удалён ItemBridge?

ItemBridge будет удалён после полного отказа от модели Item. Это произойдёт после миграции всех данных и обновления всего кода, использующего Item.


См. также

Тесты

Все тесты для Direct Pipeline находятся в директории /src/planiqum/core/hierarchy/tests/direct/:

  • test_importer.py — тесты импорта данных через HierarchyImporter
  • test_importer_files.py — тесты импорта из CSV и XLSX файлов
  • test_table_builder.py — тесты создания и синхронизации структуры dim-direct таблиц
  • test_dim_table_full_sync.py — тесты синхронизации dim-full таблиц
  • test_item_bridge.py — тесты синхронизации с Item и Item.parents
  • test_item_signals.py — тесты сигналов для Item
  • test_level_signals.py — тесты сигналов для Level (изменение структуры иерархии)
  • test_cascade_deletion.py — тесты каскадного удаления записей
  • test_migrate_command.py — тесты команды миграции
  • test_import_session.py — тесты журналов импорта
  • test_log_partitioning.py — тесты партиционирования журналов импорта

Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию ЮНИК СОФТ