Перейти к содержанию

Direct Pipeline: Механизм удаления записей

Введение

Механизм удаления в Direct Pipeline обеспечивает безопасное каскадное удаление элементов иерархии из dim_*_direct и dim_*_full таблиц с автоматическим обнулением всех ссылок у потомков.

Ключевые особенности:

  • Флаговый подход — пометка записей через is_deleted = TRUE вместо передачи списков ID
  • Каскадное обнуление — автоматическое обнуление ссылок у всех потомков
  • Legacy навигация — использование обёрток get_parents_with_self() и get_all_parents_with_self()
  • SQL оптимизация — подзапросы вместо передачи списков ID для работы с миллионами записей
  • Независимая архитектура — отдельный класс DimRecordCleaner, не привязанный к ItemBridge
  • Поддержка dim_*_full — каскадное обнуление во всех дочерних уровнях

Архитектура

Компоненты

┌─────────────────────────────────────────────────────────────┐
│                      DimRecordCleaner                       │
│  Независимый класс для удаления помеченных записей          │
└─────────────────┬───────────────────────────────────────────┘
                  │
                  ├─► 1. nullify_references_in_children()
                  │    Обнуляет ссылки у дочерних уровней
                  │    в dim_*_direct
                  │
                  ├─► 2. delete_from_full_cascade()
                  │    Удаляет из dim_*_full + каскадное
                  │    обнуление у всех потомков
                  │
                  └─► 3. delete_from_direct()
                       Физически удаляет из dim_*_direct

Класс DimRecordCleaner

Файл: /src/planiqum/core/hierarchy/libs/direct/record_cleaner.py

Основной метод:

def delete_marked(self, level: Level) -> Dict[str, Any]:
    """
    Полное удаление помеченных записей (dim_*_direct + dim_*_full).

    Args:
        level: Уровень иерархии

    Returns:
        dict: Статистика удаления
            - children_nullified: обнулено ссылок у потомков в direct
            - affected_levels: список затронутых уровней
            - deleted_from_full: удалено записей из full
            - nullified_in_full: обнулено ссылок в full (по уровням)
            - deleted_from_direct: физически удалено из direct
    """

Этапы удаления

Этап 0: Пометка на удаление

Кто выполняет: Внешний код (не DimRecordCleaner)

Пример через ItemBridge:

from planiqum.core.hierarchy.libs.direct import ItemBridge

bridge = ItemBridge()
stats = bridge.mark_item_as_deleted(item)
# Устанавливает is_deleted = TRUE в dim_*_direct

Прямой SQL:

UPDATE dim_brand_direct 
SET is_deleted = TRUE 
WHERE id = 12345;

Важно: DimRecordCleaner работает только с уже помеченными записями!

Этап 1: Обнуление ссылок у дочерних уровней

Метод: nullify_references_in_children(level: Level)

Что делает: 1. Находит все дочерние уровни через legacy подход (см. раздел Legacy навигация) 2. Обнуляет ссылки на удаляемые элементы в dim_*_direct 3. Устанавливает флаг needs_rel_sync = TRUE для дальнейшей синхронизации

SQL запрос:

-- Для уровня product (дочернего по отношению к brand)
UPDATE dim_product_direct
SET dim_brand = NULL,
    needs_rel_sync = TRUE
WHERE dim_brand IN (
    SELECT id FROM dim_brand_direct
    WHERE is_deleted = TRUE
);

Пример:

# Структура: brand → product → dpu
# Удаляем brand_a

cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)

# Результат:
# - В dim_product_direct у prod1 обнулится dim_brand
# - У prod1 установится needs_rel_sync = TRUE

Этап 2: Удаление из dim_*_full + каскадное обнуление

Метод: delete_from_full_cascade(level: Level)

Что делает:

  1. Удаляет записи из dim_{level}_full:
DELETE FROM dim_brand 
WHERE dim_brand IN (
    SELECT id FROM dim_brand_direct 
    WHERE is_deleted = TRUE
);
  1. Находит все уровни-потомки через legacy подход (см. раздел Legacy навигация)

  2. Обнуляет ссылки в dim_*_full для каждого потомка:

-- Для уровня product (потомка brand)
UPDATE dim_product 
SET dim_brand = NULL
WHERE dim_brand IN (
    SELECT id FROM dim_brand_direct 
    WHERE is_deleted = TRUE
);

-- Для уровня dpu (потомка потомка brand через product)
UPDATE dim_dpu 
SET dim_brand = NULL
WHERE dim_brand IN (
    SELECT id FROM dim_brand_direct 
    WHERE is_deleted = TRUE
);

