Microsegment.ru
  • Главная страница
  • О проекте
  • Портфолио
  • Блог

ETL и автоматизация подготовки данных

В этом проекте требуется изменить процессы в пайплайне так, чтобы они соответствовали новым задачам бизнеса. Проект разделен на два этапа развития пайплайна — требуется постепенно дорабатывать его, получая новые вводные от заказчика. В итоге должен получится код, который включает в себя решения для каждой ситуации.

Этап 1

Разработанный ранее пайплайн, работает, и компания успешно им пользуются. Однако, во-первых, на данных из витрины mart.f_sales BI-аналитики построили графики total revenue (Revenue — это суммарный доход от продаж. Рассчитывается по формуле revenue = SUM(payment_amount)) для различных срезов. Во-вторых, система магазина продолжает развиваться: команда разработки добавила функционал отмены заказов и возврата средств (refunded). Значит, процессы в пайплайне нужно обновить.

Требуется адаптировать существующий пайплайн для текущей задачи:

  1. Учтесть в витрине mart.f_sales статусы shipped и refunded. Все данные в витрине следует считать shipped.
  2. Обновить пайплайн с учётом статусов и backward compatibility.

Этап 2

Компания решила лучше изучить поведение клиентов. Для этого она хочет исследовать возвращаемость клиентов, или customer retention (Customer retention (пер. «удержание клиента») — это критерий, который учитывает, как долго клиент пользуется услугами компании после первой успешной сделки. Чем лояльнее клиенты к компании, тем выше показатель Customer retention). Выяснилось, что на текущий момент отчёт по customer retention строится очень долго. Поэтому есть потребность вычислить нужные метрики в дополнительной витрине.

На основе пайплайна из сквозной задачи спринта требуется наполнить витрину mart.f_customer_retention данными по «возвращаемости клиентов» в разрезе недель.

Структура репозитория проекта¶

  1. Папка migrations хранит файлы миграции с расширением .sql и содержат SQL-скрипты обновления базы данных.
    • mart.f_sales_add_column.sql
    • mart.f_customer_retention_create_table.sql
    • mart.f_sales_new.sql
    • mart.f_customer_retention_new.sql
    • mart.d_city.sql
    • mart.d_customer.sql
    • mart.d_item.sql
  2. В папке src хранятся все необходимые исходники.
  3. Папка src/dags содержит DAG's Airflow:
    • sales_mart_upgrade.py
    • sales_mart_new.py

Файл mart.f_sales_add_column.sql ¶

Файл mart.f_sales_add_column.sql содержит скрипт добавления столбцов в таблицу mart.f_sales в рамках 1 этапа.

ALTER TABLE mart.f_sales ADD COLUMN 'status' DATA_TYPE VARCHAR(10) DEFAULT 'shipped';

Файл mart.f_customer_retention_create_table.sql ¶

Файл mart.f_customer_retention_create_table.sql содержит скрипт создания таблицы mart.f_customer_retention в рамках 2 этапа данного проекта.

CREATE TABLE mart.f_customer_retention (
    new_customers_count INT NOT NULL,
    returning_customers_count INT NOT NULL,
    refunded_customer_count INT NOT NULL,
    period_name VARCHAR(30),
    period_id INT NOT NULL,
    item_id INT NOT NULL,
    new_customers_revenue NUMERIC(11, 2), -- требуется указать параметры числа
    returning_customers_revenue NUMERIC(11, 2), -- требуется указать параметры числа
    customers_refunded INT, 

    CONSTRAINT fk_period_name FOREIGN KEY period_name REFERENCES mart.d_calendar (week_of_year_iso),
    CONSTRAINT fk_period_id FOREIGN KEY period_id REFERENCES mart.d_calendar (week_of_year),
    CONSTRAINT fk_item_id FOREIGN KEY item_id REFERENCES mart.d_item (id)
);

Файл mart.f_sales_new.sql ¶

Файл mart.f_sales_new.sql содержит скрипт обновления данных в таблице mart.f_sales в рамках 1 этапа данного проекта.

insert into mart.f_sales (date_id, 
                          item_id, 
                          customer_id, 
                          city_id, 
                          quantity, 
                          payment_amount
                          status)
