Microsegment.ru
  • Главная страница
  • О проекте
  • Портфолио
  • Блог

Построение хранилища и аналитики данных для финтех-стартапа

Данный проект связан с финтех-стартапом, который предлагает международные банковские услуги через приложение. Через приложение стартапа пользователи могут безопасно переводить деньги в разные страны. Компания ведёт учёт транзакционной активности клиентов внутри и между странами: разработан единый протокол передачи данных, который обеспечивает одинаковую структуру таблиц во всех странах. В стартапе реализованы три варианта размещения данных:

  • в S3;
  • в PostgreSQL;
  • через Spark Streaming в Kafka.

Команда аналитиков просит собрать данные по транзакционной активности пользователей и настроить обновление таблицы с курсом валют. Цель — понять, как выглядит динамика оборота всей компании и что приводит к его изменениям. Для достижения цели требуется выбрать, какую инфраструктуру использовать. В зависимости от выбора инфраструктуры требуется решить, как выполнить каждый шаг проекта.

Содержание:¶

  • Анализ проекта
    • Предварительный анализ данных проекта
      • Таблица public.transactions
      • Таблица public.сurrencies
    • Выбор архитектуры решения
      • Выбор основного варианта источника данных
      • Альтернативный вариант c S3
      • Альтернативный вариант c Kafka + Spark Streaming
      • Сравнение вариантов источников данных
      • Окончательный выбор архитектуры решения
      • Примерная архитектура пайплайна на основе PostgreSQL + Airflow
  • Реализация проекта
    • Настройка окружения
      1. Установка и запуск Docker-контейнера
      2. Настройка подключений в Airflow
      3. Создание таблиц в Vertica
      4. Настройка Metabase
      5. Запуск пайплайна
    • Файлы проекта
      • Структура файлов проекта
      • Файл project/docker-compose.yml
      • Файл project/requirements.txt
      • Файл project/src/sql/create_staging_tables.sql
      • Файл project/src/sql/create_dwh_tables.sql
      • Файл project/src/dags/1_data_import.py
      • Файл project/src/dags/2_datamart_update.py
      • Файл project/src/config/connections.py

Анализ проекта¶

Предварительный анализ данных проекта¶

Таблица public.transactions¶

Данные public.transactions содержат в себе информацию о движении денежных средств между клиентами в разных валютах.

Структура данных:

  • operation_id — id транзакции;
  • account_number_from — внутренний бухгалтерский номер счёта транзакции ОТ КОГО;
  • account_number_to — внутренний бухгалтерский номер счёта транзакции К КОМУ;
  • currency_code — трёхзначный код валюты страны, из которой идёт транзакция;
  • country — страна-источник транзакции;
  • status — статус проведения транзакции: queued («транзакция в очереди на обработку сервисом»), in_progress («транзакция в обработке»), blocked («транзакция заблокирована сервисом»), done («транзакция выполнена успешно»), chargeback («пользователь осуществил возврат по транзакции»).
  • transaction_type — тип транзакции во внутреннем учёте: authorisation («авторизационная транзакция, подтверждающая наличие счёта пользователя»), sbp_incoming («входящий перевод по системе быстрых платежей»), sbp_outgoing («исходящий перевод по системе быстрых платежей»), transfer_incoming («входящий перевод по счёту»), transfer_outgoing («исходящий перевод по счёту»), c2b_partner_incoming («перевод от юридического лица»), c2b_partner_outgoing («перевод юридическому лицу»).
  • amount — целочисленная сумма транзакции в минимальной единице валюты страны (копейка, цент, куруш);
  • transaction_dt — дата и время исполнения транзакции до миллисекунд.

Пример данных в таблице при их проверке селектом:

SELECT *
FROM public.transactions
LIMIT 10;

Рзультат:

