Microsegment.ru
  • Главная страница
  • О проекте
  • Портфолио
  • Блог

Сэмплирование данных проекта Data Lake для соцсети

Предварительный анализ проекта Data Lake для соцсети показал наличие очень больших объемов сырых данных. Подобные объемы потребуются в продакшене. Однако, в процессе разработки озера данных и его отладки требуется использование сэмпла, который будет кратно меньше и достаточно показательным для всего набора данных.

In [1]:
# Вариант обхода ошибки о несуществующем пути к файлам 
# Использовать в начале тетрадки или файла python до импорта PySpark

# импортируем модуль os, который даёт возможность работы с ОС
# указание os.environ[…] настраивает окружение
import os 

# прописываем пути
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['JAVA_HOME']='/usr'
os.environ['SPARK_HOME'] ='/usr/lib/spark'
os.environ['PYTHONPATH'] ='/usr/local/lib/python3.8'

# Из урока в качестве дополнения, если не работает
# Использовать в начале тетрадки или файла python до импорта PySpark
import findspark
findspark.init()
findspark.find()
Out[1]:
'/usr/lib/spark'
In [2]:
# Системная библиотека 
# для обеспечения работы функции main()
import sys

# Библиотека PySpark
import pyspark
# Создание сессий PySpark
from pyspark.sql import SparkSession
# Конвертация типов в PySpark
from pyspark.sql.types import StructType, StructField, FloatType, StringType

# импортируем оконную функцию и модуль Spark Functions
from pyspark.sql.window import Window 
# Функции работы с датафреймами Spark
import pyspark.sql.functions as F

# Функции для работы с датой
from datetime import datetime, timedelta
In [3]:
import pandas as pd
import math
In [4]:
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('realization_07_07_02_00') \
    .getOrCreate()

spark
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2025-06-12 19:20:48,580 WARN util.Utils: Your hostname, fhmm7d6esjai70o9e0fr resolves to a loopback address: 127.0.1.1; using 172.16.0.6 instead (on interface eth0)
2025-06-12 19:20:48,581 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2025-06-12 19:20:50,210 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Out[4]:

SparkSession — in-memory

SparkContext

Spark UI

Version
v3.0.2
Master
local
AppName
realization_07_07_02_00

Таблица событий проекта, дополненная гео-данными¶

In [8]:
# Просмотр директории таблицы событий проекта, дополненная гео-данными
#! hdfs dfs -ls /user/master/data/geo/events/date=2022-01-01
In [9]:
# Таблица событий проекта, дополненная гео-данными
# + Извлечение сэмпла из датафрейма событий для облегчения расчетов проекта
# + Оптимизация после сэмплироваия (партицироваие и кэширование)
event_sample = spark \
    .read.parquet('/user/master/data/geo/events') \
    .sample(withReplacement=False, fraction=.0001, seed=42) \
    .repartition('date').cache()
