Microsegment.ru
  • Главная страница
  • О проекте
  • Портфолио
  • Блог

Построение витрины для анализа оплат курьерам в существующем хранилище данных

Это проект построения витрины данных для расчета с курьерами. В витрине требуется рассчитать суммы оплаты каждому курьеру за предыдущий месяц. В рамках проекта требуется доработать существующее хранилище DWH и создать ETL процессы для перемещения и трансформации данных от источников к витрине.

Задачи проекта:

  1. Изучить источники данных, их внутреннюю модель хранения и технологии извлечения данных.
  2. Спроектировать многослойную модель DWH на основе требований к проекту.
  3. Построить проектную часть хранилища.
  4. Создать ETL для перемещения и трансформации данных.

Содержание¶

  1. Анализ данных проекта
    1. Получение данных по API
      1. GET /restaurants
      2. GET /couriers
      3. GET /deliveries
    2. Результаты анализа данных, полученных по API
  2. Проектирование витрины хранилища для анализа данных о расчетах с курьерами
    1. Слой STG
    2. Слой DDS
    3. Слой CDM
    4. DDL для реструктуризации хранилища
  3. Файлы проекта
    • ddl
      • api_ddl.sql
    • dags
      • lib
        • __init__.py
        • api_connect.py
        • dict_util.py
        • mongo_connect.py
        • pg_connect.py
      • stg
        • api_delivery
          • __init__.py
          • api_connect.py
          • api_couriers_loader.py
          • api_deliveries_loader.py
          • config_const.py
          • pg_connect.py
          • stg_api_deliveries_dag.py
        • __init__.py
        • stg_settings_repository.py
      • dds
        • dds_delivery
          • __init__.py
          • courier_loader.py
          • courier_repositories.py
          • delivery_loader.py
          • delivery_repositories.py
          • fct_deliveries_loader.py
        • dds_settings_repository.py
        • dds_snowflake_dag.py
        • timestamp_loader.py
      • cdm
        • cdm_courier_ledger
          • __init__.py
          • cdm_courier_ledger_report_dag.py
          • courier_ledger_report.py

Анализ данных проекта ¶

Получение данных по API ¶

GET /restaurants ¶

Метод возвращает данные о ресторанах:

  1. _id — ID задачи;
  2. name — название ресторана.

Вариант кода на bash:

curl --location --request GET 'https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/restaurants?sort_field={{ sort_field }}&sort_direction={{ sort_direction }}&limit={{ limit }}&offset={{ offset }}' \
--header 'X-Nickname: {{ mailforal }}' \
--header 'X-Cohort: {{ 32 }}' \
--header 'X-API-KEY: {{ 25c27781-8fde-4b30-a22e-524044a7580f }}'

GET /couriers ¶

Метод возвращает данные о курьерах:

  1. _id — ID курьера в БД;
  2. name — имя курьера.

Вариант кода на bash:

curl --location --request GET 'https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/couriers?sort_field={{ sort_field }}&sort_direction={{ sort_direction }}&limit={{ limit }}&offset={{ offset }}' \
--header 'X-Nickname: {{ your_nickname }}' \
--header 'X-Cohort: {{ your_cohort_number }}' \
--header 'X-API-KEY: {{ api_key }}'

GET /deliveries ¶

Метод возвращает данные о доставках:

  1. order_id — ID заказа;
  2. order_ts — дата и время создания заказа;
  3. delivery_id — ID доставки;
  4. courier_id — ID курьера;
  5. address — адрес доставки;
  6. delivery_ts — дата и время совершения доставки;
  7. rate — рейтинг доставки, который выставляет покупатель: целочисленное значение от 1 до 5;
  8. tip_sum — сумма чаевых, которые оставил покупатель курьеру (в руб.).
  9. sum — сумма заказа. Это поле не понадобится вам для выполнения проекта, поскольку это число вы и так получите из подсистемы заказов (MongoDB). Так бывает: не всегда нужно использовать всю доступную информацию.

Вариант кода на bash:

curl --location --request GET 'https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/deliveries?restaurant_id={{ restaurant_id }}&from={{ from }}&to={{ to }}&sort_field={{ sort_field }}&sort_direction={{ sort_direction }}&limit={{ limit }}&offset={{ limit }}' \
--header 'X-Nickname: {{ your_nickname }}' \
--header 'X-Cohort: {{ your_cohort_number }}' \
--header 'X-API-KEY: {{ api_key }}'

Результаты анализа данных, полученных по API ¶

По API с помощью Python получены данные о ресторанах, курьерах и транзакциях. Данные размещены в pandas-датафреймах get_restaurants, get_deliveries и get_couriers, которые соответствуют таблицам хранилища stg.get_restaurants, stg.get_deliveries и stg.get_couriers соответственно. Произведен анализа полученных данных:

  1. stg.get_restaurants (данные о ресторанах):
    1. Данные содержат на момент их анализа только 4 записи.
    2. Данные не содержат пропусков и дублей.
    3. Данные из этой таблицы не востребованы в витринах этого хранилища, т.к. они дублируют данные таблицы dds.dm_restaurants. Следовательно, данные из этой таблицы не требуется загружать в хранилище и создавать для них таблицу.
  2. stg.get_couriers (данные о курьерах):
    1. Даные содержат на момент их анализа только 50 записей.
    2. Данные не содержат пропусков, но содержат 6 не уникальных значений в поле name. Учитывая тот факт, что у каждой записи уникальное значение id, можно предположить, либо ошибку оператора, приведщую к наличию дублей записей, либо наличие полных тёсок в команде курьеров. Учитывая отсутствие опровержения наличия тёсок, при построении хранилища каждая запись будет считаться уникальной до получения информации о наличии в какой-либо записи дублей.
  3. stg.get_deliveries (данные о доставках за январь 2025 года):
    1. Даные содержат на момент их анализа только 50 записей.
    2. В данных нет пропусков, но есть 10 повторяющихся значений в поле courier_id (идентификатор курьера). Наличие повторяющихся значений в поле courier_id может свидетельствовать о том, что 10 курьеров доставили от 2 до 3 заказов.
    3. Данные о 28 курьерах, идентификаторы которых содержаться в поле courier_id этой таблицы, отсутствуют в поле _id таблицы stg.get_couriers. Риск отсутствия полных данных письменно принят заказчиком. Для обеспечения полноты и непротиворечивости данных в слое DDS хранилища требуется идентификаторы курьеров, данные о которых отсутствуют в таблице dds.dm_couriers, dcnfdbnm туда из stg.get_deliveries.

Проектирование витрины хранилища для анализа данных о расчетах с курьерами ¶

Слой STG ¶

Данные о курьерах и продажах, полученные по API и предназначенные для витрины анализа расчетов с курьерами, требуется разместить «как есть» в слое STG в таблицах stg.get_couriers и stg.get_deliveries. При этом, данные о ресторанах загружать не требуется, т.к. они не востребованы в проектируемой витрине и дублируют более детальные данные в таблице dds.dm_restaurants.

Слой DDS ¶

Данные из слоя STG, обогащенные суррогатными идентификаторами таблиц хранилища и более детальными данными о дате и времени продаж, требуется разместить в слое DDS по модели «снежинка». Числовые данные (rate_avg, courier_tips_sum) и внешние ключи (order_id, delivery_id и courier_id (уникальные идентификаторы в исходных системах)) для связки с таблицами измерений должны быть загружены в таблицу фактов dds.fct_product_deliveries. Также в эту таблицу требуется добавить суррогатный идентификатор таблицы id, а поля order_id, delivery_id сделать уникальными.

Остальные данные требуется разместить в таблицы измерений:

  1. id (суррогатный идентификатор таблицы), courier_id (уникальный идентификатор курьера в исходной системе) и courier_name (ФИО курьера) разместить в таблице dds.dm_couriers.
  2. Данные о дате и времени продажи, обогащенные разделением значения на год, месяц, дату, время и день, а также уникальным идентификатором разместить в таблице dds.dm_timestamps
  3. id (суррогатный идентификатор таблицы), delivery_id (уникальный идентификатор доставки в исходной системе) и timestamp_id (идентификатор записи времени в таблице dds.dm_timestamps) разместить в таблице dds.dm_deliveries.

Слой CDM ¶

Данные из слоя DDS, обогащенные расчетными данными требуется разместить в таблице cdm.dm_courier_ledger витрины в слое CDM. Состав витрины:

  1. id — (поле группировки) идентификатор записи. Данные из id таблицы dds.fct_product_deliveries.
  2. courier_id — (поле группировки) ID курьера, которому перечисляем. Данные из courier_id таблицы dds.dm_couriers.
  3. courier_name — (поле группировки) Ф. И. О. курьера. Данные из courier_name таблицы dds.dm_couriers.
  4. settlement_year — (поле группировки) год отчёта. Данные из year таблицы dds.dm_timestamps.
  5. settlement_month — (поле группировки) месяц отчёта, где 1 — январь и 12 — декабрь. Данные из month таблицы dds.dm_timestamps.
  6. orders_count — (расчетное поле) количество заказов за период (месяц). Количество записей в поле id таблицы dds.dm_orders, сгруппированных по годам, месяцам и идентификаторам курьеров.
  7. orders_total_sum — (расчетное поле) общая стоимость заказов. Сумма данных из поля total_sum таблицы dds.fct_product_sales, сгруппированных по годам, месяцам и идентификаторам курьеров.
  8. rate_avg — (расчетное поле) средний рейтинг курьера по оценкам пользователей. Среднее значение данных из поля rate таблицы dds.fct_product_deliveries, сгруппированных данных по годам, месяцам и идентификаторам курьеров.
  9. order_processing_fee — (расчетное поле с условием) сумма, удержанная компанией за обработку заказов, которая высчитывается как orders_total_sum * 0.25.
  10. courier_order_sum — (расчетное поле с условием) сумма, которую необходимо перечислить курьеру за доставленные им/ей заказы. За каждый доставленный заказ курьер должен получить некоторую сумму в зависимости от рейтинга (см. ниже).
  11. courier_tips_sum — (расчетное поле) сумма, которую пользователи оставили курьеру в качестве чаевых. Сумма данных из поля courier_tips_sum таблицы dds.fct_product_deliveries, сгруппированных по годам, месяцам и идентификаторам курьеров.
  12. courier_reward_sum — (расчетное поле с условием) сумма, которую необходимо перечислить курьеру. Вычисляется как courier_order_sum + courier_tips_sum * 0.95 (5% — комиссия за обработку платежа).

