- etlclasses.py - датаклассы необходимые для работы ETL модуля
- etldecorator.py - декораторы coroutine, backoff и функция some_sleep
- etlsettings.py - загрузка и валидация настроек с помощью pydantic
- etlpostgres.py - работа с базой postgress
- etlredis.py - работа с Redis
- esindex.py - индекс для elastic
- etlelastic.py - работа с elastic
- etlproducer.py - Выборка изменишихся фильмов из postgress и загрузка id в Redis
- etlconsumer.py - Загрузка данных в elastic
- stop.py - остановка процессов etlproducer и etlconsumer
Ограничение пачки данных задоется через конфиг (ETL_SIZE_LIMIT).
-
Первый скрипт (etproducer.py) выбирает последовательно из всех таблиц новые или изменившиеся записи, на основании этих записей формируем ID изменишихся фильмов.
-
Отдельная выбираем изменишиеся фильмы (основная таблица с фильмами).
-
Если изменения в таблице участников фильмов, то выбираем все связанные фильмы.
-
Если изменения в таблице жанров, выбираем все связанные фильмы.
-
Если изменения в таблице типыфильмов, выбираем все связанные фильмы. В результате работы первого этапа, в Redis обновляется список фильмов которые надо обновить (или загрузить с нуля) в elastic.
-
Далее (etlconsumer.py) из базы postgres, выбираем все изменишиеся фильмы с теми данными которые нам необходимы для загрузки в ES, потом загружаем их пачкой в elastic.
Принцип хранения состояний в Redis (через конфиг вводим префикс для всех ключей REDIS_PREFIX, и добавляем его ко всем ключам). В Redis по каждой из таблиц, записываем время последней выбранной записи. Таблица ожидающих фильмов ораганизована как очередь в Redis.