select  dc.date_id, 
        uol.item_id, 
        uol.customer_id, 
        uol.city_id, 
        uol.quantity, 
        CASE
            WHEN uol.status = 'shipped' THEN uol.payment_amount = -uol.payment_amount
            WHEN uol.status IS NULL THEN uol.payment_amount = uol.payment_amount
            ELSE uol.payment_amount = uol.payment_amount
        END AS payment_amount
        uol.status
from staging.user_order_log uol
left join mart.d_calendar as dc on uol.date_time::Date = dc.date_actual
where uol.date_time::Date = '{{ds}}';

Файл mart.f_customer_retention_new.sql ¶

Файл mart.f_customer_retention_new.sql скрипт обновления данных в таблице mart.f_customer_retention в рамках 2 этапа данного проекта.

insert into mart.f_customer_retention ( new_customers_count,
                                        returning_customers_count,
                                        refunded_customer_count,
                                        period_name,
                                        period_id,
                                        item_id,
                                        new_customers_revenue,
                                        returning_customers_revenue,
                                        customers_refunded)
SELECT  COUNT(CASE 
                WHEN date_time = MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN 1
                ELSE 0
        END) AS new_customers_count,
        COUNT(CASE 
                WHEN date_time > MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN 1
                ELSE 0
        END) AS returning_customers_count,
        COUNT(CASE 
                WHEN uol.status = 'refunded' THEN 1
                ELSE 0
        END) AS refunded_customer_count,
        dc.week_of_year_iso AS period_name,
        dc.week_of_year AS period_id,
        uol.item_id,
        SUM(CASE 
                WHEN date_time = MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN oul.payment_amount
                ELSE 0
        END) AS new_customers_revenue,
        SUM(CASE 
                WHEN date_time > MIN(date_time) OVER (PARTITION BY oul.customer_id ORDER BY oul.date_time) THEN oul.payment_amount
                ELSE 0
        END) AS returning_customers_revenue,
        COUNT(CASE 
                WHEN uol.status = 'refunded' THEN oul.payment_amount
                ELSE 0
        END) AS customers_refunded
FROM staging.user_order_log uol
LEFT JOIN mart.d_calendar as dc on uol.date_time::Date = dc.date_actual
WHERE uol.date_time::Date = '{{ds}}'
GROUP BY dc.week_of_year_iso, dc.week_of_year, uol.item_id;

Файл mart.d_city.sql ¶

Файл mart.d_city.sql содержит исходный скрипт обновления данных в таблице mart.d_city.

INSERT INTO mart.d_city (city_id, city_name)
SELECT city_id, city_name FROM staging.user_order_log
WHERE city_id NOT IN (SELECT city_id FROM mart.d_city)
GROUP BY city_id, city_name;

Файл mart.d_customer.sql ¶

Файл mart.d_customer.sql содержит исходный скрипт обновления данных в таблице mart.d_customer.

INSERT INTO mart.d_customer (customer_id, first_name, last_name, city_id)
SELECT customer_id, first_name, last_name, MAX(city_id) FROM staging.user_order_log
WHERE customer_id NOT IN (SELECT customer_id FROM mart.d_customer)
GROUP BY customer_id, first_name, last_name

Файл mart.d_item.sql ¶

Файл mart.d_item.sql содержит исходный скрипт обновления данных в таблице mart.d_item.

INSERT INTO mart.d_item (item_id, item_name)
SELECT item_id, item_name FROM staging.user_order_log
WHERE item_id NOT IN (SELECT item_id FROM mart.d_item)
GROUP BY item_id, item_name;

Файл sales_mart_upgrade.py ¶

Файл sales_mart_upgrade.py содержит скрипт DAG пайплайна изменения структуры таблиц хранилища данных.

import time
import requests
import json
import pandas as pd

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.hooks.http_hook import HttpHook
# Библиотека для использования переменных Variable, заданных в Airflow
from airflow.models import Variable


# Параметры подключения к серверу
http_conn_id = HttpHook.get_connection('http_conn_id')
api_key = '5f55e6c0-e9e5-4a9c-b313-63c01fc31460'
base_url = http_conn_id.host