operation_id account_number_from account_number_to currency_code country status transaction_type amount transaction_dt
7f9922e8-48b8-4097-9ff6-3eab131e80d9 914810 4454155 430 russia queued sbp_incoming 230900 2022-10-01 00:00:02.000
e8793d0b-a562-4bcf-8c04-155792f3ec93 914810 5371058 430 russia queued sbp_incoming 62000 2022-10-01 00:00:02.000
2e5f9d2e-2798-4527-a416-69b2f87c7289 914810 6633598 470 turkey queued sbp_incoming 30000 2022-10-01 00:00:04.000
b83232d1-b194-48e9-b361-ec7cc7326ce5 903810 1027227 430 russia queued c2b_partner_incoming 100000 2022-10-01 00:00:06.000
cb5ba138-55bc-415b-81e2-ee8c5b89f444 903810 3934590 470 turkey queued sbp_incoming 75000 2022-10-01 00:00:06.000
97a3ac98-1330-40fa-b074-c85fcaf3c201 914810 975167 460 england queued sbp_incoming 110000 2022-10-01 00:00:09.000
3a89a9d7-6b3a-4e38-b6ab-8eef7f120be3 914810 2516410 450 china queued authorization -5100 2022-10-01 00:00:10.000
f116380a-4d34-4f8a-af66-5da7512314c9 903810 8115531 430 russia queued sbp_incoming 20000 2022-10-01 00:00:11.000
e8793d0b-a562-4bcf-8c04-155792f3ec93 914810 5371058 430 russia in_progress sbp_incoming 62000 2022-10-01 00:00:14.000
7f9922e8-48b8-4097-9ff6-3eab131e80d9 914810 4454155 430 russia in_progress sbp_incoming 230900 2022-10-01 00:00:14.000

Статистика данных в таблице:

SELECT 'operation_id' AS поле,
       COUNT(operation_id) AS количество_значений,
       NULL AS максимум,
       NULL AS минимум,
       NULL AS среднее,
       NULL AS медиана,
       SUM(CASE WHEN account_number_from IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'account_number_from' AS поле,
       COUNT(account_number_from) AS количество_значений,
       MAX(account_number_from) AS максимум,
       MIN(account_number_from) AS минимум,
       AVG(account_number_from) AS среднее,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY account_number_from) AS медиана,
       SUM(CASE WHEN account_number_from IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'ccount_number_to' AS поле,
       COUNT(account_number_to) AS количество_значений,
       MAX(account_number_to) AS максимум,
       MIN(account_number_to) AS минимум,
       AVG(account_number_to) AS среднее,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY account_number_to) AS медиана,
       SUM(CASE WHEN account_number_to IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'currency_code' AS поле,
       COUNT(currency_code) AS количество_значений,
       MAX(currency_code) AS максимум,
       MIN(currency_code) AS минимум,
       AVG(currency_code) AS среднее,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_code) AS медиана,
       SUM(CASE WHEN currency_code IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'currency_code' AS поле,
       COUNT(currency_code) AS количество_значений,
       MAX(currency_code) AS максимум,
       MIN(currency_code) AS минимум,
       AVG(currency_code) AS среднее,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_code) AS медиана,
       SUM(CASE WHEN currency_code IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'country' AS поле,
       COUNT(country) AS количество_значений,
       NULL AS максимум,
       NULL AS минимум,
       NULL AS среднее,
       NULL AS медиана,
       SUM(CASE WHEN country IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'status' AS поле,
       COUNT(status) AS количество_значений,
       NULL AS максимум,
       NULL AS минимум,
       NULL AS среднее,
       NULL AS медиана,
       SUM(CASE WHEN status IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'transaction_type' AS поле,
       COUNT(transaction_type) AS количество_значений,
       NULL AS максимум,
       NULL AS минимум,
       NULL AS среднее,
       NULL AS медиана,
       SUM(CASE WHEN transaction_type IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'amount' AS поле,
       COUNT(amount) AS количество_значений,
       MAX(amount) AS максимум,
       MIN(amount) AS минимум,
       AVG(amount) AS среднее,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS медиана,
       SUM(CASE WHEN amount IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
UNION ALL
SELECT 'transaction_dt' AS поле,
       COUNT(transaction_dt) AS количество_значений,
       NULL AS максимум,
       NULL AS минимум,
       NULL AS среднее,
       NULL AS медиана,
       SUM(CASE WHEN transaction_dt IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.transactions
;

Рзультат:

поле количество_значений максимум минимум среднее медиана количество_пропусков
account_number_from 1135493 9999994 -1 2753337.329072922510 1143311.0 0
ccount_number_to 1135493 9999994 10 2971503.212936583493 1351209.0 0
currency_code 1135493 470 410 436.5410530932379152 430.0 0
currency_code 1135493 470 410 436.5410530932379152 430.0 0
amount 1135493 60000000 -20000000 583295.815772532283 100000.0 0
country 1135493 0
status 1135493 0
transaction_type 1135493 0
transaction_dt 1135493 0
operation_id 1135493 0

Таблица public.сurrencies ¶

Данные public.сurrencies — это справочник, который содержит в себе информацию об обновлениях курсов валют и взаимоотношениях валютных пар друг с другом.

Структура данных:

  • date_update — дата обновления курса валют;
  • currency_code — трёхзначный код валюты транзакции;
  • currency_code_with — отношение другой валюты к валюте трёхзначного кода;
  • currency_code_div — значение отношения единицы одной валюты к единице валюты транзакции.

Пример данных в таблице при их проверке селектом:

SELECT *
FROM public.currencies
LIMIT 10;

Рзультат:

date_update currency_code currency_code_with currency_with_div
2022-10-01 00:00:00.000 430 470 0.920
2022-10-01 00:00:00.000 430 460 0.950
2022-10-01 00:00:00.000 430 450 1.080
2022-10-01 00:00:00.000 430 410 1.020
2022-10-01 00:00:00.000 430 420 1.050
2022-10-01 00:00:00.000 430 440 1.040
2022-10-01 00:00:00.000 470 430 0.910
2022-10-01 00:00:00.000 470 460 0.960
2022-10-01 00:00:00.000 470 450 1.090
2022-10-01 00:00:00.000 470 410 0.900

Статистика данных в таблице:

SELECT 'date_update' AS поле,
       COUNT(date_update) AS количество_значений,
       NULL AS максимум,
       NULL AS минимум,
       NULL AS среднее,
       NULL AS медиана,
       SUM(CASE WHEN date_update IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.currencies
UNION ALL
SELECT 'currency_code' AS поле,
       COUNT(currency_code) AS количество_значений,
       MAX(currency_code) AS максимум,
       MIN(currency_code) AS минимум,
       AVG(currency_code) AS среднее,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_code) AS медиана,
       SUM(CASE WHEN currency_code IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.currencies
UNION ALL
SELECT 'currency_code_with' AS поле,
       COUNT(currency_code_with) AS количество_значений,
       MAX(currency_code_with) AS максимум,
       MIN(currency_code_with) AS минимум,
       AVG(currency_code_with) AS среднее,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_code_with) AS медиана,
       SUM(CASE WHEN currency_code_with IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.currencies
UNION ALL
SELECT 'currency_with_div' AS поле,
       COUNT(currency_with_div) AS количество_значений,
       MAX(currency_with_div) AS максимум,
       MIN(currency_with_div) AS минимум,
       AVG(currency_with_div) AS среднее,
       PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY currency_with_div) AS медиана,
       SUM(CASE WHEN currency_with_div IS NULL THEN 1 ELSE 0 END) AS количество_пропусков
FROM public.currencies;

Рзультат:

поле количество_значений максимум минимум среднее медиана количество_пропусков
date_update 1680 0
currency_code 1680 470 410 440.0000000000000000 440.0 0
currency_code_with 1680 470 410 440.0000000000000000 440.0 0
currency_with_div 1680 1.100 0.900 0.99923809523809523810 1.0 0

Выбор архитектуры решения¶

Выбор основного варианта источника данных¶

В дополнительной информации к проекту найдено следующее:

В компании запущена PostgreSQL для продакшена. Для построения инфраструктуры аналитики и поставки данных представлена отдельная, копия продовой БД. В таблицах `public.transaction` и `public.currencies` есть доступ на загрузку данных... 

Вы можете запустить поставку данных в режиме реального времени в два топика Kafka. Инициируйте запуск топиков через curl-вызов... Данные будут поступать в виде JSON-объектов в топик, который вы указали... К вам будут поступать сообщения двух типов:
- транзакции: `object_type = TRANSACTION`;
- курсы валют: `object_type = CURRENCY`.

Если вы ошиблись или вам надо завести отправку сообщений заново. Удалите старые настройки... и добавьте новые настройки.

В результате анализа дополнительной информации по проекту принято решение об использовании PostgreSQL в качестве источника данных и Airflow для оркестрации ETL. Причины:

  1. Максимальная простота реализации (Ключевой фактор):
    • Данные уже находятся в полностью структурированном виде в таблицах public.transactions и public.currencies. Не нужно парсить CSV-файлы из S3 или сложные JSON из Kafka.
    • Инкрементальная выгрузка упрощается до предела. Вместо сложной логики работы с файлами в S3 вы можете использовать простой SQL-запрос с фильтром по дате (WHERE transaction_dt::date = ‘{{ ds }}’). Это надежнее, чем механизм отслеживания файлов.
    • Для работы требуется только один инструмент оркестрации — Airflow. Не нужно изучать и настраивать Spark для работы с Kafka, что значительно сокращает объем работы и потенциальные точки отказа.
  2. Прямая и надежная интеграция:
    • Airflow имеет встроенный, отлично протестированный PostgreSQLHook и PostgreSQLOperator. Интеграция надежна и хорошо документирована.
    • Интеграция Airflow с Vertica также не представляет проблемы через стандартный коннектор или VerticaHook.
    • Вы избегаете самого сложного и рискованного шага, упомянутого в уточнении: загрузки данных в Vertica через Spark.
  3. Идеальное соответствие задаче:
    • Вы работаете с историческими данными, которые уже сохранены в реляционной модели. PostgreSQL идеально подходит для пакетной (batch) выгрузки таких данных.
    • Вам не нужна сложность потоковой обработки Kafka для этой задачи.
  4. Производительность и безопасность:
    • Для выгрузки данных за конкретный день можно создать эффективные индексы в PostgreSQL (например, на transaction_dt), что сделает выгрузку очень быстрой.
    • Подключение защищено SSL-сертификатом, что соответствует требованиям к обработке финансовых данных.

Альтернативный вариант c S3¶

  • Статус: Запасной вариант.
  • Почему: Работа с готовой реляционной БД через SQL проще и надежнее, чем работа с сырыми файлами в S3. Необходимость скачивать и парсить CSV-файлы добавляет лишние шаги и потенциальные ошибки (например, проблемы с кодировкой, разделителями).

Альтернативный вариант c Kafka + Spark Streaming¶

  • Статус: Наименее предпочтительный и избыточно сложный вариант.
  • Почему:
    • Данные не потоковые. Использование Kafka для имитации потока исторических данных — это создание искусственной сложности.
    • Высокий порог входа. Требует глубоких знаний Spark Structured Streaming для чтения из Kafka, трансформации JSON и загрузки в Vertica.
    • Подтверждение рисков: Прямое указание в уточнении, что этот путь «более сложный и рискованный», является решающим аргументом против него. Даже руководители проекта не рекомендуют его.

Сравнение вариантов источников данных¶

Критерий PostgreSQL + Airflow S3 + Airflow Kafka + Spark Streaming
Простота реализации ✅ Высочайшая ✅ Средняя ❌ Очень низкая
Надежность пайплайна ✅ Высочайшая ✅ Средняя ❌ Низкая (много компонентов)
Соответствие формату данных ✅ Идеально (SQL) ⚠️ (CSV-файлы) ⚠️ (JSON-поток)
Инкрементальная загрузка ✅ Очень простая (WHERE по дате) ✅ Средняя (отслеживание файлов) ❌ Сложная (водяные знаки в Spark)
Потребность в новых инструментах ✅ Нет (только Airflow) ✅ Нет (только Airflow) ❌ Да (Spark Streaming)
Риски ✅ Минимальные ✅ Умеренные ❌ Высокие
Рекомендация ✅ ЛУЧШИЙ ВЫБОР ⚠️ ЗАПАСНОЙ ВАРИАНТ ❌ НЕ РЕКОМЕНДУЕТСЯ

Окончательный выбор архитектуры решения¶

Учитывая дополнительную информацию о том, что данные уже аккуратно размещены в структурированных таблицах PostgreSQL с готовым доступом, выбор очевиден.

Реализация с использованием PostgreSQL и Airflow является безусловно самой простой, самой надежной и самой быстрой для реализации. Она позволяет сосредоточиться на основной логике ETL и построении витрины, а не на решении инфраструктурных проблем, связанных с обработкой файлов или потоков.

Решено полностью отказаться от варианта с Kafka как от избыточного и рискованного. Вариант с S3 оставлен как запасной, если вдруг возникнут неожиданные проблемы с доступом к PostgreSQL. Но основным и единственным решением является реализация на PostgreSQL + Airflow:

  1. Источник данных: PostgreSQL в Yandex Cloud
  2. Хранилище данных: Vertica
  3. Оркестрация: Apache Airflow
  4. BI-визуализация: Metabase

Примерная архитектура пайплайна на основе PostgreSQL + Airflow¶

  1. DAG в Airflow запускается ежедневно для обработки данных за предыдущий день ({{ yesterday_ds }}).
  2. Задача 1 (PostgreSQLOperator): Выполняет запрос на выгрузку данных о транзакциях за вчерашний день из public.transactions и сохраняет результат во временный файл или напрямую в память.
  3. Задача 2 (PostgreSQLOperator): Аналогично выгружает актуальные курсы валют из public.currencies (или выгружает сразу все, так как справочник небольшой).
  4. Задача 3 (PythonOperator/VerticaHook): Загружает выгруженные данные в соответствующие таблицы в слое *__STAGING в Vertica.
  5. Задача 4 (VerticaOperator): Выполняет SQL-скрипт для инкрементального обновления витрины global_metrics в схеме *__DWH, джойня данные из staging-таблиц.

Реализация проекта¶

Настройка окружения¶

1. Установка и запуск Docker-контейнера¶

docker run -d -p 8998:8998 -p 8280:8280 -p 15432:5432 \
  -v $(pwd)/src/dags:/usr/local/airflow/dags \
  -v $(pwd)/certs:/usr/local/airflow/certs \
  --name=de-final-prj-local \
  cr.yandex/crp1r8pht0n0gl25aug1/de-final-prj:latest

2. Настройка подключений в Airflow¶

  1. Откройте Airflow UI: http://localhost:8280/airflow/
  2. Перейдите в Admin -> Connections
  3. Добавьте два подключения: · postgres_cloud: Подключение к PostgreSQL в Yandex Cloud · vertica_conn: Подключение к Vertica

3. Создание таблиц в Vertica¶

Выполните SQL-скрипты из папки src/sql/ для создания таблиц в STAGING и DWH слоях.

4. Настройка Metabase¶

  1. Откройте Metabase: http://localhost:8998/
  2. Зарегистрируйте новую учетную запись
  3. Добавьте подключение к Vertica с параметрами из src/config/connections.py
  4. Создайте дашборд на основе витрины global_metrics

5. Запуск пайплайна¶

  1. В Airflow UI найдите DAG import_postgres_to_vertica
  2. Запустите его вручную или дождитесь автоматического запуска по расписанию
  3. После завершения запустите DAG update_global_metrics

Файлы проекта¶

Структура файлов проекта¶

project/
├── src/
│   ├── dags/                    # DAG для Airflow
│   │   ├── 1_data_import.py
│   │   └── 2_datamart_update.py
│   ├── sql/                     # SQL-скрипты
│   │   ├── create_staging_tables.sql
│   │   └── create_dwh_tables.sql
│   └── config/
│       └── connections.py       # Настройки подключений
├── certs/                       # SSL-сертификаты
│   └── CA.pem
├── docker-compose.yml           # Конфигурация Docker
├── requirements.txt             # Зависимости Python
└── README.md                    # Документация

Файл project/docker-compose.yml¶

yml
version: '3'

services:
  airflow:
    image: cr.yandex/crp1r8pht0n0gl25aug1/de-pg-cr-af:latest # из 5 спринта курса DE
    container_name: de-final-airflow
    ports:
      - "8280:8280"  # Airflow UI
      - "8998:8998"  # Metabase
      - "15432:5432" # PostgreSQL
    volumes:
      - ./src/dags:/usr/local/airflow/dags  # Монтирование папки с DAG
      - ./certs:/usr/local/airflow/certs     # Монтирование папки с сертификатами
    environment:
      - AIRFLOW__CORE__EXECUTOR=LocalExecutor
      - AIRFLOW__CORE__LOAD_EXAMPLES=false
    networks:
      - de-network

networks:
  de-network:
    driver: bridge

# Инструкция по запуску:
# 1. Создайте папку certs и поместите туда сертификат CA.pem
# 2. Запустите контейнер: docker-compose up -d
# 3. Откройте Airflow UI: http://localhost:8280/airflow/
# 4. Добавьте подключения через UI Admin -> Connections

Файл project/requirements.txt¶

apache-airflow==2.5.1
apache-airflow-providers-postgres==5.0.0
apache-airflow-providers-vertica==3.1.0
pandas==1.5.2
vertica-python==1.1.1
psycopg2-binary==2.9.5

Файл project/src/config/connections.py¶

"""
Конфигурационный файл с настройками подключений к базам данных.
Настройки должны быть также добавлены в Airflow через UI:
Admin -> Connections -> Add new connection

Для PostgreSQL в Yandex Cloud требуется SSL-сертификат.
Скачайте сертификат по ссылке:
https://storage.yandexcloud.net/cloud-serts/CA.pem
"""

# Настройки подключения к PostgreSQL в Yandex Cloud
POSTGRES_CONNECTION = {
    'conn_id': 'postgres_cloud',
    'conn_type': 'postgres',
    'host': 'rc1b-w5d285tmxa8jimyn.mdb.yandexcloud.net',
    'port': 6432,
    'login': 'student',
    'password': 'de_student_112022',
    'database': 'db1',
    'extra': {
        'sslmode': 'verify-full',
        'sslrootcert': '/usr/local/airflow/certs/CA.pem'  # Путь к сертификату в контейнере
    }
}

# Настройки подключения к Vertica
VERTICA_CONNECTION = {
    'conn_id': 'vertica_conn',
    'conn_type': 'vertica',
    'host': 'vertica_host',  # Замените на хост вашего кластера Vertica
    'port': 5433,
    'login': 'dbadmin',      # Замените на вашего пользователя Vertica
    'password': 'password',  # Замените на ваш пароль
    'database': 'VMart'      # Замените на имя вашей БД
}

# Настройки для подключения Metabase к Vertica
METABASE_CONNECTION = {
    'host': 'vertica_host',      # Замените на хост вашего кластера Vertica
    'port': 5433,
    'database': 'VMart',         # Замените на имя вашей БД
    'username': 'dbadmin',       # Замените на вашего пользователя Vertica
    'password': 'password',      # Замените на ваш пароль
    'schema': 'student__DWH'     # Замените на вашу схему
}

Файл project/src/sql/create_staging_tables.sql¶

-- Создание таблицы transactions в STAGING-слое
-- Замените 'student' на ваш email от Практикума
CREATE TABLE IF NOT EXISTS student__STAGING.transactions (
    operation_id VARCHAR(255),
    account_number_from INTEGER,
    account_number_to INTEGER,
    currency_code INTEGER,
    country VARCHAR(100),
    status VARCHAR(50),
    transaction_type VARCHAR(50),
    amount INTEGER,
    transaction_dt TIMESTAMP
)
ORDER BY transaction_dt, operation_id
SEGMENTED BY HASH(transaction_dt, operation_id) ALL NODES;

-- Создание таблицы currencies в STAGING-слое
CREATE TABLE IF NOT EXISTS student__STAGING.currencies (
    date_update TIMESTAMP,
    currency_code INTEGER,
    currency_code_with INTEGER,
    currency_with_div NUMERIC(10, 4)
)
ORDER BY date_update, currency_code
SEGMENTED BY HASH(date_update, currency_code) ALL NODES;

-- Создание проекций для оптимизации запросов
CREATE PROJECTION IF NOT EXISTS student__STAGING.transactions_proj (
    operation_id,
    account_number_from,
    account_number_to,
    currency_code,
    country,
    status,
    transaction_type,
    amount,
    transaction_dt
) AS
SELECT 
    operation_id,
    account_number_from,
    account_number_to,
    currency_code,
    country,
    status,
    transaction_type,
    amount,
    transaction_dt
FROM student__STAGING.transactions
ORDER BY transaction_dt, currency_code
SEGMENTED BY HASH(transaction_dt, operation_id) ALL NODES;

CREATE PROJECTION IF NOT EXISTS student__STAGING.currencies_proj (
    date_update,
    currency_code,
    currency_code_with,
    currency_with_div
) AS
SELECT 
    date_update,
    currency_code,
    currency_code_with,
    currency_with_div
FROM student__STAGING.currencies
ORDER BY date_update, currency_code
SEGMENTED BY HASH(date_update, currency_code) ALL NODES;

Файл project/src/sql/create_dwh_tables.sql¶

-- Создание витрины global_metrics в DWH-слое
-- Замените 'student' на ваш email от Практикума
CREATE TABLE IF NOT EXISTS student__DWH.global_metrics (
    date_update DATE,
    currency_from INTEGER,
    amount_total NUMERIC(18, 2),
    cnt_transactions INTEGER,
    avg_transactions_per_account NUMERIC(10, 2),
    cnt_accounts_make_transactions INTEGER
)
ORDER BY date_update, currency_from
SEGMENTED BY HASH(date_update, currency_from) ALL NODES;

-- Создание проекции для оптимизации запросов к витрине
CREATE PROJECTION IF NOT EXISTS student__DWH.global_metrics_proj (
    date_update,
    currency_from,
    amount_total,
    cnt_transactions,
    avg_transactions_per_account,
    cnt_accounts_make_transactions
) AS
SELECT 
    date_update,
    currency_from,
    amount_total,
    cnt_transactions,
    avg_transactions_per_account,
    cnt_accounts_make_transactions
FROM student__DWH.global_metrics
ORDER BY date_update, currency_from
SEGMENTED BY HASH(date_update, currency_from) ALL NODES;

Файл project/src/dags/1_data_import.py¶

"""
DAG для ежедневной инкрементальной выгрузки данных из PostgreSQL в Vertica.
Использует макрос {{ ds }} для фильтрации данных по дате выполнения.
Требует предварительной настройки подключений в Airflow:
1. postgres_cloud - подключение к PostgreSQL в Yandex Cloud
2. vertica_conn - подключение к Vertica
"""

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.vertica.hooks.vertica import VerticaHook
from datetime import datetime, timedelta
import logging

default_args = {
    'owner': 'student',
    'depends_on_past': False,
    'start_date': datetime(2022, 10, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

def import_transactions_data(**kwargs):
    """Импорт данных о транзакциях за указанную дату"""
    execution_date = kwargs['ds']
    logging.info(f"Импорт данных за {execution_date}")

    # Подключение к PostgreSQL
    pg_hook = PostgresHook(postgres_conn_id='postgres_cloud')
    conn_pg = pg_hook.get_conn()

    # Подключение к Vertica
    vertica_hook = VerticaHook(vertica_conn_id='vertica_conn')
    conn_vert = vertica_hook.get_conn()
    cursor_vert = conn_vert.cursor()

    try:
        # Выгрузка данных из PostgreSQL
        query = f"""
        SELECT operation_id, account_number_from, account_number_to, 
               currency_code, country, status, transaction_type, 
               amount, transaction_dt
        FROM public.transactions
        WHERE DATE(transaction_dt) = '{execution_date}'
        """
        df = pg_hook.get_pandas_df(query)

        # Вставка данных в Vertica
        if not df.empty:
            # Подготовка данных для вставки
            values = [tuple(x) for x in df.to_numpy()]
            insert_query = """
            INSERT INTO student__STAGING.transactions 
            (operation_id, account_number_from, account_number_to, 
             currency_code, country, status, transaction_type, 
             amount, transaction_dt)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
            """

            cursor_vert.executemany(insert_query, values)
            conn_vert.commit()
            logging.info(f"Импортировано {len(df)} записей транзакций")
        else:
            logging.info("Нет данных для импорта")

    except Exception as e:
        logging.error(f"Ошибка при импорте данных: {str(e)}")
        conn_vert.rollback()
        raise e
    finally:
        cursor_vert.close()
        conn_pg.close()
        conn_vert.close()

def import_currencies_data(**kwargs):
    """Импорт данных о курсах валют за указанную дату"""
    execution_date = kwargs['ds']
    logging.info(f"Импорт курсов валют за {execution_date}")

    # Подключение к PostgreSQL
    pg_hook = PostgresHook(postgres_conn_id='postgres_cloud')
    conn_pg = pg_hook.get_conn()

    # Подключение к Vertica
    vertica_hook = VerticaHook(vertica_conn_id='vertica_conn')
    conn_vert = vertica_hook.get_conn()
    cursor_vert = conn_vert.cursor()

    try:
        # Выгрузка данных из PostgreSQL
        query = f"""
        SELECT date_update, currency_code, currency_code_with, currency_with_div
        FROM public.currencies
        WHERE DATE(date_update) = '{execution_date}'
        """
        df = pg_hook.get_pandas_df(query)

        # Вставка данных в Vertica
        if not df.empty:
            # Подготовка данных для вставки
            values = [tuple(x) for x in df.to_numpy()]
            insert_query = """
            INSERT INTO student__STAGING.currencies 
            (date_update, currency_code, currency_code_with, currency_with_div)
            VALUES (%s, %s, %s, %s)
            """

            cursor_vert.executemany(insert_query, values)
            conn_vert.commit()
            logging.info(f"Импортировано {len(df)} записей курсов валют")
        else:
            logging.info("Нет данных о курсах для импорта")

    except Exception as e:
        logging.error(f"Ошибка при импорте курсов валют: {str(e)}")
        conn_vert.rollback()
        raise e
    finally:
        cursor_vert.close()
        conn_pg.close()
        conn_vert.close()

with DAG(
    'import_postgres_to_vertica',
    default_args=default_args,
    description='Ежедневная выгрузка данных из PostgreSQL в Vertica',
    schedule_interval='0 6 * * *',  # Ежедневно в 06:00
    catchup=True,
    max_active_runs=1,
) as dag:

    import_transactions = PythonOperator(
        task_id='import_transactions_data',
        python_callable=import_transactions_data,
        provide_context=True,
    )

    import_currencies = PythonOperator(
        task_id='import_currencies_data',
        python_callable=import_currencies_data,
        provide_context=True,
    )

    import_transactions >> import_currencies

Файл project/src/dags/2_datamart_update.py¶

"""
DAG для ежедневного обновления витрины данных в Vertica.
Включает очистку от тестовых аккаунтов и расчет агрегатов.
Требует предварительной настройки подключения к Vertica в Airflow.
"""

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.vertica.hooks.vertica import VerticaHook
from datetime import datetime, timedelta
import logging

default_args = {
    'owner': 'student',
    'depends_on_past': False,
    'start_date': datetime(2022, 10, 2),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

def update_global_metrics(**kwargs):
    """Обновление витрины global_metrics данными за предыдущий день"""
    execution_date = kwargs['ds']
    prev_date = (datetime.strptime(execution_date, '%Y-%m-%d') - timedelta(days=1)).strftime('%Y-%m-%d')
    logging.info(f"Обновление витрины за {prev_date}")

    # Подключение к Vertica
    vertica_hook = VerticaHook(vertica_conn_id='vertica_conn')
    conn_vert = vertica_hook.get_conn()
    cursor_vert = conn_vert.cursor()

    try:
        # SQL-запрос для обновления витрины
        update_query = f"""
        INSERT INTO student__DWH.global_metrics
        SELECT 
            DATE(t.transaction_dt) as date_update,
            t.currency_code as currency_from,
            SUM(t.amount * c.currency_with_div) as amount_total,
            COUNT(t.operation_id) as cnt_transactions,
            COUNT(t.operation_id) / COUNT(DISTINCT t.account_number_from) as avg_transactions_per_account,
            COUNT(DISTINCT t.account_number_from) as cnt_accounts_make_transactions
        FROM student__STAGING.transactions t
        JOIN student__STAGING.currencies c ON 
            t.currency_code = c.currency_code AND 
            DATE(t.transaction_dt) = DATE(c.date_update) AND
            c.currency_code_with = 840  -- USD код
        WHERE 
            DATE(t.transaction_dt) = '{prev_date}' AND
            t.status = 'done' AND
            t.account_number_from >= 0  -- Исключаем тестовые аккаунты
        GROUP BY 
            DATE(t.transaction_dt),
            t.currency_code
        """

        cursor_vert.execute(update_query)
        conn_vert.commit()
        logging.info("Витрина global_metrics успешно обновлена")

    except Exception as e:
        logging.error(f"Ошибка при обновлении витрины: {str(e)}")
        conn_vert.rollback()
        raise e
    finally:
        cursor_vert.close()
        conn_vert.close()

with DAG(
    'update_global_metrics',
    default_args=default_args,
    description='Ежедневное обновление витрины global_metrics',
    schedule_interval='0 7 * * *',  # Ежедневно в 07:00 (после загрузки данных)
    catchup=True,
    max_active_runs=1,
) as dag:

    update_metrics = PythonOperator(
        task_id='update_global_metrics',
        python_callable=update_global_metrics,
        provide_context=True,
    )
In [ ]:
 

Политика конфиденциальности

Продолжая использовать данный сайт вы подтверждаете свое согласие с условиями его политики конфиденциальности. Подробнее…




Администрация и владельцы данного информационного ресурса не несут ответственности за возможные последствия, связанные с использованием информации, размещенной на нем.


Все права защищены. При копировании материалов сайта обязательно указывать ссылку на © Microsegment.ru (2020-2025)