В этом проекте требуется изменить процессы в пайплайне так, чтобы они соответствовали новым задачам бизнеса. Проект разделен на два этапа развития пайплайна — требуется постепенно дорабатывать его, получая новые вводные от заказчика. В итоге должен получится код, который включает в себя решения для каждой ситуации.
Этап 1
Разработанный ранее пайплайн, работает, и компания успешно им пользуются. Однако, во-первых, на данных из витрины mart.f_sales BI-аналитики построили графики total revenue (Revenue — это суммарный доход от продаж. Рассчитывается по формуле revenue = SUM(payment_amount)) для различных срезов. Во-вторых, система магазина продолжает развиваться: команда разработки добавила функционал отмены заказов и возврата средств (refunded). Значит, процессы в пайплайне нужно обновить.
Требуется адаптировать существующий пайплайн для текущей задачи:
- Учтесть в витрине 
mart.f_salesстатусыshippedиrefunded. Все данные в витрине следует считатьshipped. - Обновить пайплайн с учётом статусов и backward compatibility.
 
Этап 2
Компания решила лучше изучить поведение клиентов. Для этого она хочет исследовать возвращаемость клиентов, или customer retention (Customer retention (пер. «удержание клиента») — это критерий, который учитывает, как долго клиент пользуется услугами компании после первой успешной сделки. Чем лояльнее клиенты к компании, тем выше показатель Customer retention). Выяснилось, что на текущий момент отчёт по customer retention строится очень долго. Поэтому есть потребность вычислить нужные метрики в дополнительной витрине.
На основе пайплайна из сквозной задачи спринта требуется наполнить витрину mart.f_customer_retention данными по «возвращаемости клиентов» в разрезе недель.
Структура репозитория проекта¶
- Папка 
migrationsхранит файлы миграции с расширением.sqlи содержат SQL-скрипты обновления базы данных. - В папке 
srcхранятся все необходимые исходники. - Папка 
src/dagsсодержитDAG's Airflow: 
Файл mart.f_sales_add_column.sql ¶
Файл mart.f_sales_add_column.sql содержит скрипт добавления столбцов в таблицу mart.f_sales в рамках 1 этапа.
ALTER TABLE mart.f_sales ADD COLUMN 'status' DATA_TYPE VARCHAR(10) DEFAULT 'shipped';
Файл mart.f_customer_retention_create_table.sql ¶
Файл mart.f_customer_retention_create_table.sql содержит скрипт создания таблицы mart.f_customer_retention в рамках 2 этапа данного проекта.
CREATE TABLE mart.f_customer_retention (
    new_customers_count INT NOT NULL,
    returning_customers_count INT NOT NULL,
    refunded_customer_count INT NOT NULL,
    period_name VARCHAR(30),
    period_id INT NOT NULL,
    item_id INT NOT NULL,
    new_customers_revenue NUMERIC(11, 2), -- требуется указать параметры числа
    returning_customers_revenue NUMERIC(11, 2), -- требуется указать параметры числа
    customers_refunded INT, 
    CONSTRAINT fk_period_name FOREIGN KEY period_name REFERENCES mart.d_calendar (week_of_year_iso),
    CONSTRAINT fk_period_id FOREIGN KEY period_id REFERENCES mart.d_calendar (week_of_year),
    CONSTRAINT fk_item_id FOREIGN KEY item_id REFERENCES mart.d_item (id)
);
Файл mart.f_sales_new.sql ¶
Файл mart.f_sales_new.sql содержит скрипт обновления данных в таблице mart.f_sales в рамках 1 этапа данного проекта.
insert into mart.f_sales (date_id, 
                          item_id, 
                          customer_id, 
                          city_id, 
                          quantity, 
                          payment_amount
                          status)
