Перейти к содержанию

Временная синхронизация с Item/HorizonItem (ItemBridge)

⚠️ ВНИМАНИЕ: Весь функционал, описанный в этом разделе, является временным и будет удалён после полного перехода на Direct Pipeline и отказа от модели Item.

Обзор

Модуль ItemBridge обеспечивает двустороннюю синхронизацию между dim_*_direct таблицами и устаревшими таблицами: - core_hierarchy_item (базовые поля элементов) - core_hierarchy_item_parents (родительские связи) - core_hierarchy_horizonitem (календарные поля)

Архитектура

Класс ItemBridge

Расположение: /src/planiqum/core/hierarchy/libs/direct/item_bridge.py: ItemBridge

Назначение: Координация синхронизации между dim_*_direct и Item-таблицами.

Celery-задача для асинхронной синхронизации

Расположение: /src/planiqum/core/hierarchy/tasks.py: sync_item_from_dim_direct_task

Параметры задачи: - Очередь: low_priority — не блокирует критичные операции - Приоритет: 1 (низкий, где 0 — высший) - Retry: До 3 попыток с интервалом 60 секунд - Binding: bind=True — доступ к self для retry

Fallback механизм:

Если Celery недоступен (например, в среде разработки или при ошибке), синхронизация автоматически выполняется синхронно с предупреждением в логе:

# В importer.py
try:
    # Попытка асинхронного запуска через Celery
    task = sync_item_from_dim_direct_task.apply_async(args=[level.id], priority=1)
    logger.info(f"Синхронизация запланирована асинхронно (task_id={task.id})")
except (ImportError, Exception) as e:
    # Fallback: синхронное выполнение
    logger.warning(f"Не удалось запустить асинхронно. Выполняем синхронно.")
    stats = sync_items_from_dim_direct(level)

Направления синхронизации

  1. dim_*_direct → Item (основное, реализовано)
  2. Автоматически запускается после импорта асинхронно через Celery
  3. Использует флаги update_item, update_parents, update_horizon
  4. Выполняется в отдельном процессе с низким приоритетом
  5. При недоступности Celery — fallback на синхронное выполнение

  6. Item → dim_*_direct (TODO, заглушка)

  7. Для обратной совместимости
  8. Пока не реализовано

Синхронизация dim_*_direct → Item

Главный метод

def sync_items_from_dim_direct(level: Level) -> Dict[str, int]:
    """
    Синхронизирует изменения из dim_*_direct в Item, item_parents и HorizonItem.

    ⚠️ ВРЕМЕННЫЙ метод - будет удалён после отказа от Item.
    """

Процесс синхронизации

Синхронизация разделена на три независимых этапа:

def sync_items_from_dim_direct(self, level: Level) -> Dict[str, int]:
    stats = {'items_created': 0, 'items_updated': 0, 
             'parents_updated': 0, 'horizons_updated': 0}

    # 1. Синхронизация базовых полей
    part_stats = self._sync_item_rows(level, table_name)
    stats['items_created'] += part_stats['items_created']
    stats['items_updated'] += part_stats['items_updated']

    # 2. Синхронизация родительских связей
    part_stats = self._sync_item_parents(level, table_name)
    stats['parents_updated'] += part_stats['parents_updated']

    # 3. Синхронизация календарных полей (только для is_calendar=True)
    if level.is_calendar:
        part_stats = self._sync_horizon_fields(level, table_name)
        stats['horizons_updated'] += part_stats['horizons_updated']

    return stats

1. Синхронизация core_hierarchy_item

Метод: _sync_item_rows(level: Level, table_name: str)

Что синхронизируется: - id — совпадает с ID в dim_*_direct - shortname — краткое имя - description — описание - is_active — активность - level_id — связь с уровнем иерархии

Реализация:

def _sync_item_rows(self, level: Level, table_name: str) -> Dict[str, int]:
    """Массово синхронизирует базовые поля в core_hierarchy_item."""

    item_table = Item._meta.db_table
    upsert_sql = f"""
        INSERT INTO {item_table} (id, shortname, description, is_active, is_new, level_id)
        SELECT 
            dim.id,
            dim.shortname,
            dim.description,
            dim.is_active,
            FALSE as is_new,  -- Явно сбрасываем флаг is_new
            {level.id} as level_id
        FROM {table_name} dim
        WHERE dim.update_item = TRUE

        ON CONFLICT (id) DO UPDATE
        SET 
            shortname = EXCLUDED.shortname,
            description = EXCLUDED.description,
            is_active = EXCLUDED.is_active,
            is_new = EXCLUDED.is_new,
            level_id = EXCLUDED.level_id
    """

    with connection.cursor() as cursor:
        cursor.execute(upsert_sql)
        affected = cursor.rowcount

    # Сброс флага после синхронизации
    execute_query(f"UPDATE {table_name} SET update_item = FALSE WHERE update_item = TRUE")

    return {'items_created': affected, 'items_updated': 0}

