В этом проекте требуется изменить процессы в пайплайне так, чтобы они соответствовали новым задачам бизнеса. Проект разделен на два этапа развития пайплайна — требуется постепенно дорабатывать его, получая новые вводные от заказчика. В итоге должен получится код, который включает в себя решения для каждой ситуации.
Этап 1
Разработанный ранее пайплайн, работает, и компания успешно им пользуются. Однако, во-первых, на данных из витрины mart.f_sales
BI-аналитики построили графики total revenue
(Revenue — это суммарный доход от продаж. Рассчитывается по формуле revenue = SUM(payment_amount)
) для различных срезов. Во-вторых, система магазина продолжает развиваться: команда разработки добавила функционал отмены заказов и возврата средств (refunded
). Значит, процессы в пайплайне нужно обновить.
Требуется адаптировать существующий пайплайн для текущей задачи:
- Учтесть в витрине
mart.f_sales
статусыshipped
иrefunded
. Все данные в витрине следует считатьshipped
. - Обновить пайплайн с учётом статусов и backward compatibility.
Этап 2
Компания решила лучше изучить поведение клиентов. Для этого она хочет исследовать возвращаемость клиентов, или customer retention (Customer retention (пер. «удержание клиента») — это критерий, который учитывает, как долго клиент пользуется услугами компании после первой успешной сделки. Чем лояльнее клиенты к компании, тем выше показатель Customer retention). Выяснилось, что на текущий момент отчёт по customer retention
строится очень долго. Поэтому есть потребность вычислить нужные метрики в дополнительной витрине.
На основе пайплайна из сквозной задачи спринта требуется наполнить витрину mart.f_customer_retention
данными по «возвращаемости клиентов» в разрезе недель.
Структура репозитория проекта¶
- Папка
migrations
хранит файлы миграции с расширением.sql
и содержат SQL-скрипты обновления базы данных. - В папке
src
хранятся все необходимые исходники. - Папка
src/dags
содержитDAG's Airflow
:
Файл mart.f_sales_add_column.sql
¶
Файл mart.f_sales_add_column.sql
содержит скрипт добавления столбцов в таблицу mart.f_sales
в рамках 1 этапа.
ALTER TABLE mart.f_sales ADD COLUMN 'status' DATA_TYPE VARCHAR(10) DEFAULT 'shipped';
Файл mart.f_customer_retention_create_table.sql
¶
Файл mart.f_customer_retention_create_table.sql
содержит скрипт создания таблицы mart.f_customer_retention
в рамках 2 этапа данного проекта.
CREATE TABLE mart.f_customer_retention (
new_customers_count INT NOT NULL,
returning_customers_count INT NOT NULL,
refunded_customer_count INT NOT NULL,
period_name VARCHAR(30),
period_id INT NOT NULL,
item_id INT NOT NULL,
new_customers_revenue NUMERIC(11, 2), -- требуется указать параметры числа
returning_customers_revenue NUMERIC(11, 2), -- требуется указать параметры числа
customers_refunded INT,
CONSTRAINT fk_period_name FOREIGN KEY period_name REFERENCES mart.d_calendar (week_of_year_iso),
CONSTRAINT fk_period_id FOREIGN KEY period_id REFERENCES mart.d_calendar (week_of_year),
CONSTRAINT fk_item_id FOREIGN KEY item_id REFERENCES mart.d_item (id)
);
Файл mart.f_sales_new.sql
¶
Файл mart.f_sales_new.sql
содержит скрипт обновления данных в таблице mart.f_sales
в рамках 1 этапа данного проекта.
insert into mart.f_sales (date_id,
item_id,
customer_id,
city_id,
quantity,
payment_amount
status)
select dc.date_id,
uol.item_id,
uol.customer_id,
uol.city_id,
uol.quantity,
CASE
WHEN uol.status = 'shipped' THEN uol.payment_amount = -uol.payment_amount
WHEN uol.status IS NULL THEN uol.payment_amount = uol.payment_amount
ELSE uol.payment_amount = uol.payment_amount
END AS payment_amount
uol.status
from staging.user_order_log uol
left join mart.d_calendar as dc on uol.date_time::Date = dc.date_actual
where uol.date_time::Date = '{{ds}}';
Файл mart.f_customer_retention_new.sql
¶
Файл mart.f_customer_retention_new.sql
скрипт обновления данных в таблице mart.f_customer_retention
в рамках 2 этапа данного проекта.
insert into mart.f_customer_retention ( new_customers_count,
returning_customers_count,
refunded_customer_count,
period_name,
period_id,
item_id,
new_customers_revenue,
returning_customers_revenue,
customers_refunded)
SELECT COUNT(CASE
WHEN date_time = MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN 1
ELSE 0
END) AS new_customers_count,
COUNT(CASE
WHEN date_time > MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN 1
ELSE 0
END) AS returning_customers_count,
COUNT(CASE
WHEN uol.status = 'refunded' THEN 1
ELSE 0
END) AS refunded_customer_count,
dc.week_of_year_iso AS period_name,
dc.week_of_year AS period_id,
uol.item_id,
SUM(CASE
WHEN date_time = MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN oul.payment_amount
ELSE 0
END) AS new_customers_revenue,
SUM(CASE
WHEN date_time > MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN oul.payment_amount
ELSE 0
END) AS returning_customers_revenue,
COUNT(CASE
WHEN uol.status = 'refunded' THEN oul.payment_amount
ELSE 0
END) AS customers_refunded
FROM staging.user_order_log uol
LEFT JOIN mart.d_calendar as dc on uol.date_time::Date = dc.date_actual
WHERE uol.date_time::Date = '{{ds}}'
GROUP BY dc.week_of_year_iso, dc.week_of_year, uol.item_id;
Файл mart.d_city.sql
¶
Файл mart.d_city.sql
содержит исходный скрипт обновления данных в таблице mart.d_city
.
INSERT INTO mart.d_city (city_id, city_name)
SELECT city_id, city_name FROM staging.user_order_log
WHERE city_id NOT IN (SELECT city_id FROM mart.d_city)
GROUP BY city_id, city_name;
Файл mart.d_customer.sql
¶
Файл mart.d_customer.sql
содержит исходный скрипт обновления данных в таблице mart.d_customer
.
INSERT INTO mart.d_customer (customer_id, first_name, last_name, city_id)
SELECT customer_id, first_name, last_name, MAX(city_id) FROM staging.user_order_log
WHERE customer_id NOT IN (SELECT customer_id FROM mart.d_customer)
GROUP BY customer_id, first_name, last_name
Файл mart.d_item.sql
¶
Файл mart.d_item.sql
содержит исходный скрипт обновления данных в таблице mart.d_item
.
INSERT INTO mart.d_item (item_id, item_name)
SELECT item_id, item_name FROM staging.user_order_log
WHERE item_id NOT IN (SELECT item_id FROM mart.d_item)
GROUP BY item_id, item_name;
Файл sales_mart_upgrade.py
¶
Файл sales_mart_upgrade.py
содержит скрипт DAG пайплайна изменения структуры таблиц хранилища данных.
import time
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.hooks.http_hook import HttpHook
# Библиотека для использования переменных Variable, заданных в Airflow
from airflow.models import Variable
# Параметры подключения к серверу
http_conn_id = HttpHook.get_connection('http_conn_id')
api_key = '5f55e6c0-e9e5-4a9c-b313-63c01fc31460'
base_url = http_conn_id.host
POSTGRES_CONN_ID = 'postgresql_de'
# Глобальные переменые заглавными буквами
# Испльзование переменных Variable, заданных в Airflow
NICKNAME = Variable.get('nickname')
COHORT = Variable.get('cohort')
headers = {
'X-Nickname': NICKNAME,
'X-Cohort': COHORT,
'X-Project': 'True',
'X-API-KEY': api_key,
'Content-Type': 'application/x-www-form-urlencoded'
}
# DAG
args = {
"owner": "student",
'email': ['student@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3 # Больше циклов в случае не срабатывания подключения
}
business_dt = '{{ ds }}'
with DAG(
'sales_mart_upgrade',
default_args=args,
description='Provide default dag for sprint3',
catchup=True,
start_date=datetime.today() - timedelta(days=7),
end_date=datetime.today() - timedelta(days=1),
) as dag:
# Добавление столбца в таблицу mart.f_sales
f_sales_add_column = PostgresOperator(
task_id='f_sales_add_column',
postgres_conn_id=POSTGRES_CONN_ID,
sql="/migrations/mart.f_sales_add_column.sql"
)
# Создание таблицы mart.f_customer_retention
f_customer_retention_create_table = PostgresOperator(
task_id='f_customer_retention_create_table',
postgres_conn_id=POSTGRES_CONN_ID,
sql="/migrations/mart.f_customer_retention_create_table.sql"
)
(
f_sales_add_column
)
Файл sales_mart_new.py
¶
Файл sales_mart_new.py
содержит скрипт DAG пайплайна обновления данных в хранилище.
import time
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.hooks.http_hook import HttpHook
# Библиотека для использования переменных Variable, заданных в Airflow
from airflow.models import Variable
# Параметры подключения к серверу
http_conn_id = HttpHook.get_connection('http_conn_id')
api_key = '5f55e6c0-e9e5-4a9c-b313-63c01fc31460' #http_conn_id.extra_dejson.get('api_key')
base_url = http_conn_id.host
POSTGRES_CONN_ID = 'postgresql_de'
# Глобальные переменые заглавными буквами
# Испльзование переменных Variable, заданных в Airflow
NICKNAME = Variable.get('nickname')
COHORT = Variable.get('cohort')
headers = {
'X-Nickname': NICKNAME,
'X-Cohort': COHORT,
'X-Project': 'True',
'X-API-KEY': api_key,
'Content-Type': 'application/x-www-form-urlencoded'
}
# Инициализация формирование данных (отчета) по API
def generate_report(ti):
# Здесь можно применить библиотеку logging (https://www.astronomer.io/guides/logging/)
# с логами типа info, warning, error
print('Making request generate_report')
response = requests.post(f'{base_url}/generate_report', headers=headers)
response.raise_for_status()
task_id = json.loads(response.content)['task_id']
ti.xcom_push(key='task_id', value=task_id)
print(f'Response is {response.content}')
# Получение данных (отчета) по API
def get_report(ti):
print('Making request get_report')
task_id = ti.xcom_pull(key='task_id')
report_id = None
for i in range(20):
response = requests.get(f'{base_url}/get_report?task_id={task_id}', headers=headers)
response.raise_for_status()
print(f'Response is {response.content}')
status = json.loads(response.content)['status']
if status == 'SUCCESS':
report_id = json.loads(response.content)['data']['report_id']
break
else:
time.sleep(120)
if not report_id:
raise TimeoutError('Истек срок ожидания ответа для переменной "report_id"')
ti.xcom_push(key='report_id', value=report_id)
print(f'Report_id={report_id}')
# Получение данных (отчета) из API за те даты,
# которые не вошли в основной отчет
def get_increment(date, ti):
print('Making request get_increment')
report_id = ti.xcom_pull(key='report_id')
response = requests.get(
f'{base_url}/get_increment?report_id={report_id}&date={str(date)}T00:00:00',
headers=headers)
response.raise_for_status()
print(f'Response is {response.content}')
increment_id = json.loads(response.content)['data']['increment_id']
if not increment_id:
raise ValueError(f'Increment is empty. Most probably due to error in API call.')
ti.xcom_push(key='increment_id', value=increment_id)
print(f'increment_id={increment_id}')
# Обновление данных в слое Staging
def upload_data_to_staging(filename, date, pg_table, pg_schema, ti):
increment_id = ti.xcom_pull(key='increment_id')
s3_filename = f'https://storage.yandexcloud.net/s3-sprint3/cohort_{COHORT}/{NICKNAME}/project/{increment_id}/{filename}'
print(s3_filename)
local_filename = date.replace('-', '') + '_' + filename
print(local_filename)
response = requests.get(s3_filename)
response.raise_for_status()
open(f"{local_filename}", "wb").write(response.content)
print(response.content)
df = pd.read_csv(local_filename, index_col=0)
#df=df.drop('id', axis=1) # Использование index_col=0 в read_csv вместо удаления столбца
df=df.drop_duplicates(subset=['uniq_id'])
if 'status' not in df.columns:
df['status'] = 'shipped'
postgres_hook = PostgresHook(POSTGRES_CONN_ID)
engine = postgres_hook.get_sqlalchemy_engine()
row_count = df.to_sql(pg_table, engine, schema=pg_schema, if_exists='append', index=False)
print(f'{row_count} rows was inserted')
# DAG
args = {
"owner": "student",
'email': ['student@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3 # Больше циклов в случае не срабатывания подключения
}
business_dt = '{{ ds }}'
with DAG(
'sales_mart_new',
default_args=args,
description='Provide default dag for sprint3',
catchup=True,
start_date=datetime.today() - timedelta(days=7),
end_date=datetime.today() - timedelta(days=1),
) as dag:
# Инициализация формирование данных (отчета) по API
generate_report = PythonOperator(
task_id='generate_report',
python_callable=generate_report)
# Получение данных (отчета) по API
get_report = PythonOperator(
task_id='get_report',
python_callable=get_report)
# Получение данных (отчета) из API за те даты,
# которые не вошли в основной отчет
get_increment = PythonOperator(
task_id='get_increment',
python_callable=get_increment,
op_kwargs={'date': business_dt})
# Обновление данных в таблице staging.user_order_log
upload_user_order_inc = PythonOperator(
task_id='upload_user_order_inc',
python_callable=upload_data_to_staging,
op_kwargs={'date': business_dt,
'filename': 'user_order_log_inc.csv',
'pg_table': 'user_order_log',
'pg_schema': 'staging'})
# Обновление данных в таблицах уровня витрин 'mart'
for i in ['d_item', 'd_customer', 'd_city']:
dementions_tasks.append = PostgresOperator(
task_id = 'update_' + i,
postgres_conn_id = POSTGRES_CONN_ID,
sql =f'/migrations/mart.{i}.sql')
# Обновление данных в таблице 'mart.f_sales'
# (в том числе 'status' и зависимого от него 'payment_amount')
# Не предуспотрена обратная совместимость,
# если в таблице 'staging.user_order_log' нет поля 'status'
update_f_sales = PostgresOperator(
task_id='update_f_sales',
postgres_conn_id=POSTGRES_CONN_ID,
sql="/migrations/mart.f_sales_new.sql",
parameters={"date": {business_dt}}
)
(
generate_report
>> get_report
>> get_increment
>> upload_user_order_inc
>> [dementions_tasks[0], dementions_tasks[1], dementions_tasks[2]]
>> update_f_sales
)