Это проект построения витрины данных для расчета с курьерами. В витрине требуется рассчитать суммы оплаты каждому курьеру за предыдущий месяц. В рамках проекта требуется доработать существующее хранилище DWH и создать ETL процессы для перемещения и трансформации данных от источников к витрине.
Задачи проекта:
- Изучить источники данных, их внутреннюю модель хранения и технологии извлечения данных.
- Спроектировать многослойную модель DWH на основе требований к проекту.
- Построить проектную часть хранилища.
- Создать ETL для перемещения и трансформации данных.
Анализ данных проекта ¶
Получение данных по API ¶
GET /restaurants ¶
Метод возвращает данные о ресторанах:
_id
— ID задачи;name
— название ресторана.
Вариант кода на bash:
curl --location --request GET 'https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/restaurants?sort_field={{ sort_field }}&sort_direction={{ sort_direction }}&limit={{ limit }}&offset={{ offset }}' \
--header 'X-Nickname: {{ mailforal }}' \
--header 'X-Cohort: {{ 32 }}' \
--header 'X-API-KEY: {{ 25c27781-8fde-4b30-a22e-524044a7580f }}'
GET /couriers ¶
Метод возвращает данные о курьерах:
_id
— ID курьера в БД;name
— имя курьера.
Вариант кода на bash:
curl --location --request GET 'https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/couriers?sort_field={{ sort_field }}&sort_direction={{ sort_direction }}&limit={{ limit }}&offset={{ offset }}' \
--header 'X-Nickname: {{ your_nickname }}' \
--header 'X-Cohort: {{ your_cohort_number }}' \
--header 'X-API-KEY: {{ api_key }}'
GET /deliveries ¶
Метод возвращает данные о доставках:
order_id
— ID заказа;order_ts
— дата и время создания заказа;delivery_id
— ID доставки;courier_id
— ID курьера;address
— адрес доставки;delivery_ts
— дата и время совершения доставки;rate
— рейтинг доставки, который выставляет покупатель: целочисленное значение от 1 до 5;tip_sum
— сумма чаевых, которые оставил покупатель курьеру (в руб.).sum
— сумма заказа. Это поле не понадобится вам для выполнения проекта, поскольку это число вы и так получите из подсистемы заказов (MongoDB). Так бывает: не всегда нужно использовать всю доступную информацию.
Вариант кода на bash:
curl --location --request GET 'https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/deliveries?restaurant_id={{ restaurant_id }}&from={{ from }}&to={{ to }}&sort_field={{ sort_field }}&sort_direction={{ sort_direction }}&limit={{ limit }}&offset={{ limit }}' \
--header 'X-Nickname: {{ your_nickname }}' \
--header 'X-Cohort: {{ your_cohort_number }}' \
--header 'X-API-KEY: {{ api_key }}'
Результаты анализа данных, полученных по API ¶
По API с помощью Python получены данные о ресторанах, курьерах и транзакциях. Данные размещены в pandas-датафреймах get_restaurants
, get_deliveries
и get_couriers
, которые соответствуют таблицам хранилища stg.get_restaurants
, stg.get_deliveries
и stg.get_couriers
соответственно. Произведен анализа полученных данных:
stg.get_restaurants
(данные о ресторанах):- Данные содержат на момент их анализа только 4 записи.
- Данные не содержат пропусков и дублей.
- Данные из этой таблицы не востребованы в витринах этого хранилища, т.к. они дублируют данные таблицы
dds.dm_restaurants
. Следовательно, данные из этой таблицы не требуется загружать в хранилище и создавать для них таблицу.
stg.get_couriers
(данные о курьерах):- Даные содержат на момент их анализа только 50 записей.
- Данные не содержат пропусков, но содержат 6 не уникальных значений в поле
name
. Учитывая тот факт, что у каждой записи уникальное значениеid
, можно предположить, либо ошибку оператора, приведщую к наличию дублей записей, либо наличие полных тёсок в команде курьеров. Учитывая отсутствие опровержения наличия тёсок, при построении хранилища каждая запись будет считаться уникальной до получения информации о наличии в какой-либо записи дублей.
stg.get_deliveries
(данные о доставках за январь 2025 года):- Даные содержат на момент их анализа только 50 записей.
- В данных нет пропусков, но есть 10 повторяющихся значений в поле
courier_id
(идентификатор курьера). Наличие повторяющихся значений в полеcourier_id
может свидетельствовать о том, что 10 курьеров доставили от 2 до 3 заказов. - Данные о 28 курьерах, идентификаторы которых содержаться в поле
courier_id
этой таблицы, отсутствуют в поле_id
таблицыstg.get_couriers
. Риск отсутствия полных данных письменно принят заказчиком. Для обеспечения полноты и непротиворечивости данных в слоеDDS
хранилища требуется идентификаторы курьеров, данные о которых отсутствуют в таблицеdds.dm_couriers
, dcnfdbnm туда изstg.get_deliveries
.
Проектирование витрины хранилища для анализа данных о расчетах с курьерами ¶
Слой STG
¶
Данные о курьерах и продажах, полученные по API и предназначенные для витрины анализа расчетов с курьерами, требуется разместить «как есть» в слое STG
в таблицах stg.get_couriers
и stg.get_deliveries
. При этом, данные о ресторанах загружать не требуется, т.к. они не востребованы в проектируемой витрине и дублируют более детальные данные в таблице dds.dm_restaurants
.
Слой DDS
¶
Данные из слоя STG
, обогащенные суррогатными идентификаторами таблиц хранилища и более детальными данными о дате и времени продаж, требуется разместить в слое DDS
по модели «снежинка». Числовые данные (rate_avg
, courier_tips_sum
) и внешние ключи (order_id
, delivery_id
и courier_id
(уникальные идентификаторы в исходных системах)) для связки с таблицами измерений должны быть загружены в таблицу фактов dds.fct_product_deliveries
. Также в эту таблицу требуется добавить суррогатный идентификатор таблицы id
, а поля order_id
, delivery_id
сделать уникальными.
Остальные данные требуется разместить в таблицы измерений:
id
(суррогатный идентификатор таблицы),courier_id
(уникальный идентификатор курьера в исходной системе) иcourier_name
(ФИО курьера) разместить в таблицеdds.dm_couriers
.- Данные о дате и времени продажи, обогащенные разделением значения на год, месяц, дату, время и день, а также уникальным идентификатором разместить в таблице
dds.dm_timestamps
id
(суррогатный идентификатор таблицы),delivery_id
(уникальный идентификатор доставки в исходной системе) иtimestamp_id
(идентификатор записи времени в таблицеdds.dm_timestamps
) разместить в таблицеdds.dm_deliveries
.
Слой CDM
¶
Данные из слоя DDS
, обогащенные расчетными данными требуется разместить в таблице cdm.dm_courier_ledger
витрины в слое CDM
. Состав витрины:
id
— (поле группировки) идентификатор записи. Данные изid
таблицыdds.fct_product_deliveries
.courier_id
— (поле группировки) ID курьера, которому перечисляем. Данные изcourier_id
таблицыdds.dm_couriers
.courier_name
— (поле группировки) Ф. И. О. курьера. Данные изcourier_name
таблицыdds.dm_couriers
.settlement_year
— (поле группировки) год отчёта. Данные изyear
таблицыdds.dm_timestamps
.settlement_month
— (поле группировки) месяц отчёта, где 1 — январь и 12 — декабрь. Данные изmonth
таблицыdds.dm_timestamps
.orders_count
— (расчетное поле) количество заказов за период (месяц). Количество записей в полеid
таблицыdds.dm_orders
, сгруппированных по годам, месяцам и идентификаторам курьеров.orders_total_sum
— (расчетное поле) общая стоимость заказов. Сумма данных из поляtotal_sum
таблицыdds.fct_product_sales
, сгруппированных по годам, месяцам и идентификаторам курьеров.rate_avg
— (расчетное поле) средний рейтинг курьера по оценкам пользователей. Среднее значение данных из поляrate
таблицыdds.fct_product_deliveries
, сгруппированных данных по годам, месяцам и идентификаторам курьеров.order_processing_fee
— (расчетное поле с условием) сумма, удержанная компанией за обработку заказов, которая высчитывается какorders_total_sum * 0.25
.courier_order_sum
— (расчетное поле с условием) сумма, которую необходимо перечислить курьеру за доставленные им/ей заказы. За каждый доставленный заказ курьер должен получить некоторую сумму в зависимости от рейтинга (см. ниже).courier_tips_sum
— (расчетное поле) сумма, которую пользователи оставили курьеру в качестве чаевых. Сумма данных из поляcourier_tips_sum
таблицыdds.fct_product_deliveries
, сгруппированных по годам, месяцам и идентификаторам курьеров.courier_reward_sum
— (расчетное поле с условием) сумма, которую необходимо перечислить курьеру. Вычисляется какcourier_order_sum + courier_tips_sum * 0.95
(5% — комиссия за обработку платежа).
Правила расчёта процента выплаты курьеру в зависимости от рейтинга, где r
— это средний рейтинг курьера в расчётном месяце:
r < 4
— 5% от заказа, но не менее 100 р.;4 <= r < 4.5
— 7% от заказа, но не менее 150 р.;4.5 <= r < 4.9
— 8% от заказа, но не менее 175 р.;4.9 <= r
— 10% от заказа, но не менее 200 р.
DDL для реструктуризации хранилища ¶
DDL скрипты для реструктуризации хранилища реализованы в файле ddl/api_ddl.sql
в дирректории файлов этого проекта.
Файлы проекта ¶
— ddl
¶
— api_ddl.sql
¶
--BEGIN;
-- DDL STG
-- Очистка слоя STG
ALTER TABLE stg.get_couriers DROP CONSTRAINT IF EXISTS get_couriers_id_pk;
DROP TABLE IF EXISTS stg.get_couriers;
ALTER TABLE stg.get_deliveries DROP CONSTRAINT IF EXISTS stg_get_deliveries_order_id_pk;
ALTER TABLE stg.get_deliveries DROP CONSTRAINT IF EXISTS stg_get_deliveries_rate_check;
DROP TABLE IF EXISTS stg.get_deliveries;
-- Таблица сырых данных о ресторанах stg.get_restaurants в слое STG
-- В рамках данного проекта таблица не требуется,
-- т.к. данные из нее не используются в витринах
/*
ALTER TABLE stg.get_restaurants DROP CONSTRAINT IF EXISTS stg_get_restaurants_id_pk;
DROP TABLE IF EXISTS stg.get_restaurants;
CREATE TABLE IF NOT EXISTS stg.get_restaurants(
_id VARCHAR NOT NULL,
name VARCHAR NOT NULL,
CONSTRAINT stg_get_restaurants_id_pk PRIMARY KEY (_id)
);
*/
-- Таблица сырых данных о курьерах stg.get_couriers в слое STG
CREATE TABLE IF NOT EXISTS stg.get_couriers(
id VARCHAR NOT NULL,
name VARCHAR NOT NULL,
CONSTRAINT get_couriers_id_pk PRIMARY KEY (id)
);
-- Таблица сырых данных о доставках stg.get_deliveries в слое STG
CREATE TABLE IF NOT EXISTS stg.get_deliveries(
order_id VARCHAR NOT NULL, -- ID заказа;
order_ts TIMESTAMP NOT NULL, -- дата и время создания заказа;
delivery_id VARCHAR NOT NULL, -- ID доставки;
courier_id VARCHAR NOT NULL, -- ID курьера;
address VARCHAR NOT NULL, -- адрес доставки;
delivery_ts TIMESTAMP NOT NULL, -- дата и время совершения доставки;
rate INT NOT NULL, -- рейтинг доставки, который выставляет покупатель: целочисленное значение от 1 до 5;
tip_sum NUMERIC(19, 5) NOT NULL, -- сумма чаевых, которые оставил покупатель курьеру (в руб.).
sum NUMERIC(19, 5) NOT NULL, -- сумма заказа. Это поле не понадобится вам для выполнения проекта, поскольку это число вы и так получите из подсистемы заказов (MongoDB). Так бывает: не всегда нужно использовать всю доступную информацию.
CONSTRAINT stg_get_deliveries_order_id_pk PRIMARY KEY (order_id),
CONSTRAINT stg_get_deliveries_rate_check CHECK ((rate >= 1) AND (rate <= 5))
);
-- DDL DDS
-- Очистка слоя DDS
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_deliveries_id_pk;
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_deliveries_order_id_fk;
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_delivery_id_id_fk;
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_courier_id_id_fk;
ALTER TABLE dds.fct_product_deliveries DROP CONSTRAINT IF EXISTS dds_fct_product_deliveries_order_id_and_delivery_id_unique;
DROP TABLE IF EXISTS dds.fct_product_deliveries;
ALTER TABLE dds.dm_couriers DROP CONSTRAINT IF EXISTS dds_dm_couriers_id_pk;
ALTER TABLE dds.dm_couriers DROP CONSTRAINT IF EXISTS dds_dm_couriers_courier_id_unique;
DROP TABLE IF EXISTS dds.dm_couriers
ALTER TABLE dds.dm_deliveries DROP CONSTRAINT IF EXISTS dds_dm_deliveries_delivery_id_unique;
ALTER TABLE dds.dm_deliveries DROP CONSTRAINT IF EXISTS dds_dm_deliveries_id_pk;
ALTER TABLE dds.dm_deliveries DROP CONSTRAINT IF EXISTS dds_dm_deliveries_timestamp_id_fk;
DROP TABLE IF EXISTS dds.dm_deliveries;
-- Таблица измерений с данными о курьрерах stg.get_couriers в слое DDS
CREATE TABLE IF NOT EXISTS dds.dm_couriers(
id SERIAL NOT NULL,
courier_id VARCHAR NOT NULL,
courier_name VARCHAR NOT NULL,
CONSTRAINT dds_dm_couriers_id_pk PRIMARY KEY (id),
CONSTRAINT dds_dm_couriers_courier_id_unique UNIQUE (courier_id)
);
-- Таблица измерений с данными о доставке dds.dm_deliveries в слое DDS
CREATE TABLE IF NOT EXISTS dds.dm_deliveries(
id SERIAL NOT NULL,
delivery_id VARCHAR NOT NULL,
timestamp_id INT NOT NULL,
CONSTRAINT dds_dm_deliveries_delivery_id_unique UNIQUE (delivery_id),
CONSTRAINT dds_dm_deliveries_id_pk PRIMARY KEY (id),
CONSTRAINT dds_dm_deliveries_timestamp_id_fk FOREIGN KEY (timestamp_id) REFERENCES dds.dm_timestamps(id)
);
-- Таблица фактов с данными о доставках dds.fct_product_deliveries в слое DDS
CREATE TABLE IF NOT EXISTS dds.fct_product_deliveries(
id SERIAL NOT NULL,
order_id INT NOT NULL,
delivery_id INT NOT NULL,
courier_id INT NOT NULL,
rate NUMERIC(5, 2) NOT NULL,
tips_sum NUMERIC(19, 5) NOT NULL,
CONSTRAINT dds_fct_product_deliveries_id_pk PRIMARY KEY (id),
CONSTRAINT dds_fct_product_deliveries_order_id_fk FOREIGN KEY (order_id) REFERENCES dds.dm_orders(id),
CONSTRAINT dds_fct_product_delivery_id_id_fk FOREIGN KEY (delivery_id) REFERENCES dds.dm_deliveries(id),
CONSTRAINT dds_fct_product_courier_id_id_fk FOREIGN KEY (courier_id) REFERENCES dds.dm_couriers(id),
CONSTRAINT dds_fct_product_deliveries_order_id_and_delivery_id_unique UNIQUE (order_id, delivery_id)
);
-- DDL CDM
-- Очистка слоя CDM
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_id_pk;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_settlement_year_check;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_settlement_month_check;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_order_processing_fee_check;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_courier_order_sum_check;
ALTER TABLE cdm.dm_courier_ledger DROP CONSTRAINT IF EXISTS cdm_dm_courier_ledger_courier_reward_sum_check;
DROP TABLE IF EXISTS cdm.dm_courier_ledger;
-- Таблица витрины оплаты кульерам dm_courier_ledger в слое CDM
CREATE TABLE IF NOT EXISTS cdm.dm_courier_ledger(
id INT NOT NULL, -- идентификатор записи.
courier_id VARCHAR(100) NOT NULL, -- ID курьера, которому перечисляем.
courier_name VARCHAR(100), -- Ф. И. О. курьера.
settlement_year INT NOT NULL, -- год отчёта.
settlement_month SMALLINT NOT NULL, -- месяц отчёта, где 1 — январь и 12 — декабрь.
orders_count INT, -- количество заказов за период (месяц).
orders_total_sum NUMERIC(19, 5), -- общая стоимость заказов.
rate_avg NUMERIC(5, 2), -- средний рейтинг курьера по оценкам пользователей.
order_processing_fee NUMERIC(19, 5), -- сумма, удержанная компанией за обработку заказов, которая высчитывается как orders_total_sum * 0.25.
courier_order_sum NUMERIC(19, 5), -- сумма, которую необходимо перечислить курьеру за доставленные им/ей заказы. За каждый доставленный заказ курьер должен получить некоторую сумму в зависимости от рейтинга (см. ниже).
courier_tips_sum NUMERIC(19, 5), -- сумма, которую пользователи оставили курьеру в качестве чаевых.
courier_reward_sum NUMERIC(19, 5), -- сумма, которую необходимо перечислить курьеру. Вычисляется как courier_order_sum + courier_tips_sum * 0.95 (5% — комиссия за обработку платежа).
CONSTRAINT cdm_dm_courier_ledger_id_pk PRIMARY KEY (id),
CONSTRAINT cdm_dm_courier_ledger_settlement_year_check CHECK ((settlement_year >= 2022) AND (settlement_year < 2500)),
CONSTRAINT cdm_dm_courier_ledger_settlement_month_check CHECK ((settlement_month >= 1) AND (settlement_month <= 12)),
CONSTRAINT cdm_dm_courier_ledger_order_processing_fee_check CHECK (order_processing_fee = orders_total_sum * .25),
CONSTRAINT cdm_dm_courier_ledger_courier_order_sum_check CHECK (((courier_order_sum >= orders_total_sum * .05) AND (courier_order_sum >= 100)) AND
((courier_order_sum <= orders_total_sum * .1) OR (courier_order_sum <= 200))),
CONSTRAINT cdm_dm_courier_ledger_courier_reward_sum_check CHECK (courier_reward_sum = courier_order_sum + courier_tips_sum * .95)
);
-- Функция расчета полей таблицы dds.fct_product_deliveries
CREATE OR REPLACE FUNCTION cdm_dm_courier_ledger_function()
RETURNS TRIGGER AS $$
BEGIN
-- order_processing_fee — сумма, удержанная компанией за обработку заказов,
-- которая высчитывается как orders_total_sum * 0.25.
NEW.order_processing_fee = NEW.orders_total_sum * 0.25;
-- courier_order_sum — сумма, которую необходимо перечислить курьеру
-- за доставленные им/ей заказы. За каждый доставленный заказ
-- курьер должен получить некоторую сумму в зависимости от рейтинга (rate_avg).
-- Правила расчёта процента выплаты курьеру в зависимости от рейтинга, где r — это средний рейтинг курьера в расчётном месяце:
-- r < 4 — 5% от заказа, но не менее 100 р.;
-- 4 <= r < 4.5 — 7% от заказа, но не менее 150 р.;
-- 4.5 <= r < 4.9 — 8% от заказа, но не менее 175 р.;
-- 4.9 <= r — 10% от заказа, но не менее 200 р.
IF (NEW.rate_avg < 4) THEN
NEW.courier_order_sum = NEW.orders_total_sum * 0.05;
IF (NEW.courier_order_sum < 100) THEN
NEW.courier_order_sum = 100;
END IF;
ELSIF (NEW.rate_avg >= 4 AND NEW.rate_avg < 4.5) THEN
NEW.courier_order_sum = NEW.orders_total_sum * 0.07;
IF (NEW.courier_order_sum < 150) THEN
NEW.courier_order_sum = 150;
END IF;
ELSIF (NEW.rate_avg >= 4.5 AND NEW.rate_avg < 4.9) THEN
NEW.courier_order_sum = NEW.orders_total_sum * 0.08;
IF (NEW.courier_order_sum < 175) THEN
NEW.courier_order_sum = 175;
END IF;
ELSE
NEW.courier_order_sum = NEW.orders_total_sum * 0.1;
IF (NEW.courier_order_sum < 200) THEN
NEW.courier_order_sum = 200;
END IF;
END IF;
-- courier_reward_sum — сумма, которую необходимо перечислить курьеру.
-- Вычисляется как courier_order_sum + courier_tips_sum * 0.95 (5% — комиссия за обработку платежа).
NEW.courier_reward_sum = NEW.courier_order_sum + NEW.courier_tips_sum * 0.95;
RETURN NEW;
END;
$$ LANGUAGE PLPGSQL;
-- Удаление старого триггера таблицы dds.fct_product_deliveries, если он существует
DROP TRIGGER IF EXISTS cdm_dm_courier_ledger_trigger ON cdm.dm_courier_ledger;
-- Создание нового триггера таблицы dds.fct_product_deliveries
CREATE TRIGGER cdm_dm_courier_ledger_trigger
BEFORE INSERT OR UPDATE ON cdm.dm_courier_ledger
FOR EACH ROW EXECUTE FUNCTION cdm_dm_courier_ledger_function();
--COMMIT;
— dags
¶
— lib
¶
— __init__.py
¶
import os
import sys
from .mongo_connect import MongoConnect # noqa
from .pg_connect import ConnectionBuilder # noqa
from .pg_connect import PgConnect # noqa
from .api_connect import APIConnect # noqa
from .api_connect import JsonToDF # noqa
from test import Test
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
— api_connect.py
¶
from urllib.parse import quote_plus as quote
import pandas as pd
import requests
class APIConnect:
def __init__(self,
entry_point: str, # переопределить в подклассе
sort_field: str,
sort_direction: str,
limit: str,
offset: str,
restaurant_id: str,
from_: str,
to: str) -> None:
self.entry_point = entry_point
self.sort_field = sort_field
self.sort_direction = sort_direction
self.limit = limit
self.offset = offset
self.restaurant_id = restaurant_id
self.from_ = from_
self.to = to
def adding_headers(self) -> None:
# Получение данных JSON с сервера через API
self.headers = {
"X-API-KEY": "25c27781-8fde-4b30-a22e-524044a7580f", # ключ API
"X-Nickname": "mailforal", # авторизационные данные microsegment
"X-Cohort": "32", # авторизационные данные
# sort_field - определяет поле, к которому будет применяться сортировка, переданная в параметре sort_direction
# Yеобязательный параметр. Возможные значения: id, name...
"sort_field": self.sort_field,
# sort_direction - определяет порядок сортировки для поля, переданного в sort_field
# asc — сортировка по возрастанию,
# desc — сортировка по убыванию.
"sort_direction": self.sort_direction,
# limit - определяет максимальное количество записей, которые будут возвращены в ответе.
# Необязательный параметр. Значение по умолчанию: 50. Возможные значения:
# целое число в интервале от 0 до 50 включительно.
"limit": self.limit,
# offset - определяет количество возвращаемых элементов результирующей выборки, когда формируется ответ.
# Необязательный параметр. Значение по умолчанию: 0.
"offset": self.offset
}
# restaurant_id - ID ресторана.
# Если значение не указано, то метод вернёт данные по всем доступным в БД ресторанам.
if self.restaurant_id != '': self.headers['from'] = self.restaurant_id
# from - параметр фильтрации.
# В выборку попадают заказы с датой доставки, которая больше или равна значению from.
# Дата должна быть в формате '%Y-%m-%d %H:%M:%S’, например, 2022-01-01 00:00:00.
if self.from_ != '': self.headers['from'] = self.from_
# to — параметр фильтрации.
# В выборку попадают заказы с датой доставки меньше значения to.
if self.to != '': self.headers['to'] = self.to
def from_api_to_json(self) -> str:
self.adding_headers()
# Получение данных JSON с сервера через API
return requests.get(
"https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/" + self.entry_point, # точка входа
headers = self.headers
).json()
class JsonToDF:
def __init__(self, json_str: str):
self.json_str = json_str
def json_to_df(self) -> pd.DataFrame:
return pd.json_normalize(self.json_str)
— dict_util.py
¶
import json
from datetime import datetime
from typing import Any, Dict
from bson.objectid import ObjectId
def json2str(obj: Any) -> str:
return json.dumps(to_dict(obj), sort_keys=True, ensure_ascii=False)
def str2json(str: str) -> Dict:
return json.loads(str)
def to_dict(obj, classkey=None):
if isinstance(obj, datetime):
return obj.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(obj, ObjectId):
return str(obj)
if isinstance(obj, dict):
data = {}
for (k, v) in obj.items():
data[k] = to_dict(v, classkey)
return data
elif hasattr(obj, "_ast"):
return to_dict(obj._ast())
elif hasattr(obj, "__iter__") and not isinstance(obj, str):
return [to_dict(v, classkey) for v in obj]
elif hasattr(obj, "__dict__"):
data = dict([(key, to_dict(value, classkey))
for key, value in obj.__dict__.items()
if not callable(value) and not key.startswith('_')])
if classkey is not None and hasattr(obj, "__class__"):
data[classkey] = obj.__class__.__name__
return data
else:
return obj
— mongo_connect.py
¶
from urllib.parse import quote_plus as quote
from pymongo.mongo_client import MongoClient
class MongoConnect:
def __init__(self,
cert_path: str,
user: str,
pw: str,
host: str,
rs: str,
auth_db: str,
main_db: str
) -> None:
self.user = user
self.pw = pw
self.host = host
self.replica_set = rs
self.auth_db = auth_db
self.main_db = main_db
self.cert_path = cert_path
def url(self) -> str:
return 'mongodb://{user}:{pw}@{hosts}/?replicaSet={rs}&authSource={auth_src}'.format(
user=quote(self.user),
pw=quote(self.pw),
hosts=self.host,
rs=self.replica_set,
auth_src=self.auth_db)
def client(self):
return MongoClient(self.url(), tlsCAFile=self.cert_path)[self.main_db]
— pg_connect.py
¶
from contextlib import contextmanager
from typing import Generator
import psycopg
from airflow.hooks.base import BaseHook
class PgConnect:
def __init__(self, host: str, port: str, db_name: str, user: str, pw: str, sslmode: str = "require") -> None:
self.host = host
self.port = int(port)
self.db_name = db_name
self.user = user
self.pw = pw
self.sslmode = sslmode
def url(self) -> str:
return """
host={host}
port={port}
dbname={db_name}
user={user}
password={pw}
target_session_attrs=read-write
sslmode={sslmode}
""".format(
host=self.host,
port=self.port,
db_name=self.db_name,
user=self.user,
pw=self.pw,
sslmode=self.sslmode)
def client(self):
return psycopg.connect(self.url())
@contextmanager
def connection(self) -> Generator[psycopg.Connection, None, None]:
conn = psycopg.connect(self.url())
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
class ConnectionBuilder:
@staticmethod
def pg_conn(conn_id: str) -> PgConnect:
conn = BaseHook.get_connection(conn_id)
sslmode = "require"
if "sslmode" in conn.extra_dejson:
sslmode = conn.extra_dejson["sslmode"]
pg = PgConnect(str(conn.host),
str(conn.port),
str(conn.schema),
str(conn.login),
str(conn.password),
sslmode)
return pg
— stg
¶
— api_delivery
¶
—- __init__.py
¶
import os
import sys
from .courier_repositories import (CourierJSONObj, CourierRawRepository,
CourierDDSObj, CourierDDSRepository) # noqa
from .delivery_repositories import (DeliveryJSONObj, DeliveryRawRepository,
DeliveryDDSObj, DeliveryDDSRepository) # noqa
from .courier_loader import CourierLoader # noqa
from .delivery_loader import DeliveryLoader # noqa
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
—- api_connect.py
¶
from urllib.parse import quote_plus as quote
import pandas as pd
import requests
class APIConnect:
def __init__(self,
entry_point: str, # переопределить в подклассе
sort_field: str,
sort_direction: str,
limit: str,
offset: str,
restaurant_id: str,
from_: str,
to: str) -> None:
self.entry_point = entry_point
self.sort_field = sort_field
self.sort_direction = sort_direction
self.limit = limit
self.offset = offset
self.restaurant_id = restaurant_id
self.from_ = from_
self.to = to
def adding_headers(self) -> None:
# Получение данных JSON с сервера через API
self.headers = {
"X-API-KEY": "25c27781-8fde-4b30-a22e-524044a7580f", # ключ API
"X-Nickname": "mailforal", # авторизационные данные microsegment
"X-Cohort": "32", # авторизационные данные
# sort_field - определяет поле, к которому будет применяться сортировка, переданная в параметре sort_direction
# Yеобязательный параметр. Возможные значения: id, name...
"sort_field": self.sort_field,
# sort_direction - определяет порядок сортировки для поля, переданного в sort_field
# asc — сортировка по возрастанию,
# desc — сортировка по убыванию.
"sort_direction": self.sort_direction,
# limit - определяет максимальное количество записей, которые будут возвращены в ответе.
# Необязательный параметр. Значение по умолчанию: 50. Возможные значения:
# целое число в интервале от 0 до 50 включительно.
"limit": self.limit,
# offset - определяет количество возвращаемых элементов результирующей выборки, когда формируется ответ.
# Необязательный параметр. Значение по умолчанию: 0.
"offset": self.offset
}
# restaurant_id - ID ресторана.
# Если значение не указано, то метод вернёт данные по всем доступным в БД ресторанам.
if self.restaurant_id != '': self.headers['from'] = self.restaurant_id
# from - параметр фильтрации.
# В выборку попадают заказы с датой доставки, которая больше или равна значению from.
# Дата должна быть в формате '%Y-%m-%d %H:%M:%S’, например, 2022-01-01 00:00:00.
if self.from_ != '': self.headers['from'] = self.from_
# to — параметр фильтрации.
# В выборку попадают заказы с датой доставки меньше значения to.
if self.to != '': self.headers['to'] = self.to
def from_api_to_json(self) -> str:
self.adding_headers()
# Получение данных JSON с сервера через API
return requests.get(
"https://d5d04q7d963eapoepsqr.apigw.yandexcloud.net/" + self.entry_point, # точка входа
headers = self.headers
).json()
class JsonToDF:
def __init__(self, json_str: str):
self.json_str = json_str
def json_to_df(self) -> pd.DataFrame:
return pd.json_normalize(self.json_str)
—- api_couriers_loader.py
¶
from typing import List
from psycopg.rows import class_row
from pydantic import BaseModel # Упрощение проверки и анализа данных, загружаемых из БД и API
# Самоделки
from lib import PgConnect, ConnectionBuilder # Получение данных из PostgreSQL
from lib import APIConnect, JsonToDF # Получение данных по API в формате JSON
class CourierObj(BaseModel):
id: int
name: str
class FetchCouriersFromAPI:
def list_couriers(self):
return APIConnect('couriers',
'name', 'asc',
'0', '0', '',
'', '').from_api_to_json()
class CourierDestRepository:
def __init__(self, pg: PgConnect) -> None:
self._db = pg
def insert_couriers(self, couriers: List[CourierObj]) -> None:
with self._db.client() as conn:
with conn.cursor() as cur:
for courier in couriers:
print('courier =', courier)
cur.execute(
"""
INSERT INTO stg.get_couriers(id, name)
VALUES (%(id)s, %(name)s)
ON CONFLICT (id) DO NOTHING;
""",
{
"id": courier['_id'],
"name": courier['name']
},
)
conn.commit()
class CourierLoader:
WF_KEY = "api_couriers_to_stg_workflow"
def __init__(self, pg_dest: PgConnect) -> None:
self.stg = CourierDestRepository(pg_dest)
def load_couriers(self):
load_queue = FetchCouriersFromAPI().list_couriers()
self.stg.insert_couriers(load_queue)
—- api_deliveries_loader.py
¶
from typing import List
from psycopg.rows import class_row
from pydantic import BaseModel # Упрощение проверки и анализа данных, загружаемых из БД и API
from datetime import datetime, timedelta
# Самоделки
from lib import PgConnect, ConnectionBuilder # Получение данных из PostgreSQL
from lib import APIConnect, JsonToDF # Получение данных по API в формате JSON
class DeliveryObj(BaseModel):
order_id: str
order_ts: str
delivery_id: str
courier_id: str
address: str
delivery_ts: str
rate: str
tip_sum: float
sum: float
class FetchDeliveriesFromAPI:
def list_deliveries(self):
return APIConnect('deliveries',
'order_id', 'asc',
'0', '0', '',
(datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d %H:%M:%S'),
(datetime.now()).strftime('%Y-%m-%d %H:%M:%S')).from_api_to_json()
class DeliveryDestRepository:
def __init__(self, pg: PgConnect) -> None:
self._db = pg
def insert_deliveries(self, deliveries: List[DeliveryObj]) -> None:
with self._db.client() as conn:
with conn.cursor() as cur:
for delivery in deliveries:
print('delivery =', delivery)
cur.execute(
"""
INSERT INTO stg.get_deliveries(order_id,
order_ts,
delivery_id,
courier_id,
address,
delivery_ts,
rate,
tip_sum,
sum)
VALUES (%(order_id)s,
%(order_ts)s,
%(delivery_id)s,
%(courier_id)s,
%(address)s,
%(delivery_ts)s,
%(rate)s,
%(tip_sum)s,
%(sum)s)
ON CONFLICT (order_id) DO NOTHING;
""",
{
"order_id": delivery['order_id'],
"order_ts": delivery['order_ts'],
"delivery_id": delivery['delivery_id'],
"courier_id": delivery['courier_id'],
"address": delivery['address'],
"delivery_ts": delivery['delivery_ts'],
"rate": delivery['rate'],
"tip_sum": delivery['tip_sum'],
"sum": delivery['sum']
},
)
conn.commit()
class DeliveryLoader:
WF_KEY = "api_deliveries_to_stg_workflow"
def __init__(self, pg_dest: PgConnect) -> None:
self.stg = DeliveryDestRepository(pg_dest)
def load_deliveries(self):
load_queue = FetchDeliveriesFromAPI().list_deliveries()
self.stg.insert_deliveries(load_queue)
—- config_const.py
¶
class ConfigConst:
PG_WAREHOUSE_CONNECTION = "PG_WAREHOUSE_CONNECTION"
PG_ORIGIN_BONUS_SYSTEM_CONNECTION = "PG_ORIGIN_BONUS_SYSTEM_CONNECTION"
MONGO_DB_CERTIFICATE_PATH = 'MONGO_DB_CERTIFICATE_PATH'
MONGO_DB_USER = 'MONGO_DB_USER'
MONGO_DB_PASSWORD = 'MONGO_DB_PASSWORD'
MONGO_DB_REPLICA_SET = 'MONGO_DB_REPLICA_SET'
MONGO_DB_DATABASE_NAME = 'MONGO_DB_DATABASE_NAME'
MONGO_DB_HOST = 'MONGO_DB_HOST'
PG_ORIGIN_DATABASE_NAME = "PG_ORIGIN_DATABASE_NAME"
PG_ORIGIN_HOST = "PG_ORIGIN_HOST"
PG_ORIGIN_PASSWORD = "PG_ORIGIN_PASSWORD"
PG_ORIGIN_PORT = "PG_ORIGIN_PORT"
PG_ORIGIN_USER = "PG_ORIGIN_USER"
—- pg_connect.py
¶
from contextlib import contextmanager
from typing import Generator
import psycopg
from airflow.hooks.base import BaseHook
class PgConnect:
def __init__(self, host: str, port: str, db_name: str, user: str, pw: str, sslmode: str = "require") -> None:
self.host = host
self.port = int(port)
self.db_name = db_name
self.user = user
self.pw = pw
self.sslmode = sslmode
def url(self) -> str:
return """
host={host}
port={port}
dbname={db_name}
user={user}
password={pw}
target_session_attrs=read-write
sslmode={sslmode}
""".format(
host=self.host,
port=self.port,
db_name=self.db_name,
user=self.user,
pw=self.pw,
sslmode=self.sslmode)
def client(self):
return psycopg.connect(self.url())
@contextmanager
def connection(self) -> Generator[psycopg.Connection, None, None]:
conn = psycopg.connect(self.url())
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
class ConnectionBuilder:
@staticmethod
def pg_conn(conn_id: str) -> PgConnect:
conn = BaseHook.get_connection(conn_id)
sslmode = "require"
if "sslmode" in conn.extra_dejson:
sslmode = conn.extra_dejson["sslmode"]
pg = PgConnect(str(conn.host),
str(conn.port),
str(conn.schema),
str(conn.login),
str(conn.password),
sslmode)
return pg
—- stg_api_deliveries_dag.py
¶
import logging
import pendulum
from airflow.decorators import dag, task
# Самоделки
from config_const import ConfigConst
from lib import ConnectionBuilder # Получение данных из PostgreSQL
from stg.api_delivery import CourierLoader
from stg.api_delivery import DeliveryLoader
log = logging.getLogger(__name__)
@dag(
schedule_interval='0/15 * * * *',
start_date=pendulum.datetime(2022, 5, 5, tz="UTC"),
catchup=False,
tags=['sprint5', 'stg', 'api', 'couriers', 'delivery'],
is_paused_upon_creation=False
)
def stg_api_delivery_dag():
dwh_pg_connect = ConnectionBuilder.pg_conn(ConfigConst.PG_WAREHOUSE_CONNECTION)
@task(task_id="couriers_load")
def load_couriers():
courier_loader = CourierLoader(dwh_pg_connect)
courier_loader.load_couriers()
@task(task_id="deliveries_load")
def load_deliveries():
delivery_loader = DeliveryLoader(dwh_pg_connect)
delivery_loader.load_deliveries()
couriers = load_couriers()
deliveries = load_deliveries()
couriers # type: ignore
deliveries # type: ignore
stg_delivery_dag = stg_api_delivery_dag()
— __init__.py
¶
from .stg_settings_repository import EtlSetting, StgEtlSettingsRepository
— stg_settings_repository.py
¶
from typing import Dict, Optional
from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel
class EtlSetting(BaseModel):
id: int
workflow_key: str
workflow_settings: Dict
class StgEtlSettingsRepository:
def get_setting(self, conn: Connection, etl_key: str) -> Optional[EtlSetting]:
with conn.cursor(row_factory=class_row(EtlSetting)) as cur:
cur.execute(
"""
SELECT
id,
workflow_key,
workflow_settings
FROM stg.srv_wf_settings
WHERE workflow_key = %(etl_key)s;
""",
{"etl_key": etl_key},
)
obj = cur.fetchone()
return obj
def save_setting(self, conn: Connection, workflow_key: str, workflow_settings: str) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO stg.srv_wf_settings(workflow_key, workflow_settings)
VALUES (%(etl_key)s, %(etl_setting)s)
ON CONFLICT (workflow_key) DO UPDATE
SET workflow_settings = EXCLUDED.workflow_settings;
""",
{
"etl_key": workflow_key,
"etl_setting": workflow_settings
},
)
— dds
¶
—- dds_delivery
¶
—- __init__.py
¶
import os
import sys
from .courier_loader import CourierLoader # noqa
from .delivery_loader import DeliveryLoader # noqa
from .fct_deliveries_loader import FctDeliveryLoader # noqa
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
—- courier_loader.py
¶
import json
from datetime import datetime
from lib import PgConnect
from dds.dds_settings_repository import DdsEtlSettingsRepository, EtlSetting
from dds.dds_delivery.courier_repositories import (CourierDDSObj, CourierDDSRepository, CourierJSONObj,
CourierRawRepository)
# Класс таска "dm_couriers_load" дага "sprint5_case_dds_snowflake"
class CourierLoader:
WF_KEY = "couriers_raw_to_dds_workflow"
LAST_LOADED_ID_KEY = "last_loaded_id"
def __init__(self, pg: PgConnect, settings_repository: DdsEtlSettingsRepository) -> None:
self.dwh = pg
self.raw = CourierRawRepository()
self.dds_couriers = CourierDDSRepository()
self.settings_repository = settings_repository
# Метод преобразования данных таблицы `get_couriers` слоя `STG`
# в сласс данных таблицы `dm_couriers` слоя `DDS` через JSON
#def parse_data(self, courier_raw: CourierJSONObj, restaurant_id: int, timestamp_id: int, user_id: int) -> CourierDDSObj:
def parse_data(self, courier_raw: CourierJSONObj) -> CourierDDSObj:
print('(i) File "/lessons/dags/dds/dds_delivery/courier_loader.py", line 32, courier_raw =', courier_raw)
t = CourierDDSObj(id=0,
courier_id=courier_raw.id,
courier_name=courier_raw.name
)
return t
# Метод чтения всех необходимых данных из всех необходимых таблиц
# и запись их в требуемые таблицы, связанные с таблицей `dm_couriers` слоя `DDS`
# в рамках таска "dm_couriers_load" дага "sprint5_case_dds_snowflake"
def load_data(self):
with self.dwh.connection() as conn:
wf_setting = self.settings_repository.get_setting(conn, self.WF_KEY)
if not wf_setting:
wf_setting = EtlSetting(id=0, workflow_key=self.WF_KEY, workflow_settings={self.LAST_LOADED_ID_KEY: '-1'})
last_loaded_id = wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY]
print('(i) File "/lessons/dags/dds/dds_delivery/courier_loader.py", line 52, last_loaded_id =', last_loaded_id)
load_queue = self.raw.load_data(conn, last_loaded_id)
load_queue.sort(key=lambda x: x.id)
for courier_raw in load_queue:
courier_to_load = self.parse_data(courier_raw)
self.dds_couriers.insert_data(conn, courier_to_load)
print('(i) File "/lessons/dags/dds/dds_delivery/courier_loader.py", line 81, courier_raw =', courier_raw)
wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY] = courier_raw.id
self.settings_repository.save_setting(conn, wf_setting)
—- courier_repositories.py
¶
from typing import List, Optional
from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel
# Класс данных таблицы `get_couriers` в слое `STG`
class CourierJSONObj(BaseModel):
id: str
name: str
# Класс взаимодействия с данными таблицы `get_couriers` в слое `STG`
class CourierRawRepository:
# Метод чтения данных из таблицы `get_couriers` в слое `STG`
def load_data(self, conn: Connection, last_loaded_record_id: int) -> List[CourierJSONObj]:
print('(i) File "/lessons/dags/dds/dds_delivery/courier_repositories.py", line 20, last_loaded_record_id =', last_loaded_record_id)
with conn.cursor(row_factory=class_row(CourierJSONObj)) as cur:
cur.execute(
"""
SELECT
id,
name
FROM stg.get_couriers
WHERE id > %(last_loaded_record_id)s
ORDER BY id ASC;
""",
{"last_loaded_record_id": last_loaded_record_id}
)
objs = cur.fetchall()
objs.sort(key=lambda x: x.id)
print('(i) File "/lessons/dags/dds/dds_delivery/courier_repositories.py", line 35, objs =', objs)
return objs
# Класс данных таблицы `dm_couriers` в слое `DDS`
class CourierDDSObj(BaseModel):
id: int
courier_id: str
courier_name: str
# Класс взаимодействия с данными таблицы `dm_couriers` в слое `DDS`
class CourierDDSRepository:
# Метод вставки данных в таблицу `dm_couriers` в слое `DDS`
def insert_data(self, conn: Connection, obj: CourierJSONObj) -> None:
print('(i) File "/lessons/dags/dds/dds_delivery/courier_repositories.py", line 52, obj =', obj)
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO dds.dm_couriers(courier_id, courier_name)
VALUES (%(courier_id)s, %(courier_name)s)
ON CONFLICT (courier_id) DO UPDATE
SET
courier_id = EXCLUDED.courier_id,
courier_name = EXCLUDED.courier_name
;
""",
{
"courier_id": obj.courier_id,
"courier_name": obj.courier_name
},
)
# Метод чтения данных из таблицы `dm_couriers` в слое `DDS`
def get_data(self, conn: Connection, courier_id: str) -> Optional[CourierDDSObj]:
with conn.cursor(row_factory=class_row(CourierDDSObj)) as cur:
cur.execute(
"""
SELECT
id,
courier_id,
courier_name
FROM dds.dm_couriers
WHERE courier_id = %(courier_id)s;
""",
{"courier_id": courier_id},
)
obj = cur.fetchone()
return obj
—- delivery_loader.py
¶
import json
from datetime import datetime
from lib import PgConnect
from dds.dds_settings_repository import DdsEtlSettingsRepository, EtlSetting
from dds.timestamp_loader import TimestampDdsRepository
from dds.dds_delivery.delivery_repositories import (DeliveryJSONObj, DeliveryRawRepository,
DeliveryDDSObj, DeliveryDDSRepository)
# Класс таска "dm_delivery_load" дага "sprint5_case_dds_snowflake"
class DeliveryLoader:
WF_KEY = "deliviries_raw_to_dds_workflow"
LAST_LOADED_ID_KEY = "last_loaded_id"
def __init__(self, pg: PgConnect, settings_repository: DdsEtlSettingsRepository) -> None:
self.dwh = pg
self.raw = DeliveryRawRepository()
self.dds_timestamps = TimestampDdsRepository()
self.dds_deliveries = DeliveryDDSRepository()
self.settings_repository = settings_repository
print('(i) dags\dds\delivery_loader.py, row 20, DeliveryLoader.self =', self)
# Метод преобразования данных таблицы `get_deliveries` слоя `STG`
# в сласс данных таблицы `dm_deliveries` слоя `DDS` через JSON
def parse_data(self, delivery_raw: DeliveryJSONObj, timestamp_id: int) -> DeliveryDDSObj:
print('(i) dags\dds\delivery_loader.py, row 35, delivery_raw =', delivery_raw)
t = DeliveryDDSObj(id=0,
delivery_id=delivery_raw.delivery_id,
timestamp_id=timestamp_id
)
print('(i) dags\dds\delivery_loader.py, row 38, t (DeliveryDDSObj) =', t)
return t
# Метод чтения всех необходимых данных из всех необходимых таблиц
# и запись их в требуемые таблицы, связанные с таблицей `dm_deliveries` слоя `DDS`
# в рамках таска "dm_delivery_load" дага "sprint5_case_dds_snowflake"
def load_data(self):
print('(i) dags\dds\delivery_loader.py, row 48, DeliveryLoader.load_data(self) =', self)
with self.dwh.connection() as conn:
print('(i) dags\dds\delivery_loader.py, row 51, conn =', conn)
wf_setting = self.settings_repository.get_setting(conn, self.WF_KEY)
if not wf_setting:
wf_setting = EtlSetting(id=0, workflow_key=self.WF_KEY, workflow_settings={self.LAST_LOADED_ID_KEY: '-1'})
print('(i) dags\dds\delivery_loader.py, row 53, wf_setting =', wf_setting)
last_loaded_id = wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY]
print('(i) dags\dds\delivery_loader.py, row 58, last_loaded_id =', last_loaded_id)
load_queue = self.raw.load_data(conn, last_loaded_id)
load_queue.sort(key=lambda x: x.order_id)
for delivery_raw in load_queue:
print('(i) dags\dds\delivery_loader.py, row 63, delivery_raw =', delivery_raw)
print('(i) dags\dds\delivery_loader.py, row 70, delivery_raw.order_ts =', delivery_raw.order_ts)
dt = delivery_raw.order_ts.strftime("%Y-%m-%d %H:%M:%S")
print('(i) dags\dds\delivery_loader.py, row 70, dt =', dt)
timestamp = self.dds_timestamps.get_timestamp(conn, dt)
if not timestamp:
continue
print('(i) dags\dds\delivery_loader.py, row 69, timestamp =', timestamp)
delivery_to_load = self.parse_data(delivery_raw, timestamp.id)
print('(i) dags\dds\delivery_loader.py, row 84, delivery_to_load =', delivery_to_load)
self.dds_deliveries.insert_data(conn, delivery_to_load)
wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY] = delivery_raw.order_id
self.settings_repository.save_setting(conn, wf_setting)
—- delivery_repositories.py
¶
from typing import List, Optional
from datetime import datetime
from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel
# Класс данных таблицы `get_deliveries` в слое `STG`
class DeliveryJSONObj(BaseModel):
order_id: str
order_ts: datetime
delivery_id: str
courier_id: str
address: str
delivery_ts: datetime
rate: int
tip_sum: float
sum: float
# Класс взаимодействия с данными таблицы `get_deliveries` в слое `STG`
class DeliveryRawRepository:
# Метод чтения данных из таблицы `get_deliveries` в слое `STG`
def load_data(self, conn: Connection, last_loaded_record_id: str) -> List[DeliveryJSONObj]:
print('(i) dags\dds\delivery_repositories.py, row 26, last_loaded_record_id =', last_loaded_record_id)
print('(i) dags\dds\delivery_repositories.py, row 26, conn =', conn)
with conn.cursor(row_factory=class_row(DeliveryJSONObj)) as cur:
cur.execute(
"""
SELECT
order_id,
order_ts,
delivery_id,
courier_id,
address,
delivery_ts,
rate,
tip_sum,
sum
FROM stg.get_deliveries
WHERE order_id > %(last_loaded_record_id)s
ORDER BY order_id ASC;
""",
{"last_loaded_record_id": last_loaded_record_id},
)
objs = cur.fetchall()
objs.sort(key=lambda x: x.order_id)
print('(i) dags\dds\delivery_repositories.py, row 49, objs =', objs)
return objs
# Класс данных таблицы `dm_deliveries` в слое `DDS`
class DeliveryDDSObj(BaseModel):
id: int
delivery_id: str
timestamp_id: int
# Класс взаимодействия с данными таблицы `dm_deliveries` в слое `DDS`
class DeliveryDDSRepository:
# Метод вставки данных в таблицу `dm_deliveries` в слое `DDS`
def insert_data(self, conn: Connection, obj: DeliveryDDSObj) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO dds.dm_deliveries(delivery_id, timestamp_id)
VALUES (%(delivery_id)s, %(timestamp_id)s)
ON CONFLICT (delivery_id) DO UPDATE
SET
delivery_id = EXCLUDED.delivery_id,
timestamp_id = EXCLUDED.timestamp_id
;
""",
{
"delivery_id": obj.delivery_id,
"timestamp_id": obj.timestamp_id,
},
)
# Метод чтения данных из таблицы `dm_deliveries` в слое `DDS`
def get_data(self, conn: Connection, delivery_id: str) -> Optional[DeliveryDDSObj]:
with conn.cursor(row_factory=class_row(DeliveryDDSObj)) as cur:
cur.execute(
"""
SELECT
id,
delivery_id,
timestamp_id
FROM dds.dm_deliveries
WHERE delivery_id = %(delivery_id)s;
""",
{"delivery_id": delivery_id},
)
obj = cur.fetchone()
return obj
def list_data(self, conn: Connection) -> List[DeliveryDDSObj]:
with conn.cursor(row_factory=class_row(DeliveryDDSObj)) as cur:
cur.execute(
"""
SELECT id, delivery_id, timestamp_id
FROM dds.dm_deliveries;
"""
)
obj = cur.fetchall()
return obj
—- fct_deliveries_loader.py
¶
import json
import logging
from datetime import datetime
from typing import Dict, List, Tuple
from lib import PgConnect
from psycopg import Connection
from pydantic import BaseModel
from dds.dds_settings_repository import DdsEtlSettingsRepository, EtlSetting
from dds.order_repositories import (OrderDdsObj, OrderDdsRepository)
from dds.dds_delivery.courier_repositories import (CourierDDSObj,
CourierDDSRepository,
CourierJSONObj,
CourierRawRepository)
from dds.dds_delivery.delivery_repositories import (DeliveryDDSObj,
DeliveryDDSRepository,
DeliveryJSONObj,
DeliveryRawRepository)
log = logging.getLogger(__name__)
# Класс данных таблицы `fct_product_deliveries` в слое `DDS`
class FctDeliveryDDSObj(BaseModel):
id: int
order_id: int
delivery_id: int
courier_id: int
rate: float
tips_sum: float
# Класс взаимодействия с данными таблицы фактов `fct_product_deliveries` в слое `DDS`
class FctDeliveryDDSRepository:
# Метод вставки данных в таблицу фактов `fct_product_deliveries` в слое `DDS`
def insert_data(self, conn: Connection, fact: List[FctDeliveryDDSObj]) -> None:
print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 64, facts =', fact)
with conn.cursor() as cur:
print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 66, fact =', fact)
cur.execute(
"""
INSERT INTO dds.fct_product_deliveries(
order_id,
delivery_id,
courier_id,
rate,
tips_sum
)
VALUES (
%(order_id)s,
%(delivery_id)s,
%(courier_id)s,
%(rate)s,
%(tips_sum)s
)
ON CONFLICT (order_id, delivery_id) DO UPDATE
SET
courier_id = EXCLUDED.courier_id,
rate = EXCLUDED.rate,
tips_sum = EXCLUDED.tips_sum
;
""",
{
"order_id": fact.order_id,
"delivery_id": fact.delivery_id,
"courier_id": fact.courier_id,
"rate": fact.rate,
"tips_sum": fact.tips_sum
},
)
class FctDeliveryLoader:
WF_KEY = "fact_product_deliveries_raw_to_dds_workflow"
LAST_LOADED_ID_KEY = "last_loaded_order_id"
_LOG_THRESHOLD = 100
def __init__(self, pg: PgConnect, settings_repository: DdsEtlSettingsRepository) -> None:
self.dwh = pg
self.raw_deliveries = DeliveryRawRepository()
self.dds_orders = OrderDdsRepository()
self.dds_deliveries = DeliveryDDSRepository()
self.dds_couriers = CourierDDSRepository()
self.dds_facts = FctDeliveryDDSRepository()
self.settings_repository = settings_repository
# Метод преобразования данных таблицы `get_deliveries` слоя `STG`
# в сласс данных таблицы `fct_product_deliveries` слоя `DDS` через JSON
# с использованием идентификаторов таблиц `dm_orders`,
# `dm_deliveries`, `dm_couriers` слоя `DDS`
def parse_data(self,
order: OrderDdsObj,
delivery: DeliveryDDSObj,
courier: CourierDDSObj,
delivery_raw: FctDeliveryDDSObj
) -> Tuple[bool, List[FctDeliveryDDSObj]]:
print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 123, order =', order)
print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 123, delivery =', delivery)
print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 123, courier =', courier)
print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 123, delivery_raw =', delivery_raw)
if (delivery_raw.order_id != order.order_key
or delivery_raw.delivery_id != delivery.delivery_id
or delivery_raw.courier_id != courier.courier_id):
return (False, [])
print(f'(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 140, order.id = {order.id}, delivery.id = {delivery.id}, courier.id = {courier.id}, delivery_raw.rate = {delivery_raw.rate}, delivery_raw.tip_sum = {delivery_raw.tip_sum}')
t = FctDeliveryDDSObj(id=0,
order_id=order.id,
delivery_id=delivery.id,
courier_id=courier.id,
rate=delivery_raw.rate * 1.,
tips_sum=delivery_raw.tip_sum
)
return (True, t)
def load_data(self):
with self.dwh.connection() as conn:
# Поиск последней записи этой таблицы в репозитории
wf_setting = self.settings_repository.get_setting(conn, self.WF_KEY)
if not wf_setting:
wf_setting = EtlSetting(id=0, workflow_key=self.WF_KEY, workflow_settings={self.LAST_LOADED_ID_KEY: '-1'})
# Идентификатор последней записи этой таблицы в репозитории
last_loaded_id = wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY]
log.info(f"Starting load from: {last_loaded_id}")
# Получение данных числовых данных из таблицы `get_deliveries` слоя `STG`.
# Сортировка и поиск по идентификатору доставки `order_id`
load_queue = self.raw_deliveries.load_data(conn, last_loaded_id)
load_queue.sort(key=lambda x: x.order_id)
log.info(f"Found {len(load_queue)} events to load.")
proc_cnt = 0
for delivery_raw in load_queue:
# Получение данных о продаже из таблицы `dm_orders`
order = self.dds_orders.get_order(conn, delivery_raw.order_id)
if not order:
log.info(f"Not found order {delivery_raw.order_id}. Finishing.")
continue
# Получение данных о доставке из таблицы `dm_deliveries`
delivery = self.dds_deliveries.get_data(conn, delivery_raw.delivery_id)
if not delivery:
log.info(f"Not found order {delivery_raw.delivery_id}. Finishing.")
continue
# Получение данных о курьере из таблицы `dm_couriers`
courier = self.dds_couriers.get_data(conn, delivery_raw.courier_id)
if not courier:
log.info(f"Not found order {delivery_raw.courier_id}. Finishing.")
continue
(success, facts_to_load) = self.parse_data(order, delivery, courier, delivery_raw)
if not success:
log.info(f"Could not parse object for order {order.id}, delivery {delivery.id}, courier {courier.id}. Finishing.")
break
print('(i) dags\dds\dds_delivery\fct_deliveries_loader.py, row 202, facts_to_load =', facts_to_load)
self.dds_facts.insert_data(conn, facts_to_load)
wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY] = delivery_raw.order_id
self.settings_repository.save_setting(conn, wf_setting)
proc_cnt += 1
if proc_cnt % self._LOG_THRESHOLD == 0:
log.info(f"Processing events {proc_cnt} out of {len(load_queue)}.")
log.info(f"Processed {proc_cnt} events out of {len(load_queue)}.")
— dds_settings_repository.py
¶
from typing import Dict, Optional
from lib.dict_util import json2str
from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel
class EtlSetting(BaseModel):
id: int
workflow_key: str
workflow_settings: Dict
class DdsEtlSettingsRepository:
def get_setting(self, conn: Connection, etl_key: str) -> Optional[EtlSetting]:
with conn.cursor(row_factory=class_row(EtlSetting)) as cur:
cur.execute(
"""
SELECT
id,
workflow_key,
workflow_settings
FROM dds.srv_wf_settings
WHERE workflow_key = %(etl_key)s;
""",
{"etl_key": etl_key},
)
obj = cur.fetchone()
return obj
def save_setting(self, conn: Connection, sett: EtlSetting) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO dds.srv_wf_settings(workflow_key, workflow_settings)
VALUES (%(wf_key)s, %(wf_setting)s)
ON CONFLICT (workflow_key) DO UPDATE
SET workflow_settings = EXCLUDED.workflow_settings;
""",
{
"wf_key": sett.workflow_key,
"wf_setting": json2str(sett.workflow_settings)
},
)
— dds_snowflake_dag.py
¶
import logging
import pendulum
from airflow import DAG
from airflow.decorators import task
from config_const import ConfigConst
from lib import ConnectionBuilder
from dds.dds_settings_repository import DdsEtlSettingsRepository
from dds.fct_products_loader import FctProductsLoader
from dds.order_loader import OrderLoader
from dds.products_loader import ProductLoader
from dds.restaurant_loader import RestaurantLoader
from dds.schema_ddl import SchemaDdl
from dds.timestamp_loader import TimestampLoader
from dds.user_loader import UserLoader
#----- Проект ------
from dds_delivery import CourierLoader
from dds_delivery import DeliveryLoader
from dds_delivery import FctDeliveryLoader
#-------------------
log = logging.getLogger(__name__)
with DAG(
dag_id='sprint5_case_dds_snowflake',
schedule_interval='* 0/15 * * *',
start_date=pendulum.datetime(2022, 5, 5, tz="UTC"),
catchup=False,
tags=['sprint5', 'raw', 'dds'],
is_paused_upon_creation=False
) as dag:
dwh_pg_connect = ConnectionBuilder.pg_conn(ConfigConst.PG_WAREHOUSE_CONNECTION)
settings_repository = DdsEtlSettingsRepository()
@task(task_id="schema_init")
def schema_init(ds=None, **kwargs):
rest_loader = SchemaDdl(dwh_pg_connect)
rest_loader.init_schema()
@task(task_id="dm_restaurants_load")
def load_dm_restaurants(ds=None, **kwargs):
rest_loader = RestaurantLoader(dwh_pg_connect, settings_repository)
rest_loader.load_restaurants()
@task(task_id="dm_products_load")
def load_dm_products(ds=None, **kwargs):
prod_loader = ProductLoader(dwh_pg_connect, settings_repository)
prod_loader.load_products()
@task(task_id="dm_timestamps_load")
def load_dm_timestamps(ds=None, **kwargs):
ts_loader = TimestampLoader(dwh_pg_connect, settings_repository)
ts_loader.load_timestamps()
@task(task_id="dm_users_load")
def load_dm_users(ds=None, **kwargs):
user_loader = UserLoader(dwh_pg_connect, settings_repository)
user_loader.load_users()
@task(task_id="dm_orders_load")
def load_dm_orders(ds=None, **kwargs):
order_loader = OrderLoader(dwh_pg_connect, settings_repository)
order_loader.load_orders()
@task(task_id="fct_order_products_load")
def load_fct_order_products(ds=None, **kwargs):
fct_loader = FctProductsLoader(dwh_pg_connect, settings_repository)
print('(0) fct_loader =', fct_loader)
fct_loader.load_product_facts()
#----- Проект ------
@task(task_id="dm_couriers_load")
def load_dm_couriers(ds=None, **kwargs):
data_loader = CourierLoader(dwh_pg_connect, settings_repository)
data_loader.load_data()
@task(task_id="dm_delivery_load")
def load_dm_deliveries(ds=None, **kwargs):
data_loader = DeliveryLoader(dwh_pg_connect, settings_repository)
print('(i) dags\dds_snowflake_dag.py, row 81, data_loader (task_id="dm_delivery_load") =', data_loader)
data_loader.load_data()
@task(task_id="fct_delivery_products_load")
def load_fct_delivery_products(ds=None, **kwargs):
data_loader = FctDeliveryLoader(dwh_pg_connect, settings_repository)
data_loader.load_data()
#-------------------
init_schema = schema_init()
dm_restaurants = load_dm_restaurants()
dm_products = load_dm_products()
dm_timestamps = load_dm_timestamps()
dm_users = load_dm_users()
dm_orders = load_dm_orders()
fct_order_products = load_fct_order_products()
#----- Проект ------
dm_couriers = load_dm_couriers()
dm_deliveries = load_dm_deliveries()
fct_delivery_products = load_fct_delivery_products()
#-------------------
init_schema >> dm_restaurants # type: ignore
init_schema >> dm_timestamps # type: ignore
init_schema >> dm_users # type: ignore
init_schema >> dm_products # type: ignore
init_schema >> dm_orders # type: ignore
dm_restaurants >> dm_products # type: ignore
dm_restaurants >> dm_orders # type: ignore
dm_timestamps >> dm_orders # type: ignore
dm_users >> dm_orders # type: ignore
dm_products >> fct_order_products # type: ignore
dm_orders >> fct_order_products # type: ignore
#----- Проект ------
dm_orders >> dm_deliveries # type: ignore
dm_timestamps >> dm_deliveries # type: ignore
dm_deliveries >> fct_delivery_products # type: ignore
dm_couriers >> fct_delivery_products # type: ignore
#-------------------
— timestamp_loader.py
¶
import json
from datetime import date, datetime, time
from typing import Optional
from lib import PgConnect
from psycopg import Connection
from psycopg.rows import class_row
from pydantic import BaseModel
from dds.dds_settings_repository import DdsEtlSettingsRepository, EtlSetting
from dds.order_repositories import OrderJsonObj, OrderRawRepository
class TimestampDdsObj(BaseModel):
id: int
ts: datetime
year: int
month: int
day: int
time: time
date: date
class TimestampDdsRepository:
def insert_dds_timestamp(self, conn: Connection, timestamp: TimestampDdsObj) -> None:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO dds.dm_timestamps(ts, year, month, day, time, date)
VALUES (%(ts)s, %(year)s, %(month)s, %(day)s, %(time)s, %(date)s)
ON CONFLICT (ts) DO NOTHING;
""",
{
"ts": timestamp.ts,
"year": timestamp.year,
"month": timestamp.month,
"day": timestamp.day,
"time": timestamp.time,
"date": timestamp.date
},
)
def get_timestamp(self, conn: Connection, dt: datetime) -> Optional[TimestampDdsObj]:
with conn.cursor(row_factory=class_row(TimestampDdsObj)) as cur:
cur.execute(
"""
SELECT id, ts, year, month, day, time, date
FROM dds.dm_timestamps
WHERE ts = %(dt)s;
""",
{"dt": dt},
)
obj = cur.fetchone()
print('(i) dags\dds\timestamp_loader.py, row 53, obj =', obj)
return obj
class TimestampLoader:
WF_KEY = "timestamp_raw_to_dds_workflow"
LAST_LOADED_ID_KEY = "last_loaded_order_id"
def __init__(self, pg: PgConnect, settings_repository: DdsEtlSettingsRepository) -> None:
self.dwh = pg
self.raw_orders = OrderRawRepository()
self.dds = TimestampDdsRepository()
self.settings_repository = settings_repository
def parse_order_ts(self, order_raw: OrderJsonObj) -> TimestampDdsObj:
order_json = json.loads(order_raw.object_value)
dt = datetime.strptime(order_json['date'], "%Y-%m-%d %H:%M:%S")
t = TimestampDdsObj(id=0,
ts=dt,
year=dt.year,
month=dt.month,
day=dt.day,
time=dt.time(),
date=dt.date()
)
return t
def load_timestamps(self):
with self.dwh.connection() as conn:
wf_setting = self.settings_repository.get_setting(conn, self.WF_KEY)
if not wf_setting:
wf_setting = EtlSetting(id=0, workflow_key=self.WF_KEY, workflow_settings={self.LAST_LOADED_ID_KEY: -1})
last_loaded_id = wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY]
load_queue = self.raw_orders.load_raw_orders(conn, last_loaded_id)
for order in load_queue:
ts_to_load = self.parse_order_ts(order)
self.dds.insert_dds_timestamp(conn, ts_to_load)
wf_setting.workflow_settings[self.LAST_LOADED_ID_KEY] = order.id
self.settings_repository.save_setting(conn, wf_setting)
— cdm
¶
— cdm_courier_ledger
¶
—- __init__.py
¶
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath(__file__)))
—- cdm_courier_ledger_report_dag.py
¶
import pendulum
from airflow.decorators import dag, task
from config_const import ConfigConst
from lib import ConnectionBuilder
from cdm.cdm_courier_ledger.courier_ledger_report import (CourierLedgerRepository, CourierLedgerReportLoader)
@dag(
dag_id='cdm_courier_ledger_report_dag',
schedule_interval='0/30 * * * *',
start_date=pendulum.datetime(2022, 5, 5, tz="UTC"),
catchup=False,
tags=['sprint5', 'cdm', 'courier_ledger'],
is_paused_upon_creation=False
)
def cdm_courier_ledger_report():
dwh_pg_connect = ConnectionBuilder.pg_conn(ConfigConst.PG_WAREHOUSE_CONNECTION)
@task(task_id="cdm_courier_ledger_report_load")
def load_cdm_courier_ledger_report():
rest_loader = CourierLedgerReportLoader(dwh_pg_connect)
rest_loader.insert_data()
report = load_cdm_courier_ledger_report()
report # type: ignore
cdm_courier_ledger_dag = cdm_courier_ledger_report() # noqa
—- courier_ledger_report.py
¶
from lib import PgConnect
class CourierLedgerRepository:
def __init__(self, pg: PgConnect) -> None:
self._db = pg
def insert_data(self) -> None:
with self._db.client() as conn:
with conn.cursor() as cur:
execute = """
INSERT INTO cdm.dm_courier_ledger(
id,
courier_id,
courier_name,
settlement_year,
settlement_month,
orders_count,
orders_total_sum,
rate_avg,
courier_tips_sum
)
SELECT fpd.id,
fpd.courier_id,
dmc.courier_name,
dmt.year AS settlement_year,
dmt.month AS settlement_month,
COUNT(fpd.order_id) AS orders_count,
SUM(fps.total_sum) AS orders_total_sum,
AVG(fpd.rate) AS rate_avg,
SUM(fpd.tips_sum) AS courier_tips_sum
FROM dds.fct_product_deliveries fpd
LEFT JOIN dds.dm_orders dmo ON dmo.id = fpd.order_id
LEFT JOIN dds.dm_couriers dmc ON dmc.id = fpd.courier_id
LEFT JOIN dds.dm_timestamps dmt ON dmt.id = dmo.timestamp_id
LEFT JOIN dds.fct_product_sales fps ON fps.order_id = fpd.order_id
GROUP BY fpd.id, fpd.courier_id, dmc.courier_name, dmt.year, dmt.month
ON CONFLICT (id) DO UPDATE
SET
courier_id = EXCLUDED.courier_id,
courier_name = EXCLUDED.courier_name,
settlement_year = EXCLUDED.settlement_year,
settlement_month = EXCLUDED.settlement_month,
orders_count = EXCLUDED.orders_count,
orders_total_sum = EXCLUDED.orders_total_sum,
rate_avg = EXCLUDED.rate_avg,
courier_tips_sum = EXCLUDED.courier_tips_sum;
"""
cur.execute(execute)
conn.commit()
class CourierLedgerReportLoader:
def __init__(self, pg: PgConnect) -> None:
self.repository = CourierLedgerRepository(pg)
def insert_data(self):
self.repository.insert_data()