Пример:

# Структура: segment → brand → product → dpu
# Удаляем brand_a

cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)

# Результат:
# 1. Удалено из dim_brand (где dim_brand = brand_a.id)
# 2. Обнулено в dim_product (где dim_brand = brand_a.id)
# 3. Обнулено в dim_dpu (где dim_brand = brand_a.id)
# 4. segment НЕ затронут (brand не является его родителем)

Этап 3: Физическое удаление из dim_*_direct

Метод: delete_from_direct(level: Level)

Что делает: Физически удаляет помеченные записи

SQL запрос:

DELETE FROM dim_brand_direct 
WHERE is_deleted = TRUE;

Важно: Выполняется после обнуления всех ссылок!


Legacy навигация по иерархии

Проблема

В проекте используется legacy логика навигации по иерархии, которая противоположна стандартной MPTT семантике:

  • level.child (FK) — ссылка на ребёнка (не родителя!)
  • MPTT интерпретирует child как родителя из-за parent_attr="child"
  • MPTT методы возвращают противоположное тому, что ожидается

Подробнее: Legacy логика vs MPTT

Поиск дочерних уровней (прямые дети)

Задача: Найти все уровни, которые имеют level как родителя (прямого).

❌ Неправильно:

# НЕ работает: M2M parents не синхронизируется с FK child
child_levels = Level.objects.filter(parents=level)

# НЕ работает: related_name "mptt_parents" — обратная связь
child_levels = level.mptt_parents.all()

✅ Правильно (legacy подход):

# Перебираем все уровни и проверяем через get_parents_with_self()
from planiqum.core.hierarchy.models import Level as LevelModel

all_levels = LevelModel.objects.filter(type=level.type)
child_levels = []
for potential_child in all_levels:
    parents = potential_child.get_parents_with_self()
    if level in parents and potential_child.id != level.id:
        child_levels.append(potential_child)

Пример:

# Структура: brand → product → dpu
brand = Level.objects.get(key='brand')

# Поиск детей brand:
# Проверяем каждый уровень: есть ли brand в его родителях?
# - product.get_parents_with_self() = [product, brand, category]
#   ✅ brand в списке → product - дочерний
# - dpu.get_parents_with_self() = [dpu, product]
#   ❌ brand НЕ в списке → dpu - не прямой дочерний
# Результат: [product]

Поиск всех потомков (прямые и непрямые)

Задача: Найти все уровни, которые имеют level как предка.

❌ Неправильно:

# Вернёт ПРЕДКОВ level, а не ПОТОМКОВ!
descendants = level.get_all_parents_with_self()

✅ Правильно (legacy подход):

# Перебираем все уровни и проверяем через get_all_parents_with_self()
from planiqum.core.hierarchy.models import Level as LevelModel

all_levels = LevelModel.objects.filter(type=level.type)
all_descendant_levels = []
for potential_descendant in all_levels:
    if potential_descendant.id == level.id:
        continue
    # Проверяем, есть ли level в предках potential_descendant
    ancestors = potential_descendant.get_all_parents_with_self()
    if level in ancestors:
        all_descendant_levels.append(potential_descendant)

Пример:

# Структура: segment → brand → product → dpu
brand = Level.objects.get(key='brand')

# Поиск потомков brand:
# - product.get_all_parents_with_self() = [product, brand, segment]
#   ✅ brand в списке → product - потомок
# - dpu.get_all_parents_with_self() = [dpu, product, brand, segment]
#   ✅ brand в списке → dpu - потомок
# - segment.get_all_parents_with_self() = [segment]
#   ❌ brand НЕ в списке → segment - не потомок
# Результат: [product, dpu]

Почему нельзя использовать прямые ссылки?

  1. M2M parents не синхронизируется с FK child автоматически
  2. MPTT методы инвертированыget_descendants() вернёт предков, а не потомков
  3. related_name от FK child (mptt_parents) — обратная связь, работает наоборот

Правило: Всегда используйте перебор + проверку через legacy методы.


SQL оптимизация

Проблема с передачей списков ID

❌ Не работает с миллионами записей:

# Собираем ID помеченных элементов
deleted_ids = list(
    select_to_df("SELECT id FROM dim_brand_direct WHERE is_deleted = TRUE")['id']
)
# deleted_ids = [1, 2, 3, ..., 1000000]

# Передаём в SQL (OUT OF MEMORY!)
cursor.execute("""
    DELETE FROM dim_product_direct
    WHERE dim_brand = ANY(%s)
""", [deleted_ids])