Ключевые моменты: - Массовая операция UPSERT (INSERT ... ON CONFLICT DO UPDATE) - is_new принудительно устанавливается в FALSE - После синхронизации флаг update_item сбрасывается

2. Синхронизация core_hierarchy_item_parents

Метод: _sync_item_parents(level: Level, table_name: str)

Что синхронизируется: - Связи элементов с их родителями - Добавление новых связей - Обновление существующих связей - Удаление устаревших связей

Алгоритм:

Для каждого родительского уровня (level.get_children()):
  1. DELETE — удалить связи, которых нет в dim_*_direct
  2. UPDATE — обновить существующие связи (если изменился родитель)
  3. INSERT — добавить новые связи

Реализация:

def _sync_item_parents(self, level: Level, table_name: str) -> Dict[str, int]:
    """
    Массово синхронизирует родительские связи в core_hierarchy_item_parents.

    Обрабатывает уровень за уровнем (родительский):
    1. DELETE — удаляем связи, которых больше нет
    2. UPDATE — обновляем изменённые связи
    3. INSERT — добавляем новые связи
    """
    stats = {'parents_updated': 0}

    # Получаем список прямых родительских уровней
    parent_levels = list(level.get_children())

    if not parent_levels:
        return stats

    parents_table = 'core_hierarchy_item_parents'

    for parent_level in parent_levels:
        parent_field = self.table_builder.get_field_name(parent_level)

        # 1️⃣ DELETE: Удаляем связи, которых нет в dim_*_direct
        delete_sql = f"""
            DELETE FROM {parents_table}
            WHERE from_item_id IN (
                SELECT id FROM {table_name} 
                WHERE update_parents = TRUE AND {parent_field} IS NULL
            )
            AND to_item_id IN (
                SELECT id FROM core_hierarchy_item WHERE level_id = {parent_level.id}
            )
        """
        with connection.cursor() as cursor:
            cursor.execute(delete_sql)
            deleted = cursor.rowcount

        # 2️⃣ UPDATE: Обновляем существующие связи
        update_sql = f"""
            UPDATE {parents_table} p
            SET to_item_id = dim.{parent_field}
            FROM {table_name} dim
            WHERE p.from_item_id = dim.id
              AND dim.update_parents = TRUE
              AND dim.{parent_field} IS NOT NULL
              AND p.to_item_id IN (
                  SELECT id FROM core_hierarchy_item WHERE level_id = {parent_level.id}
              )
              AND p.to_item_id != dim.{parent_field}
        """
        with connection.cursor() as cursor:
            cursor.execute(update_sql)
            updated = cursor.rowcount

        # 3️⃣ INSERT: Добавляем новые связи
        insert_sql = f"""
            INSERT INTO {parents_table} (from_item_id, to_item_id)
            SELECT dim.id, dim.{parent_field}
            FROM {table_name} dim
            WHERE dim.update_parents = TRUE
              AND dim.{parent_field} IS NOT NULL
              AND NOT EXISTS (
                  SELECT 1 FROM {parents_table} p
                  WHERE p.from_item_id = dim.id
                    AND p.to_item_id IN (
                        SELECT id FROM core_hierarchy_item WHERE level_id = {parent_level.id}
                    )
              )
        """
        with connection.cursor() as cursor:
            cursor.execute(insert_sql)
            inserted = cursor.rowcount

        stats['parents_updated'] += deleted + updated + inserted

    # Сброс флага после синхронизации
    execute_query(f"UPDATE {table_name} SET update_parents = FALSE WHERE update_parents = TRUE")

    return stats

Важные особенности: 1. Уровень за уровнем: Обрабатываем каждого родителя отдельно 2. Массовые операции: Никаких циклов на Python 3. DELETE вместо SET NULL: Связь полностью удаляется, если родитель стал NULL 4. Фильтрация по level_id: Гарантируем, что обновляем связи только для нужного родительского уровня

