Данный проект связан с финтех-стартапом, который предлагает международные банковские услуги через приложение. Через приложение стартапа пользователи могут безопасно переводить деньги в разные страны. Компания ведёт учёт транзакционной активности клиентов внутри и между странами: разработан единый протокол передачи данных, который обеспечивает одинаковую структуру таблиц во всех странах. В стартапе реализованы три варианта размещения данных:
- в S3;
- в PostgreSQL;
- через Spark Streaming в Kafka.
Команда аналитиков просит собрать данные по транзакционной активности пользователей и настроить обновление таблицы с курсом валют. Цель — понять, как выглядит динамика оборота всей компании и что приводит к его изменениям. Для достижения цели требуется выбрать, какую инфраструктуру использовать. В зависимости от выбора инфраструктуры требуется решить, как выполнить каждый шаг проекта.
Анализ проекта¶
Предварительный анализ данных проекта¶
Таблица public.transactions¶
Данные public.transactions содержат в себе информацию о движении денежных средств между клиентами в разных валютах.
Структура данных:
operation_id— id транзакции;account_number_from— внутренний бухгалтерский номер счёта транзакции ОТ КОГО;account_number_to— внутренний бухгалтерский номер счёта транзакции К КОМУ;currency_code— трёхзначный код валюты страны, из которой идёт транзакция;country— страна-источник транзакции;status— статус проведения транзакции: queued («транзакция в очереди на обработку сервисом»), in_progress («транзакция в обработке»), blocked («транзакция заблокирована сервисом»), done («транзакция выполнена успешно»), chargeback («пользователь осуществил возврат по транзакции»).transaction_type— тип транзакции во внутреннем учёте:authorisation(«авторизационная транзакция, подтверждающая наличие счёта пользователя»),sbp_incoming(«входящий перевод по системе быстрых платежей»),sbp_outgoing(«исходящий перевод по системе быстрых платежей»),transfer_incoming(«входящий перевод по счёту»),transfer_outgoing(«исходящий перевод по счёту»),c2b_partner_incoming(«перевод от юридического лица»),c2b_partner_outgoing(«перевод юридическому лицу»).amount— целочисленная сумма транзакции в минимальной единице валюты страны (копейка, цент, куруш);transaction_dt— дата и время исполнения транзакции до миллисекунд.
Пример данных в таблице при их проверке селектом:
SELECT *
FROM public.transactions
LIMIT 10;
Рзультат:
| operation_id | account_number_from | account_number_to | currency_code | country | status | transaction_type | amount | transaction_dt |
|---|---|---|---|---|---|---|---|---|
| 7f9922e8-48b8-4097-9ff6-3eab131e80d9 | 914810 | 4454155 | 430 | russia | queued | sbp_incoming | 230900 | 2022-10-01 00:00:02.000 |
| e8793d0b-a562-4bcf-8c04-155792f3ec93 | 914810 | 5371058 | 430 | russia | queued | sbp_incoming | 62000 | 2022-10-01 00:00:02.000 |
| 2e5f9d2e-2798-4527-a416-69b2f87c7289 | 914810 | 6633598 | 470 | turkey | queued | sbp_incoming | 30000 | 2022-10-01 00:00:04.000 |
| b83232d1-b194-48e9-b361-ec7cc7326ce5 | 903810 | 1027227 | 430 | russia | queued | c2b_partner_incoming | 100000 | 2022-10-01 00:00:06.000 |
| cb5ba138-55bc-415b-81e2-ee8c5b89f444 | 903810 | 3934590 | 470 | turkey | queued | sbp_incoming | 75000 | 2022-10-01 00:00:06.000 |
| 97a3ac98-1330-40fa-b074-c85fcaf3c201 | 914810 | 975167 | 460 | england | queued | sbp_incoming | 110000 | 2022-10-01 00:00:09.000 |
| 3a89a9d7-6b3a-4e38-b6ab-8eef7f120be3 | 914810 | 2516410 | 450 | china | queued | authorization | -5100 | 2022-10-01 00:00:10.000 |
| f116380a-4d34-4f8a-af66-5da7512314c9 | 903810 | 8115531 | 430 | russia | queued | sbp_incoming | 20000 | 2022-10-01 00:00:11.000 |
| e8793d0b-a562-4bcf-8c04-155792f3ec93 | 914810 | 5371058 | 430 | russia | in_progress | sbp_incoming | 62000 | 2022-10-01 00:00:14.000 |
| 7f9922e8-48b8-4097-9ff6-3eab131e80d9 | 914810 | 4454155 | 430 | russia | in_progress | sbp_incoming | 230900 | 2022-10-01 00:00:14.000 |
Статистика данных в таблице:
SELECT 'operation_id' AS поле,
COUNT(operation_id) AS количество_значений,
NULL AS максимум,
NULL AS минимум,
NULL AS среднее,
NULL AS медиана,
SUM(CASE WHEN account_number_from IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'account_number_from' AS поле,
COUNT(account_number_from) AS количество_значений,
MAX(account_number_from) AS максимум,
MIN(account_number_from) AS минимум,
AVG(account_number_from) AS среднее,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY account_number_from) AS медиана,
SUM(CASE WHEN account_number_from IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'ccount_number_to' AS поле,
COUNT(account_number_to) AS количество_значений,
MAX(account_number_to) AS максимум,
MIN(account_number_to) AS минимум,
AVG(account_number_to) AS среднее,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY account_number_to) AS медиана,
SUM(CASE WHEN account_number_to IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'currency_code' AS поле,
COUNT(currency_code) AS количество_значений,
MAX(currency_code) AS максимум,
MIN(currency_code) AS минимум,
AVG(currency_code) AS среднее,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_code) AS медиана,
SUM(CASE WHEN currency_code IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'currency_code' AS поле,
COUNT(currency_code) AS количество_значений,
MAX(currency_code) AS максимум,
MIN(currency_code) AS минимум,
AVG(currency_code) AS среднее,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_code) AS медиана,
SUM(CASE WHEN currency_code IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'country' AS поле,
COUNT(country) AS количество_значений,
NULL AS максимум,
NULL AS минимум,
NULL AS среднее,
NULL AS медиана,
SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'status' AS поле,
COUNT(status) AS количество_значений,
NULL AS максимум,
NULL AS минимум,
NULL AS среднее,
NULL AS медиана,
SUM(CASE WHEN status IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'transaction_type' AS поле,
COUNT(transaction_type) AS количество_значений,
NULL AS максимум,
NULL AS минимум,
NULL AS среднее,
NULL AS медиана,
SUM(CASE WHEN transaction_type IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'amount' AS поле,
COUNT(amount) AS количество_значений,
MAX(amount) AS максимум,
MIN(amount) AS минимум,
AVG(amount) AS среднее,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS медиана,
SUM(CASE WHEN amount IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'transaction_dt' AS поле,
COUNT(transaction_dt) AS количество_значений,
NULL AS максимум,
NULL AS минимум,
NULL AS среднее,
NULL AS медиана,
SUM(CASE WHEN transaction_dt IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
;
Рзультат:
| поле | количество_значений | максимум | минимум | среднее | медиана | количество_пропусков |
|---|---|---|---|---|---|---|
| account_number_from | 1135493 | 9999994 | -1 | 2753337.329072922510 | 1143311.0 | 0 |
| ccount_number_to | 1135493 | 9999994 | 10 | 2971503.212936583493 | 1351209.0 | 0 |
| currency_code | 1135493 | 470 | 410 | 436.5410530932379152 | 430.0 | 0 |
| currency_code | 1135493 | 470 | 410 | 436.5410530932379152 | 430.0 | 0 |
| amount | 1135493 | 60000000 | -20000000 | 583295.815772532283 | 100000.0 | 0 |
| country | 1135493 | 0 | ||||
| status | 1135493 | 0 | ||||
| transaction_type | 1135493 | 0 | ||||
| transaction_dt | 1135493 | 0 | ||||
| operation_id | 1135493 | 0 |
Таблица public.сurrencies ¶
Данные public.сurrencies — это справочник, который содержит в себе информацию об обновлениях курсов валют и взаимоотношениях валютных пар друг с другом.
Структура данных:
date_update— дата обновления курса валют;currency_code— трёхзначный код валюты транзакции;currency_code_with— отношение другой валюты к валюте трёхзначного кода;currency_code_div— значение отношения единицы одной валюты к единице валюты транзакции.
Пример данных в таблице при их проверке селектом:
SELECT *
FROM public.currencies
LIMIT 10;
Рзультат:
| date_update | currency_code | currency_code_with | currency_with_div |
|---|---|---|---|
| 2022-10-01 00:00:00.000 | 430 | 470 | 0.920 |
| 2022-10-01 00:00:00.000 | 430 | 460 | 0.950 |
| 2022-10-01 00:00:00.000 | 430 | 450 | 1.080 |
| 2022-10-01 00:00:00.000 | 430 | 410 | 1.020 |
| 2022-10-01 00:00:00.000 | 430 | 420 | 1.050 |
| 2022-10-01 00:00:00.000 | 430 | 440 | 1.040 |
| 2022-10-01 00:00:00.000 | 470 | 430 | 0.910 |
| 2022-10-01 00:00:00.000 | 470 | 460 | 0.960 |
| 2022-10-01 00:00:00.000 | 470 | 450 | 1.090 |
| 2022-10-01 00:00:00.000 | 470 | 410 | 0.900 |
Статистика данных в таблице:
SELECT 'date_update' AS поле,
COUNT(date_update) AS количество_значений,
NULL AS максимум,
NULL AS минимум,
NULL AS среднее,
NULL AS медиана,
SUM(CASE WHEN date_update IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.currencies
UNION ALL
SELECT 'currency_code' AS поле,
COUNT(currency_code) AS количество_значений,
MAX(currency_code) AS максимум,
MIN(currency_code) AS минимум,
AVG(currency_code) AS среднее,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_code) AS медиана,
SUM(CASE WHEN currency_code IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.currencies
UNION ALL
SELECT 'currency_code_with' AS поле,
COUNT(currency_code_with) AS количество_значений,
MAX(currency_code_with) AS максимум,
MIN(currency_code_with) AS минимум,
AVG(currency_code_with) AS среднее,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_code_with) AS медиана,
SUM(CASE WHEN currency_code_with IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.currencies
UNION ALL
SELECT 'currency_with_div' AS поле,
COUNT(currency_with_div) AS количество_значений,
MAX(currency_with_div) AS максимум,
MIN(currency_with_div) AS минимум,
AVG(currency_with_div) AS среднее,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_with_div) AS медиана,
SUM(CASE WHEN currency_with_div IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.currencies;
Рзультат:
| поле | количество_значений | максимум | минимум | среднее | медиана | количество_пропусков |
|---|---|---|---|---|---|---|
| date_update | 1680 | 0 | ||||
| currency_code | 1680 | 470 | 410 | 440.0000000000000000 | 440.0 | 0 |
| currency_code_with | 1680 | 470 | 410 | 440.0000000000000000 | 440.0 | 0 |
| currency_with_div | 1680 | 1.100 | 0.900 | 0.99923809523809523810 | 1.0 | 0 |
Выбор архитектуры решения¶
Выбор основного варианта источника данных¶
В дополнительной информации к проекту найдено следующее:
В компании запущена PostgreSQL для продакшена. Для построения инфраструктуры аналитики и поставки данных представлена отдельная, копия продовой БД. В таблицах `public.transaction` и `public.currencies` есть доступ на загрузку данных...
Вы можете запустить поставку данных в режиме реального времени в два топика Kafka. Инициируйте запуск топиков через curl-вызов... Данные будут поступать в виде JSON-объектов в топик, который вы указали... К вам будут поступать сообщения двух типов:
- транзакции: `object_type = TRANSACTION`;
- курсы валют: `object_type = CURRENCY`.
Если вы ошиблись или вам надо завести отправку сообщений заново. Удалите старые настройки... и добавьте новые настройки.
В результате анализа дополнительной информации по проекту принято решение об использовании PostgreSQL в качестве источника данных и Airflow для оркестрации ETL. Причины:
- Максимальная простота реализации (Ключевой фактор):
- Данные уже находятся в полностью структурированном виде в таблицах public.transactions и public.currencies. Не нужно парсить CSV-файлы из S3 или сложные JSON из Kafka.
- Инкрементальная выгрузка упрощается до предела. Вместо сложной логики работы с файлами в S3 вы можете использовать простой SQL-запрос с фильтром по дате (WHERE transaction_dt::date = ‘{{ ds }}’). Это надежнее, чем механизм отслеживания файлов.
- Для работы требуется только один инструмент оркестрации — Airflow. Не нужно изучать и настраивать Spark для работы с Kafka, что значительно сокращает объем работы и потенциальные точки отказа.
- Прямая и надежная интеграция:
- Airflow имеет встроенный, отлично протестированный PostgreSQLHook и PostgreSQLOperator. Интеграция надежна и хорошо документирована.
- Интеграция Airflow с Vertica также не представляет проблемы через стандартный коннектор или VerticaHook.
- Вы избегаете самого сложного и рискованного шага, упомянутого в уточнении: загрузки данных в Vertica через Spark.
- Идеальное соответствие задаче:
- Вы работаете с историческими данными, которые уже сохранены в реляционной модели. PostgreSQL идеально подходит для пакетной (batch) выгрузки таких данных.
- Вам не нужна сложность потоковой обработки Kafka для этой задачи.
- Производительность и безопасность:
- Для выгрузки данных за конкретный день можно создать эффективные индексы в PostgreSQL (например, на transaction_dt), что сделает выгрузку очень быстрой.
- Подключение защищено SSL-сертификатом, что соответствует требованиям к обработке финансовых данных.
Альтернативный вариант c S3¶
- Статус: Запасной вариант.
- Почему: Работа с готовой реляционной БД через SQL проще и надежнее, чем работа с сырыми файлами в S3. Необходимость скачивать и парсить CSV-файлы добавляет лишние шаги и потенциальные ошибки (например, проблемы с кодировкой, разделителями).
Альтернативный вариант c Kafka + Spark Streaming¶
- Статус: Наименее предпочтительный и избыточно сложный вариант.
- Почему:
- Данные не потоковые. Использование Kafka для имитации потока исторических данных — это создание искусственной сложности.
- Высокий порог входа. Требует глубоких знаний Spark Structured Streaming для чтения из Kafka, трансформации JSON и загрузки в Vertica.
- Подтверждение рисков: Прямое указание в уточнении, что этот путь «более сложный и рискованный», является решающим аргументом против него. Даже руководители проекта не рекомендуют его.
Сравнение вариантов источников данных¶
| Критерий | PostgreSQL + Airflow | S3 + Airflow | Kafka + Spark Streaming |
|---|---|---|---|
| Простота реализации | ✅ Высочайшая | ✅ Средняя | ❌ Очень низкая |
| Надежность пайплайна | ✅ Высочайшая | ✅ Средняя | ❌ Низкая (много компонентов) |
| Соответствие формату данных | ✅ Идеально (SQL) | ⚠️ (CSV-файлы) | ⚠️ (JSON-поток) |
| Инкрементальная загрузка | ✅ Очень простая (WHERE по дате) | ✅ Средняя (отслеживание файлов) | ❌ Сложная (водяные знаки в Spark) |
| Потребность в новых инструментах | ✅ Нет (только Airflow) | ✅ Нет (только Airflow) | ❌ Да (Spark Streaming) |
| Риски | ✅ Минимальные | ✅ Умеренные | ❌ Высокие |
| Рекомендация | ✅ ЛУЧШИЙ ВЫБОР | ⚠️ ЗАПАСНОЙ ВАРИАНТ | ❌ НЕ РЕКОМЕНДУЕТСЯ |
Окончательный выбор архитектуры решения¶
Учитывая дополнительную информацию о том, что данные уже аккуратно размещены в структурированных таблицах PostgreSQL с готовым доступом, выбор очевиден.
Реализация с использованием PostgreSQL и Airflow является безусловно самой простой, самой надежной и самой быстрой для реализации. Она позволяет сосредоточиться на основной логике ETL и построении витрины, а не на решении инфраструктурных проблем, связанных с обработкой файлов или потоков.
Решено полностью отказаться от варианта с Kafka как от избыточного и рискованного. Вариант с S3 оставлен как запасной, если вдруг возникнут неожиданные проблемы с доступом к PostgreSQL. Но основным и единственным решением является реализация на PostgreSQL + Airflow:
- Источник данных: PostgreSQL в Yandex Cloud
- Хранилище данных: Vertica
- Оркестрация: Apache Airflow
- BI-визуализация: Metabase
Примерная архитектура пайплайна на основе PostgreSQL + Airflow¶
- DAG в Airflow запускается ежедневно для обработки данных за предыдущий день
({{ yesterday_ds }}). - Задача 1 (
PostgreSQLOperator): Выполняет запрос на выгрузку данных о транзакциях за вчерашний день из public.transactions и сохраняет результат во временный файл или напрямую в память. - Задача 2 (
PostgreSQLOperator): Аналогично выгружает актуальные курсы валют из public.currencies (или выгружает сразу все, так как справочник небольшой). - Задача 3 (
PythonOperator/VerticaHook): Загружает выгруженные данные в соответствующие таблицы в слое*__STAGINGв Vertica. - Задача 4 (
VerticaOperator): Выполняет SQL-скрипт для инкрементального обновления витриныglobal_metricsв схеме*__DWH, джойня данные изstaging-таблиц.
Реализация проекта¶
Настройка окружения¶
1. Установка и запуск Docker-контейнера¶
docker run -d -p 8998:8998 -p 8280:8280 -p 15432:5432 \
-v $(pwd)/src/dags:/usr/local/airflow/dags \
-v $(pwd)/certs:/usr/local/airflow/certs \
--name=de-final-prj-local \
cr.yandex/crp1r8pht0n0gl25aug1/de-final-prj:latest
2. Настройка подключений в Airflow¶
- Откройте Airflow UI: http://localhost:8280/airflow/
- Перейдите в Admin -> Connections
- Добавьте два подключения: · postgres_cloud: Подключение к PostgreSQL в Yandex Cloud · vertica_conn: Подключение к Vertica
3. Создание таблиц в Vertica¶
Выполните SQL-скрипты из папки src/sql/ для создания таблиц в STAGING и DWH слоях.
4. Настройка Metabase¶
- Откройте Metabase: http://localhost:8998/
- Зарегистрируйте новую учетную запись
- Добавьте подключение к Vertica с параметрами из src/config/connections.py
- Создайте дашборд на основе витрины global_metrics
5. Запуск пайплайна¶
- В Airflow UI найдите DAG import_postgres_to_vertica
- Запустите его вручную или дождитесь автоматического запуска по расписанию
- После завершения запустите DAG update_global_metrics
Файлы проекта¶
Структура файлов проекта¶
project/
├── src/
│ ├── dags/ # DAG для Airflow
│ │ ├── 1_data_import.py
│ │ └── 2_datamart_update.py
│ ├── sql/ # SQL-скрипты
│ │ ├── create_staging_tables.sql
│ │ └── create_dwh_tables.sql
│ └── config/
│ └── connections.py # Настройки подключений
├── certs/ # SSL-сертификаты
│ └── CA.pem
├── docker-compose.yml # Конфигурация Docker
├── requirements.txt # Зависимости Python
└── README.md # Документация
Файл project/docker-compose.yml¶
yml
version: '3'
services:
airflow:
image: cr.yandex/crp1r8pht0n0gl25aug1/de-pg-cr-af:latest # из 5 спринта курса DE
container_name: de-final-airflow
ports:
- "8280:8280" # Airflow UI
- "8998:8998" # Metabase
- "15432:5432" # PostgreSQL
volumes:
- ./src/dags:/usr/local/airflow/dags # Монтирование папки с DAG
- ./certs:/usr/local/airflow/certs # Монтирование папки с сертификатами
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=false
networks:
- de-network
networks:
de-network:
driver: bridge
# Инструкция по запуску:
# 1. Создайте папку certs и поместите туда сертификат CA.pem
# 2. Запустите контейнер: docker-compose up -d
# 3. Откройте Airflow UI: http://localhost:8280/airflow/
# 4. Добавьте подключения через UI Admin -> Connections
Файл project/requirements.txt¶
apache-airflow==2.5.1
apache-airflow-providers-postgres==5.0.0
apache-airflow-providers-vertica==3.1.0
pandas==1.5.2
vertica-python==1.1.1
psycopg2-binary==2.9.5
Файл project/src/config/connections.py¶
"""
Конфигурационный файл с настройками подключений к базам данных.
Настройки должны быть также добавлены в Airflow через UI:
Admin -> Connections -> Add new connection
Для PostgreSQL в Yandex Cloud требуется SSL-сертификат.
Скачайте сертификат по ссылке:
https://storage.yandexcloud.net/cloud-serts/CA.pem
"""
# Настройки подключения к PostgreSQL в Yandex Cloud
POSTGRES_CONNECTION = {
'conn_id': 'postgres_cloud',
'conn_type': 'postgres',
'host': 'rc1b-w5d285tmxa8jimyn.mdb.yandexcloud.net',
'port': 6432,
'login': 'student',
'password': 'de_student_112022',
'database': 'db1',
'extra': {
'sslmode': 'verify-full',
'sslrootcert': '/usr/local/airflow/certs/CA.pem' # Путь к сертификату в контейнере
}
}
# Настройки подключения к Vertica
VERTICA_CONNECTION = {
'conn_id': 'vertica_conn',
'conn_type': 'vertica',
'host': 'vertica_host', # Замените на хост вашего кластера Vertica
'port': 5433,
'login': 'dbadmin', # Замените на вашего пользователя Vertica
'password': 'password', # Замените на ваш пароль
'database': 'VMart' # Замените на имя вашей БД
}
# Настройки для подключения Metabase к Vertica
METABASE_CONNECTION = {
'host': 'vertica_host', # Замените на хост вашего кластера Vertica
'port': 5433,
'database': 'VMart', # Замените на имя вашей БД
'username': 'dbadmin', # Замените на вашего пользователя Vertica
'password': 'password', # Замените на ваш пароль
'schema': 'student__DWH' # Замените на вашу схему
}
Файл project/src/sql/create_staging_tables.sql¶
-- Создание таблицы transactions в STAGING-слое
-- Замените 'student' на ваш email от Практикума
CREATE TABLE IF NOT EXISTS student__STAGING.transactions (
operation_id VARCHAR(255),
account_number_from INTEGER,
account_number_to INTEGER,
currency_code INTEGER,
country VARCHAR(100),
status VARCHAR(50),
transaction_type VARCHAR(50),
amount INTEGER,
transaction_dt TIMESTAMP
)
ORDER BY transaction_dt, operation_id
SEGMENTED BY HASH(transaction_dt, operation_id) ALL NODES;
-- Создание таблицы currencies в STAGING-слое
CREATE TABLE IF NOT EXISTS student__STAGING.currencies (
date_update TIMESTAMP,
currency_code INTEGER,
currency_code_with INTEGER,
currency_with_div NUMERIC(10, 4)
)
ORDER BY date_update, currency_code
SEGMENTED BY HASH(date_update, currency_code) ALL NODES;
-- Создание проекций для оптимизации запросов
CREATE PROJECTION IF NOT EXISTS student__STAGING.transactions_proj (
operation_id,
account_number_from,
account_number_to,
currency_code,
country,
status,
transaction_type,
amount,
transaction_dt
) AS
SELECT
operation_id,
account_number_from,
account_number_to,
currency_code,
country,
status,
transaction_type,
amount,
transaction_dt
FROM student__STAGING.transactions
ORDER BY transaction_dt, currency_code
SEGMENTED BY HASH(transaction_dt, operation_id) ALL NODES;
CREATE PROJECTION IF NOT EXISTS student__STAGING.currencies_proj (
date_update,
currency_code,
currency_code_with,
currency_with_div
) AS
SELECT
date_update,
currency_code,
currency_code_with,
currency_with_div
FROM student__STAGING.currencies
ORDER BY date_update, currency_code
SEGMENTED BY HASH(date_update, currency_code) ALL NODES;
Файл project/src/sql/create_dwh_tables.sql¶
-- Создание витрины global_metrics в DWH-слое
-- Замените 'student' на ваш email от Практикума
CREATE TABLE IF NOT EXISTS student__DWH.global_metrics (
date_update DATE,
currency_from INTEGER,
amount_total NUMERIC(18, 2),
cnt_transactions INTEGER,
avg_transactions_per_account NUMERIC(10, 2),
cnt_accounts_make_transactions INTEGER
)
ORDER BY date_update, currency_from
SEGMENTED BY HASH(date_update, currency_from) ALL NODES;
-- Создание проекции для оптимизации запросов к витрине
CREATE PROJECTION IF NOT EXISTS student__DWH.global_metrics_proj (
date_update,
currency_from,
amount_total,
cnt_transactions,
avg_transactions_per_account,
cnt_accounts_make_transactions
) AS
SELECT
date_update,
currency_from,
amount_total,
cnt_transactions,
avg_transactions_per_account,
cnt_accounts_make_transactions
FROM student__DWH.global_metrics
ORDER BY date_update, currency_from
SEGMENTED BY HASH(date_update, currency_from) ALL NODES;
Файл project/src/dags/1_data_import.py¶
"""
DAG для ежедневной инкрементальной выгрузки данных из PostgreSQL в Vertica.
Использует макрос {{ ds }} для фильтрации данных по дате выполнения.
Требует предварительной настройки подключений в Airflow:
1. postgres_cloud - подключение к PostgreSQL в Yandex Cloud
2. vertica_conn - подключение к Vertica
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.vertica.hooks.vertica import VerticaHook
from datetime import datetime, timedelta
import logging
default_args = {
'owner': 'student',
'depends_on_past': False,
'start_date': datetime(2022, 10, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def import_transactions_data(**kwargs):
"""Импорт данных о транзакциях за указанную дату"""
execution_date = kwargs['ds']
logging.info(f"Импорт данных за {execution_date}")
# Подключение к PostgreSQL
pg_hook = PostgresHook(postgres_conn_id='postgres_cloud')
conn_pg = pg_hook.get_conn()
# Подключение к Vertica
vertica_hook = VerticaHook(vertica_conn_id='vertica_conn')
conn_vert = vertica_hook.get_conn()
cursor_vert = conn_vert.cursor()
try:
# Выгрузка данных из PostgreSQL
query = f"""
SELECT operation_id, account_number_from, account_number_to,
currency_code, country, status, transaction_type,
amount, transaction_dt
FROM public.transactions
WHERE DATE(transaction_dt) = '{execution_date}'
"""
df = pg_hook.get_pandas_df(query)
# Вставка данных в Vertica
if not df.empty:
# Подготовка данных для вставки
values = [tuple(x) for x in df.to_numpy()]
insert_query = """
INSERT INTO student__STAGING.transactions
(operation_id, account_number_from, account_number_to,
currency_code, country, status, transaction_type,
amount, transaction_dt)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor_vert.executemany(insert_query, values)
conn_vert.commit()
logging.info(f"Импортировано {len(df)} записей транзакций")
else:
logging.info("Нет данных для импорта")
except Exception as e:
logging.error(f"Ошибка при импорте данных: {str(e)}")
conn_vert.rollback()
raise e
finally:
cursor_vert.close()
conn_pg.close()
conn_vert.close()
def import_currencies_data(**kwargs):
"""Импорт данных о курсах валют за указанную дату"""
execution_date = kwargs['ds']
logging.info(f"Импорт курсов валют за {execution_date}")
# Подключение к PostgreSQL
pg_hook = PostgresHook(postgres_conn_id='postgres_cloud')
conn_pg = pg_hook.get_conn()
# Подключение к Vertica
vertica_hook = VerticaHook(vertica_conn_id='vertica_conn')
conn_vert = vertica_hook.get_conn()
cursor_vert = conn_vert.cursor()
try:
# Выгрузка данных из PostgreSQL
query = f"""
SELECT date_update, currency_code, currency_code_with, currency_with_div
FROM public.currencies
WHERE DATE(date_update) = '{execution_date}'
"""
df = pg_hook.get_pandas_df(query)
# Вставка данных в Vertica
if not df.empty:
# Подготовка данных для вставки
values = [tuple(x) for x in df.to_numpy()]
insert_query = """
INSERT INTO student__STAGING.currencies
(date_update, currency_code, currency_code_with, currency_with_div)
VALUES (%s, %s, %s, %s)
"""
cursor_vert.executemany(insert_query, values)
conn_vert.commit()
logging.info(f"Импортировано {len(df)} записей курсов валют")
else:
logging.info("Нет данных о курсах для импорта")
except Exception as e:
logging.error(f"Ошибка при импорте курсов валют: {str(e)}")
conn_vert.rollback()
raise e
finally:
cursor_vert.close()
conn_pg.close()
conn_vert.close()
with DAG(
'import_postgres_to_vertica',
default_args=default_args,
description='Ежедневная выгрузка данных из PostgreSQL в Vertica',
schedule_interval='0 6 * * *', # Ежедневно в 06:00
catchup=True,
max_active_runs=1,
) as dag:
import_transactions = PythonOperator(
task_id='import_transactions_data',
python_callable=import_transactions_data,
provide_context=True,
)
import_currencies = PythonOperator(
task_id='import_currencies_data',
python_callable=import_currencies_data,
provide_context=True,
)
import_transactions >> import_currencies
Файл project/src/dags/2_datamart_update.py¶
"""
DAG для ежедневного обновления витрины данных в Vertica.
Включает очистку от тестовых аккаунтов и расчет агрегатов.
Требует предварительной настройки подключения к Vertica в Airflow.
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.vertica.hooks.vertica import VerticaHook
from datetime import datetime, timedelta
import logging
default_args = {
'owner': 'student',
'depends_on_past': False,
'start_date': datetime(2022, 10, 2),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def update_global_metrics(**kwargs):
"""Обновление витрины global_metrics данными за предыдущий день"""
execution_date = kwargs['ds']
prev_date = (datetime.strptime(execution_date, '%Y-%m-%d') - timedelta(days=1)).strftime('%Y-%m-%d')
logging.info(f"Обновление витрины за {prev_date}")
# Подключение к Vertica
vertica_hook = VerticaHook(vertica_conn_id='vertica_conn')
conn_vert = vertica_hook.get_conn()
cursor_vert = conn_vert.cursor()
try:
# SQL-запрос для обновления витрины
update_query = f"""
INSERT INTO student__DWH.global_metrics
SELECT
DATE(t.transaction_dt) as date_update,
t.currency_code as currency_from,
SUM(t.amount * c.currency_with_div) as amount_total,
COUNT(t.operation_id) as cnt_transactions,
COUNT(t.operation_id) / COUNT(DISTINCT t.account_number_from) as avg_transactions_per_account,
COUNT(DISTINCT t.account_number_from) as cnt_accounts_make_transactions
FROM student__STAGING.transactions t
JOIN student__STAGING.currencies c ON
t.currency_code = c.currency_code AND
DATE(t.transaction_dt) = DATE(c.date_update) AND
c.currency_code_with = 840 -- USD код
WHERE
DATE(t.transaction_dt) = '{prev_date}' AND
t.status = 'done' AND
t.account_number_from >= 0 -- Исключаем тестовые аккаунты
GROUP BY
DATE(t.transaction_dt),
t.currency_code
"""
cursor_vert.execute(update_query)
conn_vert.commit()
logging.info("Витрина global_metrics успешно обновлена")
except Exception as e:
logging.error(f"Ошибка при обновлении витрины: {str(e)}")
conn_vert.rollback()
raise e
finally:
cursor_vert.close()
conn_vert.close()
with DAG(
'update_global_metrics',
default_args=default_args,
description='Ежедневное обновление витрины global_metrics',
schedule_interval='0 7 * * *', # Ежедневно в 07:00 (после загрузки данных)
catchup=True,
max_active_runs=1,
) as dag:
update_metrics = PythonOperator(
task_id='update_global_metrics',
python_callable=update_global_metrics,
provide_context=True,
)