Проблемы: - Огромный список ID загружается в память Python - Преобразование в SQL массив занимает время - PostgreSQL получает огромный массив параметров

Решение: SQL подзапросы

✅ Работает с любым объёмом данных:

# НЕ загружаем ID в Python, работаем напрямую в SQL
cursor.execute("""
    UPDATE dim_product_direct
    SET dim_brand = NULL, needs_rel_sync = TRUE
    WHERE dim_brand IN (
        SELECT id FROM dim_brand_direct
        WHERE is_deleted = TRUE
    )
""")

Преимущества: - Данные остаются в PostgreSQL - Не загружаются в память Python - Оптимизатор PostgreSQL может выбрать лучший план выполнения - Работает с миллионами записей

Примеры подзапросов в DimRecordCleaner

Обнуление в dim_*_direct:

UPDATE dim_product_direct
SET dim_brand = NULL,
    needs_rel_sync = TRUE
WHERE dim_brand IN (
    SELECT id FROM dim_brand_direct
    WHERE is_deleted = TRUE
);

Обнуление в dim_*_full:

UPDATE dim_product
SET dim_brand = NULL
WHERE dim_brand IN (
    SELECT id FROM dim_brand_direct
    WHERE is_deleted = TRUE
);

Удаление из dim_*_full:

DELETE FROM dim_brand
WHERE dim_brand IN (
    SELECT id FROM dim_brand_direct
    WHERE is_deleted = TRUE
);

Физическое удаление из dim_*_direct:

DELETE FROM dim_brand_direct
WHERE is_deleted = TRUE;

Использование

Прямое использование DimRecordCleaner

from planiqum.core.hierarchy.libs.direct import DimRecordCleaner
from planiqum.core.hierarchy.models import Level

# 1. Получаем уровень
brand = Level.objects.get(key='brand')

# 2. Помечаем записи на удаление (любым способом)
from planiqum.core.libs.db import execute_query
execute_query("""
    UPDATE dim_brand_direct 
    SET is_deleted = TRUE 
    WHERE shortname = 'brand_to_delete'
""")

# 3. Запускаем удаление
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)

# 4. Проверяем статистику
print(f"Обнулено ссылок у потомков в direct: {stats['children_nullified']}")
print(f"Удалено из full: {stats['deleted_from_full']}")
print(f"Обнулено в full (по уровням): {stats['nullified_in_full']}")
print(f"Удалено из direct: {stats['deleted_from_direct']}")
print(f"Затронутые уровни: {stats['affected_levels']}")

Использование через ItemBridge

Для совместимости со старым механизмом Item:

from planiqum.core.hierarchy.libs.direct import ItemBridge
from planiqum.core.hierarchy.models import Item

# 1. Получаем Item
item = Item.objects.get(level__key='brand', shortname='brand_to_delete')

# 2. Полный цикл: пометка + удаление
bridge = ItemBridge()
stats = bridge.delete_item_cascade(item)

# ItemBridge:
# 1. Помечает элемент через mark_item_as_deleted()
# 2. Вызывает DimRecordCleaner.delete_marked()
# 3. Возвращает объединённую статистику

Пакетное удаление

from planiqum.core.hierarchy.libs.direct import DimRecordCleaner
from planiqum.core.libs.db import execute_query

# 1. Помечаем несколько записей сразу
execute_query("""
    UPDATE dim_brand_direct 
    SET is_deleted = TRUE 
    WHERE shortname IN ('brand1', 'brand2', 'brand3')
""")

# 2. Удаляем все помеченные одним вызовом
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)

Примеры сценариев

Пример 1: Удаление элемента с прямыми потомками

Структура:

brand → product

Удаляем: brand_a

Что произойдёт:

  1. Пометка: is_deleted = TRUE для brand_a в dim_brand_direct
  2. Обнуление в direct: У всех product с dim_brand = brand_a.id обнулится dim_brand
  3. Удаление из full: Удалится запись из dim_brand (где dim_brand = brand_a.id)
  4. Обнуление в full: У всех записей в dim_product обнулится dim_brand
  5. Физическое удаление: brand_a удалится из dim_brand_direct

Код:

# Структура: brand → product
brand = Level.objects.get(key='brand')

# Помечаем brand_a на удаление
execute_query("""
    UPDATE dim_brand_direct 
    SET is_deleted = TRUE 
    WHERE shortname = 'brand_a'
""")

# Удаляем
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)