3. Синхронизация core_hierarchy_horizonitem

Метод: _sync_horizon_fields(level: Level, table_name: str)

Когда вызывается: Только для календарных уровней (level.is_calendar = True)

Что синхронизируется: - num — порядковый номер периода - start_date — дата начала - end_date — дата окончания

Реализация:

def _sync_horizon_fields(self, level: Level, table_name: str) -> Dict[str, int]:
    """Массово обновляет календарные поля в core_hierarchy_horizonitem."""

    stats = {'horizons_updated': 0}

    if not level.is_calendar:
        return stats

    horizon_table = HorizonItem._meta.db_table
    upsert_sql = f"""
        INSERT INTO {horizon_table} (item_ptr_id, num, start_date, end_date, period)
        SELECT 
            dim.id as item_ptr_id, 
            dim.num, 
            dim.start_date, 
            dim.end_date, 
            NULL as period
        FROM {table_name} dim
        WHERE dim.update_horizon = TRUE

        ON CONFLICT (item_ptr_id) DO UPDATE
        SET 
            num = EXCLUDED.num, 
            start_date = EXCLUDED.start_date, 
            end_date = EXCLUDED.end_date
    """

    with connection.cursor() as cursor:
        cursor.execute(upsert_sql)
        affected_rows = cursor.rowcount

    stats['horizons_updated'] = affected_rows

    # Сброс флага после синхронизации
    execute_query(f"UPDATE {table_name} SET update_horizon = FALSE WHERE update_horizon = TRUE")

    return stats

Ключевые моменты: - Работает только с календарными уровнями - Поле period остаётся NULL (не используется в проекте) - Поддерживает установку календарных полей в NULL

Примеры использования

Пример 1: Базовый импорт и синхронизация

from planiqum.core.hierarchy.libs.direct import HierarchyImporter
import pandas as pd

# Импорт данных
importer = HierarchyImporter()
brand_df = pd.DataFrame([
    ['brand_a', 'Brand A'],
    ['brand_b', 'Brand B'],
], columns=['shortname', 'description'])

importer.import_from_dataframe(level=brand_level, df=brand_df)

# Синхронизация произошла автоматически!
# Проверяем результат
from planiqum.core.hierarchy.models import Item
items = Item.objects.filter(level=brand_level)
print(f"Синхронизировано элементов: {items.count()}")

Пример 2: Импорт с родителями

# Сначала импортируем родителей
brand_df = pd.DataFrame([['brand_a', 'Brand A']], columns=['shortname', 'description'])
importer.import_from_dataframe(level=brand_level, df=brand_df)

# Затем импортируем элементы с родителями
product_df = pd.DataFrame([
    ['prod1', 'Product 1', 'brand_a'],
], columns=['shortname', 'description', 'brand'])
importer.import_from_dataframe(level=product_level, df=product_df)

# Проверяем связи
from planiqum.core.hierarchy.models import Item
prod = Item.objects.get(shortname='prod1')
print(f"Родители: {list(prod.parents.values_list('shortname', flat=True))}")
# Результат: ['brand_a']

Пример 3: Импорт календарных данных

week_df = pd.DataFrame([
    ['2024-W01', 'Week 1', '2024-01-01', '2024-01-07', 1],
    ['2024-W02', 'Week 2', '2024-01-08', '2024-01-14', 2],
], columns=['shortname', 'description', 'start_date', 'end_date', 'num'])

importer.import_from_dataframe(level=week_level, df=week_df)

# Проверяем календарные поля
from planiqum.core.horizons.models import HorizonItem
weeks = HorizonItem.objects.filter(level=week_level)
for week in weeks:
    print(f"{week.shortname}: {week.start_date} - {week.end_date}, num={week.num}")

Пример 4: Изменение родителей

# Первый импорт
product_df = pd.DataFrame([
    ['prod1', 'Product 1', 'brand_a']
], columns=['shortname', 'description', 'brand'])
importer.import_from_dataframe(level=product_level, df=product_df)

# Изменяем родителя
product_df = pd.DataFrame([
    ['prod1', 'Product 1', 'brand_b']  # Новый родитель!
], columns=['shortname', 'description', 'brand'])
importer.import_from_dataframe(level=product_level, df=product_df)

# Связь в core_hierarchy_item_parents обновлена автоматически

Флаги синхронизации

update_item

Когда устанавливается: - При любом UPSERT в dim_*_direct (всегда TRUE)