POSTGRES_CONN_ID = 'postgresql_de'

# Глобальные переменые заглавными буквами
# Испльзование переменных Variable, заданных в Airflow
NICKNAME = Variable.get('nickname')
COHORT = Variable.get('cohort')

headers = {
    'X-Nickname': NICKNAME,
    'X-Cohort': COHORT,
    'X-Project': 'True',
    'X-API-KEY': api_key,
    'Content-Type': 'application/x-www-form-urlencoded'
}


# DAG
args = {
    "owner": "student",
    'email': ['student@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3 # Больше циклов в случае не срабатывания подключения
}

business_dt = '{{ ds }}'

with DAG(
        'sales_mart_upgrade',
        default_args=args,
        description='Provide default dag for sprint3',
        catchup=True,
        start_date=datetime.today() - timedelta(days=7),
        end_date=datetime.today() - timedelta(days=1),
) as dag:
    # Добавление столбца в таблицу mart.f_sales
    f_sales_add_column = PostgresOperator(
        task_id='f_sales_add_column',
        postgres_conn_id=POSTGRES_CONN_ID,
        sql="/migrations/mart.f_sales_add_column.sql"
    )

    # Создание таблицы mart.f_customer_retention
    f_customer_retention_create_table = PostgresOperator(
        task_id='f_customer_retention_create_table',
        postgres_conn_id=POSTGRES_CONN_ID,
        sql="/migrations/mart.f_customer_retention_create_table.sql"
    )

    (
            f_sales_add_column
    )

Файл sales_mart_new.py ¶

Файл sales_mart_new.py содержит скрипт DAG пайплайна обновления данных в хранилище.

import time
import requests
import json
import pandas as pd

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.hooks.http_hook import HttpHook
# Библиотека для использования переменных Variable, заданных в Airflow
from airflow.models import Variable


# Параметры подключения к серверу
http_conn_id = HttpHook.get_connection('http_conn_id')
api_key = '5f55e6c0-e9e5-4a9c-b313-63c01fc31460' #http_conn_id.extra_dejson.get('api_key')
base_url = http_conn_id.host

POSTGRES_CONN_ID = 'postgresql_de'

# Глобальные переменые заглавными буквами
# Испльзование переменных Variable, заданных в Airflow
NICKNAME = Variable.get('nickname')
COHORT = Variable.get('cohort')

headers = {
    'X-Nickname': NICKNAME,
    'X-Cohort': COHORT,
    'X-Project': 'True',
    'X-API-KEY': api_key,
    'Content-Type': 'application/x-www-form-urlencoded'
}


# Инициализация формирование данных (отчета) по API
def generate_report(ti):
    # Здесь можно применить библиотеку logging (https://www.astronomer.io/guides/logging/)
    # с логами типа info, warning, error
    print('Making request generate_report')

    response = requests.post(f'{base_url}/generate_report', headers=headers)
    response.raise_for_status()
    task_id = json.loads(response.content)['task_id']
    ti.xcom_push(key='task_id', value=task_id)
    print(f'Response is {response.content}')


# Получение данных (отчета) по API
def get_report(ti):
    print('Making request get_report')
    task_id = ti.xcom_pull(key='task_id')

    report_id = None

    for i in range(20):
        response = requests.get(f'{base_url}/get_report?task_id={task_id}', headers=headers)
        response.raise_for_status()
        print(f'Response is {response.content}')
        status = json.loads(response.content)['status']
        if status == 'SUCCESS':
            report_id = json.loads(response.content)['data']['report_id']
            break
        else:
            time.sleep(120)

    if not report_id:
        raise TimeoutError('Истек срок ожидания ответа для переменной "report_id"')

    ti.xcom_push(key='report_id', value=report_id)
    print(f'Report_id={report_id}')


# Получение данных (отчета) из API за те даты, 
# которые не вошли в основной отчет
def get_increment(date, ti):
    print('Making request get_increment')
    report_id = ti.xcom_pull(key='report_id')
    response = requests.get(
        f'{base_url}/get_increment?report_id={report_id}&date={str(date)}T00:00:00',
        headers=headers)
    response.raise_for_status()
    print(f'Response is {response.content}')

    increment_id = json.loads(response.content)['data']['increment_id']
    if not increment_id:
        raise ValueError(f'Increment is empty. Most probably due to error in API call.')

    ti.xcom_push(key='increment_id', value=increment_id)
    print(f'increment_id={increment_id}')


# Обновление данных в слое Staging
def upload_data_to_staging(filename, date, pg_table, pg_schema, ti):
    increment_id = ti.xcom_pull(key='increment_id')
    s3_filename = f'https://storage.yandexcloud.net/s3-sprint3/cohort_{COHORT}/{NICKNAME}/project/{increment_id}/{filename}'
    print(s3_filename)
    local_filename = date.replace('-', '') + '_' + filename
    print(local_filename)
    response = requests.get(s3_filename)
    response.raise_for_status()
    open(f"{local_filename}", "wb").write(response.content)
    print(response.content)

    df = pd.read_csv(local_filename, index_col=0)
    #df=df.drop('id', axis=1) # Использование index_col=0 в read_csv вместо удаления столбца
    df=df.drop_duplicates(subset=['uniq_id'])

    if 'status' not in df.columns:
        df['status'] = 'shipped'

    postgres_hook = PostgresHook(POSTGRES_CONN_ID)
    engine = postgres_hook.get_sqlalchemy_engine()
    row_count = df.to_sql(pg_table, engine, schema=pg_schema, if_exists='append', index=False)
    print(f'{row_count} rows was inserted')


# DAG
args = {
    "owner": "student",
    'email': ['student@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3 # Больше циклов в случае не срабатывания подключения
}

business_dt = '{{ ds }}'

with DAG(
        'sales_mart_new',
        default_args=args,
        description='Provide default dag for sprint3',
        catchup=True,
        start_date=datetime.today() - timedelta(days=7),
        end_date=datetime.today() - timedelta(days=1),
) as dag:
    # Инициализация формирование данных (отчета) по API
    generate_report = PythonOperator(
        task_id='generate_report',
        python_callable=generate_report)

    # Получение данных (отчета) по API
    get_report = PythonOperator(
        task_id='get_report',
        python_callable=get_report)

    # Получение данных (отчета) из API за те даты, 
    # которые не вошли в основной отчет
    get_increment = PythonOperator(
        task_id='get_increment',
        python_callable=get_increment,
        op_kwargs={'date': business_dt})

    # Обновление данных в таблице staging.user_order_log
    upload_user_order_inc = PythonOperator(
        task_id='upload_user_order_inc',
        python_callable=upload_data_to_staging,
        op_kwargs={'date': business_dt,
                   'filename': 'user_order_log_inc.csv',
                   'pg_table': 'user_order_log',
                   'pg_schema': 'staging'})

    # Обновление данных в таблицах уровня витрин 'mart'
    for i in ['d_item', 'd_customer', 'd_city']:
        dementions_tasks.append = PostgresOperator(
            task_id = 'update_' + i,
            postgres_conn_id = POSTGRES_CONN_ID,
            sql =f'/migrations/mart.{i}.sql')

    # Обновление данных в таблице 'mart.f_sales'
    # (в том числе 'status' и зависимого от него 'payment_amount')
    # Не предуспотрена обратная совместимость, 
    # если в таблице 'staging.user_order_log' нет поля 'status'
    update_f_sales = PostgresOperator(
        task_id='update_f_sales',
        postgres_conn_id=POSTGRES_CONN_ID,
        sql="/migrations/mart.f_sales_new.sql",
        parameters={"date": {business_dt}}
    )

    (
            generate_report
            >> get_report
            >> get_increment
            >> upload_user_order_inc
            >> [dementions_tasks[0], dementions_tasks[1], dementions_tasks[2]]
            >> update_f_sales
    )

Политика конфиденциальности

Продолжая использовать данный сайт вы подтверждаете свое согласие с условиями его политики конфиденциальности. Подробнее…




Администрация и владельцы данного информационного ресурса не несут ответственности за возможные последствия, связанные с использованием информации, размещенной на нем.


Все права защищены. При копировании материалов сайта обязательно указывать ссылку на © Microsegment.ru (2020-2025)