Правила расчёта процента выплаты курьеру в зависимости от рейтинга, где r — это средний рейтинг курьера в расчётном месяце:

  1. r < 4 — 5% от заказа, но не менее 100 р.;
  2. 4 <= r < 4.5 — 7% от заказа, но не менее 150 р.;
  3. 4.5 <= r < 4.9 — 8% от заказа, но не менее 175 р.;
  4. 4.9 <= r — 10% от заказа, но не менее 200 р.

DDL для реструктуризации хранилища ¶

DDL скрипты для реструктуризации хранилища реализованы в файле ddl/api_ddl.sql в дирректории файлов этого проекта.

Файлы проекта ¶

— ddl ¶

— api_ddl.sql ¶

--BEGIN;



-- DDL STG



-- Очистка слоя STG
ALTER TABLE stg.get_couriers DROP CONSTRAINT IF EXISTS get_couriers_id_pk;
DROP TABLE IF EXISTS stg.get_couriers;

ALTER TABLE stg.get_deliveries DROP CONSTRAINT IF EXISTS stg_get_deliveries_order_id_pk;
ALTER TABLE stg.get_deliveries DROP CONSTRAINT IF EXISTS stg_get_deliveries_rate_check;
DROP TABLE IF EXISTS stg.get_deliveries;



-- Таблица сырых данных о ресторанах stg.get_restaurants в слое STG
-- В рамках данного проекта таблица не требуется, 
-- т.к. данные из нее не используются в витринах
/*
ALTER TABLE stg.get_restaurants DROP CONSTRAINT IF EXISTS stg_get_restaurants_id_pk;

DROP TABLE IF EXISTS stg.get_restaurants;
CREATE TABLE IF NOT EXISTS stg.get_restaurants(
    _id VARCHAR NOT NULL, 
    name VARCHAR NOT NULL,
    
    CONSTRAINT stg_get_restaurants_id_pk PRIMARY KEY (_id)
);
*/

-- Таблица сырых данных о курьерах stg.get_couriers в слое STG
CREATE TABLE IF NOT EXISTS stg.get_couriers(
    id VARCHAR NOT NULL, 
    name VARCHAR NOT NULL,
    
    CONSTRAINT get_couriers_id_pk PRIMARY KEY (id)
);

-- Таблица сырых данных о доставках stg.get_deliveries в слое STG
CREATE TABLE IF NOT EXISTS stg.get_deliveries(
    order_id VARCHAR NOT NULL, -- ID заказа;
    order_ts TIMESTAMP NOT NULL, -- дата и время создания заказа;
    delivery_id VARCHAR NOT NULL, -- ID доставки;
    courier_id VARCHAR NOT NULL, -- ID курьера;
    address VARCHAR NOT NULL, -- адрес доставки;
    delivery_ts TIMESTAMP NOT NULL, -- дата и время совершения доставки;
    rate INT NOT NULL, -- рейтинг доставки, который выставляет покупатель: целочисленное значение от 1 до 5;
    tip_sum NUMERIC(19, 5) NOT NULL, -- сумма чаевых, которые оставил покупатель курьеру (в руб.).
    sum NUMERIC(19, 5) NOT NULL, -- сумма заказа. Это поле не понадобится вам для выполнения проекта, поскольку это число вы и так получите из подсистемы заказов (MongoDB). Так бывает: не всегда нужно использовать всю доступную информацию.
    
    CONSTRAINT stg_get_deliveries_order_id_pk PRIMARY KEY (order_id),
    CONSTRAINT stg_get_deliveries_rate_check CHECK ((rate >= 1) AND (rate <= 5))
);



-- DDL DDS



-- Очистка слоя DDS
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_deliveries_id_pk;
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_deliveries_order_id_fk;
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_delivery_id_id_fk;
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_courier_id_id_fk;
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_deliveries_order_id_and_delivery_id_unique;
DROP TABLE IF EXISTS dds.fct_product_deliveries;

ALTER TABLE dds.dm_couriers DROP CONSTRAINT IF EXISTS dds_dm_couriers_id_pk;
ALTER TABLE dds.dm_couriers DROP CONSTRAINT IF EXISTS dds_dm_couriers_courier_id_unique;
DROP TABLE IF EXISTS dds.dm_couriers

ALTER TABLE dds.dm_deliveries DROP CONSTRAINT IF EXISTS dds_dm_deliveries_delivery_id_unique;
ALTER TABLE dds.dm_deliveries DROP CONSTRAINT IF EXISTS dds_dm_deliveries_id_pk;
ALTER TABLE dds.dm_deliveries DROP CONSTRAINT IF EXISTS dds_dm_deliveries_timestamp_id_fk;
DROP TABLE IF EXISTS dds.dm_deliveries;



-- Таблица измерений с данными о курьрерах stg.get_couriers в слое DDS
CREATE TABLE IF NOT EXISTS dds.dm_couriers(
    id SERIAL NOT NULL,
    courier_id VARCHAR NOT NULL, 
    courier_name VARCHAR NOT NULL,
    
    CONSTRAINT dds_dm_couriers_id_pk PRIMARY KEY (id),
    CONSTRAINT dds_dm_couriers_courier_id_unique UNIQUE (courier_id)
);

-- Таблица измерений с данными о доставке dds.dm_deliveries в слое DDS
CREATE TABLE IF NOT EXISTS dds.dm_deliveries(
    id SERIAL NOT NULL,
    delivery_id VARCHAR NOT NULL, 
    timestamp_id INT NOT NULL,
    
    CONSTRAINT dds_dm_deliveries_delivery_id_unique UNIQUE (delivery_id),
    CONSTRAINT dds_dm_deliveries_id_pk PRIMARY KEY (id),
    CONSTRAINT dds_dm_deliveries_timestamp_id_fk FOREIGN KEY (timestamp_id) REFERENCES dds.dm_timestamps(id)
);

-- Таблица фактов с данными о доставках dds.fct_product_deliveries в слое DDS
CREATE TABLE IF NOT EXISTS dds.fct_product_deliveries(
    id SERIAL NOT NULL,
    order_id INT NOT NULL,
    delivery_id INT NOT NULL,
    courier_id INT NOT NULL,
    rate NUMERIC(5, 2) NOT NULL,
    tips_sum NUMERIC(19, 5) NOT NULL,
    
    CONSTRAINT dds_fct_product_deliveries_id_pk PRIMARY KEY (id),
    CONSTRAINT dds_fct_product_deliveries_order_id_fk FOREIGN KEY (order_id) REFERENCES dds.dm_orders(id),
    CONSTRAINT dds_fct_product_delivery_id_id_fk FOREIGN KEY (delivery_id) REFERENCES dds.dm_deliveries(id),
    CONSTRAINT dds_fct_product_courier_id_id_fk FOREIGN KEY (courier_id) REFERENCES dds.dm_couriers(id),
    CONSTRAINT dds_fct_product_deliveries_order_id_and_delivery_id_unique UNIQUE (order_id, delivery_id)
);



-- DDL CDM



-- Очистка слоя CDM
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_id_pk;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_settlement_year_check;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_settlement_month_check;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_order_processing_fee_check;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_courier_order_sum_check;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_courier_reward_sum_check;
DROP TABLE IF EXISTS cdm.dm_courier_ledger;



-- Таблица витрины оплаты кульерам dm_courier_ledger в слое CDM
CREATE TABLE IF NOT EXISTS cdm.dm_courier_ledger(
    id INT NOT NULL, -- идентификатор записи.
    courier_id VARCHAR(100) NOT NULL, -- ID курьера, которому перечисляем.
    courier_name VARCHAR(100), -- Ф. И. О. курьера.
    settlement_year INT NOT NULL, -- год отчёта.
    settlement_month SMALLINT NOT NULL, -- месяц отчёта, где 1 — январь и 12 — декабрь.
    orders_count INT, -- количество заказов за период (месяц).
    orders_total_sum NUMERIC(19, 5), -- общая стоимость заказов.
    rate_avg NUMERIC(5, 2), -- средний рейтинг курьера по оценкам пользователей.
    order_processing_fee NUMERIC(19, 5), -- сумма, удержанная компанией за обработку заказов, которая высчитывается как orders_total_sum * 0.25.
    courier_order_sum NUMERIC(19, 5), -- сумма, которую необходимо перечислить курьеру за доставленные им/ей заказы. За каждый доставленный заказ курьер должен получить некоторую сумму в зависимости от рейтинга (см. ниже).
    courier_tips_sum NUMERIC(19, 5), -- сумма, которую пользователи оставили курьеру в качестве чаевых.
    courier_reward_sum NUMERIC(19, 5), -- сумма, которую необходимо перечислить курьеру. Вычисляется как courier_order_sum + courier_tips_sum * 0.95 (5% — комиссия за обработку платежа).
    
    CONSTRAINT cdm_dm_courier_ledger_id_pk PRIMARY KEY (id),
    
    CONSTRAINT cdm_dm_courier_ledger_settlement_year_check CHECK ((settlement_year >= 2022) AND (settlement_year < 2500)),
    CONSTRAINT cdm_dm_courier_ledger_settlement_month_check CHECK ((settlement_month >= 1) AND (settlement_month <= 12)),
    CONSTRAINT cdm_dm_courier_ledger_order_processing_fee_check CHECK (order_processing_fee = orders_total_sum * .25),
    CONSTRAINT cdm_dm_courier_ledger_courier_order_sum_check CHECK (((courier_order_sum >= orders_total_sum * .05) AND (courier_order_sum >= 100)) AND 
    																   ((courier_order_sum <= orders_total_sum * .1) OR (courier_order_sum <= 200))),
    CONSTRAINT cdm_dm_courier_ledger_courier_reward_sum_check CHECK (courier_reward_sum = courier_order_sum + courier_tips_sum * .95)
);



