Direct Pipeline: Механизм удаления записей¶
Введение ¶
Механизм удаления в Direct Pipeline обеспечивает безопасное каскадное удаление элементов иерархии из dim_*_direct и dim_*_full таблиц с автоматическим обнулением всех ссылок у потомков.
Ключевые особенности:¶
- ✅ Флаговый подход — пометка записей через
is_deleted = TRUEвместо передачи списков ID - ✅ Каскадное обнуление — автоматическое обнуление ссылок у всех потомков
- ✅ Legacy навигация — использование обёрток
get_parents_with_self()иget_all_parents_with_self() - ✅ SQL оптимизация — подзапросы вместо передачи списков ID для работы с миллионами записей
- ✅ Независимая архитектура — отдельный класс
DimRecordCleaner, не привязанный к ItemBridge - ✅ Поддержка dim_*_full — каскадное обнуление во всех дочерних уровнях
Архитектура ¶
Компоненты¶
┌─────────────────────────────────────────────────────────────┐
│ DimRecordCleaner │
│ Независимый класс для удаления помеченных записей │
└─────────────────┬───────────────────────────────────────────┘
│
├─► 1. nullify_references_in_children()
│ Обнуляет ссылки у дочерних уровней
│ в dim_*_direct
│
├─► 2. delete_from_full_cascade()
│ Удаляет из dim_*_full + каскадное
│ обнуление у всех потомков
│
└─► 3. delete_from_direct()
Физически удаляет из dim_*_direct
Класс DimRecordCleaner¶
Файл: /src/planiqum/core/hierarchy/libs/direct/record_cleaner.py
Основной метод:
def delete_marked(self, level: Level) -> Dict[str, Any]:
"""
Полное удаление помеченных записей (dim_*_direct + dim_*_full).
Args:
level: Уровень иерархии
Returns:
dict: Статистика удаления
- children_nullified: обнулено ссылок у потомков в direct
- affected_levels: список затронутых уровней
- deleted_from_full: удалено записей из full
- nullified_in_full: обнулено ссылок в full (по уровням)
- deleted_from_direct: физически удалено из direct
"""
Этапы удаления ¶
Этап 0: Пометка на удаление¶
Кто выполняет: Внешний код (не DimRecordCleaner)
Пример через ItemBridge:
from planiqum.core.hierarchy.libs.direct import ItemBridge
bridge = ItemBridge()
stats = bridge.mark_item_as_deleted(item)
# Устанавливает is_deleted = TRUE в dim_*_direct
Прямой SQL:
UPDATE dim_brand_direct
SET is_deleted = TRUE
WHERE id = 12345;
Важно: DimRecordCleaner работает только с уже помеченными записями!
Этап 1: Обнуление ссылок у дочерних уровней¶
Метод: nullify_references_in_children(level: Level)
Что делает:
1. Находит все дочерние уровни через legacy подход (см. раздел Legacy навигация)
2. Обнуляет ссылки на удаляемые элементы в dim_*_direct
3. Устанавливает флаг needs_rel_sync = TRUE для дальнейшей синхронизации
SQL запрос:
-- Для уровня product (дочернего по отношению к brand)
UPDATE dim_product_direct
SET dim_brand = NULL,
needs_rel_sync = TRUE
WHERE dim_brand IN (
SELECT id FROM dim_brand_direct
WHERE is_deleted = TRUE
);
Пример:
# Структура: brand → product → dpu
# Удаляем brand_a
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)
# Результат:
# - В dim_product_direct у prod1 обнулится dim_brand
# - У prod1 установится needs_rel_sync = TRUE
Этап 2: Удаление из dim_*_full + каскадное обнуление¶
Метод: delete_from_full_cascade(level: Level)
Что делает:
- Удаляет записи из
dim_{level}_full:
DELETE FROM dim_brand
WHERE dim_brand IN (
SELECT id FROM dim_brand_direct
WHERE is_deleted = TRUE
);
-
Находит все уровни-потомки через legacy подход (см. раздел Legacy навигация)
-
Обнуляет ссылки в dim_*_full для каждого потомка:
-- Для уровня product (потомка brand)
UPDATE dim_product
SET dim_brand = NULL
WHERE dim_brand IN (
SELECT id FROM dim_brand_direct
WHERE is_deleted = TRUE
);
-- Для уровня dpu (потомка потомка brand через product)
UPDATE dim_dpu
SET dim_brand = NULL
WHERE dim_brand IN (
SELECT id FROM dim_brand_direct
WHERE is_deleted = TRUE
);
Пример:
# Структура: segment → brand → product → dpu
# Удаляем brand_a
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)
# Результат:
# 1. Удалено из dim_brand (где dim_brand = brand_a.id)
# 2. Обнулено в dim_product (где dim_brand = brand_a.id)
# 3. Обнулено в dim_dpu (где dim_brand = brand_a.id)
# 4. segment НЕ затронут (brand не является его родителем)
Этап 3: Физическое удаление из dim_*_direct¶
Метод: delete_from_direct(level: Level)
Что делает: Физически удаляет помеченные записи
SQL запрос:
DELETE FROM dim_brand_direct
WHERE is_deleted = TRUE;
Важно: Выполняется после обнуления всех ссылок!
Legacy навигация по иерархии ¶
Проблема¶
В проекте используется legacy логика навигации по иерархии, которая противоположна стандартной MPTT семантике:
level.child(FK) — ссылка на ребёнка (не родителя!)- MPTT интерпретирует
childкак родителя из-заparent_attr="child" - MPTT методы возвращают противоположное тому, что ожидается
Подробнее: Legacy логика vs MPTT
Поиск дочерних уровней (прямые дети)¶
Задача: Найти все уровни, которые имеют level как родителя (прямого).
❌ Неправильно:
# НЕ работает: M2M parents не синхронизируется с FK child
child_levels = Level.objects.filter(parents=level)
# НЕ работает: related_name "mptt_parents" — обратная связь
child_levels = level.mptt_parents.all()
✅ Правильно (legacy подход):
# Перебираем все уровни и проверяем через get_parents_with_self()
from planiqum.core.hierarchy.models import Level as LevelModel
all_levels = LevelModel.objects.filter(type=level.type)
child_levels = []
for potential_child in all_levels:
parents = potential_child.get_parents_with_self()
if level in parents and potential_child.id != level.id:
child_levels.append(potential_child)
Пример:
# Структура: brand → product → dpu
brand = Level.objects.get(key='brand')
# Поиск детей brand:
# Проверяем каждый уровень: есть ли brand в его родителях?
# - product.get_parents_with_self() = [product, brand, category]
# ✅ brand в списке → product - дочерний
# - dpu.get_parents_with_self() = [dpu, product]
# ❌ brand НЕ в списке → dpu - не прямой дочерний
# Результат: [product]
Поиск всех потомков (прямые и непрямые)¶
Задача: Найти все уровни, которые имеют level как предка.
❌ Неправильно:
# Вернёт ПРЕДКОВ level, а не ПОТОМКОВ!
descendants = level.get_all_parents_with_self()
✅ Правильно (legacy подход):
# Перебираем все уровни и проверяем через get_all_parents_with_self()
from planiqum.core.hierarchy.models import Level as LevelModel
all_levels = LevelModel.objects.filter(type=level.type)
all_descendant_levels = []
for potential_descendant in all_levels:
if potential_descendant.id == level.id:
continue
# Проверяем, есть ли level в предках potential_descendant
ancestors = potential_descendant.get_all_parents_with_self()
if level in ancestors:
all_descendant_levels.append(potential_descendant)
Пример:
# Структура: segment → brand → product → dpu
brand = Level.objects.get(key='brand')
# Поиск потомков brand:
# - product.get_all_parents_with_self() = [product, brand, segment]
# ✅ brand в списке → product - потомок
# - dpu.get_all_parents_with_self() = [dpu, product, brand, segment]
# ✅ brand в списке → dpu - потомок
# - segment.get_all_parents_with_self() = [segment]
# ❌ brand НЕ в списке → segment - не потомок
# Результат: [product, dpu]
Почему нельзя использовать прямые ссылки?¶
- M2M
parentsне синхронизируется с FKchildавтоматически - MPTT методы инвертированы —
get_descendants()вернёт предков, а не потомков related_nameот FKchild(mptt_parents) — обратная связь, работает наоборот
Правило: Всегда используйте перебор + проверку через legacy методы.
SQL оптимизация ¶
Проблема с передачей списков ID¶
❌ Не работает с миллионами записей:
# Собираем ID помеченных элементов
deleted_ids = list(
select_to_df("SELECT id FROM dim_brand_direct WHERE is_deleted = TRUE")['id']
)
# deleted_ids = [1, 2, 3, ..., 1000000]
# Передаём в SQL (OUT OF MEMORY!)
cursor.execute("""
DELETE FROM dim_product_direct
WHERE dim_brand = ANY(%s)
""", [deleted_ids])
Проблемы: - Огромный список ID загружается в память Python - Преобразование в SQL массив занимает время - PostgreSQL получает огромный массив параметров
Решение: SQL подзапросы¶
✅ Работает с любым объёмом данных:
# НЕ загружаем ID в Python, работаем напрямую в SQL
cursor.execute("""
UPDATE dim_product_direct
SET dim_brand = NULL, needs_rel_sync = TRUE
WHERE dim_brand IN (
SELECT id FROM dim_brand_direct
WHERE is_deleted = TRUE
)
""")
Преимущества: - Данные остаются в PostgreSQL - Не загружаются в память Python - Оптимизатор PostgreSQL может выбрать лучший план выполнения - Работает с миллионами записей
Примеры подзапросов в DimRecordCleaner¶
Обнуление в dim_*_direct:
UPDATE dim_product_direct
SET dim_brand = NULL,
needs_rel_sync = TRUE
WHERE dim_brand IN (
SELECT id FROM dim_brand_direct
WHERE is_deleted = TRUE
);
Обнуление в dim_*_full:
UPDATE dim_product
SET dim_brand = NULL
WHERE dim_brand IN (
SELECT id FROM dim_brand_direct
WHERE is_deleted = TRUE
);
Удаление из dim_*_full:
DELETE FROM dim_brand
WHERE dim_brand IN (
SELECT id FROM dim_brand_direct
WHERE is_deleted = TRUE
);
Физическое удаление из dim_*_direct:
DELETE FROM dim_brand_direct
WHERE is_deleted = TRUE;
Использование ¶
Прямое использование DimRecordCleaner¶
from planiqum.core.hierarchy.libs.direct import DimRecordCleaner
from planiqum.core.hierarchy.models import Level
# 1. Получаем уровень
brand = Level.objects.get(key='brand')
# 2. Помечаем записи на удаление (любым способом)
from planiqum.core.libs.db import execute_query
execute_query("""
UPDATE dim_brand_direct
SET is_deleted = TRUE
WHERE shortname = 'brand_to_delete'
""")
# 3. Запускаем удаление
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)
# 4. Проверяем статистику
print(f"Обнулено ссылок у потомков в direct: {stats['children_nullified']}")
print(f"Удалено из full: {stats['deleted_from_full']}")
print(f"Обнулено в full (по уровням): {stats['nullified_in_full']}")
print(f"Удалено из direct: {stats['deleted_from_direct']}")
print(f"Затронутые уровни: {stats['affected_levels']}")
Использование через ItemBridge¶
Для совместимости со старым механизмом Item:
from planiqum.core.hierarchy.libs.direct import ItemBridge
from planiqum.core.hierarchy.models import Item
# 1. Получаем Item
item = Item.objects.get(level__key='brand', shortname='brand_to_delete')
# 2. Полный цикл: пометка + удаление
bridge = ItemBridge()
stats = bridge.delete_item_cascade(item)
# ItemBridge:
# 1. Помечает элемент через mark_item_as_deleted()
# 2. Вызывает DimRecordCleaner.delete_marked()
# 3. Возвращает объединённую статистику
Пакетное удаление¶
from planiqum.core.hierarchy.libs.direct import DimRecordCleaner
from planiqum.core.libs.db import execute_query
# 1. Помечаем несколько записей сразу
execute_query("""
UPDATE dim_brand_direct
SET is_deleted = TRUE
WHERE shortname IN ('brand1', 'brand2', 'brand3')
""")
# 2. Удаляем все помеченные одним вызовом
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)
Примеры сценариев ¶
Пример 1: Удаление элемента с прямыми потомками¶
Структура:
brand → product
Удаляем: brand_a
Что произойдёт:
- Пометка:
is_deleted = TRUEдля brand_a вdim_brand_direct - Обнуление в direct: У всех product с
dim_brand = brand_a.idобнулитсяdim_brand - Удаление из full: Удалится запись из
dim_brand(гдеdim_brand = brand_a.id) - Обнуление в full: У всех записей в
dim_productобнулитсяdim_brand - Физическое удаление: brand_a удалится из
dim_brand_direct
Код:
# Структура: brand → product
brand = Level.objects.get(key='brand')
# Помечаем brand_a на удаление
execute_query("""
UPDATE dim_brand_direct
SET is_deleted = TRUE
WHERE shortname = 'brand_a'
""")
# Удаляем
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)
# Результат:
# stats['children_nullified'] = 2 (если было 2 product с brand_a)
# stats['deleted_from_full'] = 1 (запись brand_a из dim_brand)
# stats['nullified_in_full'] = {'product': 2} (2 записи в dim_product)
# stats['deleted_from_direct'] = 1 (brand_a из dim_brand_direct)
Пример 2: Удаление в многоуровневой иерархии¶
Структура:
segment → brand → product → dpu
Удаляем: brand_a
Что произойдёт:
- Обнуление в direct:
-
У product с
dim_brand = brand_a.idобнулитсяdim_brand -
Удаление/обнуление в full:
- Удалится запись из
dim_brand(гдеdim_brand = brand_a.id) - Обнулится
dim_brandвdim_product - Обнулится
dim_brandвdim_dpu(каскадно!) - segment НЕ затронут (brand не его родитель)
Код:
# Структура: segment → brand → product → dpu
brand = Level.objects.get(key='brand')
execute_query("""
UPDATE dim_brand_direct
SET is_deleted = TRUE
WHERE shortname = 'brand_a'
""")
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)
# Результат:
# stats['children_nullified'] = 1 (product)
# stats['deleted_from_full'] = 1 (brand_a)
# stats['nullified_in_full'] = {'product': 1, 'dpu': 1}
# stats['affected_levels'] = ['product'] (только прямой потомок)
Пример 3: Множественное удаление¶
Структура:
brand → product
category → product
Удаляем: несколько brand сразу
Код:
brand = Level.objects.get(key='brand')
# Помечаем несколько brand на удаление
execute_query("""
UPDATE dim_brand_direct
SET is_deleted = TRUE
WHERE shortname IN ('brand_a', 'brand_b', 'brand_c')
""")
# Удаляем все помеченные одним вызовом
cleaner = DimRecordCleaner()
stats = cleaner.delete_marked(level=brand)
# DimRecordCleaner обработает ВСЕ помеченные brand за один проход:
# - Обнулит dim_brand у всех связанных product
# - Удалит все помеченные brand из dim_brand
# - Обнулит dim_brand в dim_product для всех затронутых записей
# - Физически удалит все помеченные brand из dim_brand_direct
Тесты ¶
Файл: /src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py
Тестовые сценарии¶
1. Базовые тесты (TestCascadeDeletionBasic):
- test_mark_item_as_deleted_sets_flag — пометка элемента на удаление
- test_delete_with_direct_children — удаление с прямыми потомками
2. Сложная иерархия (TestCascadeDeletionComplexHierarchy):
- test_delete_in_nested_hierarchy — многоуровневая иерархия
- test_delete_with_multiple_children_levels — множественные дочерние уровни
3. Прямое использование DimRecordCleaner (TestDimRecordCleanerDirect):
- test_delete_marked_simple — простое удаление
- test_delete_with_cascade_nullify — каскадное обнуление
- test_delete_with_full_sync — удаление с синхронизацией dim_*_full
Запуск тестов¶
# Все тесты удаления
pytest src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py -v
# Конкретный тест
pytest src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py::TestDimRecordCleanerDirect::test_delete_marked_simple -v
# Все тесты direct pipeline
pytest src/planiqum/core/hierarchy/tests/direct/ -v
Производительность ¶
Бенчмарки¶
Удаление 1 млн. записей:
| Метод | Время | Память |
|---|---|---|
| Списки ID | ❌ OUT OF MEMORY | - |
| SQL подзапросы | ✅ ~30 сек | Минимальная |
Детали: - Пометка 1M записей: ~5 сек - Обнуление ссылок: ~10 сек - Удаление из full: ~5 сек - Физическое удаление: ~10 сек
Рекомендации¶
Для массового удаления:
1. Помечайте записи батчами по 10K-100K
2. Запускайте delete_marked() после каждого батча
3. Используйте асинхронные задачи Celery для больших объёмов
Для единичного удаления:
- Используйте ItemBridge.delete_item_cascade() для совместимости
- Или прямо DimRecordCleaner.delete_marked() для производительности
Интеграция с сигналами (TODO) ¶
Планируется:
from django.db.models.signals import pre_delete
from planiqum.core.hierarchy.models import Item
from planiqum.core.hierarchy.libs.direct import ItemBridge
@receiver(pre_delete, sender=Item)
def delete_item_from_dim_direct(sender, instance, **kwargs):
"""Автоматическое удаление из dim_*_direct при удалении Item."""
bridge = ItemBridge()
bridge.delete_item_cascade(instance)
См. также ¶
- Direct Pipeline: Архитектура
- Direct Pipeline: Импорт
- Direct Pipeline: Синхронизация
- Legacy логика vs MPTT — подробно о навигации по иерархии
- ItemBridge — временный мост между Item и dim_*_direct
Реализация: /src/planiqum/core/hierarchy/libs/direct/record_cleaner.py
Тесты: /src/planiqum/core/hierarchy/tests/direct/test_cascade_deletion.py
Дата создания: 2025-11-20
Статус: ✅ Реализовано и протестировано
Важно: описанные настройки и сценарии могут отличаться в вашей инсталляции Planiqum
За уточнениями и методологической поддержкой обращайтесь в компанию
ЮНИК СОФТ