select  dc.date_id, 
        uol.item_id, 
        uol.customer_id, 
        uol.city_id, 
        uol.quantity, 
        CASE
            WHEN uol.status = 'shipped' THEN uol.payment_amount = -uol.payment_amount
            WHEN uol.status IS NULL THEN uol.payment_amount = uol.payment_amount
            ELSE uol.payment_amount = uol.payment_amount
        END AS payment_amount
        uol.status
from staging.user_order_log uol
left join mart.d_calendar as dc on uol.date_time::Date = dc.date_actual
where uol.date_time::Date = '{{ds}}';
Файл mart.f_customer_retention_new.sql ¶
Файл mart.f_customer_retention_new.sql скрипт обновления данных в таблице mart.f_customer_retention в рамках 2 этапа данного проекта.
insert into mart.f_customer_retention ( new_customers_count,
                                        returning_customers_count,
                                        refunded_customer_count,
                                        period_name,
                                        period_id,
                                        item_id,
                                        new_customers_revenue,
                                        returning_customers_revenue,
                                        customers_refunded)
SELECT  COUNT(CASE 
                WHEN date_time = MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN 1
                ELSE 0
        END) AS new_customers_count,
        COUNT(CASE 
                WHEN date_time > MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN 1
                ELSE 0
        END) AS returning_customers_count,
        COUNT(CASE 
                WHEN uol.status = 'refunded' THEN 1
                ELSE 0
        END) AS refunded_customer_count,
        dc.week_of_year_iso AS period_name,
        dc.week_of_year AS period_id,
        uol.item_id,
        SUM(CASE 
                WHEN date_time = MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN oul.payment_amount
                ELSE 0
        END) AS new_customers_revenue,
        SUM(CASE 
                WHEN date_time > MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN oul.payment_amount
                ELSE 0
        END) AS returning_customers_revenue,
        COUNT(CASE 
                WHEN uol.status = 'refunded' THEN oul.payment_amount
                ELSE 0
        END) AS customers_refunded