-- Функция расчета полей таблицы dds.fct_product_deliveries
CREATE OR REPLACE FUNCTION cdm_dm_courier_ledger_function()
RETURNS TRIGGER AS $$
BEGIN
    -- order_processing_fee — сумма, удержанная компанией за обработку заказов,
    -- которая высчитывается как orders_total_sum * 0.25.
    NEW.order_processing_fee = NEW.orders_total_sum * 0.25;

    -- courier_order_sum — сумма, которую необходимо перечислить курьеру
    -- за доставленные им/ей заказы. За каждый доставленный заказ
    -- курьер должен получить некоторую сумму в зависимости от рейтинга (rate_avg).
    -- Правила расчёта процента выплаты курьеру в зависимости от рейтинга, где r — это средний рейтинг курьера в расчётном месяце:
    -- r < 4 — 5% от заказа, но не менее 100 р.;
    -- 4 <= r < 4.5 — 7% от заказа, но не менее 150 р.;
    -- 4.5 <= r < 4.9 — 8% от заказа, но не менее 175 р.;
    -- 4.9 <= r — 10% от заказа, но не менее 200 р.
    IF (NEW.rate_avg < 4) THEN
        NEW.courier_order_sum = NEW.orders_total_sum * 0.05;
        IF (NEW.courier_order_sum < 100) THEN
            NEW.courier_order_sum = 100;
        END IF;
    ELSIF (NEW.rate_avg >= 4 AND NEW.rate_avg < 4.5) THEN
        NEW.courier_order_sum = NEW.orders_total_sum * 0.07;
        IF (NEW.courier_order_sum < 150) THEN
            NEW.courier_order_sum = 150;
        END IF;
    ELSIF (NEW.rate_avg >= 4.5 AND NEW.rate_avg < 4.9) THEN
        NEW.courier_order_sum = NEW.orders_total_sum * 0.08;
        IF (NEW.courier_order_sum < 175) THEN
            NEW.courier_order_sum = 175;
        END IF;
    ELSE
        NEW.courier_order_sum = NEW.orders_total_sum * 0.1;
        IF (NEW.courier_order_sum < 200) THEN
            NEW.courier_order_sum = 200;
        END IF;
    END IF;

    -- courier_reward_sum — сумма, которую необходимо перечислить курьеру.
    -- Вычисляется как courier_order_sum + courier_tips_sum * 0.95 (5% — комиссия за обработку платежа).
    NEW.courier_reward_sum = NEW.courier_order_sum + NEW.courier_tips_sum * 0.95;

    RETURN NEW;
END;
$$ LANGUAGE PLPGSQL;



-- Удаление старого триггера таблицы dds.fct_product_deliveries, если он существует
DROP TRIGGER IF EXISTS cdm_dm_courier_ledger_trigger ON cdm.dm_courier_ledger;



-- Создание нового триггера таблицы dds.fct_product_deliveries
CREATE TRIGGER cdm_dm_courier_ledger_trigger
BEFORE INSERT OR UPDATE ON cdm.dm_courier_ledger
FOR EACH ROW EXECUTE FUNCTION cdm_dm_courier_ledger_function();



--COMMIT;

— dags ¶

— lib ¶

— __init__.py ¶

import os
import sys

from .mongo_connect import MongoConnect  # noqa
from .pg_connect import ConnectionBuilder  # noqa
from .pg_connect import PgConnect  # noqa
from .api_connect import APIConnect  # noqa
from .api_connect import JsonToDF  # noqa
from test import Test

sys.path.append(os.path.dirname(os.path.realpath(__file__)))

— api_connect.py ¶

from urllib.parse import quote_plus as quote
import pandas as pd
import requests


class APIConnect:
    def __init__(self,
                 entry_point: str, # переопределить в подклассе
                 sort_field: str,
                 sort_direction: str,
                 limit: str,
                 offset: str,
                 restaurant_id: str,
                 from_: str,
                 to: str) -> None:

        self.entry_point = entry_point
        self.sort_field = sort_field
        self.sort_direction = sort_direction
        self.limit = limit
        self.offset = offset
        self.restaurant_id = restaurant_id
        self.from_ = from_
        self.to = to

    def adding_headers(self) -> None:
        # Получение данных JSON с сервера через API
        self.headers = {
            "X-API-KEY": "25c27781-8fde-4b30-a22e-524044a7580f", # ключ API
            "X-Nickname": "mailforal", # авторизационные данные microsegment
            "X-Cohort": "32", # авторизационные данные

            # sort_field - определяет поле, к которому будет применяться сортировка, переданная в параметре sort_direction
            # Yеобязательный параметр. Возможные значения: id, name...
            "sort_field": self.sort_field,

            # sort_direction - определяет порядок сортировки для поля, переданного в sort_field
            # asc — сортировка по возрастанию,
            # desc — сортировка по убыванию.
            "sort_direction": self.sort_direction,

            # limit - определяет максимальное количество записей, которые будут возвращены в ответе.
            # Необязательный параметр. Значение по умолчанию: 50. Возможные значения: 
            # целое число в интервале от 0 до 50 включительно.
            "limit": self.limit,

            # offset - определяет количество возвращаемых элементов результирующей выборки, когда формируется ответ.
            # Необязательный параметр. Значение по умолчанию: 0.
            "offset": self.offset
        }

        # restaurant_id - ID ресторана. 
        # Если значение не указано, то метод вернёт данные по всем доступным в БД ресторанам.
        if self.restaurant_id != '': self.headers['from'] = self.restaurant_id

        # from - параметр фильтрации. 
        # В выборку попадают заказы с датой доставки, которая больше или равна значению from. 
        # Дата должна быть в формате '%Y-%m-%d %H:%M:%S’, например, 2022-01-01 00:00:00.
        if self.from_ != '': self.headers['from'] = self.from_

        # to — параметр фильтрации. 
        # В выборку попадают заказы с датой доставки меньше значения to.
        if self.to != '': self.headers['to'] = self.to

    def from_api_to_json(self) -> str:
        self.adding_headers()

        # Получение данных JSON с сервера через API
        return requests.get(
            "https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/" + self.entry_point, # точка входа
            headers = self.headers
        ).json()


class JsonToDF:
    def __init__(self, json_str: str):
        self.json_str = json_str
    def json_to_df(self) -> pd.DataFrame:
        return pd.json_normalize(self.json_str)

— dict_util.py ¶

import json
from datetime import datetime
from typing import Any, Dict

from bson.objectid import ObjectId


def json2str(obj: Any) -> str:
    return json.dumps(to_dict(obj), sort_keys=True, ensure_ascii=False)


def str2json(str: str) -> Dict:
    return json.loads(str)


def to_dict(obj, classkey=None):
    if isinstance(obj, datetime):
        return obj.strftime("%Y-%m-%d %H:%M:%S")
    elif isinstance(obj, ObjectId):
        return str(obj)
    if isinstance(obj, dict):
        data = {}
        for (k, v) in obj.items():
            data[k] = to_dict(v, classkey)
        return data
    elif hasattr(obj, "_ast"):
        return to_dict(obj._ast())
    elif hasattr(obj, "__iter__") and not isinstance(obj, str):
        return [to_dict(v, classkey) for v in obj]
    elif hasattr(obj, "__dict__"):
        data = dict([(key, to_dict(value, classkey))
                     for key, value in obj.__dict__.items()
                     if not callable(value) and not key.startswith('_')])
        if classkey is not None and hasattr(obj, "__class__"):
            data[classkey] = obj.__class__.__name__
        return data
    else:
        return obj

— mongo_connect.py ¶

from urllib.parse import quote_plus as quote

from pymongo.mongo_client import MongoClient


class MongoConnect:
    def __init__(self,
                 cert_path: str,
                 user: str,
                 pw: str,
                 host: str,
                 rs: str,
                 auth_db: str,
                 main_db: str
                 ) -> None:

        self.user = user
        self.pw = pw
        self.host = host
        self.replica_set = rs
        self.auth_db = auth_db
        self.main_db = main_db
        self.cert_path = cert_path

    def url(self) -> str:
        return 'mongodb://{user}:{pw}@{hosts}/?replicaSet={rs}&authSource={auth_src}'.format(
            user=quote(self.user),
            pw=quote(self.pw),
            hosts=self.host,
            rs=self.replica_set,
            auth_src=self.auth_db)

    def client(self):
        return MongoClient(self.url(), tlsCAFile=self.cert_path)[self.main_db]