# Таблица целиком
event_sample.show(5)
2025-06-12 19:21:12,302 WARN datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
[Stage 2:====================================================>(1467 + 1) / 1468]
+--------------------+------------+-------------------+------------------+----------+
|               event|  event_type|                lat|               lon|      date|
+--------------------+------------+-------------------+------------------+----------+
|[,, 2022-03-28 18...|subscription|-37.768011260910846|145.42977177931309|2022-03-28|
|[,, 2022-03-28 09...|subscription|-31.270437980985538|115.92680612778504|2022-03-28|
|[,, 2022-03-28 00...|    reaction|-22.788926871491185|150.80866551705867|2022-03-28|
|[,, 2022-03-28 21...|    reaction| -22.91876910985198|151.43083424826006|2022-03-28|
|[,, 2022-03-28 14...|    reaction|               null|              null|2022-03-28|
+--------------------+------------+-------------------+------------------+----------+
only showing top 5 rows

                                                                                
In [10]:
# Удаление ранее записанного сэмпла в личной директории (закомментитть, если не нужно)
! hdfs dfs -rm -r /user/microsegment/events_sample
Deleted /user/microsegment/events_sample
In [11]:
# Запись оптимизированного сэмпла датафрейма событий в свою директорию
# для дальнейшего чтения из нее
event_sample.write.parquet('/user/microsegment/events_sample')
# Проверка записи сэмпла
event_sample = spark.read.parquet('/user/microsegment/events_sample')
event_sample.show(5)
                                                                                
+--------------------+----------+-------------------+------------------+----------+
|               event|event_type|                lat|               lon|      date|
+--------------------+----------+-------------------+------------------+----------+
|[,, 2022-05-24 20...|  reaction|  -37.0661314704226|143.96756707671318|2022-05-24|
|[,, 2022-05-24 09...|  reaction| -16.50448267872195|146.29769353191838|2022-05-24|
|[,, 2022-05-24 10...|  reaction| -27.13975389330464| 153.3100548392131|2022-05-24|
|[,, 2022-05-24 03...|  reaction| -34.44767721112111|138.79707015486716|2022-05-24|
|[,, 2022-05-24 14...|  reaction|-31.642042165822073| 116.6721962048457|2022-05-24|
+--------------------+----------+-------------------+------------------+----------+
only showing top 5 rows

In [13]:
# Таблица событий проекта, только с гео-данными
# + Извлечение сэмпла из датафрейма событий для облегчения расчетов проекта
# + Оптимизация после сэмплироваия (партицироваие и кэширование)
event_onlywithgeo_sample = spark \
    .read.parquet('/user/master/data/geo/events') \
    .where('lat IS NOT NULL').where('lon IS NOT NULL') \
    .sample(withReplacement=False, fraction=.0001, seed=42) \
    .repartition('date').cache()
# Таблица целиком
event_onlywithgeo_sample.show(5)
[Stage 18:===================================================>(1467 + 1) / 1468]
+--------------------+------------+-------------------+------------------+----------+
|               event|  event_type|                lat|               lon|      date|
+--------------------+------------+-------------------+------------------+----------+
|[,, 2022-03-28 13...|subscription|-37.069071494951544|144.05667233743122|2022-03-28|
|[,, 2022-03-28 19...|subscription|-19.238485381561514|147.63630816797908|2022-03-28|
|[,, 2022-03-28 14...|subscription|-26.945005121532045|153.31517744462877|2022-03-28|
|[,, 2022-03-28 12...|subscription|-27.027993652122817|153.64889192912614|2022-03-28|
|[,, 2022-03-28 10...|subscription| -38.10304646072987|145.29878402667848|2022-03-28|
+--------------------+------------+-------------------+------------------+----------+
only showing top 5 rows

                                                                                
In [14]:
# Удаление ранее записанного сэмпла в личной директории (закомментитть, если не нужно)
! hdfs dfs -rm -r /user/microsegment/event_onlywithgeo_sample
Deleted /user/microsegment/event_onlywithgeo_sample
In [15]:
# Запись оптимизированного сэмпла датафрейма событий в свою директорию
# для дальнейшего чтения из нее
event_onlywithgeo_sample.write.parquet('/user/microsegment/event_onlywithgeo_sample')
# Проверка записи сэмпла
event_onlywithgeo_sample = spark.read.parquet('/user/microsegment/event_onlywithgeo_sample')
event_onlywithgeo_sample.show(5)
                                                                                
+--------------------+------------+-------------------+------------------+----------+
|               event|  event_type|                lat|               lon|      date|
+--------------------+------------+-------------------+------------------+----------+
|[,, 2022-03-07 14...|subscription| -33.58639245694319|151.22313935814825|2022-03-07|
|[,, 2022-03-07 14...|subscription| -27.35511178355465| 152.5469988129458|2022-03-07|
|[[110467], 116064...|     message|  -36.5739088344417| 144.2930383084192|2022-03-07|
|[[17789], 186313,...|     message|-27.769410372844728|154.28716274752682|2022-01-28|
|[,,,, can you /jo...|     message|-11.756350419171689|131.69608278290468|2022-01-28|
+--------------------+------------+-------------------+------------------+----------+
only showing top 5 rows

In [16]:
# Оптимизация после сэмплироваия
# Количество ядер
#total_cpu = spark.sparkContext.defaultParallelism
# repartition - Изменеие количества партиций с перемешиванием даных (shuffle)
# coalesce - Аналогично repartition, но без shuffle
# cache - Кэширование данных
#event_geo_sample = event_geo_sample.coalesce(total_cpu * 4)#.cache()
#event_geo_sample = event_geo_sample.repartition('date').cache()
#event_geo_sample = event_geo_sample.cache()

Файл с гео-данными городов¶

ВАЖНО!!! В исходном файле требуется разграничитель целой и дробной части с запятой (,) надо заменить на точку (.).

In [17]:
# Файл с гео-данными городов
geo_csv = pd.read_csv('geo.csv', sep=';')
geo_csv
Out[17]:
id city lat lng
0 1 Sydney -33.8650 151.2094
1 2 Melbourne -37.8136 144.9631
2 3 Brisbane -27.4678 153.0281
3 4 Perth -31.9522 115.8589
4 5 Adelaide -34.9289 138.6011
5 6 Gold Coast -28.0167 153.4000
6 7 Cranbourne -38.0996 145.2834
7 8 Canberra -35.2931 149.1269
8 9 Newcastle -32.9167 151.7500
9 10 Wollongong -34.4331 150.8831
10 11 Geelong -38.1500 144.3500
11 12 Hobart -42.8806 147.3250
12 13 Townsville -19.2564 146.8183
13 14 Ipswich -27.6167 152.7667
14 15 Cairns -16.9303 145.7703
15 16 Toowoomba -27.5667 151.9500
16 17 Darwin -12.4381 130.8411
17 18 Ballarat -37.5500 143.8500
18 19 Bendigo -36.7500 144.2667
19 20 Launceston -41.4419 147.1450
20 21 Mackay -21.1411 149.1861
21 22 Rockhampton -23.3750 150.5117
22 23 Maitland -32.7167 151.5500
23 24 Bunbury -33.3333 115.6333
In [18]:
# Копирование файла c гео-информацией городов
! hdfs dfs -rm /user/microsegment/geo.csv
! hdfs dfs -put geo.csv /user/microsegment/geo.csv
! hdfs dfs -ls /user/microsegment/geo.csv
Deleted /user/microsegment/geo.csv
-rw-r--r--   1 microsegment microsegment        704 2025-06-12 21:17 /user/microsegment/geo.csv
In [ ]:
 

Политика конфиденциальности

Продолжая использовать данный сайт вы подтверждаете свое согласие с условиями его политики конфиденциальности. Подробнее…




Администрация и владельцы данного информационного ресурса не несут ответственности за возможные последствия, связанные с использованием информации, размещенной на нем.


Все права защищены. При копировании материалов сайта обязательно указывать ссылку на © Microsegment.ru (2020-2025)