FROM staging.user_order_log uol
LEFT JOIN mart.d_calendar as dc on uol.date_time::Date = dc.date_actual
WHERE uol.date_time::Date = '{{ds}}'
GROUP BY dc.week_of_year_iso, dc.week_of_year, uol.item_id;
Файл mart.d_city.sql ¶
Файл mart.d_city.sql содержит исходный скрипт обновления данных в таблице mart.d_city.
INSERT INTO mart.d_city (city_id, city_name)
SELECT city_id, city_name FROM staging.user_order_log
WHERE city_id NOT IN (SELECT city_id FROM mart.d_city)
GROUP BY city_id, city_name;
Файл mart.d_customer.sql ¶
Файл mart.d_customer.sql содержит исходный скрипт обновления данных в таблице mart.d_customer.
INSERT INTO mart.d_customer (customer_id, first_name, last_name, city_id)
SELECT customer_id, first_name, last_name, MAX(city_id) FROM staging.user_order_log
WHERE customer_id NOT IN (SELECT customer_id FROM mart.d_customer)
GROUP BY customer_id, first_name, last_name
Файл mart.d_item.sql ¶
Файл mart.d_item.sql содержит исходный скрипт обновления данных в таблице mart.d_item.
INSERT INTO mart.d_item (item_id, item_name)
SELECT item_id, item_name FROM staging.user_order_log
WHERE item_id NOT IN (SELECT item_id FROM mart.d_item)
GROUP BY item_id, item_name;
Файл sales_mart_upgrade.py ¶
Файл sales_mart_upgrade.py содержит скрипт DAG пайплайна изменения структуры таблиц хранилища данных.
import time
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.hooks.http_hook import HttpHook
# Библиотека для использования переменных Variable, заданных в Airflow
from airflow.models import Variable
# Параметры подключения к серверу
http_conn_id = HttpHook.get_connection('http_conn_id')
api_key = '5f55e6c0-e9e5-4a9c-b313-63c01fc31460'
base_url = http_conn_id.host
POSTGRES_CONN_ID = 'postgresql_de'
# Глобальные переменые заглавными буквами
# Испльзование переменных Variable, заданных в Airflow
NICKNAME = Variable.get('nickname')
COHORT = Variable.get('cohort')
headers = {
    'X-Nickname': NICKNAME,
    'X-Cohort': COHORT,
    'X-Project': 'True',
    'X-API-KEY': api_key,
    'Content-Type': 'application/x-www-form-urlencoded'
}
# DAG
args = {
    "owner": "student",
    'email': ['student@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3 # Больше циклов в случае не срабатывания подключения
}
business_dt = '{{ ds }}'
with DAG(
        'sales_mart_upgrade',
        default_args=args,
        description='Provide default dag for sprint3',
        catchup=True,
        start_date=datetime.today() - timedelta(days=7),
        end_date=datetime.today() - timedelta(days=1),
) as dag:
    # Добавление столбца в таблицу mart.f_sales
    f_sales_add_column = PostgresOperator(
        task_id='f_sales_add_column',
        postgres_conn_id=POSTGRES_CONN_ID,
        sql="/migrations/mart.f_sales_add_column.sql"
    )
    # Создание таблицы mart.f_customer_retention
    f_customer_retention_create_table = PostgresOperator(
        task_id='f_customer_retention_create_table',
        postgres_conn_id=POSTGRES_CONN_ID,
        sql="/migrations/mart.f_customer_retention_create_table.sql"
    )
    (
            f_sales_add_column
    )
Файл sales_mart_new.py ¶
Файл sales_mart_new.py содержит скрипт DAG пайплайна обновления данных в хранилище.
import time
import requests
import json
import pandas as pd
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.hooks.http_hook import HttpHook
# Библиотека для использования переменных Variable, заданных в Airflow
from airflow.models import Variable
# Параметры подключения к серверу
http_conn_id = HttpHook.get_connection('http_conn_id')
api_key = '5f55e6c0-e9e5-4a9c-b313-63c01fc31460' #http_conn_id.extra_dejson.get('api_key')
base_url = http_conn_id.host
POSTGRES_CONN_ID = 'postgresql_de'
# Глобальные переменые заглавными буквами
# Испльзование переменных Variable, заданных в Airflow
NICKNAME = Variable.get('nickname')
COHORT = Variable.get('cohort')
headers = {
    'X-Nickname': NICKNAME,
    'X-Cohort': COHORT,
    'X-Project': 'True',
    'X-API-KEY': api_key,
    'Content-Type': 'application/x-www-form-urlencoded'
}
# Инициализация формирование данных (отчета) по API
def generate_report(ti):
    # Здесь можно применить библиотеку logging (https://www.astronomer.io/guides/logging/)
    # с логами типа info, warning, error
    print('Making request generate_report')
    response = requests.post(f'{base_url}/generate_report', headers=headers)
    response.raise_for_status()
    task_id = json.loads(response.content)['task_id']
    ti.xcom_push(key='task_id', value=task_id)
    print(f'Response is {response.content}')
# Получение данных (отчета) по API
def get_report(ti):
    print('Making request get_report')
    task_id = ti.xcom_pull(key='task_id')
    report_id = None
    for i in range(20):
        response = requests.get(f'{base_url}/get_report?task_id={task_id}', headers=headers)
        response.raise_for_status()
        print(f'Response is {response.content}')
        status = json.loads(response.content)['status']
        if status == 'SUCCESS':
            report_id = json.loads(response.content)['data']['report_id']
            break
        else:
            time.sleep(120)
    if not report_id:
        raise TimeoutError('Истек срок ожидания ответа для переменной "report_id"')
    ti.xcom_push(key='report_id', value=report_id)
    print(f'Report_id={report_id}')
# Получение данных (отчета) из API за те даты, 
# которые не вошли в основной отчет
def get_increment(date, ti):
    print('Making request get_increment')
    report_id = ti.xcom_pull(key='report_id')
    response = requests.get(
        f'{base_url}/get_increment?report_id={report_id}&date={str(date)}T00:00:00',
        headers=headers)
    response.raise_for_status()
    print(f'Response is {response.content}')
    increment_id = json.loads(response.content)['data']['increment_id']
    if not increment_id:
        raise ValueError(f'Increment is empty. Most probably due to error in API call.')
    ti.xcom_push(key='increment_id', value=increment_id)
    print(f'increment_id={increment_id}')
# Обновление данных в слое Staging
def upload_data_to_staging(filename, date, pg_table, pg_schema, ti):
    increment_id = ti.xcom_pull(key='increment_id')
    s3_filename = f'https://storage.yandexcloud.net/s3-sprint3/cohort_{COHORT}/{NICKNAME}/project/{increment_id}/{filename}'
    print(s3_filename)
    local_filename = date.replace('-', '') + '_' + filename
    print(local_filename)
    response = requests.get(s3_filename)
    response.raise_for_status()
    open(f"{local_filename}", "wb").write(response.content)
    print(response.content)
    df = pd.read_csv(local_filename, index_col=0)
    #df=df.drop('id', axis=1) # Использование index_col=0 в read_csv вместо удаления столбца
    df=df.drop_duplicates(subset=['uniq_id'])
    if 'status' not in df.columns:
        df['status'] = 'shipped'
    postgres_hook = PostgresHook(POSTGRES_CONN_ID)
    engine = postgres_hook.get_sqlalchemy_engine()
    row_count = df.to_sql(pg_table, engine, schema=pg_schema, if_exists='append', index=False)
    print(f'{row_count} rows was inserted')
# DAG
args = {
    "owner": "student",
    'email': ['student@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3 # Больше циклов в случае не срабатывания подключения
}
business_dt = '{{ ds }}'
with DAG(
        'sales_mart_new',
        default_args=args,
        description='Provide default dag for sprint3',
        catchup=True,
        start_date=datetime.today() - timedelta(days=7),
        end_date=datetime.today() - timedelta(days=1),
) as dag:
    # Инициализация формирование данных (отчета) по API
    generate_report = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report)
    # Получение данных (отчета) по API
    get_report = PythonOperator(
        task_id='get_report',
        python_callable=get_report)
    # Получение данных (отчета) из API за те даты, 
    # которые не вошли в основной отчет
    get_increment = PythonOperator(
        task_id='get_increment',
        python_callable=get_increment,
        op_kwargs={'date': business_dt})
    # Обновление данных в таблице staging.user_order_log
    upload_user_order_inc = PythonOperator(
        task_id='upload_user_order_inc',
        python_callable=upload_data_to_staging,
        op_kwargs={'date': business_dt,
                   'filename': 'user_order_log_inc.csv',
                   'pg_table': 'user_order_log',
                   'pg_schema': 'staging'})
    # Обновление данных в таблицах уровня витрин 'mart'
    for i in ['d_item', 'd_customer', 'd_city']:
        dementions_tasks.append = PostgresOperator(
            task_id = 'update_' + i,
            postgres_conn_id = POSTGRES_CONN_ID,
            sql =f'/migrations/mart.{i}.sql')
    # Обновление данных в таблице 'mart.f_sales'
    # (в том числе 'status' и зависимого от него 'payment_amount')
    # Не предуспотрена обратная совместимость, 
    # если в таблице 'staging.user_order_log' нет поля 'status'
    update_f_sales = PostgresOperator(
        task_id='update_f_sales',
        postgres_conn_id=POSTGRES_CONN_ID,
        sql="/migrations/mart.f_sales_new.sql",
        parameters={"date": {business_dt}}
    )
    (
            generate_report
            >> get_report
            >> get_increment
            >> upload_user_order_inc
            >> [dementions_tasks[0], dementions_tasks[1], dementions_tasks[2]]
            >> update_f_sales
    )