— pg_connect.py ¶

from contextlib import contextmanager
from typing import Generator

import psycopg
from airflow.hooks.base import BaseHook


class PgConnect:
    def __init__(self, host: str, port: str, db_name: str, user: str, pw: str, sslmode: str = "require") -> None:
        self.host = host
        self.port = int(port)
        self.db_name = db_name
        self.user = user
        self.pw = pw
        self.sslmode = sslmode

    def url(self) -> str:
        return """
            host={host}
            port={port}
            dbname={db_name}
            user={user}
            password={pw}
            target_session_attrs=read-write
            sslmode={sslmode}
        """.format(
            host=self.host,
            port=self.port,
            db_name=self.db_name,
            user=self.user,
            pw=self.pw,
            sslmode=self.sslmode)

    def client(self):
        return psycopg.connect(self.url())

    @contextmanager
    def connection(self) -> Generator[psycopg.Connection, None, None]:
        conn = psycopg.connect(self.url())
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()


class ConnectionBuilder:

    @staticmethod
    def pg_conn(conn_id: str) -> PgConnect:
        conn = BaseHook.get_connection(conn_id)

        sslmode = "require"
        if "sslmode" in conn.extra_dejson:
            sslmode = conn.extra_dejson["sslmode"]

        pg = PgConnect(str(conn.host),
                       str(conn.port),
                       str(conn.schema),
                       str(conn.login),
                       str(conn.password),
                       sslmode)

        return pg

— stg ¶

— api_delivery ¶

—- __init__.py ¶

import os
import sys

from .courier_repositories import (CourierJSONObj, CourierRawRepository, 
                                    CourierDDSObj, CourierDDSRepository)  # noqa
from .delivery_repositories import (DeliveryJSONObj, DeliveryRawRepository, 
                                    DeliveryDDSObj, DeliveryDDSRepository)  # noqa
from .courier_loader import CourierLoader  # noqa
from .delivery_loader import DeliveryLoader  # noqa

sys.path.append(os.path.dirname(os.path.realpath(__file__)))

—- api_connect.py ¶

from urllib.parse import quote_plus as quote
import pandas as pd
import requests


class APIConnect:
    def __init__(self,
                 entry_point: str, # переопределить в подклассе
                 sort_field: str,
                 sort_direction: str,
                 limit: str,
                 offset: str,
                 restaurant_id: str,
                 from_: str,
                 to: str) -> None:

        self.entry_point = entry_point
        self.sort_field = sort_field
        self.sort_direction = sort_direction
        self.limit = limit
        self.offset = offset
        self.restaurant_id = restaurant_id
        self.from_ = from_
        self.to = to

    def adding_headers(self) -> None:
        # Получение данных JSON с сервера через API
        self.headers = {
            "X-API-KEY": "25c27781-8fde-4b30-a22e-524044a7580f", # ключ API
            "X-Nickname": "mailforal", # авторизационные данные microsegment
            "X-Cohort": "32", # авторизационные данные

            # sort_field - определяет поле, к которому будет применяться сортировка, переданная в параметре sort_direction
            # Yеобязательный параметр. Возможные значения: id, name...
            "sort_field": self.sort_field,

            # sort_direction - определяет порядок сортировки для поля, переданного в sort_field
            # asc — сортировка по возрастанию,
            # desc — сортировка по убыванию.
            "sort_direction": self.sort_direction,

            # limit - определяет максимальное количество записей, которые будут возвращены в ответе.
            # Необязательный параметр. Значение по умолчанию: 50. Возможные значения: 
            # целое число в интервале от 0 до 50 включительно.
            "limit": self.limit,

            # offset - определяет количество возвращаемых элементов результирующей выборки, когда формируется ответ.
            # Необязательный параметр. Значение по умолчанию: 0.
            "offset": self.offset
        }

        # restaurant_id - ID ресторана. 
        # Если значение не указано, то метод вернёт данные по всем доступным в БД ресторанам.
        if self.restaurant_id != '': self.headers['from'] = self.restaurant_id

        # from - параметр фильтрации. 
        # В выборку попадают заказы с датой доставки, которая больше или равна значению from. 
        # Дата должна быть в формате '%Y-%m-%d %H:%M:%S’, например, 2022-01-01 00:00:00.
        if self.from_ != '': self.headers['from'] = self.from_

        # to — параметр фильтрации. 
        # В выборку попадают заказы с датой доставки меньше значения to.
        if self.to != '': self.headers['to'] = self.to

    def from_api_to_json(self) -> str:
        self.adding_headers()

        # Получение данных JSON с сервера через API
        return requests.get(
            "https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/" + self.entry_point, # точка входа
            headers = self.headers
        ).json()


class JsonToDF:
    def __init__(self, json_str: str):
        self.json_str = json_str
    def json_to_df(self) -> pd.DataFrame:
        return pd.json_normalize(self.json_str)

—- api_couriers_loader.py ¶

from typing import List
from psycopg.rows import class_row
from pydantic import BaseModel # Упрощение проверки и анализа данных, загружаемых из БД и API

# Самоделки
from lib import PgConnect, ConnectionBuilder # Получение данных из PostgreSQL
from lib import APIConnect, JsonToDF # Получение данных по API в формате JSON


class CourierObj(BaseModel):
    id: int
    name: str


class FetchCouriersFromAPI:
    def list_couriers(self):
        return APIConnect('couriers', 
                          'name', 'asc', 
                          '0', '0', '',
                          '', '').from_api_to_json()


class CourierDestRepository:
    def __init__(self, pg: PgConnect) -> None:
        self._db = pg

    def insert_couriers(self, couriers: List[CourierObj]) -> None:
        with self._db.client() as conn:
            with conn.cursor() as cur:
                for courier in couriers:
                    print('courier =', courier)
                    cur.execute(
                        """
                            INSERT INTO stg.get_couriers(id, name)
                            VALUES (%(id)s, %(name)s)
                            ON CONFLICT (id) DO NOTHING;
                        """,
                        {
                            "id": courier['_id'],
                            "name": courier['name']
                        },
                    )
                conn.commit()


class CourierLoader:
    WF_KEY = "api_couriers_to_stg_workflow"

    def __init__(self, pg_dest: PgConnect) -> None:
        self.stg = CourierDestRepository(pg_dest)

    def load_couriers(self):
        load_queue = FetchCouriersFromAPI().list_couriers()
        self.stg.insert_couriers(load_queue)

—- api_deliveries_loader.py ¶

from typing import List
from psycopg.rows import class_row
from pydantic import BaseModel # Упрощение проверки и анализа данных, загружаемых из БД и API
from datetime import datetime, timedelta 

# Самоделки
from lib import PgConnect, ConnectionBuilder # Получение данных из PostgreSQL
from lib import APIConnect, JsonToDF # Получение данных по API в формате JSON


class DeliveryObj(BaseModel):
    order_id: str
    order_ts: str
    delivery_id: str
    courier_id: str
    address: str
    delivery_ts: str
    rate: str
    tip_sum: float
    sum: float