# Результат:
# stats['children_nullified'] = 2  (если было 2 product с brand_a)
# stats['deleted_from_full'] = 1   (запись brand_a из dim_brand)
# stats['nullified_in_full'] = {'product': 2}  (2 записи в dim_product)
# stats['deleted_from_direct'] = 1  (brand_a из dim_brand_direct)

Пример 2: Удаление в многоуровневой иерархии

Структура:

segment → brand → product → dpu

Удаляем: brand_a

Что произойдёт:

  1. Обнуление в direct:
  2. У product с dim_brand = brand_a.id обнулится dim_brand

  3. Удаление/обнуление в full:

  4. Удалится запись из dim_brand (где dim_brand = brand_a.id)
  5. Обнулится dim_brand в dim_product
  6. Обнулится dim_brand в dim_dpu (каскадно!)
  7. segment НЕ затронут (brand не его родитель)

Код:

# Структура: segment → brand → product → dpu
brand = Level.objects.get(key='brand')

execute_query("""
    UPDATE dim_brand_direct 
    SET is_deleted = TRUE 
    WHERE shortname = 'brand_a'
""")

cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)

# Результат:
# stats['children_nullified'] = 1  (product)
# stats['deleted_from_full'] = 1   (brand_a)
# stats['nullified_in_full'] = {'product': 1, 'dpu': 1}
# stats['affected_levels'] = ['product']  (только прямой потомок)

Пример 3: Множественное удаление

Структура:

brand → product
category → product

Удаляем: несколько brand сразу

Код:

brand = Level.objects.get(key='brand')

# Помечаем несколько brand на удаление
execute_query("""
    UPDATE dim_brand_direct 
    SET is_deleted = TRUE 
    WHERE shortname IN ('brand_a', 'brand_b', 'brand_c')
""")

# Удаляем все помеченные одним вызовом
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)

# DimRecordCleaner обработает ВСЕ помеченные brand за один проход:
# - Обнулит dim_brand у всех связанных product
# - Удалит все помеченные brand из dim_brand
# - Обнулит dim_brand в dim_product для всех затронутых записей
# - Физически удалит все помеченные brand из dim_brand_direct

Тесты

Файл: /src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py

Тестовые сценарии

1. Базовые тесты (TestCascadeDeletionBasic): - test_mark_item_as_deleted_sets_flag — пометка элемента на удаление - test_delete_with_direct_children — удаление с прямыми потомками

2. Сложная иерархия (TestCascadeDeletionComplexHierarchy): - test_delete_in_nested_hierarchy — многоуровневая иерархия - test_delete_with_multiple_children_levels — множественные дочерние уровни

3. Прямое использование DimRecordCleaner (TestDimRecordCleanerDirect): - test_delete_marked_simple — простое удаление - test_delete_with_cascade_nullify — каскадное обнуление - test_delete_with_full_sync — удаление с синхронизацией dim_*_full

Запуск тестов

# Все тесты удаления
pytest src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py -v

# Конкретный тест
pytest src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py::TestDimRecordCleanerDirect::test_delete_marked_simple -v

# Все тесты direct pipeline
pytest src/planiqum/core/hierarchy/tests/direct/ -v

Производительность

Бенчмарки

Удаление 1 млн. записей:

Метод Время Память
Списки ID ❌ OUT OF MEMORY -
SQL подзапросы ✅ ~30 сек Минимальная

Детали: - Пометка 1M записей: ~5 сек - Обнуление ссылок: ~10 сек - Удаление из full: ~5 сек - Физическое удаление: ~10 сек

Рекомендации

Для массового удаления: 1. Помечайте записи батчами по 10K-100K 2. Запускайте delete_marked() после каждого батча 3. Используйте асинхронные задачи Celery для больших объёмов

Для единичного удаления: - Используйте ItemBridge.delete_item_cascade() для совместимости - Или прямо DimRecordCleaner.delete_marked() для производительности


Интеграция с сигналами (TODO)

Планируется:

from django.db.models.signals import pre_delete
from planiqum.core.hierarchy.models import Item
from planiqum.core.hierarchy.libs.direct import ItemBridge

@receiver(pre_delete, sender=Item)
def delete_item_from_dim_direct(sender, instance, **kwargs):
    """Автоматическое удаление из dim_*_direct при удалении Item."""
    bridge = ItemBridge()
    bridge.delete_item_cascade(instance)

См. также


Реализация: /src/planiqum/core/hierarchy/libs/direct/record_cleaner.py

Тесты: /src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py

Дата создания: 2025-11-20

Статус: ✅ Реализовано и протестировано


Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию ЮНИК СОФТ