Требуется создать аналитическое хранилище для соцсети Architech Social. Хранилище требуется в рамках проекта развертывания этой новой соцсети на англоязычном рынке в Австралии. Данные в хранилище должны ответить на несколько вопросов бизнеса.
Развертывание соцсети является софт лончем (пробным запуском) для тестирования возможностей сети и отработки фич. Ожидается прирост количества данных. Из-за этого для хранения данных выбрана аналитическая база данных Vertica вместо прежних Postgres и MongoDB. Vertica надежная, относительно недорогая и легко масштабируется.
Вопросы бизнеса:
- Пользователи какого возраста занимают топ-5 позиций по количеству сообщений в самых старых группах? Австралийцы хотят пропушить аудиторию играми. Надо понять, на какой возраст ориентировать геймификацию.
- Какова конверсия пользователей, вступивших в группы в активных пользователей? То есть, требуется найти долю пользователей, которые пишут в группах, относительно пользователей, которые просто вступили в группу.
Описание даных¶
Данные проекта можно разделить на базовые для создания аналитического хранилища и основные:
- Базовые данные:
users.csv
— данные о пользователях;groups.csv
— информация о группах;dialogs.csv
— информация о сообщениях.
- Основные данные:
group_log.csv
— лог групп.
Базовые даные для создания хранилища¶
Файл users.csv
:
id
— уникальный идентификатор пользователя;chat_name
— имя полшьзователя/название чата;registration_dt
— дата регистрации пользователя;country
— страна пользователя;age
— возраст.
Файл groups.csv
:
id
— идентификатор группы пользователей;admin_id
— идентификатор администратора;group_name
— название группы;registration_dt
— дата регистрации группы;is_private
— является ли группа приватной.
Файл dialogs.csv
:
message_id
— это идентификатор сообщения;message_ts
— дата и время сообщения;message_from
— идентификатор отправителя сообщения;message_to
— идентификатор получателя;message
— текст сообщения;message_group
— группа сообщения.
Основные даные проекта¶
Файл group_log.csv
:
group_id
— уникальный идентификатор группы;user_id
— уникальный идентификатор пользователя;user_id_from
— поле для отметки о том, что пользователь не сам вступил в группу, а его добавил другой участник. Если пользователя пригласил в группу кто-то другой, поле будет непустым;event
— действие, которое совершено пользователемuser_id
. Возможны следующие варианты:create
— пользователь создал группу;add
— пользователь user_id вступил сам или был добавлен в группу;leave
— пользователь user_id покинул группу;
datetime
— время совершения event.
1. Анализ даных ¶
Анализ даных осуществлен с помощью Pandas и других библиотек Python.
# Подготовка тетради
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.decorators import dag
from airflow.hooks.base import BaseHook
from airflow.operators.dummy_operator import DummyOperator
import pendulum
import os # Работа с файловой системой
import boto3 # Работа с файловой системой S3
from io import StringIO # Разбор строк
import vertica_python # Работа с Vertica
import pandas as pd
import numpy as np
# Параметры доступа к хранилищу S3
YANDEX_CLOUD_S3_SPRINT6 = BaseHook.get_connection('YANDEX_CLOUD_S3_SPRINT6')
# Передача параметров доступа к S3 из BaseHook
AWS_ACCESS_KEY_ID = YANDEX_CLOUD_S3_SPRINT6.login
AWS_SECRET_ACCESS_KEY = YANDEX_CLOUD_S3_SPRINT6.password
AWS_SERVICE_NAME = 's3'
AWS_ENDPOINT_URL = 'https://storage.yandexcloud.net'
BUCKET = 'sprint6'
# Параметры доступа к Vertica
YANDEX_CLOUD_VERTICA_SPRINT6 = BaseHook.get_connection('YANDEX_CLOUD_VERTICA_SPRINT6')
# Передача параметров доступа к Vertica из BaseHook
VERTICA_HOST = YANDEX_CLOUD_VERTICA_SPRINT6.host
VERTICA_PORT = YANDEX_CLOUD_VERTICA_SPRINT6.port
VERTICA_USER = YANDEX_CLOUD_VERTICA_SPRINT6.login
VERTICA_PASSWORD = YANDEX_CLOUD_VERTICA_SPRINT6.password
# Чтение файла CSV из S3 по чанкам
# (биение файла на чанки может упростить чтение/запись очень больших файлов)
def reading_csv_from_s3(file_name, chunksize):
# Доступ к хранилищу S3
session = boto3.session.Session()
s3_client = session.client(
service_name=AWS_SERVICE_NAME,
endpoint_url=AWS_ENDPOINT_URL,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
# Чтение файла CSV из S3 по чанкам
key = file_name + '.csv'
obj = s3_client.get_object(Bucket=BUCKET, Key=key)
body = obj['Body'].read().decode('utf-8')
data = StringIO(body)
chunks = pd.read_csv(data, chunksize=chunksize)
return chunks
# Название файлов CSV в S3 и одноименных таблиц в Vertica
file_names = ['users', 'groups', 'dialogs', 'group_log']
# Размер чанок файла CSV
chunksize = 100000
# Подготовка данных для анализа
for file_name in file_names:
chunks = reading_csv_from_s3(file_name, chunksize)
# Объединение чанок в единые датафреймы
df = pd.DataFrame()
for i in chunks:
df = pd.concat([df, i], ignore_index=True)
# Вывод информации о данных
print()
print('-'*36, '\n')
print(f'\033[1mФайл {file_name}.csv\033[0m')
print('\nМетаинформация:\n')
display(i.info())
print('\nСтатистика числовых даных:\n')
display(i.describe())
print('\nСтатистика не числовых данных:\n')
numeric_columns = df.select_dtypes(include=np.number).columns
display(df.drop(columns=numeric_columns).describe())
print('\nКоличество уникальных значений и пропусков в данных:\n')
display(pd.DataFrame({
#'Column': df.columns,
'Total count': df.count(),
'Unique count': df.nunique(),
'Null count': df.isnull().sum()
}))
print('\nПример данных:\n')
display(i.head())
------------------------------------
Файл users.csv
Метаинформация:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 70000 entries, 100000 to 169999
Data columns (total 5 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 id 70000 non-null int64
1 chat_name 70000 non-null object
2 registration_dt 69939 non-null object
3 country 70000 non-null object
4 age 70000 non-null int64
dtypes: int64(2), object(3)
memory usage: 2.7+ MB
None
Статистика числовых даных:
id | age | |
---|---|---|
count | 70000.000000 | 70000.000000 |
mean | 134999.500000 | 39.004529 |
std | 20207.403759 | 16.405125 |
min | 100000.000000 | 18.000000 |
25% | 117499.750000 | 22.000000 |
50% | 134999.500000 | 37.000000 |
75% | 152499.250000 | 56.000000 |
max | 169999.000000 | 999.000000 |
Статистика не числовых данных:
chat_name | registration_dt | country | |
---|---|---|---|
count | 170000 | 169850 | 170000 |
unique | 80 | 155845 | 100 |
top | AlexLuter King | 2020-11-08 10:43:27 | Algeria |
freq | 2220 | 5 | 1798 |
Количество уникальных значений и пропусков в данных:
Total count | Unique count | Null count | |
---|---|---|---|
id | 170000 | 170000 | 0 |
chat_name | 170000 | 80 | 0 |
registration_dt | 169850 | 155845 | 150 |
country | 170000 | 100 | 0 |
age | 170000 | 48 | 0 |
Пример данных:
id | chat_name | registration_dt | country | age | |
---|---|---|---|---|---|
100000 | 100000 | MaksimBissmark | 2021-03-15 09:33:24 | Egypt | 35 |
100001 | 100001 | ValeryaTolmachev | 2021-03-12 10:55:48 | China | 63 |
100002 | 100002 | MaksimKovega | 2021-01-14 01:39:11 | Cyprus | 20 |
100003 | 100003 | PavelBissmark | 2021-01-17 11:17:16 | Honduras | 60 |
100004 | 100004 | MaksimMendoza | 2021-02-11 15:43:27 | Azerbaijan | 39 |
------------------------------------
Файл groups.csv
Метаинформация:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 250 entries, 0 to 249
Data columns (total 5 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 id 250 non-null int64
1 admin_id 250 non-null int64
2 group_name 250 non-null object
3 registration_dt 250 non-null object
4 is_private 250 non-null int64
dtypes: int64(3), object(2)
memory usage: 9.9+ KB
None
Статистика числовых даных:
id | admin_id | is_private | |
---|---|---|---|
count | 250.000000 | 250.000000 | 250.000000 |
mean | 124.500000 | 82093.208000 | 0.512000 |
std | 72.312977 | 48441.445203 | 0.500859 |
min | 0.000000 | 57.000000 | 0.000000 |
25% | 62.250000 | 41478.250000 | 0.000000 |
50% | 124.500000 | 79313.000000 | 1.000000 |
75% | 186.750000 | 123780.000000 | 1.000000 |
max | 249.000000 | 169669.000000 | 1.000000 |
Статистика не числовых данных:
group_name | registration_dt | |
---|---|---|
count | 250 | 250 |
unique | 250 | 250 |
top | самолеты 90-ые как | 2021-02-10 05:31:20.990338 |
freq | 1 | 1 |
Количество уникальных значений и пропусков в данных:
Total count | Unique count | Null count | |
---|---|---|---|
id | 250 | 250 | 0 |
admin_id | 250 | 250 | 0 |
group_name | 250 | 250 | 0 |
registration_dt | 250 | 250 | 0 |
is_private | 250 | 2 | 0 |
Пример данных:
id | admin_id | group_name | registration_dt | is_private | |
---|---|---|---|---|---|
0 | 0 | 18583 | самолеты 90-ые как | 2021-02-10 05:31:20.990338 | 0 |
1 | 1 | 24532 | ох из двора хах | 2021-01-24 15:37:26.301895 | 1 |
2 | 2 | 89388 | собачники новая эра подписка 500р | 2021-01-01 12:18:49.020925 | 0 |
3 | 3 | 3044 | рожденные быть молодыми я и ты подписка 1000р | 2021-01-30 07:21:48.450242 | 1 |
4 | 4 | 51788 | девушки перезагрузка и все | 2021-05-19 05:59:33.732218 | 0 |
------------------------------------
Файл dialogs.csv
Метаинформация:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 37747 entries, 1000000 to 1037746
Data columns (total 6 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 message_id 37747 non-null int64
1 message_ts 37747 non-null object
2 message_from 37747 non-null int64
3 message_to 37747 non-null int64
4 message 37745 non-null object
5 message_group 14276 non-null float64
dtypes: float64(1), int64(3), object(2)
memory usage: 1.7+ MB
None
Статистика числовых даных:
message_id | message_from | message_to | message_group | |
---|---|---|---|---|
count | 3.774700e+04 | 37747.000000 | 37747.000000 | 14276.000000 |
mean | 1.019437e+06 | 82155.855936 | 109642.716401 | 132.226044 |
std | 1.090378e+04 | 47407.019847 | 48370.414357 | 72.437737 |
min | 1.000555e+06 | 19.000000 | 22.000000 | 0.000000 |
25% | 1.009992e+06 | 40989.000000 | 71126.000000 | 70.000000 |
50% | 1.019436e+06 | 82662.000000 | 138148.000000 | 134.000000 |
75% | 1.028878e+06 | 123330.000000 | 149488.000000 | 199.000000 |
max | 1.038323e+06 | 164068.000000 | 149488.000000 | 249.000000 |
Статистика не числовых данных:
message_ts | message | |
---|---|---|
count | 1037747 | 1037658 |
unique | 1031300 | 863445 |
top | 2020-12-21 14:03:12.000000000 | thanks |
freq | 77 | 6252 |
Количество уникальных значений и пропусков в данных:
Total count | Unique count | Null count | |
---|---|---|---|
message_id | 1037747 | 1037747 | 0 |
message_ts | 1037747 | 1031300 | 0 |
message_from | 1037747 | 163954 | 0 |
message_to | 1037747 | 149450 | 0 |
message | 1037658 | 863445 | 89 |
message_group | 392027 | 250 | 645720 |
Пример данных:
message_id | message_ts | message_from | message_to | message | message_group | |
---|---|---|---|---|---|---|
1000000 | 1000555 | 2011-05-09 18:07:00.000000000 | 154167 | 149488 | in 11.04 | NaN |
1000001 | 1000556 | 2020-12-27 06:01:16.426412143 | 12029 | 140453 | theres a weather-indicator applet in the repo… | NaN |
1000002 | 1000557 | 2021-04-15 09:32:44.421327559 | 115810 | 149488 | whats the name of a good os x dock clone for l… | NaN |
1000003 | 1000558 | 2020-12-01 02:01:03.838251259 | 15958 | 105430 | superkaramba | 32.0 |
1000004 | 1000559 | 2021-04-14 05:11:28.909718273 | 115810 | 14569 | thanks | 229.0 |
------------------------------------
Файл group_log.csv
Метаинформация:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 89733 entries, 800000 to 889732
Data columns (total 5 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 group_id 89733 non-null int64
1 user_id 89733 non-null int64
2 user_id_from 15874 non-null float64
3 event 89733 non-null object
4 datetime 89733 non-null object
dtypes: float64(1), int64(2), object(2)
memory usage: 3.4+ MB
None
Статистика числовых даных:
group_id | user_id | user_id_from | |
---|---|---|---|
count | 89733.000000 | 89733.000000 | 15874.000000 |
mean | 131.619315 | 94535.531856 | 95475.870543 |
std | 72.217684 | 50553.165679 | 49574.214557 |
min | 0.000000 | 5.000000 | 8.000000 |
25% | 70.000000 | 48781.000000 | 53067.500000 |
50% | 133.000000 | 101651.000000 | 101775.500000 |
75% | 199.000000 | 149488.000000 | 149488.000000 |
max | 249.000000 | 169994.000000 | 164080.000000 |
Статистика не числовых данных:
event | datetime | |
---|---|---|
count | 889733 | 889733 |
unique | 3 | 869392 |
top | add | 2020-12-16 07:18:12 |
freq | 800748 | 4 |
Количество уникальных значений и пропусков в данных:
Total count | Unique count | Null count | |
---|---|---|---|
group_id | 889733 | 250 | 0 |
user_id | 889733 | 162750 | 0 |
user_id_from | 155755 | 63057 | 733978 |
event | 889733 | 3 | 0 |
datetime | 889733 | 869392 | 0 |
Пример данных:
group_id | user_id | user_id_from | event | datetime | |
---|---|---|---|---|---|
800000 | 158 | 104597 | NaN | add | 2021-04-11 04:51:52 |
800001 | 115 | 79071 | NaN | add | 2021-04-11 04:52:16 |
800002 | 203 | 149488 | NaN | add | 2021-04-11 04:52:17 |
800003 | 135 | 149488 | NaN | add | 2021-04-11 04:53:16 |
800004 | 46 | 149488 | NaN | add | 2021-04-11 04:53:27 |
2. Выводы из анализа данных ¶
Анализ данных выявил следующее:
- В проекте участвуют 4 файла формата
CSV
, полученные из хранилища S3 сервиса Yandex Cloud. - Проанализируемые датафреймы содержат числовые, текстовые и временные данные.
- Все датафреймы, кроме
groups
содержат пропуски в данных. - В датафреймах присутствуют поля с идентификаторами, являющимися первичными и внешними ключами в виде целочисленных положительных значений. Однако, часть из этих признаков имеют тип чисел с плавающей точкой. Например, признак
user_id_from
датафреймаgroup_log
. - Во всех проанализируемых датафреймах количество уникальных значений в признаках с идентификаторами, явлющимися первичными ключами, равно количеству объектов датафпейма.
Выводы:
- Признаки с идентификаторами требуется привести к целочисленному типу данных.
- Так как процесс преобразования данных в целочисленный тип в Pandas не предполагает наличие пропусков, требуется сначала привести все пропуски к отрицательному значению, а после преобразования типа данных заменить отрицательные значения на пропуски. Для данной операции подходят отрицательные значения, так как операция затронет только признаки с идентифиаторами, являющимися положительными числами.
3. Создание слоя STAGING в DWH на базе Vertica ¶
Создание слоя STAGING в DWH на базе Vertica осуществлено с помощью Data Defenition Language (DDL) в DBeaver.
3.A. Создание таблиц слоя STAGING ¶
Код SQL:
-- users - Пользователи
ALTER TABLE STV2025011410__STAGING.groups DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_groups_admin_id_fk;
ALTER TABLE STV2025011410__STAGING.dialogs DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_dialogs_message_from_fk;
ALTER TABLE STV2025011410__STAGING.dialogs DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_dialogs_message_to_fk;
ALTER TABLE STV2025011410__STAGING.dialogs DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_dialogs_message_id_pk;
ALTER TABLE STV2025011410__STAGING.dialogs DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_dialogs_message_id_unique;
ALTER TABLE STV2025011410__STAGING.users DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_users_id_pk;
ALTER TABLE STV2025011410__STAGING.users DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_users_id_unique;
ALTER TABLE STV2025011410__STAGING.users DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_users_age_unique;
DROP TABLE IF EXISTS STV2025011410__STAGING.users;
CREATE TABLE IF NOT EXISTS STV2025011410__STAGING.users(
id INT NOT NULL, -- уникальный идентификатор пользователя.
chat_name VARCHAR(200), -- имя полшьзователя/название чата.
registration_dt DATETIME, -- дата регистрации пользователя.
country VARCHAR(200), -- страна пользователя.
age INT, -- возраст.
CONSTRAINT STV2025011410__STAGING_users_id_pk PRIMARY KEY (id),
CONSTRAINT STV2025011410__STAGING_users_id_unique UNIQUE (id),
CONSTRAINT STV2025011410__STAGING_users_age_unique CHECK ((age > 0) AND (age < 1000))
)
ORDER BY id
SEGMENTED BY HASH(id) ALL NODES;
-- groups - Группы пользователей
ALTER TABLE STV2025011410__STAGING.groups DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_groups_id_pk;
ALTER TABLE STV2025011410__STAGING.groups DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_groups_id_unique;
ALTER TABLE STV2025011410__STAGING.dialogs DROP CONSTRAINT IF EXISTS STV2025011410__STAGING_dialogs_message_group_fk;
DROP TABLE IF EXISTS STV2025011410__STAGING.groups;
CREATE TABLE IF NOT EXISTS STV2025011410__STAGING.groups(
id INT NOT NULL, -- идентификатор группы пользователей.
admin_id INT NOT NULL, -- идентификатор администратора.
group_name VARCHAR(100) NOT NULL, -- название группы.
registration_dt DATETIME NOT NULL, -- дата регистрации группы.
is_private INT NOT NULL, -- является ли группа приватной.
CONSTRAINT STV2025011410__STAGING_groups_id_pk PRIMARY KEY (id),
CONSTRAINT STV2025011410__STAGING_groups_admin_id_fk FOREIGN KEY (admin_id) REFERENCES STV2025011410__STAGING.users (id),
CONSTRAINT STV2025011410__STAGING_groups_id_unique UNIQUE (id)
)
ORDER BY id, admin_id
SEGMENTED BY HASH(id) ALL NODES
PARTITION BY registration_dt::DATE
GROUP BY calendar_hierarchy_day(registration_dt::DATE, 3, 2);
--ALTER TABLE STV2025011410__STAGING.groups ADD CONSTRAINT STV2025011410__STAGING_groups_admin_id_fk FOREIGN KEY (admin_id) REFERENCES STV2025011410__STAGING.users (id);
--ALTER TABLE STV2025011410__STAGING.groups ADD CONSTRAINT STV2025011410__STAGING_groups_id_unique UNIQUE (id);
-- dialogs - Диалоги между пользователями
DROP TABLE IF EXISTS STV2025011410__STAGING.dialogs;
CREATE TABLE IF NOT EXISTS STV2025011410__STAGING.dialogs(
message_id INT NOT NULL, -- это идентификатор сообщения.
message_ts DATETIME, -- дата и время сообщения.
message_from INT, -- идентификатор отправителя сообщения.
message_to INT, -- идентификатор получателя.
message VARCHAR(1000), -- текст сообщения.
message_group INT, -- группа сообщения.
CONSTRAINT STV2025011410__STAGING_dialogs_message_id_pk PRIMARY KEY (message_id),
CONSTRAINT STV2025011410__STAGING_dialogs_message_from_fk FOREIGN KEY (message_from) REFERENCES STV2025011410__STAGING.users (id),
CONSTRAINT STV2025011410__STAGING_dialogs_message_to_fk FOREIGN KEY (message_to) REFERENCES STV2025011410__STAGING.users (id),
CONSTRAINT STV2025011410__STAGING_dialogs_message_group_fk FOREIGN KEY (message_group) REFERENCES STV2025011410__STAGING.groups (id),
CONSTRAINT STV2025011410__STAGING_dialogs_message_id_unique UNIQUE (message_id)
)
ORDER BY message_id
SEGMENTED BY HASH(message_id) ALL NODES
PARTITION BY message_ts::DATE
GROUP BY calendar_hierarchy_day(message_ts::DATE, 3, 2);
--ALTER TABLE STV2025011410__STAGING.dialogs ADD CONSTRAINT STV2025011410__STAGING_dialogs_message_from_fk FOREIGN KEY (message_from) REFERENCES STV2025011410__STAGING.users (id);
--ALTER TABLE STV2025011410__STAGING.dialogs ADD CONSTRAINT STV2025011410__STAGING_dialogs_message_to_fk FOREIGN KEY (message_to) REFERENCES STV2025011410__STAGING.users (id);
--ALTER TABLE STV2025011410__STAGING.dialogs ADD CONSTRAINT STV2025011410__STAGING_dialogs_message_group_fk FOREIGN KEY (message_group) REFERENCES STV2025011410__STAGING.groups (id);
-- group_log - Логи групп
DROP TABLE IF EXISTS STV2025011410__STAGING.group_log;
CREATE TABLE IF NOT EXISTS STV2025011410__STAGING.group_log
(
group_id INT, -- уникальный идентификатор группы
user_id INT, -- уникальный идентификатор пользователя
user_id_from INT, -- поле для отметки о том, что пользователь не сам вступил в группу
event VARCHAR(20), -- действие, которое совершено пользователем user_id (create, add, leave)
datetime DATETIME -- время совершения event
)
ORDER BY group_id, user_id
SEGMENTED BY HASH(user_id) ALL NODES
PARTITION BY datetime::DATE
GROUP BY calendar_hierarchy_day(datetime::DATE, 3, 2);
3.B. Проверка DDL слоя STAGING ¶
Код SQL:
SELECT table_schema, table_name, ordinal_position as col_no, column_name, data_type, is_nullable
FROM v_catalog.columns
WHERE table_schema = 'STV2025011410__STAGING'
ORDER BY table_schema, table_name, col_no;
Результат выполения кода SQL:
table_schema | table_name | col_no | column_name | data_type | is_nullable |
---|---|---|---|---|---|
STV2025011410__STAGING | dialogs | 1 | message_id | int | false |
STV2025011410__STAGING | dialogs | 2 | message_ts | timestamp | true |
STV2025011410__STAGING | dialogs | 3 | message_from | int | true |
STV2025011410__STAGING | dialogs | 4 | message_to | int | true |
STV2025011410__STAGING | dialogs | 5 | message | varchar(1000) | true |
STV2025011410__STAGING | dialogs | 6 | message_group | int | true |
STV2025011410__STAGING | group_log | 1 | group_id | int | true |
STV2025011410__STAGING | group_log | 2 | user_id | int | true |
STV2025011410__STAGING | group_log | 3 | user_id_from | int | true |
STV2025011410__STAGING | group_log | 4 | event | varchar(20) | true |
STV2025011410__STAGING | group_log | 5 | datetime | timestamp | true |
STV2025011410__STAGING | groups | 1 | id | int | false |
STV2025011410__STAGING | groups | 2 | admin_id | int | false |
STV2025011410__STAGING | groups | 3 | group_name | varchar(100) | false |
STV2025011410__STAGING | groups | 4 | registration_dt | timestamp | false |
STV2025011410__STAGING | groups | 5 | is_private | int | false |
STV2025011410__STAGING | users | 1 | id | int | false |
STV2025011410__STAGING | users | 2 | chat_name | varchar(200) | true |
STV2025011410__STAGING | users | 3 | registration_dt | timestamp | true |
STV2025011410__STAGING | users | 4 | country | varchar(200) | true |
STV2025011410__STAGING | users | 5 | age | int | true |
Визуализация схемы слоя STAGING:
4. Пернос данных из хранилища S3 в STAGING слой с помошью DAG в Airflow ¶
4.A. Загрузка библиотек Python ¶
# Загрузка библиотек Python
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.decorators import dag
from airflow.hooks.base import BaseHook
from airflow.operators.dummy_operator import DummyOperator
import pendulum
import os # Работа с файловой системой
import boto3 # Работа с файловой системой S3
from io import StringIO # Разбор строк
import vertica_python # Работа с Vertica
import pandas as pd
import numpy as np
4.B. Создание глобальных переменных ¶
# Глобальные переменные с параметрами доступа
# к хранилищу S3 ии БД Vertica, хранящиеся в Airflow
# Параметры доступа к хранилищу S3
YANDEX_CLOUD_S3_SPRINT6 = BaseHook.get_connection('YANDEX_CLOUD_S3_SPRINT6')
# Передача параметров доступа к S3 из BaseHook
AWS_ACCESS_KEY_ID = YANDEX_CLOUD_S3_SPRINT6.login
AWS_SECRET_ACCESS_KEY = YANDEX_CLOUD_S3_SPRINT6.password
AWS_SERVICE_NAME = 's3'
AWS_ENDPOINT_URL = 'https://storage.yandexcloud.net'
BUCKET = 'sprint6'
# Параметры доступа к Vertica
YANDEX_CLOUD_VERTICA_SPRINT6 = BaseHook.get_connection('YANDEX_CLOUD_VERTICA_SPRINT6')
# Передача параметров доступа к Vertica из BaseHook
VERTICA_HOST = YANDEX_CLOUD_VERTICA_SPRINT6.host
VERTICA_PORT = YANDEX_CLOUD_VERTICA_SPRINT6.port
VERTICA_USER = YANDEX_CLOUD_VERTICA_SPRINT6.login
VERTICA_PASSWORD = YANDEX_CLOUD_VERTICA_SPRINT6.password
4.C. Основные функции для DAG ¶
# Чтение файла CSV из S3 по чанкам
# (биение файла на чанки может упростить чтение/запись очень больших файлов)
def reading_csv_from_s3(file_name, chunksize):
# Доступ к хранилищу S3
session = boto3.session.Session()
s3_client = session.client(
service_name=AWS_SERVICE_NAME,
endpoint_url=AWS_ENDPOINT_URL,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
# Чтение файла CSV из S3 по чанкам
key = file_name + '.csv'
obj = s3_client.get_object(Bucket=BUCKET, Key=key)
body = obj['Body'].read().decode('utf-8')
data = StringIO(body)
chunks = pd.read_csv(data, chunksize=chunksize)
return chunks
# Выполнение SQL кода в Vertica
def sql_code_execution_in_vertica(sql_code):
# Достсуп к Vertica
with vertica_python.connect(
host=VERTICA_HOST,
port=VERTICA_PORT,
user=VERTICA_USER,
password=VERTICA_PASSWORD,
tlsmode='disable' # Рекомендация из ошибки "HINT: Set connection option 'tlsmode' to 'disable' to explicitly create a non-TLS connection."
) as connection:
with connection.cursor() as cursor:
# Выполнение кода SQL в Vertica
cursor.execute(sql_code)
# Запись чанок файла CSV в Vertica
def chunk_entry_in_vertica(chunk, file_name, primary_key):
# Достсуп к Vertica
with vertica_python.connect(
host=VERTICA_HOST,
port=VERTICA_PORT,
user=VERTICA_USER,
password=VERTICA_PASSWORD,
tlsmode='disable' # Рекомендация из ошибки "HINT: Set connection option 'tlsmode' to 'disable' to explicitly create a non-TLS connection."
) as connection:
# Проверка записываемых данных на уникальность
# (такое кривое решение из-за того, что ошибка
# при использовании IGNORE DUPLICATE ROWS в запросе COPY)
if file_name == 'group_log':
print(f'(i) {file_name}.count() =', chunk.count())
else:
print(f'(i) {file_name}[{primary_key}].count() =', chunk[primary_key].count())
chunksize = 100000
query = f'SELECT * FROM STV2025011410__STAGING.{file_name};'
df_chunks = pd.read_sql(query, connection, chunksize=chunksize)
for df in df_chunks:
if file_name == 'group_log':
chunk = chunk[~chunk.isin(df)]
print(f'(i) Not duplicates... {file_name}.count() =', chunk.count())
else:
chunk = chunk[~chunk[primary_key].isin(df[primary_key])]
print(f'(i) Not duplicates... {file_name}[{primary_key}].count() =', chunk[primary_key].count())
print('(i) chunk:', chunk.head())
with connection.cursor() as cursor:
# Форматирование данных
data = '\n'.join('|'.join(str(x) for x in row) for row in chunk.values)
# Создание буфера памяти
f_stringio = StringIO(data)
# Запись данных в БД
cursor.copy(f"COPY STV2025011410__STAGING.{file_name} FROM STDIN DELIMITER '|';", f_stringio)
# Проверка записи данных в БД
cursor.execute(f'SELECT * FROM STV2025011410__STAGING.{file_name};')
print(f'(i) SELECT * FROM STV2025011410__STAGING.{file_name}; =', cursor.fetchone())
# Преобразование столбца float к int
def positive_float_to_int(df, column_name):
df[column_name] = df[column_name].replace([np.inf, -np.inf, np.nan], -1)
df[column_name] = df[column_name].astype('Int64')
df.loc[df[column_name] < 0, column_name] = pd.NA
return df
# Чтение файла CSV из S3 по чанкам и запись данных из них в Vertica
# (биение файла на чанки может упростить чтение/запись очень больших файлов)
def csv_entry_from_s3_to_vertica(file_name, primary_key, chunksize):
# Чтение файла CSV из S3 по чанкам
chunks = reading_csv_from_s3(file_name, chunksize)
# Запись данных из чанок файла CSV в Vertica
for chunk in chunks:
# Преобразование столбца 'message_group' в чанках файла 'dialogs.csv' из float к int
if file_name == 'dialogs': chunk = positive_float_to_int(chunk, 'message_group')
# Преобразование столбца 'message_group' в чанках файла 'dialogs.csv' из float к int
if file_name == 'group_log': chunk = positive_float_to_int(chunk, 'user_id_from')
# Запись чанок файла CSV в Vertica
chunk_entry_in_vertica(chunk, file_name, primary_key)
# Ограничение на количество обрабатываемых чанок
# для ускорения получения результата по работе DAGa
#break
4.D. DAG для загрузки данных проекта в STAGING слой ¶
# Аргументы дага
default_args = {
#'owner': 'airflow',
'schedule_interval': '* */6 * * *',
'start_date': pendulum.parse('2010-07-13'),
'catchup': False,
'tags': ['s3', 'vertica', 'dwh', 'staging', 'csv', 'users', 'groups', 'dialogs', 'group_log'],
'is_paused_upon_creation': False
}
# DAG для загрузки всех данных проекта в STAGING слой
@dag(dag_id='dag_csv_entry_from_s3_to_vertica', default_args=default_args)
def dag_csv_entry_from_s3_to_vertica():
# Название файлов CSV в S3 и одноименных таблиц в Vertica
file_names = ['users', 'groups', 'dialogs', 'group_log']
# Первичные ключи
primary_keys = {'users':'id', 'groups':'id', 'dialogs':'message_id', 'group_log':'datetime'}
# Размер чанок файла CSV
chunksize = 100000
# Пересоздание всего Stagging слоя в Vertica (dll_task)
'''
start = PythonOperator(
task_id='start',
python_callable=sql_code_execution_in_vertica,
op_kwargs={'file_name': dll_code},
)
'''
# Формальные этапы старта и завершения выполнения дага
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
# Чтение файла CSV из S3 по чанкам и запись из них данных в Vertica
# (биение файла на чанки может упростить чтение/запись очень больших файлов)
entry_tasks = [
PythonOperator(
task_id=f'load_{key}',
python_callable=csv_entry_from_s3_to_vertica,
op_kwargs={'file_name': key, 'primary_key': primary_keys[key], 'chunksize': chunksize},
) for key in file_names
]
start >> entry_tasks >> end
dag_instances = dag_csv_entry_from_s3_to_vertica()
4.E. Проверка данных в STAGING слое ¶
Код SQL:
SELECT *
FROM STV2025011410__STAGING.users
LIMIT 5;
Результат выполения кода SQL:
id | chat_name | registration_dt | country | age |
---|---|---|---|---|
0 | MarinaBender | 2020-10-16 19:18:46.000 | Aruba | 56 |
5 | PavelMadona | 2021-03-25 17:20:43.000 | Cuba | 42 |
18 | PavelMendoza | 2020-11-04 22:45:33.000 | Gibraltar | 33 |
19 | AntonyLuter King | 2020-12-08 06:45:35.000 | Gabon | 19 |
24 | ValeryaVanGog | 2021-05-01 13:37:00.000 | Azerbaijan | 58 |
Код SQL:
SELECT *
FROM STV2025011410__STAGING.groups
LIMIT 5;
Результат выполения кода SQL:
id | admin_id | group_name | registration_dt | is_private |
---|---|---|---|---|
1 | 24532 | ох из двора хах | 2021-01-24 15:37:26.301 | 1 |
2 | 89388 | собачники новая эра подписка 500р | 2021-01-01 12:18:49.020 | 0 |
3 | 3044 | рожденные быть молодыми я и ты подписка 1000р | 2021-01-30 07:21:48.450 | 1 |
7 | 119586 | python новая эпоха а ты? | 2021-04-14 08:27:53.000 | 1 |
16 | 113874 | растеньеводство новая эра наше все | 2021-04-23 00:41:13.543 | 1 |
Код SQL:
SELECT *
FROM STV2025011410__STAGING.dialogs
LIMIT 5;
Результат выполения кода SQL:
id | admin_id | group_name | registration_dt | is_private |
---|---|---|---|---|
1 | 24532 | ох из двора хах | 2021-01-24 15:37:26.301 | 1 |
2 | 89388 | собачники новая эра подписка 500р | 2021-01-01 12:18:49.020 | 0 |
3 | 3044 | рожденные быть молодыми я и ты подписка 1000р | 2021-01-30 07:21:48.450 | 1 |
7 | 119586 | python новая эпоха а ты? | 2021-04-14 08:27:53.000 | 1 |
16 | 113874 | растеньеводство новая эра наше все | 2021-04-23 00:41:13.543 | 1 |
Код SQL:
SELECT *
FROM STV2025011410__STAGING.group_log
LIMIT 5;
Результат выполения кода SQL:
group_id | user_id | user_id_from | event | datetime |
---|---|---|---|---|
1 | 149488 | 28696 | add | 2012-11-05 03:51:02.000 |
1 | 149488 | 56666 | add | 2012-02-05 04:33:53.000 |
2 | 92124 | 149488 | add | 2012-05-22 23:25:14.000 |
2 | 95600 | 149488 | add | 2012-06-01 17:11:05.000 |
6 | 149488 | 149488 | add | 2012-03-12 19:22:50.000 |
5. Создание слоя Detail Data Store (DDS) по методу Data Value ¶
5.A. Создание таблиц хабов ¶
Код SQL:
-- DDL хаба h_users
drop table if exists STV2025011410__DWH.h_users;
create table STV2025011410__DWH.h_users
(
hk_user_id bigint primary key,
user_id int,
registration_dt datetime,
load_dt datetime,
load_src varchar(20)
)
order by load_dt
SEGMENTED BY hk_user_id all nodes
PARTITION BY load_dt::date
GROUP BY calendar_hierarchy_day(load_dt::date, 3, 2);
;
-- DDL хаба h_groups
DROP TABLE IF EXISTS STV2025011410__DWH.h_groups
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.h_groups
(
hk_group_id BIGINT PRIMARY KEY,
group_id INT,
registration_dt DATETIME,
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_group_id ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2)
;
-- DDL хаба h_dialogs
DROP TABLE IF EXISTS STV2025011410__DWH.h_dialogs
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.h_dialogs
(
hk_message_id BIGINT PRIMARY KEY,
message_id INT,
message_ts DATETIME,
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_message_id ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2)
;
5.B. Создание таблиц связей ¶
Код SQL:
-- DDL таблицы связи l_user_message
drop table if exists STV2025011410__DWH.l_user_message;
create table IF NOT EXISTS STV2025011410__DWH.l_user_message
(
hk_l_user_message bigint primary key,
hk_user_id bigint not null CONSTRAINT fk_l_user_message_user REFERENCES STV2025011410__DWH.h_users (hk_user_id),
hk_message_id bigint not null CONSTRAINT fk_l_user_message_message REFERENCES STV2025011410__DWH.h_dialogs (hk_message_id),
load_dt datetime,
load_src varchar(20)
)
order by load_dt
SEGMENTED BY hk_user_id all nodes
PARTITION BY load_dt::date
GROUP BY calendar_hierarchy_day(load_dt::date, 3, 2);
-- DDL таблицы связи l_admins
DROP TABLE IF EXISTS STV2025011410__DWH.l_admins;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.l_admins
(
hk_l_admin_id BIGINT PRIMARY KEY,
hk_user_id BIGINT NOT NULL CONSTRAINT fk_l_admins_user REFERENCES STV2025011410__DWH.h_users (hk_user_id),
hk_group_id BIGINT NOT NULL CONSTRAINT fk_l_admins_group REFERENCES STV2025011410__DWH.h_groups (hk_group_id),
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_l_admin_id ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
-- DDL таблицы связи l_groups_dialogs
DROP TABLE IF EXISTS STV2025011410__DWH.l_groups_dialogs;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.l_groups_dialogs
(
hk_l_groups_dialogs BIGINT PRIMARY KEY,
hk_message_id BIGINT NOT NULL CONSTRAINT fk_l_groups_dialogs_message REFERENCES STV2025011410__DWH.h_dialogs (hk_message_id),
hk_group_id BIGINT NOT NULL CONSTRAINT fk_l_groups_dialogs_group REFERENCES STV2025011410__DWH.h_groups (hk_group_id),
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_l_groups_dialogs ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
-- DDL таблицы связи l_user_group_activity
DROP TABLE If EXISTS STV2025011410__DWH.l_user_group_activity;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.l_user_group_activity
(
hk_l_user_group_activity BIGINT PRIMARY KEY,
hk_user_id BIGINT CONSTRAINT l_user_group_activity_user REFERENCES STV2025011410__DWH.h_users(hk_user_id),
hk_group_id BIGINT CONSTRAINT l_user_group_activity_group REFERENCES STV2025011410__DWH.h_groups(hk_group_id),
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY hk_l_user_group_activity
SEGMENTED BY HASH(hk_l_user_group_activity) ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
5.C. Создание таблиц сателлитов ¶
Код SQL:
-- Создаие таблицы сателлита s_admins
drop table if exists STV2025011410__DWH.s_admins;
create table STV2025011410__DWH.s_admins
(
hk_admin_id bigint not null CONSTRAINT fk_s_admins_l_admins REFERENCES STV2025011410__DWH.l_admins (hk_l_admin_id),
is_admin boolean,
admin_from datetime,
load_dt datetime,
load_src varchar(20)
)
order by load_dt
SEGMENTED BY hk_admin_id all nodes
PARTITION BY load_dt::date
GROUP BY calendar_hierarchy_day(load_dt::date, 3, 2);
-- Создаие таблицы сателлита s_group_name
DROP TABLE IF EXISTS STV2025011410__DWH.s_group_name;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.s_group_name
(
-- В названии внешнего ключа название текущей таблицы
-- и таблицы, и которой танется идентификатор
hk_group_id BIGINT NOT NULL CONSTRAINT fk_s_group_name_h_groups REFERENCES STV2025011410__DWH.h_groups(hk_group_id),
group_name VARCHAR(100),
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_group_id ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
-- Создаие таблицы сателлита s_group_private_status
DROP TABLE IF EXISTS STV2025011410__DWH.s_group_private_status;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.s_group_private_status
(
hk_group_id BIGINT NOT NULL CONSTRAINT fk_s_group_private_status_h_groups REFERENCES STV2025011410__DWH.h_groups(hk_group_id),
is_private INT,
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_group_id ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
-- Создаие таблицы сателлита s_dialog_info
DROP TABLE IF EXISTS STV2025011410__DWH.s_dialog_info;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.s_dialog_info
(
hk_message_id BIGINT NOT NULL CONSTRAINT fk_s_dialog_info_h_dialogs REFERENCES STV2025011410__DWH.h_dialogs(hk_message_id),
message VARCHAR(1000),
message_from INT,
message_to INT,
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_message_id ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
-- Создаие таблицы сателлита s_user_chatinfo
DROP TABLE IF EXISTS STV2025011410__DWH.s_user_chatinfo;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.s_user_chatinfo
(
hk_user_id BIGINT NOT NULL CONSTRAINT fk_s_user_chatinfo_h_users REFERENCES STV2025011410__DWH.h_users(hk_user_id),
chat_name VARCHAR(200),
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_user_id ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
-- Создаие таблицы сателлита s_user_socdem
DROP TABLE IF EXISTS STV2025011410__DWH.s_user_socdem;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.s_user_socdem
(
hk_user_id BIGINT NOT NULL CONSTRAINT fk_s_user_socdem_h_users REFERENCES STV2025011410__DWH.h_users(hk_user_id),
--chat_name VARCHAR(200), -- СПОРНО, Т.К, ЕСТЬ В s_user_chatinfo
country VARCHAR(200),
age INT,
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY load_dt
SEGMENTED BY hk_user_id ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
-- Создаие таблицы сателлита s_auth_history
DROP TABLE IF EXISTS STV2025011410__DWH.s_auth_history;
CREATE TABLE IF NOT EXISTS STV2025011410__DWH.s_auth_history
(
hk_l_user_group_activity BIGINT NOT NULL CONSTRAINT fk_s_auth_history_hk_l_user_group_activity REFERENCES STV2025011410__DWH.l_user_group_activity(hk_l_user_group_activity),
user_id_from INT,
event VARCHAR(20),
event_dt DATETIME,
load_dt DATETIME,
load_src VARCHAR(20)
)
ORDER BY hk_l_user_group_activity
SEGMENTED BY HASH(hk_l_user_group_activity) ALL NODES
PARTITION BY load_dt::DATE
GROUP BY calendar_hierarchy_day(load_dt::DATE, 3, 2);
5.D. Проверка DDL слоя DDS ¶
Код SQL:
select table_schema, table_name, ordinal_position as col_no, column_name, data_type,
is_nullable
from v_catalog.columns
where table_schema = 'STV2025011410__DWH'
order by table_schema, table_name, col_no;
Результат выполения кода SQL:
table_schema | table_name | col_no | column_name | data_type | is_nullable |
---|---|---|---|---|---|
STV2025011410__DWH | h_dialogs | 1 | hk_message_id | int | false |
STV2025011410__DWH | h_dialogs | 2 | message_id | int | true |
STV2025011410__DWH | h_dialogs | 3 | message_ts | timestamp | true |
STV2025011410__DWH | h_dialogs | 4 | load_dt | timestamp | true |
STV2025011410__DWH | h_dialogs | 5 | load_src | varchar(20) | true |
STV2025011410__DWH | h_groups | 1 | hk_group_id | int | false |
STV2025011410__DWH | h_groups | 2 | group_id | int | true |
STV2025011410__DWH | h_groups | 3 | registration_dt | timestamp | true |
STV2025011410__DWH | h_groups | 4 | load_dt | timestamp | true |
STV2025011410__DWH | h_groups | 5 | load_src | varchar(20) | true |
STV2025011410__DWH | h_users | 1 | hk_user_id | int | false |
STV2025011410__DWH | h_users | 2 | user_id | int | true |
STV2025011410__DWH | h_users | 3 | registration_dt | timestamp | true |
STV2025011410__DWH | h_users | 4 | load_dt | timestamp | true |
STV2025011410__DWH | h_users | 5 | load_src | varchar(20) | true |
STV2025011410__DWH | l_admins | 1 | hk_l_admin_id | int | false |
STV2025011410__DWH | l_admins | 2 | hk_user_id | int | false |
STV2025011410__DWH | l_admins | 3 | hk_group_id | int | false |
STV2025011410__DWH | l_admins | 4 | load_dt | timestamp | true |
STV2025011410__DWH | l_admins | 5 | load_src | varchar(20) | true |
STV2025011410__DWH | l_groups_dialogs | 1 | hk_l_groups_dialogs | int | false |
STV2025011410__DWH | l_groups_dialogs | 2 | hk_message_id | int | false |
STV2025011410__DWH | l_groups_dialogs | 3 | hk_group_id | int | false |
STV2025011410__DWH | l_groups_dialogs | 4 | load_dt | timestamp | true |
STV2025011410__DWH | l_groups_dialogs | 5 | load_src | varchar(20) | true |
STV2025011410__DWH | l_user_group_activity | 1 | hk_l_user_group_activity | int | false |
STV2025011410__DWH | l_user_group_activity | 2 | hk_user_id | int | true |
STV2025011410__DWH | l_user_group_activity | 3 | hk_group_id | int | true |
STV2025011410__DWH | l_user_group_activity | 4 | load_dt | timestamp | true |
STV2025011410__DWH | l_user_group_activity | 5 | load_src | varchar(20) | true |
STV2025011410__DWH | l_user_message | 1 | hk_l_user_message | int | false |
STV2025011410__DWH | l_user_message | 2 | hk_user_id | int | false |
STV2025011410__DWH | l_user_message | 3 | hk_message_id | int | false |
STV2025011410__DWH | l_user_message | 4 | load_dt | timestamp | true |
STV2025011410__DWH | l_user_message | 5 | load_src | varchar(20) | true |
STV2025011410__DWH | s_admins | 1 | hk_admin_id | int | false |
STV2025011410__DWH | s_admins | 2 | is_admin | boolean | true |
STV2025011410__DWH | s_admins | 3 | admin_from | timestamp | true |
STV2025011410__DWH | s_admins | 4 | load_dt | timestamp | true |
STV2025011410__DWH | s_admins | 5 | load_src | varchar(20) | true |
STV2025011410__DWH | s_auth_history | 1 | hk_l_user_group_activity | int | false |
STV2025011410__DWH | s_auth_history | 2 | user_id_from | int | true |
STV2025011410__DWH | s_auth_history | 3 | event | varchar(20) | true |
STV2025011410__DWH | s_auth_history | 4 | event_dt | timestamp | true |
STV2025011410__DWH | s_auth_history | 5 | load_dt | timestamp | true |
STV2025011410__DWH | s_auth_history | 6 | load_src | varchar(20) | true |
STV2025011410__DWH | s_dialog_info | 1 | hk_message_id | int | false |
STV2025011410__DWH | s_dialog_info | 2 | message | varchar(1000) | true |
STV2025011410__DWH | s_dialog_info | 3 | message_from | int | true |
STV2025011410__DWH | s_dialog_info | 4 | message_to | int | true |
STV2025011410__DWH | s_dialog_info | 5 | load_dt | timestamp | true |
STV2025011410__DWH | s_dialog_info | 6 | load_src | varchar(20) | true |
STV2025011410__DWH | s_group_name | 1 | hk_group_id | int | false |
STV2025011410__DWH | s_group_name | 2 | group_name | varchar(100) | true |
STV2025011410__DWH | s_group_name | 3 | load_dt | timestamp | true |
STV2025011410__DWH | s_group_name | 4 | load_src | varchar(20) | true |
STV2025011410__DWH | s_group_private_status | 1 | hk_group_id | int | false |
STV2025011410__DWH | s_group_private_status | 2 | is_private | int | true |
STV2025011410__DWH | s_group_private_status | 3 | load_dt | timestamp | true |
STV2025011410__DWH | s_group_private_status | 4 | load_src | varchar(20) | true |
STV2025011410__DWH | s_user_chatinfo | 1 | hk_user_id | int | false |
STV2025011410__DWH | s_user_chatinfo | 2 | chat_name | varchar(200) | true |
STV2025011410__DWH | s_user_chatinfo | 3 | load_dt | timestamp | true |
STV2025011410__DWH | s_user_chatinfo | 4 | load_src | varchar(20) | true |
STV2025011410__DWH | s_user_socdem | 1 | hk_user_id | int | false |
STV2025011410__DWH | s_user_socdem | 2 | country | varchar(200) | true |
STV2025011410__DWH | s_user_socdem | 3 | age | int | true |
STV2025011410__DWH | s_user_socdem | 4 | load_dt | timestamp | true |
STV2025011410__DWH | s_user_socdem | 5 | load_src | varchar(20) | true |
Визуализация схемы слоя DDS:
6. Заполнение слоя DDS ¶
6.A. Заполнение таблиц хабов ¶
Код SQL:
-- Заполнение хаба h_users
INSERT INTO STV2025011410__DWH.h_users(hk_user_id, user_id,registration_dt,load_dt,load_src)
select
hash(id) as hk_user_id,
id as user_id,
registration_dt,
now() as load_dt,
's3' as load_src
from STV2025011410__STAGING.users
where hash(id) not in (select hk_user_id from STV2025011410__DWH.h_users);
-- Заполнение хаба h_groups
INSERT INTO STV2025011410__DWH.h_groups(hk_group_id, group_id, registration_dt, load_dt, load_src)
SELECT
HASH(id) AS hk_group_id,
id AS group_id,
registration_dt,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__STAGING.groups
WHERE HASH(id) NOT IN (SELECT hk_group_id FROM STV2025011410__DWH.h_groups);
-- Заполнение хаба h_dialogs
INSERT INTO STV2025011410__DWH.h_dialogs(hk_message_id, message_id, message_ts, load_dt, load_src)
SELECT
HASH(message_id) AS hk_message_id,
message_id,
message_ts,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__STAGING.dialogs
WHERE HASH(message_id) NOT IN (SELECT hk_message_id FROM STV2025011410__DWH.h_dialogs)
6.B. Заполнение таблиц связей ¶
Код SQL:
-- Заполнение таблицы связи l_admins
INSERT INTO STV2025011410__DWH.l_admins(hk_l_admin_id, hk_group_id,hk_user_id,load_dt,load_src)
select
hash(hg.hk_group_id,hu.hk_user_id),
hg.hk_group_id,
hu.hk_user_id,
now() as load_dt,
's3' as load_src
from STV2025011410__STAGING.groups as g
left join STV2025011410__DWH.h_users as hu on g.admin_id = hu.user_id
left join STV2025011410__DWH.h_groups as hg on g.id = hg.group_id
where hash(hg.hk_group_id,hu.hk_user_id) not in (select hk_l_admin_id from STV2025011410__DWH.l_admins);
-- Заполнение таблицы связи l_user_message
INSERT INTO STV2025011410__DWH.l_user_message(hk_l_user_message, hk_user_id, hk_message_id, load_dt, load_src)
SELECT
HASH(hu.hk_user_id, hd.hk_message_id),
hu.hk_user_id,
hd.hk_message_id,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__STAGING.dialogs AS d
INNER JOIN STV2025011410__DWH.h_users AS hu ON hu.user_id = d.message_from
INNER JOIN STV2025011410__DWH.h_dialogs AS hd ON hd.message_id = d.message_id
WHERE HASH(hu.hk_user_id, hd.hk_message_id) NOT IN (SELECT hk_l_user_message FROM STV2025011410__DWH.l_user_message);
-- Заполнение таблицы связи l_groups_dialogs
INSERT INTO STV2025011410__DWH.l_groups_dialogs(hk_l_groups_dialogs, hk_message_id, hk_group_id, load_dt, load_src)
SELECT
HASH(hd.hk_message_id, hg.hk_group_id),
hd.hk_message_id,
hg.hk_group_id,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__STAGING.dialogs AS d
LEFT JOIN STV2025011410__DWH.h_groups AS hg ON hg.group_id = d.message_group
LEFT JOIN STV2025011410__DWH.h_dialogs AS hd ON hd.message_id = d.message_id
WHERE HASH(hd.hk_message_id, hg.hk_group_id) NOT IN (SELECT hk_l_groups_dialogs FROM STV2025011410__DWH.l_groups_dialogs);
-- Заполнение таблицы связи l_user_group_activity
INSERT INTO STV2025011410__DWH.l_user_group_activity(hk_l_user_group_activity, hk_user_id, hk_group_id, load_dt, load_src)
SELECT
HASH(hk_user_id, hk_group_id),
hu.hk_user_id,
hg.hk_group_id,
NOW(),
's3'
FROM STV2025011410__STAGING.group_log AS gl
LEFT JOIN STV2025011410__DWH.h_users AS hu ON hu.user_id = gl.user_id
LEFT JOIN STV2025011410__DWH.h_groups AS hg ON hg.group_id = gl.group_id
WHERE HASH(hk_user_id, hk_group_id) NOT IN (SELECT hk_l_user_group_activity FROM STV2025011410__DWH.l_user_group_activity);
6.C. Заполнение таблиц сателлитов ¶
Код SQL:
-- Заполение таблицы сателлита s_admins
INSERT INTO STV2025011410__DWH.s_admins(hk_admin_id, is_admin,admin_from,load_dt,load_src)
select
la.hk_l_admin_id,
True as is_admin,
hg.registration_dt,
now() as load_dt,
's3' as load_src
from STV2025011410__DWH.l_admins as la
left join STV2025011410__DWH.h_groups as hg on la.hk_group_id = hg.hk_group_id;
-- Заполение таблицы сателлита s_group_name
INSERT INTO STV2025011410__DWH.s_group_name(hk_group_id, group_name, load_dt, load_src)
SELECT
hg.hk_group_id,
g.group_name,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__DWH.h_groups AS hg
LEFT JOIN STV2025011410__STAGING.groups AS g ON g.id = hg.group_id;
-- Заполение таблицы сателлита s_group_private_status
INSERT INTO STV2025011410__DWH.s_group_private_status(hk_group_id, is_private, load_dt, load_src)
SELECT
hg.hk_group_id,
g.is_private,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__DWH.h_groups AS hg
LEFT JOIN STV2025011410__STAGING.groups AS g ON g.id = hg.group_id;
-- Заполение таблицы сателлита s_dialog_info
INSERT INTO STV2025011410__DWH.s_dialog_info(hk_message_id, message, message_from, message_to, load_dt, load_src)
SELECT
hd.hk_message_id,
d.message,
d.message_from,
d.message_to,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__DWH.h_dialogs AS hd
LEFT JOIN STV2025011410__STAGING.dialogs AS d ON d.message_id = hd.message_id;
-- Заполение таблицы сателлита s_user_chatinfo
INSERT INTO STV2025011410__DWH.s_user_chatinfo(hk_user_id, chat_name, load_dt, load_src)
SELECT
hu.hk_user_id,
u.chat_name,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__DWH.h_users AS hu
LEFT JOIN STV2025011410__STAGING.users AS u ON u.id = hu.user_id;
-- Заполение таблицы сателлита s_user_socdem
INSERT INTO STV2025011410__DWH.s_user_socdem(hk_user_id, /*chat_name,*/ country, age, load_dt, load_src)
SELECT
hu.hk_user_id,
--u.chat_name,
u.country,
u.age,
NOW() AS load_dt,
's3' AS load_src
FROM STV2025011410__DWH.h_users AS hu
LEFT JOIN STV2025011410__STAGING.users AS u ON u.id = hu.user_id;
-- Заполение таблицы сателлита s_auth_history
INSERT INTO STV2025011410__DWH.s_auth_history(hk_l_user_group_activity, user_id_from, event, event_dt, load_dt, load_src)
SELECT
luga.hk_l_user_group_activity,
gl.user_id_from,
gl.event,
gl.datetime,
NOW(),
's3'
FROM STV2025011410__STAGING.group_log AS gl
LEFT JOIN STV2025011410__DWH.h_users AS hu ON hu.user_id = gl.user_id
LEFT JOIN STV2025011410__DWH.h_groups AS hg ON hg.group_id = gl.group_id
INNER JOIN STV2025011410__DWH.l_user_group_activity AS luga ON luga.hk_user_id = hu.hk_user_id AND luga.hk_group_id = hg.hk_group_id
WHERE luga.hk_l_user_group_activity NOT IN (SELECT hk_l_user_group_activity FROM STV2025011410__DWH.s_auth_history);
Проверка данных в слое DDS ¶
Код SQL:
SELECT *
FROM STV2025011410__DWH.h_users
LIMIT 5;
Результат выполения кода SQL:
hk_user_id | user_id | registration_dt | load_dt | load_src |
---|---|---|---|---|
5090262501023620704 | 169996 | 2021-04-16 03:43:27.000 | 2025-03-26 01:14:10.757 | s3 |
8426887808008717333 | 169990 | 2020-12-01 21:18:48.000 | 2025-03-26 01:14:10.757 | s3 |
7582338616463080818 | 169985 | 2021-01-13 04:39:55.000 | 2025-03-26 01:14:10.757 | s3 |
548606052795187288 | 169982 | 2020-12-27 13:48:07.000 | 2025-03-26 01:14:10.757 | s3 |
8160700415674202812 | 169980 | 2021-01-18 12:30:43.000 | 2025-03-26 01:14:10.757 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.h_groups
LIMIT 5;
Результат выполения кода SQL:
hk_group_id | group_id | registration_dt | load_dt | load_src |
---|---|---|---|---|
3208782734603338158 | 249 | 2020-12-11 05:35:01.961 | 2025-03-26 01:14:13.656 | s3 |
7328516176364199366 | 248 | 2020-11-27 00:34:30.000 | 2025-03-26 01:14:13.656 | s3 |
5780468151549057781 | 246 | 2020-12-17 15:59:47.452 | 2025-03-26 01:14:13.656 | s3 |
4901660003169244829 | 245 | 2021-03-30 19:06:46.207 | 2025-03-26 01:14:13.656 | s3 |
3703453609225743640 | 242 | 2020-12-22 04:08:04.000 | 2025-03-26 01:14:13.656 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.h_dialogs
LIMIT 5;
Результат выполения кода SQL:
hk_message_id | message_id | message_ts | load_dt | load_src |
---|---|---|---|---|
3509868979080375254 | 1038323 | 2021-01-04 02:11:31.813 | 2025-03-26 01:14:17.078 | s3 |
5004330126816153162 | 1038312 | 2020-10-24 05:52:50.778 | 2025-03-26 01:14:17.078 | s3 |
4748946703388263323 | 1038310 | 2021-03-06 19:10:21.750 | 2025-03-26 01:14:17.078 | s3 |
6621931238336390056 | 1038292 | 2021-03-31 07:02:58.830 | 2025-03-26 01:14:17.078 | s3 |
2407409173699346270 | 1038286 | 2021-03-09 02:29:59.614 | 2025-03-26 01:14:17.078 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.l_admins
LIMIT 5;
Результат выполения кода SQL:
hk_l_admin_id | hk_user_id | hk_group_id | load_dt | load_src |
---|---|---|---|---|
4148344871575995559 | 2502865495682437569 | 1380103837717358387 | 2025-03-26 01:14:21.164 | s3 |
6367577610627851380 | 406792314074259902 | 4464667052905927099 | 2025-03-26 01:14:21.164 | s3 |
8959490627278738087 | 1827000023434574926 | 7530954060943244831 | 2025-03-26 01:14:21.164 | s3 |
6426254267726245857 | 2045646812255233220 | 2901842674831561473 | 2025-03-26 01:14:21.164 | s3 |
8395533430965614465 | 1118872629339659320 | 3402237656568344992 | 2025-03-26 01:14:21.164 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.l_groups_dialogs
LIMIT 5;
Результат выполения кода SQL:
hk_l_groups_dialogs | hk_message_id | hk_group_id | load_dt | load_src |
---|---|---|---|---|
8255136928186246851 | 7852972727762845946 | 3208782734603338158 | 2025-03-26 01:14:30.936 | s3 |
8987854242823741045 | 4611592216058703679 | 5258085330056593723 | 2025-03-26 01:14:30.936 | s3 |
2029858875455902633 | 101911687825457854 | 3110550038591389753 | 2025-03-26 01:14:30.936 | s3 |
4790767456366251185 | 4902241916160387121 | 7045669418628455506 | 2025-03-26 01:14:30.936 | s3 |
997421288145162111 | 6366966119638053341 | 2901842674831561473 | 2025-03-26 01:14:30.936 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.l_user_group_activity
LIMIT 5;
Результат выполения кода SQL:
hk_l_user_group_activity | hk_user_id | hk_group_id | load_dt | load_src |
---|---|---|---|---|
36830563171292 | 8281023022560148656 | 5892974635897879450 | 2025-03-26 01:14:34.990 | s3 |
36830563171292 | 8281023022560148656 | 5892974635897879450 | 2025-03-26 01:14:34.990 | s3 |
36830563171292 | 8281023022560148656 | 5892974635897879450 | 2025-03-26 01:14:34.990 | s3 |
36830563171292 | 8281023022560148656 | 5892974635897879450 | 2025-03-26 01:14:34.990 | s3 |
36830563171292 | 8281023022560148656 | 5892974635897879450 | 2025-03-26 01:14:34.990 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.l_user_message
LIMIT 5;
Результат выполения кода SQL:
hk_l_user_message | hk_user_id | hk_message_id | load_dt | load_src |
---|---|---|---|---|
4704836362409932042 | 5783548743464686114 | 653633402730918039 | 2025-03-26 01:14:25.622 | s3 |
6349615621426833896 | 1618211815126016456 | 1642339183997777066 | 2025-03-26 01:14:25.622 | s3 |
147642826123896115 | 1618211815126016456 | 8763389781706656901 | 2025-03-26 01:14:25.622 | s3 |
8363686437348463415 | 9046158759674539325 | 615823479937507293 | 2025-03-26 01:14:25.622 | s3 |
2993891982568800435 | 2801478500947260785 | 4923181647567152670 | 2025-03-26 01:14:25.622 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.s_admins
LIMIT 5;
Результат выполения кода SQL:
hk_admin_id | is_admin | admin_from | load_dt | load_src |
---|---|---|---|---|
2070217548665816202 | true | 2020-11-20 20:42:26.014 | 2025-03-26 01:14:37.884 | s3 |
1711396262922159440 | true | 2020-12-18 08:53:40.000 | 2025-03-26 01:14:37.884 | s3 |
465920506241895674 | true | 2021-01-04 01:17:20.314 | 2025-03-26 01:14:37.884 | s3 |
1953905815028695711 | true | 2021-02-23 03:06:25.000 | 2025-03-26 01:14:37.884 | s3 |
3566833314758030695 | true | 2020-12-30 05:47:45.543 | 2025-03-26 01:14:37.884 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.s_auth_history
LIMIT 5;
Результат выполения кода SQL:
hk_l_user_group_activity | user_id_from | event | event_dt | load_dt | load_src |
---|---|---|---|---|---|
88704952065157 | 79289 | add | 2021-05-21 19:17:01.000 | 2025-03-26 01:15:00.194 | s3 |
88704952065157 | 79289 | add | 2021-05-21 19:17:01.000 | 2025-03-26 01:15:00.194 | s3 |
88704952065157 | 79289 | add | 2021-05-21 19:17:01.000 | 2025-03-26 01:15:00.194 | s3 |
88704952065157 | 79289 | add | 2021-05-21 19:17:01.000 | 2025-03-26 01:15:00.194 | s3 |
88704952065157 | 79289 | add | 2021-05-21 19:17:01.000 | 2025-03-26 01:15:00.194 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.s_dialog_info
LIMIT 5;
Результат выполения кода SQL:
hk_message_id | message | message_from | message_to | load_dt | load_src |
---|---|---|---|---|---|
2002946837039996813 | /usr/src/linux-source-2.6.32/drivers/video/console/bitblit.c ‘explains’ the function of the loadable kernel module »bitblit« as ‘BitBlitting Operation’. What is a bitblitting operation? | 66972 | 149488 | 2025-03-26 01:14:49.276 | s3 |
4015688680493785127 | the acer has an nvidia geforce 8600m | 54918 | 91264 | 2025-03-26 01:14:49.276 | s3 |
7278382332072098298 | 🙂 | 130716 | 49214 | 2025-03-26 01:14:49.276 | s3 |
916594369681802991 | firestarter has a nice wizard that does this | 100666 | 134692 | 2025-03-26 01:14:49.276 | s3 |
2366622350197444865 | try this and/or #xubuntu channels 🙂 | 32064 | 22365 | 2025-03-26 01:14:49.276 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.s_group_name
LIMIT 5;
Результат выполения кода SQL:
hk_group_id | group_name | load_dt | load_src |
---|---|---|---|
5783548743464686114 | ох из двора хах | 2025-03-26 01:14:41.713 | s3 |
1618211815126016456 | собачники новая эра подписка 500р | 2025-03-26 01:14:41.713 | s3 |
3883506187811235133 | рожденные быть молодыми я и ты подписка 1000р | 2025-03-26 01:14:41.713 | s3 |
3110550038591389753 | python новая эпоха а ты? | 2025-03-26 01:14:41.713 | s3 |
9046158759674539325 | любители 80-ые подписка 1000р | 2025-03-26 01:14:41.713 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.s_group_private_status
LIMIT 5;
Результат выполения кода SQL:
hk_group_id | is_private | load_dt | load_src |
---|---|---|---|
1350854927184241431 | 0 | 2025-03-26 01:14:45.842 | s3 |
8299427032879314813 | 0 | 2025-03-26 01:14:45.842 | s3 |
7157463389961429886 | 1 | 2025-03-26 01:14:45.842 | s3 |
1990582535851245805 | 0 | 2025-03-26 01:14:45.842 | s3 |
1015508935961200350 | 0 | 2025-03-26 01:14:45.842 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.s_user_chatinfo
LIMIT 5;
Результат выполения кода SQL:
hk_user_id | chat_name | load_dt | load_src |
---|---|---|---|
5090262501023620704 | ValeryaTolmachev | 2025-03-26 01:14:53.075 | s3 |
8426887808008717333 | AndreyMendoza | 2025-03-26 01:14:53.075 | s3 |
7582338616463080818 | MaksimMadona | 2025-03-26 01:14:53.075 | s3 |
548606052795187288 | MarinaBissmark | 2025-03-26 01:14:53.075 | s3 |
8160700415674202812 | AntonyVanGog | 2025-03-26 01:14:53.075 | s3 |
Код SQL:
SELECT *
FROM STV2025011410__DWH.s_user_socdem
LIMIT 5;
Результат выполения кода SQL:
hk_user_id | country | age | load_dt | load_src |
---|---|---|---|---|
2300646145511893240 | Guyana | 33 | 2025-03-26 01:14:56.168 | s3 |
7778678683918014494 | Equatorial Guinea | 18 | 2025-03-26 01:14:56.168 | s3 |
4534469875556475545 | Colombia | 59 | 2025-03-26 01:14:56.168 | s3 |
5791143016230547252 | Costa Rica | 47 | 2025-03-26 01:14:56.168 | s3 |
8757047559942488888 | Guinea | 48 | 2025-03-26 01:14:56.168 | s3 |
7. Ответ бизнесу ¶
7.A. Пользователи какого возраста занимают топ-5 позиций по количеству сообщений в самых старых группах? ¶
Код SQL:
-- Ответ бизнесу на вопрос:
-- Пользователи какого возраста занимают топ-5 позиций по количеству сообщений в самых старых группах?
select age,
count(1)
from STV2025011410__DWH.s_user_socdem
WHERE hk_user_id IN (select hk_user_id
from STV2025011410__DWH.l_user_message
where hk_message_id in (select hk_message_id
from STV2025011410__DWH.l_groups_dialogs
where hk_group_id in (select hk_group_id
from STV2025011410__DWH.h_groups
order by registration_dt limit 10)))
GROUP BY age
ORDER BY count(1) DESC
LIMIT 5;
Результат выполения кода SQL:
age | count |
---|---|
22 | 808 |
20 | 795 |
19 | 789 |
21 | 778 |
18 | 776 |
7.B. Какова конверсия пользователей, вступивших в группы, в активных пользователей? ¶
Код SQL:
-- Ответ бизнесу на вопрос:
-- Какова конверсия пользователей, вступивших в группы в активных пользователей?
-- пользователи, вступившие
-- в 10 самых ранних созданных групп
WITH user_group_log AS (
SELECT hk_group_id,
hk_user_id
FROM STV2025011410__DWH.l_user_group_activity
WHERE hk_group_id IN ( SELECT hk_group_id
FROM STV2025011410__DWH.h_groups
ORDER BY registration_dt
LIMIT 10)
AND hk_l_user_group_activity IN ( SELECT hk_l_user_group_activity
FROM STV2025011410__DWH.s_auth_history
WHERE event = 'add')
GROUP BY hk_group_id, hk_user_id
),
-- пользователи, которые активны в группах
user_group_messages as (
SELECT igd.hk_group_id,
lum.hk_user_id
FROM STV2025011410__DWH.l_groups_dialogs igd
LEFT JOIN STV2025011410__DWH.h_dialogs hd ON hd.hk_message_id = igd.hk_message_id
LEFT JOIN STV2025011410__DWH.l_user_message lum ON lum.hk_message_id = igd.hk_message_id
GROUP BY hk_group_id, lum.hk_user_id
)
SELECT -- хэш идентификатора группы
ugl.hk_group_id,
-- количество пользователей, вступивших в группу
COUNT(DISTINCT ugl.hk_user_id) AS cnt_added_users,
-- количество участников группы, которые написали хотя бы одно сообщение
COUNT(DISTINCT ugm.hk_user_id) AS cnt_users_in_group_with_messages,
-- конверсию в первое сообщение из вступления в группу
COUNT(DISTINCT ugm.hk_user_id) / COUNT(DISTINCT ugl.hk_user_id) AS group_conversion
FROM user_group_log AS ugl
LEFT JOIN user_group_messages AS ugm ON ugm.hk_group_id = ugl.hk_group_id AND ugm.hk_user_id = ugl.hk_user_id
GROUP BY ugl.hk_group_id
ORDER BY COUNT(DISTINCT ugm.hk_user_id) / COUNT(DISTINCT ugl.hk_user_id) DESC
;
Результат выполения кода SQL:
hk_group_id | cnt_added_users | cnt_users_in_group_with_messages | group_conversion |
---|---|---|---|
9183043445192227260 | 709 | 464 | 0.654442877291960508 |
7174329635764732197 | 932 | 603 | 0.646995708154506438 |
4350425024258480878 | 845 | 539 | 0.637869822485207101 |
2461736748292367987 | 686 | 437 | 0.637026239067055394 |
206904954090724337 | 866 | 551 | 0.636258660508083141 |
3214410852649090659 | 755 | 480 | 0.635761589403973510 |
7757992142189260835 | 378 | 238 | 0.629629629629629630 |
5568963519328366880 | 796 | 495 | 0.621859296482412060 |
7279971728630971062 | 288 | 179 | 0.621527777777777778 |
6014017525933240454 | 619 | 382 | 0.617124394184168013 |
Выводы из ответов бизнесу ¶
Вопрос 1. Пользователи какого возраста занимают топ-5 позиций по количеству сообщений в самых старых группах?
Вывод. В самых старых группах топ-5 позиций по количеству сообщений занимают пользователи в возрасте 18-22 лет. Количество сообщений в них колеблется от 776 до 808.
Вопрос 2. Какова конверсия пользователей, вступивших в группы, в активных пользователей?
Вывод. Максимальная конверсия пользователей, вступивших в группы, в активных пользователей составляет 65.44%. Этот максимум достигнут в группе с HASH-идентификатором 9183043445192227260
. В нее вступило 709 пользователей, из которых 464 стали активными. Замыкает топ-10 группа с HASH-идентификатором 6014017525933240454
и конверсией 61.71%.