class FetchDeliveriesFromAPI:
    def list_deliveries(self):
        return APIConnect('deliveries', 
                          'order_id', 'asc', 
                          '0', '0', '', 
                          (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S'), 
                          (datetime.now()).strftime('%Y-%m-%d %H:%M:%S')).from_api_to_json()


class DeliveryDestRepository:
    def __init__(self, pg: PgConnect) -> None:
        self._db = pg

    def insert_deliveries(self, deliveries: List[DeliveryObj]) -> None:
        with self._db.client() as conn:
            with conn.cursor() as cur:
                for delivery in deliveries:
                    print('delivery =', delivery)
                    cur.execute(
                        """
                            INSERT INTO stg.get_deliveries(order_id, 
                                                           order_ts, 
                                                           delivery_id, 
                                                           courier_id, 
                                                           address, 
                                                           delivery_ts, 
                                                           rate, 
                                                           tip_sum, 
                                                           sum)
                            VALUES (%(order_id)s, 
                                    %(order_ts)s, 
                                    %(delivery_id)s, 
                                    %(courier_id)s, 
                                    %(address)s, 
                                    %(delivery_ts)s, 
                                    %(rate)s, 
                                    %(tip_sum)s, 
                                    %(sum)s)
                            ON CONFLICT (order_id) DO NOTHING;
                        """,
                        {
                            "order_id": delivery['order_id'],
                            "order_ts": delivery['order_ts'],
                            "delivery_id": delivery['delivery_id'],
                            "courier_id": delivery['courier_id'],
                            "address": delivery['address'],
                            "delivery_ts": delivery['delivery_ts'],
                            "rate": delivery['rate'],
                            "tip_sum": delivery['tip_sum'],
                            "sum": delivery['sum']
                        },
                    )
                conn.commit()


class DeliveryLoader:
    WF_KEY = "api_deliveries_to_stg_workflow"

    def __init__(self, pg_dest: PgConnect) -> None:
        self.stg = DeliveryDestRepository(pg_dest)

    def load_deliveries(self):
        load_queue = FetchDeliveriesFromAPI().list_deliveries()
        self.stg.insert_deliveries(load_queue)

—- config_const.py ¶

class ConfigConst:
    PG_WAREHOUSE_CONNECTION = "PG_WAREHOUSE_CONNECTION"
    PG_ORIGIN_BONUS_SYSTEM_CONNECTION = "PG_ORIGIN_BONUS_SYSTEM_CONNECTION"

    MONGO_DB_CERTIFICATE_PATH = 'MONGO_DB_CERTIFICATE_PATH'
    MONGO_DB_USER = 'MONGO_DB_USER'
    MONGO_DB_PASSWORD = 'MONGO_DB_PASSWORD'
    MONGO_DB_REPLICA_SET = 'MONGO_DB_REPLICA_SET'
    MONGO_DB_DATABASE_NAME = 'MONGO_DB_DATABASE_NAME'
    MONGO_DB_HOST = 'MONGO_DB_HOST'

    PG_ORIGIN_DATABASE_NAME = "PG_ORIGIN_DATABASE_NAME"
    PG_ORIGIN_HOST = "PG_ORIGIN_HOST"
    PG_ORIGIN_PASSWORD = "PG_ORIGIN_PASSWORD"
    PG_ORIGIN_PORT = "PG_ORIGIN_PORT"
    PG_ORIGIN_USER = "PG_ORIGIN_USER"

—- pg_connect.py ¶

from contextlib import contextmanager
from typing import Generator

import psycopg
from airflow.hooks.base import BaseHook


class PgConnect:
    def __init__(self, host: str, port: str, db_name: str, user: str, pw: str, sslmode: str = "require") -> None:
        self.host = host
        self.port = int(port)
        self.db_name = db_name
        self.user = user
        self.pw = pw
        self.sslmode = sslmode

    def url(self) -> str:
        return """
            host={host}
            port={port}
            dbname={db_name}
            user={user}
            password={pw}
            target_session_attrs=read-write
            sslmode={sslmode}
        """.format(
            host=self.host,
            port=self.port,
            db_name=self.db_name,
            user=self.user,
            pw=self.pw,
            sslmode=self.sslmode)

    def client(self):
        return psycopg.connect(self.url())

    @contextmanager
    def connection(self) -> Generator[psycopg.Connection, None, None]:
        conn = psycopg.connect(self.url())
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()


class ConnectionBuilder:

    @staticmethod
    def pg_conn(conn_id: str) -> PgConnect:
        conn = BaseHook.get_connection(conn_id)

        sslmode = "require"
        if "sslmode" in conn.extra_dejson:
            sslmode = conn.extra_dejson["sslmode"]

        pg = PgConnect(str(conn.host),
                       str(conn.port),
                       str(conn.schema),
                       str(conn.login),
                       str(conn.password),
                       sslmode)

        return pg

—- stg_api_deliveries_dag.py ¶

import logging

import pendulum
from airflow.decorators import dag, task

# Самоделки
from config_const import ConfigConst
from lib import ConnectionBuilder # Получение данных из PostgreSQL
from stg.api_delivery import CourierLoader
from stg.api_delivery import DeliveryLoader

log = logging.getLogger(__name__)


@dag(
    schedule_interval='0/15 * * * *',
    start_date=pendulum.datetime(2022, 5, 5, tz="UTC"),
    catchup=False,
    tags=['sprint5', 'stg', 'api', 'couriers', 'delivery'],
    is_paused_upon_creation=False
)
def stg_api_delivery_dag():

    dwh_pg_connect = ConnectionBuilder.pg_conn(ConfigConst.PG_WAREHOUSE_CONNECTION)

    @task(task_id="couriers_load")
    def load_couriers():
        courier_loader = CourierLoader(dwh_pg_connect)
        courier_loader.load_couriers()

    @task(task_id="deliveries_load")
    def load_deliveries():
        delivery_loader = DeliveryLoader(dwh_pg_connect)
        delivery_loader.load_deliveries()

    couriers = load_couriers()
    deliveries = load_deliveries()

    couriers  # type: ignore
    deliveries  # type: ignore


stg_delivery_dag = stg_api_delivery_dag()

— __init__.py ¶

from .stg_settings_repository import EtlSetting, StgEtlSettingsRepository

— stg_settings_repository.py ¶

from typing import Dict, Optional

from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel


class EtlSetting(BaseModel):
    id: int
    workflow_key: str
    workflow_settings: Dict


class StgEtlSettingsRepository:
    def get_setting(self, conn: Connection, etl_key: str) -> Optional[EtlSetting]:
        with conn.cursor(row_factory=class_row(EtlSetting)) as cur:
            cur.execute(
                """
                    SELECT
                        id,
                        workflow_key,
                        workflow_settings
                    FROM stg.srv_wf_settings
                    WHERE workflow_key = %(etl_key)s;
                """,
                {"etl_key": etl_key},
            )
            obj = cur.fetchone()

        return obj

    def save_setting(self, conn: Connection, workflow_key: str, workflow_settings: str) -> None:
        with conn.cursor() as cur:
            cur.execute(
                """
                    INSERT INTO stg.srv_wf_settings(workflow_key, workflow_settings)
                    VALUES (%(etl_key)s, %(etl_setting)s)
                    ON CONFLICT (workflow_key) DO UPDATE
                    SET workflow_settings = EXCLUDED.workflow_settings;
                """,
                {
                    "etl_key": workflow_key,
                    "etl_setting": workflow_settings
                },
            )

— dds ¶

—- dds_delivery ¶

—- __init__.py ¶

import os
import sys

from .courier_loader import CourierLoader # noqa
from .delivery_loader import DeliveryLoader  # noqa
from .fct_deliveries_loader import FctDeliveryLoader  # noqa

sys.path.append(os.path.dirname(os.path.realpath(__file__)))

—- courier_loader.py ¶

import json
from datetime import datetime

from lib import PgConnect

from dds.dds_settings_repository import DdsEtlSettingsRepository, EtlSetting
from dds.dds_delivery.courier_repositories import (CourierDDSObj, CourierDDSRepository, CourierJSONObj,
                                CourierRawRepository)


# Класс таска "dm_couriers_load" дага "sprint5_case_dds_snowflake"
class CourierLoader:
    WF_KEY = "couriers_raw_to_dds_workflow"
    LAST_LOADED_ID_KEY = "last_loaded_id"

    def __init__(self, pg: PgConnect, settings_repository: DdsEtlSettingsRepository) -> None:
        self.dwh = pg
        self.raw = CourierRawRepository()
        self.dds_couriers = CourierDDSRepository()
        self.settings_repository = settings_repository

    # Метод преобразования данных таблицы `get_couriers` слоя `STG` 
    # в сласс данных таблицы `dm_couriers` слоя `DDS` через JSON
    #def parse_data(self, courier_raw: CourierJSONObj, restaurant_id: int, timestamp_id: int, user_id: int) -> CourierDDSObj:
    def parse_data(self, courier_raw: CourierJSONObj) -> CourierDDSObj:
        print('(i) File "/lessons/dags/dds/dds_delivery/courier_loader.py", line 32, courier_raw =', courier_raw)

        t = CourierDDSObj(id=0,
                        courier_id=courier_raw.id,
                        courier_name=courier_raw.name
                        )

        return t

    # Метод чтения всех необходимых данных из всех необходимых таблиц 
    # и запись их в требуемые таблицы, связанные с таблицей `dm_couriers` слоя `DDS`
    # в рамках таска "dm_couriers_load" дага "sprint5_case_dds_snowflake"
    def load_data(self):
        with self.dwh.connection() as conn:
            wf_setting = self.settings_repository.get_setting(conn, self.WF_KEY)
            if not wf_setting:
                wf_setting = EtlSetting(id=0, workflow_key=self.WF_KEY, workflow_settings={self.LAST_LOADED_ID_KEY: '-1'})

            last_loaded_id = wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY]

            print('(i) File "/lessons/dags/dds/dds_delivery/courier_loader.py", line 52, last_loaded_id =', last_loaded_id)
            load_queue = self.raw.load_data(conn, last_loaded_id)
            load_queue.sort(key=lambda x: x.id)
            for courier_raw in load_queue:
                courier_to_load = self.parse_data(courier_raw)
                self.dds_couriers.insert_data(conn, courier_to_load)

                print('(i) File "/lessons/dags/dds/dds_delivery/courier_loader.py", line 81, courier_raw =', courier_raw)
                wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY] = courier_raw.id
                self.settings_repository.save_setting(conn, wf_setting)

—- courier_repositories.py ¶

from typing import List, Optional

from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel


# Класс данных таблицы `get_couriers` в слое `STG`
class CourierJSONObj(BaseModel):
    id: str
    name: str


# Класс взаимодействия с данными таблицы `get_couriers` в слое `STG`
class CourierRawRepository:

    # Метод чтения данных из таблицы `get_couriers` в слое `STG`
    def load_data(self, conn: Connection, last_loaded_record_id: int) -> List[CourierJSONObj]:
        print('(i) File "/lessons/dags/dds/dds_delivery/courier_repositories.py", line 20, last_loaded_record_id =', last_loaded_record_id)
        with conn.cursor(row_factory=class_row(CourierJSONObj)) as cur:
            cur.execute(
                """
                    SELECT
                        id,
                        name
                    FROM stg.get_couriers
                    WHERE id > %(last_loaded_record_id)s
                    ORDER BY id ASC;
                """,
                {"last_loaded_record_id": last_loaded_record_id}
            )
            objs = cur.fetchall()
        objs.sort(key=lambda x: x.id)
        print('(i) File "/lessons/dags/dds/dds_delivery/courier_repositories.py", line 35, objs =', objs)
        return objs


# Класс данных таблицы `dm_couriers` в слое `DDS`
class CourierDDSObj(BaseModel):
    id: int
    courier_id: str
    courier_name: str