Что инициирует: - Синхронизацию id, shortname, description, is_active, level_id в core_hierarchy_item

Сброс: - После успешной синхронизации в _sync_item_rows

update_parents

Когда устанавливается: - При изменении любого поля родителя (dim_brand, dim_category, и т.д.)

Что инициирует: - Синхронизацию связей в core_hierarchy_item_parents - Обработку DELETE/UPDATE/INSERT для родительских связей

Сброс: - После успешной синхронизации в _sync_item_parents

update_horizon

Когда устанавливается: - При изменении start_date, end_date или num - Только для календарных уровней

Что инициирует: - Синхронизацию календарных полей в core_hierarchy_horizonitem

Сброс: - После успешной синхронизации в _sync_horizon_fields

Асинхронное выполнение через Celery

Автоматический запуск после импорта

После успешного импорта в dim_*_direct таблицу синхронизация запускается автоматически и асинхронно:

# В HierarchyImporter._import_from_temp_table
if added_count + updated_count > 0:
    self._schedule_item_sync(level)  # Асинхронный запуск

Логирование

При асинхронном запуске:

INFO: brand: Синхронизация Item запланирована асинхронно 
      (временный механизм, task_id=2cfc8742-db43-4258-a2eb-8100fcd11f4d)
INFO: [Celery] Начало синхронизации Item для уровня 'brand' (level_id=123)
INFO: [Celery] Синхронизация Item для 'brand' завершена успешно: 
      создано 5, обновлено 3, связей 10, календарных 0
INFO: Task hierarchy.sync_item_from_dim_direct[2cfc8742...] succeeded in 0.044s

При fallback на синхронное выполнение:

WARNING: brand: Не удалось запустить асинхронную синхронизацию Item 
         (причина: [Errno 111] Connection refused). Выполняем синхронно.
INFO: brand: Синхронизация с Item завершена (синхронно): 
      создано 5, обновлено 3, связей 10, календарных 0

Мониторинг Celery-задач

Проверить статус задачи можно через Celery:

from celery.result import AsyncResult

task_id = "2cfc8742-db43-4258-a2eb-8100fcd11f4d"
result = AsyncResult(task_id)

print(f"Статус: {result.status}")  # SUCCESS, PENDING, FAILURE
print(f"Результат: {result.result}")  # {'level_key': 'brand', 'stats': {...}}

Тестирование асинхронной синхронизации

В тестовом окружении Django настроен параметр CELERY_TASK_ALWAYS_EAGER = True, который заставляет Celery-задачи выполняться синхронно, но весь код задачи работает реально.

Это позволяет тестам проверять фактические изменения в БД:

def test_import_syncs_to_item_table(self, product, brand):
    """Тест проверяет что после импорта элементы доступны в Item."""
    importer = HierarchyImporter()

    # Импорт запускает Celery-задачу, но она выполняется синхронно в тесте
    importer.import_from_dataframe(level=brand, df=brand_df)

    # Проверяем реальные изменения в БД
    items = Item.objects.filter(level=brand)
    assert items.count() == 2  # Задача выполнилась, данные синхронизированы

Производительность

Массовые операции

Все операции синхронизации выполняются массово: - Один UPSERT для всех элементов с update_item = TRUE - Один DELETE, один UPDATE, один INSERT для каждого родительского уровня - Нет циклов на уровне Python

Асинхронное выполнение

Синхронизация Item выполняется в отдельном процессе Celery с низким приоритетом: - Не блокирует основной процесс импорта - Не мешает критичной синхронизации dim_*_full таблиц - Автоматические повторы при временных ошибках

Частичные индексы

Для оптимизации выборки элементов для синхронизации используются частичные индексы:

CREATE INDEX idx_dim_product_direct_update_item 
ON dim_product_direct(update_item) 
WHERE update_item = TRUE;

Транзакционность

Вся синхронизация выполняется в одной транзакции (@transaction.atomic).

Замечания по миграции

⚠️ Этот модуль является временным.

План отказа от ItemBridge: 1. Перенести все операции на прямую работу с dim_*_direct и dim_*_full 2. Удалить все вызовы sync_items_from_dim_direct 3. Удалить поля update_item, update_parents, update_horizon из dim_*_direct 4. Удалить класс ItemBridge и модуль item_bridge.py 5. Удалить модель Item и связанные таблицы

См. также


Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию ЮНИК СОФТ