Временная синхронизация с Item/HorizonItem (ItemBridge)¶
⚠️ ВНИМАНИЕ: Весь функционал, описанный в этом разделе, является временным и будет удалён после полного перехода на Direct Pipeline и отказа от модели Item.
Обзор¶
Модуль ItemBridge обеспечивает двустороннюю синхронизацию между dim_*_direct таблицами и устаревшими таблицами:
- core_hierarchy_item (базовые поля элементов)
- core_hierarchy_item_parents (родительские связи)
- core_hierarchy_horizonitem (календарные поля)
Архитектура¶
Класс ItemBridge¶
Расположение: /src/planiqum/core/hierarchy/libs/direct/item_bridge.py: ItemBridge
Назначение: Координация синхронизации между dim_*_direct и Item-таблицами.
Celery-задача для асинхронной синхронизации¶
Расположение: /src/planiqum/core/hierarchy/tasks.py: sync_item_from_dim_direct_task
Параметры задачи:
- Очередь: low_priority — не блокирует критичные операции
- Приоритет: 1 (низкий, где 0 — высший)
- Retry: До 3 попыток с интервалом 60 секунд
- Binding: bind=True — доступ к self для retry
Fallback механизм:
Если Celery недоступен (например, в среде разработки или при ошибке), синхронизация автоматически выполняется синхронно с предупреждением в логе:
# В importer.py
try:
# Попытка асинхронного запуска через Celery
task = sync_item_from_dim_direct_task.apply_async(args=[level.id], priority=1)
logger.info(f"Синхронизация запланирована асинхронно (task_id={task.id})")
except (ImportError, Exception) as e:
# Fallback: синхронное выполнение
logger.warning(f"Не удалось запустить асинхронно. Выполняем синхронно.")
stats = sync_items_from_dim_direct(level)
Направления синхронизации¶
- dim_*_direct → Item (основное, реализовано)
- Автоматически запускается после импорта асинхронно через Celery
- Использует флаги
update_item,update_parents,update_horizon - Выполняется в отдельном процессе с низким приоритетом
-
При недоступности Celery — fallback на синхронное выполнение
-
Item → dim_*_direct (TODO, заглушка)
- Для обратной совместимости
- Пока не реализовано
Синхронизация dim_*_direct → Item¶
Главный метод¶
def sync_items_from_dim_direct(level: Level) -> Dict[str, int]:
"""
Синхронизирует изменения из dim_*_direct в Item, item_parents и HorizonItem.
⚠️ ВРЕМЕННЫЙ метод - будет удалён после отказа от Item.
"""
Процесс синхронизации¶
Синхронизация разделена на три независимых этапа:
def sync_items_from_dim_direct(self, level: Level) -> Dict[str, int]:
stats = {'items_created': 0, 'items_updated': 0,
'parents_updated': 0, 'horizons_updated': 0}
# 1. Синхронизация базовых полей
part_stats = self._sync_item_rows(level, table_name)
stats['items_created'] += part_stats['items_created']
stats['items_updated'] += part_stats['items_updated']
# 2. Синхронизация родительских связей
part_stats = self._sync_item_parents(level, table_name)
stats['parents_updated'] += part_stats['parents_updated']
# 3. Синхронизация календарных полей (только для is_calendar=True)
if level.is_calendar:
part_stats = self._sync_horizon_fields(level, table_name)
stats['horizons_updated'] += part_stats['horizons_updated']
return stats
1. Синхронизация core_hierarchy_item¶
Метод: _sync_item_rows(level: Level, table_name: str)
Что синхронизируется:
- id — совпадает с ID в dim_*_direct
- shortname — краткое имя
- description — описание
- is_active — активность
- level_id — связь с уровнем иерархии
Реализация:
def _sync_item_rows(self, level: Level, table_name: str) -> Dict[str, int]:
"""Массово синхронизирует базовые поля в core_hierarchy_item."""
item_table = Item._meta.db_table
upsert_sql = f"""
INSERT INTO {item_table} (id, shortname, description, is_active, is_new, level_id)
SELECT
dim.id,
dim.shortname,
dim.description,
dim.is_active,
FALSE as is_new, -- Явно сбрасываем флаг is_new
{level.id} as level_id
FROM {table_name} dim
WHERE dim.update_item = TRUE
ON CONFLICT (id) DO UPDATE
SET
shortname = EXCLUDED.shortname,
description = EXCLUDED.description,
is_active = EXCLUDED.is_active,
is_new = EXCLUDED.is_new,
level_id = EXCLUDED.level_id
"""
with connection.cursor() as cursor:
cursor.execute(upsert_sql)
affected = cursor.rowcount
# Сброс флага после синхронизации
execute_query(f"UPDATE {table_name} SET update_item = FALSE WHERE update_item = TRUE")
return {'items_created': affected, 'items_updated': 0}
Ключевые моменты:
- Массовая операция UPSERT (INSERT ... ON CONFLICT DO UPDATE)
- is_new принудительно устанавливается в FALSE
- После синхронизации флаг update_item сбрасывается
2. Синхронизация core_hierarchy_item_parents¶
Метод: _sync_item_parents(level: Level, table_name: str)
Что синхронизируется: - Связи элементов с их родителями - Добавление новых связей - Обновление существующих связей - Удаление устаревших связей
Алгоритм:
Для каждого родительского уровня (level.get_children()):
1. DELETE — удалить связи, которых нет в dim_*_direct
2. UPDATE — обновить существующие связи (если изменился родитель)
3. INSERT — добавить новые связи
Реализация:
def _sync_item_parents(self, level: Level, table_name: str) -> Dict[str, int]:
"""
Массово синхронизирует родительские связи в core_hierarchy_item_parents.
Обрабатывает уровень за уровнем (родительский):
1. DELETE — удаляем связи, которых больше нет
2. UPDATE — обновляем изменённые связи
3. INSERT — добавляем новые связи
"""
stats = {'parents_updated': 0}
# Получаем список прямых родительских уровней
parent_levels = list(level.get_children())
if not parent_levels:
return stats
parents_table = 'core_hierarchy_item_parents'
for parent_level in parent_levels:
parent_field = self.table_builder.get_field_name(parent_level)
# 1️⃣ DELETE: Удаляем связи, которых нет в dim_*_direct
delete_sql = f"""
DELETE FROM {parents_table}
WHERE from_item_id IN (
SELECT id FROM {table_name}
WHERE update_parents = TRUE AND {parent_field} IS NULL
)
AND to_item_id IN (
SELECT id FROM core_hierarchy_item WHERE level_id = {parent_level.id}
)
"""
with connection.cursor() as cursor:
cursor.execute(delete_sql)
deleted = cursor.rowcount
# 2️⃣ UPDATE: Обновляем существующие связи
update_sql = f"""
UPDATE {parents_table} p
SET to_item_id = dim.{parent_field}
FROM {table_name} dim
WHERE p.from_item_id = dim.id
AND dim.update_parents = TRUE
AND dim.{parent_field} IS NOT NULL
AND p.to_item_id IN (
SELECT id FROM core_hierarchy_item WHERE level_id = {parent_level.id}
)
AND p.to_item_id != dim.{parent_field}
"""
with connection.cursor() as cursor:
cursor.execute(update_sql)
updated = cursor.rowcount
# 3️⃣ INSERT: Добавляем новые связи
insert_sql = f"""
INSERT INTO {parents_table} (from_item_id, to_item_id)
SELECT dim.id, dim.{parent_field}
FROM {table_name} dim
WHERE dim.update_parents = TRUE
AND dim.{parent_field} IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM {parents_table} p
WHERE p.from_item_id = dim.id
AND p.to_item_id IN (
SELECT id FROM core_hierarchy_item WHERE level_id = {parent_level.id}
)
)
"""
with connection.cursor() as cursor:
cursor.execute(insert_sql)
inserted = cursor.rowcount
stats['parents_updated'] += deleted + updated + inserted
# Сброс флага после синхронизации
execute_query(f"UPDATE {table_name} SET update_parents = FALSE WHERE update_parents = TRUE")
return stats
Важные особенности: 1. Уровень за уровнем: Обрабатываем каждого родителя отдельно 2. Массовые операции: Никаких циклов на Python 3. DELETE вместо SET NULL: Связь полностью удаляется, если родитель стал NULL 4. Фильтрация по level_id: Гарантируем, что обновляем связи только для нужного родительского уровня
3. Синхронизация core_hierarchy_horizonitem¶
Метод: _sync_horizon_fields(level: Level, table_name: str)
Когда вызывается: Только для календарных уровней (level.is_calendar = True)
Что синхронизируется:
- num — порядковый номер периода
- start_date — дата начала
- end_date — дата окончания
Реализация:
def _sync_horizon_fields(self, level: Level, table_name: str) -> Dict[str, int]:
"""Массово обновляет календарные поля в core_hierarchy_horizonitem."""
stats = {'horizons_updated': 0}
if not level.is_calendar:
return stats
horizon_table = HorizonItem._meta.db_table
upsert_sql = f"""
INSERT INTO {horizon_table} (item_ptr_id, num, start_date, end_date, period)
SELECT
dim.id as item_ptr_id,
dim.num,
dim.start_date,
dim.end_date,
NULL as period
FROM {table_name} dim
WHERE dim.update_horizon = TRUE
ON CONFLICT (item_ptr_id) DO UPDATE
SET
num = EXCLUDED.num,
start_date = EXCLUDED.start_date,
end_date = EXCLUDED.end_date
"""
with connection.cursor() as cursor:
cursor.execute(upsert_sql)
affected_rows = cursor.rowcount
stats['horizons_updated'] = affected_rows
# Сброс флага после синхронизации
execute_query(f"UPDATE {table_name} SET update_horizon = FALSE WHERE update_horizon = TRUE")
return stats
Ключевые моменты:
- Работает только с календарными уровнями
- Поле period остаётся NULL (не используется в проекте)
- Поддерживает установку календарных полей в NULL
Примеры использования¶
Пример 1: Базовый импорт и синхронизация¶
from planiqum.core.hierarchy.libs.direct import HierarchyImporter
import pandas as pd
# Импорт данных
importer = HierarchyImporter()
brand_df = pd.DataFrame([
['brand_a', 'Brand A'],
['brand_b', 'Brand B'],
], columns=['shortname', 'description'])
importer.import_from_dataframe(level=brand_level, df=brand_df)
# Синхронизация произошла автоматически!
# Проверяем результат
from planiqum.core.hierarchy.models import Item
items = Item.objects.filter(level=brand_level)
print(f"Синхронизировано элементов: {items.count()}")
Пример 2: Импорт с родителями¶
# Сначала импортируем родителей
brand_df = pd.DataFrame([['brand_a', 'Brand A']], columns=['shortname', 'description'])
importer.import_from_dataframe(level=brand_level, df=brand_df)
# Затем импортируем элементы с родителями
product_df = pd.DataFrame([
['prod1', 'Product 1', 'brand_a'],
], columns=['shortname', 'description', 'brand'])
importer.import_from_dataframe(level=product_level, df=product_df)
# Проверяем связи
from planiqum.core.hierarchy.models import Item
prod = Item.objects.get(shortname='prod1')
print(f"Родители: {list(prod.parents.values_list('shortname', flat=True))}")
# Результат: ['brand_a']
Пример 3: Импорт календарных данных¶
week_df = pd.DataFrame([
['2024-W01', 'Week 1', '2024-01-01', '2024-01-07', 1],
['2024-W02', 'Week 2', '2024-01-08', '2024-01-14', 2],
], columns=['shortname', 'description', 'start_date', 'end_date', 'num'])
importer.import_from_dataframe(level=week_level, df=week_df)
# Проверяем календарные поля
from planiqum.core.horizons.models import HorizonItem
weeks = HorizonItem.objects.filter(level=week_level)
for week in weeks:
print(f"{week.shortname}: {week.start_date} - {week.end_date}, num={week.num}")
Пример 4: Изменение родителей¶
# Первый импорт
product_df = pd.DataFrame([
['prod1', 'Product 1', 'brand_a']
], columns=['shortname', 'description', 'brand'])
importer.import_from_dataframe(level=product_level, df=product_df)
# Изменяем родителя
product_df = pd.DataFrame([
['prod1', 'Product 1', 'brand_b'] # Новый родитель!
], columns=['shortname', 'description', 'brand'])
importer.import_from_dataframe(level=product_level, df=product_df)
# Связь в core_hierarchy_item_parents обновлена автоматически
Флаги синхронизации¶
update_item¶
Когда устанавливается:
- При любом UPSERT в dim_*_direct (всегда TRUE)
Что инициирует:
- Синхронизацию id, shortname, description, is_active, level_id в core_hierarchy_item
Сброс:
- После успешной синхронизации в _sync_item_rows
update_parents¶
Когда устанавливается:
- При изменении любого поля родителя (dim_brand, dim_category, и т.д.)
Что инициирует:
- Синхронизацию связей в core_hierarchy_item_parents
- Обработку DELETE/UPDATE/INSERT для родительских связей
Сброс:
- После успешной синхронизации в _sync_item_parents
update_horizon¶
Когда устанавливается:
- При изменении start_date, end_date или num
- Только для календарных уровней
Что инициирует:
- Синхронизацию календарных полей в core_hierarchy_horizonitem
Сброс:
- После успешной синхронизации в _sync_horizon_fields
Асинхронное выполнение через Celery¶
Автоматический запуск после импорта¶
После успешного импорта в dim_*_direct таблицу синхронизация запускается автоматически и асинхронно:
# В HierarchyImporter._import_from_temp_table
if added_count + updated_count > 0:
self._schedule_item_sync(level) # Асинхронный запуск
Логирование¶
При асинхронном запуске:
INFO: brand: Синхронизация Item запланирована асинхронно
(временный механизм, task_id=2cfc8742-db43-4258-a2eb-8100fcd11f4d)
INFO: [Celery] Начало синхронизации Item для уровня 'brand' (level_id=123)
INFO: [Celery] Синхронизация Item для 'brand' завершена успешно:
создано 5, обновлено 3, связей 10, календарных 0
INFO: Task hierarchy.sync_item_from_dim_direct[2cfc8742...] succeeded in 0.044s
При fallback на синхронное выполнение:
WARNING: brand: Не удалось запустить асинхронную синхронизацию Item
(причина: [Errno 111] Connection refused). Выполняем синхронно.
INFO: brand: Синхронизация с Item завершена (синхронно):
создано 5, обновлено 3, связей 10, календарных 0
Мониторинг Celery-задач¶
Проверить статус задачи можно через Celery:
from celery.result import AsyncResult
task_id = "2cfc8742-db43-4258-a2eb-8100fcd11f4d"
result = AsyncResult(task_id)
print(f"Статус: {result.status}") # SUCCESS, PENDING, FAILURE
print(f"Результат: {result.result}") # {'level_key': 'brand', 'stats': {...}}
Тестирование асинхронной синхронизации¶
В тестовом окружении Django настроен параметр CELERY_TASK_ALWAYS_EAGER = True, который заставляет Celery-задачи выполняться синхронно, но весь код задачи работает реально.
Это позволяет тестам проверять фактические изменения в БД:
def test_import_syncs_to_item_table(self, product, brand):
"""Тест проверяет что после импорта элементы доступны в Item."""
importer = HierarchyImporter()
# Импорт запускает Celery-задачу, но она выполняется синхронно в тесте
importer.import_from_dataframe(level=brand, df=brand_df)
# Проверяем реальные изменения в БД
items = Item.objects.filter(level=brand)
assert items.count() == 2 # Задача выполнилась, данные синхронизированы
Производительность¶
Массовые операции¶
Все операции синхронизации выполняются массово:
- Один UPSERT для всех элементов с update_item = TRUE
- Один DELETE, один UPDATE, один INSERT для каждого родительского уровня
- Нет циклов на уровне Python
Асинхронное выполнение¶
Синхронизация Item выполняется в отдельном процессе Celery с низким приоритетом:
- Не блокирует основной процесс импорта
- Не мешает критичной синхронизации dim_*_full таблиц
- Автоматические повторы при временных ошибках
Частичные индексы¶
Для оптимизации выборки элементов для синхронизации используются частичные индексы:
CREATE INDEX idx_dim_product_direct_update_item
ON dim_product_direct(update_item)
WHERE update_item = TRUE;
Транзакционность¶
Вся синхронизация выполняется в одной транзакции (@transaction.atomic).
Замечания по миграции¶
⚠️ Этот модуль является временным.
План отказа от ItemBridge:
1. Перенести все операции на прямую работу с dim_*_direct и dim_*_full
2. Удалить все вызовы sync_items_from_dim_direct
3. Удалить поля update_item, update_parents, update_horizon из dim_*_direct
4. Удалить класс ItemBridge и модуль item_bridge.py
5. Удалить модель Item и связанные таблицы
См. также¶
- Direct Pipeline: Руководство для разработчиков
- Архитектура dim_*_direct
- Импорт данных
- Синхронизация dim_*_full
Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию
ЮНИК СОФТ