# Класс взаимодействия с данными таблицы `dm_couriers` в слое `DDS`
class CourierDDSRepository:

    # Метод вставки данных в таблицу `dm_couriers` в слое `DDS`
    def insert_data(self, conn: Connection, obj: CourierJSONObj) -> None:
        print('(i) File "/lessons/dags/dds/dds_delivery/courier_repositories.py", line 52, obj =', obj)
        with conn.cursor() as cur:
            cur.execute(
                """
                    INSERT INTO dds.dm_couriers(courier_id, courier_name)
                    VALUES (%(courier_id)s, %(courier_name)s)
                    ON CONFLICT (courier_id) DO UPDATE
                    SET
                        courier_id = EXCLUDED.courier_id,
                        courier_name = EXCLUDED.courier_name
                    ;
                """,
                {
                    "courier_id": obj.courier_id,
                    "courier_name": obj.courier_name
                },
            )

    # Метод чтения данных из таблицы `dm_couriers` в слое `DDS`
    def get_data(self, conn: Connection, courier_id: str) -> Optional[CourierDDSObj]:
        with conn.cursor(row_factory=class_row(CourierDDSObj)) as cur:
            cur.execute(
                """
                    SELECT
                        id,
                        courier_id,
                        courier_name
                    FROM dds.dm_couriers
                    WHERE courier_id = %(courier_id)s;
                """,
                {"courier_id": courier_id},
            )
            obj = cur.fetchone()
        return obj

—- delivery_loader.py ¶

import json
from datetime import datetime

from lib import PgConnect

from dds.dds_settings_repository import DdsEtlSettingsRepository, EtlSetting
from dds.timestamp_loader import TimestampDdsRepository
from dds.dds_delivery.delivery_repositories import (DeliveryJSONObj, DeliveryRawRepository, 
                                                    DeliveryDDSObj, DeliveryDDSRepository)


# Класс таска "dm_delivery_load" дага "sprint5_case_dds_snowflake"
class DeliveryLoader:
    WF_KEY = "deliviries_raw_to_dds_workflow"
    LAST_LOADED_ID_KEY = "last_loaded_id"

    def __init__(self, pg: PgConnect, settings_repository: DdsEtlSettingsRepository) -> None:
        self.dwh = pg
        self.raw = DeliveryRawRepository()
        self.dds_timestamps = TimestampDdsRepository()
        self.dds_deliveries = DeliveryDDSRepository()
        self.settings_repository = settings_repository

        print('(i) dags\dds\delivery_loader.py, row 20, DeliveryLoader.self =', self)

    # Метод преобразования данных таблицы `get_deliveries` слоя `STG` 
    # в сласс данных таблицы `dm_deliveries` слоя `DDS` через JSON
    def parse_data(self, delivery_raw: DeliveryJSONObj, timestamp_id: int) -> DeliveryDDSObj:
        print('(i) dags\dds\delivery_loader.py, row 35, delivery_raw =', delivery_raw)

        t = DeliveryDDSObj(id=0,
                        delivery_id=delivery_raw.delivery_id,
                        timestamp_id=timestamp_id
                        )
        print('(i) dags\dds\delivery_loader.py, row 38, t (DeliveryDDSObj) =', t)

        return t

    # Метод чтения всех необходимых данных из всех необходимых таблиц 
    # и запись их в требуемые таблицы, связанные с таблицей `dm_deliveries` слоя `DDS`
    # в рамках таска "dm_delivery_load" дага "sprint5_case_dds_snowflake"
    def load_data(self):
        print('(i) dags\dds\delivery_loader.py, row 48, DeliveryLoader.load_data(self) =', self)
        with self.dwh.connection() as conn:
            print('(i) dags\dds\delivery_loader.py, row 51, conn =', conn)
            wf_setting = self.settings_repository.get_setting(conn, self.WF_KEY)
            if not wf_setting:
                wf_setting = EtlSetting(id=0, workflow_key=self.WF_KEY, workflow_settings={self.LAST_LOADED_ID_KEY: '-1'})
            print('(i) dags\dds\delivery_loader.py, row 53, wf_setting =', wf_setting)

            last_loaded_id = wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY]
            print('(i) dags\dds\delivery_loader.py, row 58, last_loaded_id =', last_loaded_id)

            load_queue = self.raw.load_data(conn, last_loaded_id)
            load_queue.sort(key=lambda x: x.order_id)
            for delivery_raw in load_queue:
                print('(i) dags\dds\delivery_loader.py, row 63, delivery_raw =', delivery_raw)

                print('(i) dags\dds\delivery_loader.py, row 70, delivery_raw.order_ts =', delivery_raw.order_ts)
                dt = delivery_raw.order_ts.strftime("%Y-%m-%d %H:%M:%S")
                print('(i) dags\dds\delivery_loader.py, row 70, dt =', dt)
                timestamp = self.dds_timestamps.get_timestamp(conn, dt)
                if not timestamp:
                    continue
                print('(i) dags\dds\delivery_loader.py, row 69, timestamp =', timestamp)

                delivery_to_load = self.parse_data(delivery_raw, timestamp.id)
                print('(i) dags\dds\delivery_loader.py, row 84, delivery_to_load =', delivery_to_load)
                self.dds_deliveries.insert_data(conn, delivery_to_load)

                wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY] = delivery_raw.order_id
                self.settings_repository.save_setting(conn, wf_setting)

—- delivery_repositories.py ¶

from typing import List, Optional
from datetime import datetime

from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel


# Класс данных таблицы `get_deliveries` в слое `STG`
class DeliveryJSONObj(BaseModel):
    order_id: str
    order_ts: datetime
    delivery_id: str
    courier_id: str
    address: str
    delivery_ts: datetime
    rate: int
    tip_sum: float
    sum: float


# Класс взаимодействия с данными таблицы `get_deliveries` в слое `STG`
class DeliveryRawRepository:

    # Метод чтения данных из таблицы `get_deliveries` в слое `STG`
    def load_data(self, conn: Connection, last_loaded_record_id: str) -> List[DeliveryJSONObj]:
        print('(i) dags\dds\delivery_repositories.py, row 26, last_loaded_record_id =', last_loaded_record_id)
        print('(i) dags\dds\delivery_repositories.py, row 26, conn =', conn)
        with conn.cursor(row_factory=class_row(DeliveryJSONObj)) as cur:
            cur.execute(
                """
                    SELECT
                        order_id, 
                        order_ts, 
                        delivery_id, 
                        courier_id, 
                        address, 
                        delivery_ts, 
                        rate, 
                        tip_sum, 
                        sum
                    FROM stg.get_deliveries
                    WHERE order_id > %(last_loaded_record_id)s
                    ORDER BY order_id ASC;
                """,
                {"last_loaded_record_id": last_loaded_record_id},
            )
            objs = cur.fetchall()
        objs.sort(key=lambda x: x.order_id)
        print('(i) dags\dds\delivery_repositories.py, row 49, objs =', objs)
        return objs


# Класс данных таблицы `dm_deliveries` в слое `DDS`
class DeliveryDDSObj(BaseModel):
    id: int
    delivery_id: str
    timestamp_id: int


# Класс взаимодействия с данными таблицы `dm_deliveries` в слое `DDS`
class DeliveryDDSRepository:

    # Метод вставки данных в таблицу `dm_deliveries` в слое `DDS`
    def insert_data(self, conn: Connection, obj: DeliveryDDSObj) -> None:
        with conn.cursor() as cur:
            cur.execute(
                """
                    INSERT INTO dds.dm_deliveries(delivery_id, timestamp_id)
                    VALUES (%(delivery_id)s, %(timestamp_id)s)
                    ON CONFLICT (delivery_id) DO UPDATE
                    SET
                        delivery_id = EXCLUDED.delivery_id,
                        timestamp_id = EXCLUDED.timestamp_id
                    ;
                """,
                {
                    "delivery_id": obj.delivery_id,
                    "timestamp_id": obj.timestamp_id,
                },
            )

    # Метод чтения данных из таблицы `dm_deliveries` в слое `DDS`
    def get_data(self, conn: Connection, delivery_id: str) -> Optional[DeliveryDDSObj]:
        with conn.cursor(row_factory=class_row(DeliveryDDSObj)) as cur:
            cur.execute(
                """
                    SELECT
                        id,
                        delivery_id,
                        timestamp_id
                    FROM dds.dm_deliveries
                    WHERE delivery_id = %(delivery_id)s;
                """,
                {"delivery_id": delivery_id},
            )
            obj = cur.fetchone()
        return obj

    def list_data(self, conn: Connection) -> List[DeliveryDDSObj]:
        with conn.cursor(row_factory=class_row(DeliveryDDSObj)) as cur:
            cur.execute(
                """
                    SELECT id, delivery_id, timestamp_id
                    FROM dds.dm_deliveries;
                """
            )
            obj = cur.fetchall()
        return obj

—- fct_deliveries_loader.py ¶

import json
import logging
from datetime import datetime
from typing import Dict, List, Tuple

from lib import PgConnect
from psycopg import Connection
from pydantic import BaseModel

from dds.dds_settings_repository import DdsEtlSettingsRepository, EtlSetting
from dds.order_repositories import (OrderDdsObj, OrderDdsRepository)

from dds.dds_delivery.courier_repositories import (CourierDDSObj, 
                                                    CourierDDSRepository, 
                                                    CourierJSONObj, 
                                                    CourierRawRepository)
from dds.dds_delivery.delivery_repositories import (DeliveryDDSObj, 
                                                    DeliveryDDSRepository, 
                                                    DeliveryJSONObj, 
                                                    DeliveryRawRepository)

log = logging.getLogger(__name__)


# Класс данных таблицы `fct_product_deliveries` в слое `DDS`
class FctDeliveryDDSObj(BaseModel):
    id: int
    order_id: int
    delivery_id: int
    courier_id: int
    rate: float
    tips_sum: float


# Класс взаимодействия с данными таблицы фактов `fct_product_deliveries` в слое `DDS`
class FctDeliveryDDSRepository:

    # Метод вставки данных в таблицу фактов `fct_product_deliveries` в слое `DDS`
    def insert_data(self, conn: Connection, fact: List[FctDeliveryDDSObj]) -> None:
        print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 64, facts =', fact)
        with conn.cursor() as cur:
            print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 66, fact =', fact)
            cur.execute(
                """
                    INSERT INTO dds.fct_product_deliveries(
                        order_id,
                        delivery_id,
                        courier_id,
                        rate,
                        tips_sum
                    )
                    VALUES (
                        %(order_id)s,
                        %(delivery_id)s,
                        %(courier_id)s,
                        %(rate)s,
                        %(tips_sum)s
                    )
                    ON CONFLICT (order_id, delivery_id) DO UPDATE
                    SET
                        courier_id = EXCLUDED.courier_id,
                        rate = EXCLUDED.rate,
                        tips_sum = EXCLUDED.tips_sum
                    ;
                """,
                {
                    "order_id": fact.order_id,
                    "delivery_id": fact.delivery_id,
                    "courier_id": fact.courier_id,
                    "rate": fact.rate,
                    "tips_sum": fact.tips_sum
                },
            )


class FctDeliveryLoader:
    WF_KEY = "fact_product_deliveries_raw_to_dds_workflow"
    LAST_LOADED_ID_KEY = "last_loaded_order_id"

    _LOG_THRESHOLD = 100

    def __init__(self, pg: PgConnect, settings_repository: DdsEtlSettingsRepository) -> None:
        self.dwh = pg
        self.raw_deliveries = DeliveryRawRepository()
        self.dds_orders = OrderDdsRepository()
        self.dds_deliveries = DeliveryDDSRepository()
        self.dds_couriers = CourierDDSRepository()
        self.dds_facts = FctDeliveryDDSRepository()
        self.settings_repository = settings_repository

    # Метод преобразования данных таблицы `get_deliveries` слоя `STG` 
    # в сласс данных таблицы `fct_product_deliveries` слоя `DDS` через JSON
    # с использованием идентификаторов таблиц `dm_orders`, 
    # `dm_deliveries`, `dm_couriers` слоя `DDS`
    def parse_data(self,
                 order: OrderDdsObj,
                 delivery: DeliveryDDSObj,
                 courier: CourierDDSObj,
                 delivery_raw: FctDeliveryDDSObj
                 ) -> Tuple[bool, List[FctDeliveryDDSObj]]:
        print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 123, order =', order)
        print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 123, delivery =', delivery)
        print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 123, courier =', courier)
        print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 123, delivery_raw =', delivery_raw)
        if (delivery_raw.order_id != order.order_key 
        or delivery_raw.delivery_id != delivery.delivery_id 
        or delivery_raw.courier_id != courier.courier_id):
            return (False, [])

        print(f'(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 140, order.id = {order.id}, delivery.id = {delivery.id}, courier.id = {courier.id}, delivery_raw.rate = {delivery_raw.rate}, delivery_raw.tip_sum = {delivery_raw.tip_sum}')
        t = FctDeliveryDDSObj(id=0,
                            order_id=order.id,
                            delivery_id=delivery.id,
                            courier_id=courier.id,
                            rate=delivery_raw.rate * 1.,
                            tips_sum=delivery_raw.tip_sum
                            )

        return (True, t)

    def load_data(self):
        with self.dwh.connection() as conn:
            # Поиск последней записи этой таблицы в репозитории
            wf_setting = self.settings_repository.get_setting(conn, self.WF_KEY)
            if not wf_setting:
                wf_setting = EtlSetting(id=0, workflow_key=self.WF_KEY, workflow_settings={self.LAST_LOADED_ID_KEY: '-1'})

            # Идентификатор последней записи этой таблицы в репозитории
            last_loaded_id = wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY]
            log.info(f"Starting load from: {last_loaded_id}")

            # Получение данных числовых данных из таблицы `get_deliveries` слоя `STG`.
            # Сортировка и поиск по идентификатору доставки `order_id`
            load_queue = self.raw_deliveries.load_data(conn, last_loaded_id)
            load_queue.sort(key=lambda x: x.order_id)
            log.info(f"Found {len(load_queue)} events to load.")

            proc_cnt = 0
            for delivery_raw in load_queue:

                # Получение данных о продаже из таблицы `dm_orders`
                order = self.dds_orders.get_order(conn, delivery_raw.order_id)
                if not order:
                    log.info(f"Not found order {delivery_raw.order_id}. Finishing.")
                    continue

                # Получение данных о доставке из таблицы `dm_deliveries`
                delivery = self.dds_deliveries.get_data(conn, delivery_raw.delivery_id)
                if not delivery:
                    log.info(f"Not found order {delivery_raw.delivery_id}. Finishing.")
                    continue

                # Получение данных о курьере из таблицы `dm_couriers`
                courier = self.dds_couriers.get_data(conn, delivery_raw.courier_id)
                if not courier:
                    log.info(f"Not found order {delivery_raw.courier_id}. Finishing.")
                    continue

                (success, facts_to_load) = self.parse_data(order, delivery, courier, delivery_raw)
                if not success:
                    log.info(f"Could not parse object for order {order.id},  delivery {delivery.id},  courier {courier.id}. Finishing.")
                    break

                print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 202, facts_to_load =', facts_to_load)
                self.dds_facts.insert_data(conn, facts_to_load)

                wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY] = delivery_raw.order_id
                self.settings_repository.save_setting(conn, wf_setting)

                proc_cnt += 1
                if proc_cnt % self._LOG_THRESHOLD == 0:
                    log.info(f"Processing events {proc_cnt} out of {len(load_queue)}.")

            log.info(f"Processed {proc_cnt} events out of {len(load_queue)}.")

— dds_settings_repository.py ¶

from typing import Dict, Optional

from lib.dict_util import json2str
from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel


class EtlSetting(BaseModel):
    id: int
    workflow_key: str
    workflow_settings: Dict


class DdsEtlSettingsRepository:
    def get_setting(self, conn: Connection, etl_key: str) -> Optional[EtlSetting]:
        with conn.cursor(row_factory=class_row(EtlSetting)) as cur:
            cur.execute(
                """
                    SELECT
                        id,
                        workflow_key,
                        workflow_settings
                    FROM dds.srv_wf_settings
                    WHERE workflow_key = %(etl_key)s;
                """,
                {"etl_key": etl_key},
            )
            obj = cur.fetchone()

        return obj

    def save_setting(self, conn: Connection, sett: EtlSetting) -> None:
        with conn.cursor() as cur:
            cur.execute(
                """
                    INSERT INTO dds.srv_wf_settings(workflow_key, workflow_settings)
                    VALUES (%(wf_key)s, %(wf_setting)s)
                    ON CONFLICT (workflow_key) DO UPDATE
                    SET workflow_settings = EXCLUDED.workflow_settings;
                """,
                {
                    "wf_key": sett.workflow_key,
                    "wf_setting": json2str(sett.workflow_settings)
                },
            )

— dds_snowflake_dag.py ¶

import logging

import pendulum
from airflow import DAG
from airflow.decorators import task
from config_const import ConfigConst
from lib import ConnectionBuilder

from dds.dds_settings_repository import DdsEtlSettingsRepository
from dds.fct_products_loader import FctProductsLoader
from dds.order_loader import OrderLoader
from dds.products_loader import ProductLoader
from dds.restaurant_loader import RestaurantLoader
from dds.schema_ddl import SchemaDdl
from dds.timestamp_loader import TimestampLoader
from dds.user_loader import UserLoader
#----- Проект ------
from dds_delivery import CourierLoader
from dds_delivery import DeliveryLoader
from dds_delivery import FctDeliveryLoader
#-------------------

log = logging.getLogger(__name__)

with DAG(
    dag_id='sprint5_case_dds_snowflake',
    schedule_interval='* 0/15 * * *',
    start_date=pendulum.datetime(2022, 5, 5, tz="UTC"),
    catchup=False,
    tags=['sprint5', 'raw', 'dds'],
    is_paused_upon_creation=False
) as dag:
    dwh_pg_connect = ConnectionBuilder.pg_conn(ConfigConst.PG_WAREHOUSE_CONNECTION)

    settings_repository = DdsEtlSettingsRepository()

    @task(task_id="schema_init")
    def schema_init(ds=None, **kwargs):
        rest_loader = SchemaDdl(dwh_pg_connect)
        rest_loader.init_schema()

    @task(task_id="dm_restaurants_load")
    def load_dm_restaurants(ds=None, **kwargs):
        rest_loader = RestaurantLoader(dwh_pg_connect, settings_repository)
        rest_loader.load_restaurants()

    @task(task_id="dm_products_load")
    def load_dm_products(ds=None, **kwargs):
        prod_loader = ProductLoader(dwh_pg_connect, settings_repository)
        prod_loader.load_products()

    @task(task_id="dm_timestamps_load")
    def load_dm_timestamps(ds=None, **kwargs):
        ts_loader = TimestampLoader(dwh_pg_connect, settings_repository)
        ts_loader.load_timestamps()

    @task(task_id="dm_users_load")
    def load_dm_users(ds=None, **kwargs):
        user_loader = UserLoader(dwh_pg_connect, settings_repository)
        user_loader.load_users()

    @task(task_id="dm_orders_load")
    def load_dm_orders(ds=None, **kwargs):
        order_loader = OrderLoader(dwh_pg_connect, settings_repository)
        order_loader.load_orders()

    @task(task_id="fct_order_products_load")
    def load_fct_order_products(ds=None, **kwargs):
        fct_loader = FctProductsLoader(dwh_pg_connect, settings_repository)
        print('(0) fct_loader =', fct_loader)
        fct_loader.load_product_facts()

    #----- Проект ------
    @task(task_id="dm_couriers_load")
    def load_dm_couriers(ds=None, **kwargs):
        data_loader = CourierLoader(dwh_pg_connect, settings_repository)
        data_loader.load_data()

    @task(task_id="dm_delivery_load")
    def load_dm_deliveries(ds=None, **kwargs):
        data_loader = DeliveryLoader(dwh_pg_connect, settings_repository)
        print('(i) dags\dds_snowflake_dag.py, row 81, data_loader (task_id="dm_delivery_load") =', data_loader)
        data_loader.load_data()

    @task(task_id="fct_delivery_products_load")
    def load_fct_delivery_products(ds=None, **kwargs):
        data_loader = FctDeliveryLoader(dwh_pg_connect, settings_repository)
        data_loader.load_data()
    #-------------------

    init_schema = schema_init()
    dm_restaurants = load_dm_restaurants()
    dm_products = load_dm_products()
    dm_timestamps = load_dm_timestamps()
    dm_users = load_dm_users()
    dm_orders = load_dm_orders()
    fct_order_products = load_fct_order_products()
    #----- Проект ------
    dm_couriers = load_dm_couriers()
    dm_deliveries = load_dm_deliveries()
    fct_delivery_products = load_fct_delivery_products()
    #-------------------

    init_schema >> dm_restaurants  # type: ignore
    init_schema >> dm_timestamps  # type: ignore
    init_schema >> dm_users  # type: ignore
    init_schema >> dm_products  # type: ignore
    init_schema >> dm_orders  # type: ignore

    dm_restaurants >> dm_products  # type: ignore
    dm_restaurants >> dm_orders  # type: ignore
    dm_timestamps >> dm_orders  # type: ignore
    dm_users >> dm_orders  # type: ignore
    dm_products >> fct_order_products  # type: ignore
    dm_orders >> fct_order_products  # type: ignore
    #----- Проект ------
    dm_orders >> dm_deliveries  # type: ignore
    dm_timestamps >> dm_deliveries  # type: ignore
    dm_deliveries >> fct_delivery_products  # type: ignore
    dm_couriers >> fct_delivery_products  # type: ignore
    #-------------------

— timestamp_loader.py ¶

import json
from datetime import date, datetime, time
from typing import Optional

from lib import PgConnect
from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel

from dds.dds_settings_repository import DdsEtlSettingsRepository, EtlSetting
from dds.order_repositories import OrderJsonObj, OrderRawRepository


class TimestampDdsObj(BaseModel):
    id: int
    ts: datetime
    year: int
    month: int
    day: int
    time: time
    date: date


class TimestampDdsRepository:
    def insert_dds_timestamp(self, conn: Connection, timestamp: TimestampDdsObj) -> None:
        with conn.cursor() as cur:
            cur.execute(
                """
                    INSERT INTO dds.dm_timestamps(ts, year, month, day, time, date)
                    VALUES (%(ts)s, %(year)s, %(month)s, %(day)s, %(time)s, %(date)s)
                    ON CONFLICT (ts) DO NOTHING;
                """,
                {
                    "ts": timestamp.ts,
                    "year": timestamp.year,
                    "month": timestamp.month,
                    "day": timestamp.day,
                    "time": timestamp.time,
                    "date": timestamp.date
                },
            )

    def get_timestamp(self, conn: Connection, dt: datetime) -> Optional[TimestampDdsObj]:
        with conn.cursor(row_factory=class_row(TimestampDdsObj)) as cur:
            cur.execute(
                """
                    SELECT id, ts, year, month, day, time, date
                    FROM dds.dm_timestamps
                    WHERE ts = %(dt)s;
                """,
                {"dt": dt},
            )
            obj = cur.fetchone()
            print('(i) dags\dds\timestamp_loader.py, row 53, obj =', obj)
        return obj


class TimestampLoader:
    WF_KEY = "timestamp_raw_to_dds_workflow"
    LAST_LOADED_ID_KEY = "last_loaded_order_id"

    def __init__(self, pg: PgConnect, settings_repository: DdsEtlSettingsRepository) -> None:
        self.dwh = pg
        self.raw_orders = OrderRawRepository()
        self.dds = TimestampDdsRepository()
        self.settings_repository = settings_repository

    def parse_order_ts(self, order_raw: OrderJsonObj) -> TimestampDdsObj:
        order_json = json.loads(order_raw.object_value)
        dt = datetime.strptime(order_json['date'], "%Y-%m-%d %H:%M:%S")
        t = TimestampDdsObj(id=0,
                            ts=dt,
                            year=dt.year,
                            month=dt.month,
                            day=dt.day,
                            time=dt.time(),
                            date=dt.date()
                            )

        return t

    def load_timestamps(self):
        with self.dwh.connection() as conn:
            wf_setting = self.settings_repository.get_setting(conn, self.WF_KEY)
            if not wf_setting:
                wf_setting = EtlSetting(id=0, workflow_key=self.WF_KEY, workflow_settings={self.LAST_LOADED_ID_KEY: -1})

            last_loaded_id = wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY]

            load_queue = self.raw_orders.load_raw_orders(conn, last_loaded_id)
            for order in load_queue:

                ts_to_load = self.parse_order_ts(order)
                self.dds.insert_dds_timestamp(conn, ts_to_load)

                wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY] = order.id
                self.settings_repository.save_setting(conn, wf_setting)

— cdm ¶

— cdm_courier_ledger ¶

—- __init__.py ¶

import os
import sys

sys.path.append(os.path.dirname(os.path.realpath(__file__)))

—- cdm_courier_ledger_report_dag.py ¶

import pendulum
from airflow.decorators import dag, task
from config_const import ConfigConst
from lib import ConnectionBuilder

from cdm.cdm_courier_ledger.courier_ledger_report import (CourierLedgerRepository, CourierLedgerReportLoader)


@dag(
    dag_id='cdm_courier_ledger_report_dag',
    schedule_interval='0/30 * * * *',
    start_date=pendulum.datetime(2022, 5, 5, tz="UTC"),
    catchup=False,
    tags=['sprint5', 'cdm', 'courier_ledger'],
    is_paused_upon_creation=False
)
def cdm_courier_ledger_report():
    dwh_pg_connect = ConnectionBuilder.pg_conn(ConfigConst.PG_WAREHOUSE_CONNECTION)

    @task(task_id="cdm_courier_ledger_report_load")
    def load_cdm_courier_ledger_report():
        rest_loader = CourierLedgerReportLoader(dwh_pg_connect)
        rest_loader.insert_data()

    report = load_cdm_courier_ledger_report()

    report  # type: ignore


cdm_courier_ledger_dag = cdm_courier_ledger_report()  # noqa

—- courier_ledger_report.py ¶

from lib import PgConnect


class CourierLedgerRepository:
    def __init__(self, pg: PgConnect) -> None:
        self._db = pg

    def insert_data(self) -> None:
        with self._db.client() as conn:
            with conn.cursor() as cur:
                execute = """
INSERT INTO cdm.dm_courier_ledger(
    id,
    courier_id,
    courier_name,
    settlement_year,
    settlement_month,
    orders_count,
    orders_total_sum,
    rate_avg,
    courier_tips_sum
)
SELECT 	fpd.id, 
        fpd.courier_id, 
        dmc.courier_name, 
        dmt.year AS settlement_year, 
        dmt.month AS settlement_month, 
        COUNT(fpd.order_id) AS orders_count, 
        SUM(fps.total_sum) AS orders_total_sum, 
        AVG(fpd.rate) AS rate_avg,
        SUM(fpd.tips_sum) AS courier_tips_sum
    FROM dds.fct_product_deliveries fpd
    LEFT JOIN dds.dm_orders dmo ON dmo.id = fpd.order_id
    LEFT JOIN dds.dm_couriers dmc ON dmc.id = fpd.courier_id
    LEFT JOIN dds.dm_timestamps dmt ON dmt.id = dmo.timestamp_id
    LEFT JOIN dds.fct_product_sales fps ON fps.order_id = fpd.order_id
    GROUP BY fpd.id, fpd.courier_id, dmc.courier_name, dmt.year, dmt.month
ON CONFLICT (id) DO UPDATE
SET
    courier_id = EXCLUDED.courier_id,
    courier_name = EXCLUDED.courier_name,
    settlement_year = EXCLUDED.settlement_year,
    settlement_month = EXCLUDED.settlement_month,
    orders_count = EXCLUDED.orders_count,
    orders_total_sum = EXCLUDED.orders_total_sum,
    rate_avg = EXCLUDED.rate_avg,
    courier_tips_sum = EXCLUDED.courier_tips_sum;
                    """
                cur.execute(execute)
                conn.commit()


class CourierLedgerReportLoader:

    def __init__(self, pg: PgConnect) -> None:
        self.repository = CourierLedgerRepository(pg)

    def insert_data(self):
        self.repository.insert_data()

Политика конфиденциальности

Продолжая использовать данный сайт вы подтверждаете свое согласие с условиями его политики конфиденциальности. Подробнее…




Администрация и владельцы данного информационного ресурса не несут ответственности за возможные последствия, связанные с использованием информации, размещенной на нем.


Все права защищены. При копировании материалов сайта обязательно указывать ссылку на © Microsegment.ru (2020-2025)