Предварительный анализ проекта Data Lake для соцсети показал наличие очень больших объемов сырых данных. Подобные объемы потребуются в продакшене. Однако, в процессе разработки озера данных и его отладки требуется использование сэмпла, который будет кратно меньше и достаточно показательным для всего набора данных.
In [1]:
# Вариант обхода ошибки о несуществующем пути к файлам
# Использовать в начале тетрадки или файла python до импорта PySpark
# импортируем модуль os, который даёт возможность работы с ОС
# указание os.environ[…] настраивает окружение
import os
# прописываем пути
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['JAVA_HOME']='/usr'
os.environ['SPARK_HOME'] ='/usr/lib/spark'
os.environ['PYTHONPATH'] ='/usr/local/lib/python3.8'
# Из урока в качестве дополнения, если не работает
# Использовать в начале тетрадки или файла python до импорта PySpark
import findspark
findspark.init()
findspark.find()
Out[1]:
'/usr/lib/spark'
In [2]:
# Системная библиотека
# для обеспечения работы функции main()
import sys
# Библиотека PySpark
import pyspark
# Создание сессий PySpark
from pyspark.sql import SparkSession
# Конвертация типов в PySpark
from pyspark.sql.types import StructType, StructField, FloatType, StringType
# импортируем оконную функцию и модуль Spark Functions
from pyspark.sql.window import Window
# Функции работы с датафреймами Spark
import pyspark.sql.functions as F
# Функции для работы с датой
from datetime import datetime, timedelta
In [3]:
import pandas as pd
import math
In [4]:
spark = SparkSession \
.builder \
.master('local') \
.appName('realization_07_07_02_00') \
.getOrCreate()
spark
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2025-06-12 19:20:48,580 WARN util.Utils: Your hostname, fhmm7d6esjai70o9e0fr resolves to a loopback address: 127.0.1.1; using 172.16.0.6 instead (on interface eth0) 2025-06-12 19:20:48,581 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2025-06-12 19:20:50,210 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Out[4]:
SparkSession — in-memory
Таблица событий проекта, дополненная гео-данными¶
In [8]:
# Просмотр директории таблицы событий проекта, дополненная гео-данными
#! hdfs dfs -ls /user/master/data/geo/events/date=2022-01-01
In [9]:
# Таблица событий проекта, дополненная гео-данными
# + Извлечение сэмпла из датафрейма событий для облегчения расчетов проекта
# + Оптимизация после сэмплироваия (партицироваие и кэширование)
event_sample = spark \
.read.parquet('/user/master/data/geo/events') \
.sample(withReplacement=False, fraction=.0001, seed=42) \
.repartition('date').cache()
# Таблица целиком
event_sample.show(5)
2025-06-12 19:21:12,302 WARN datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance. [Stage 2:====================================================>(1467 + 1) / 1468]
+--------------------+------------+-------------------+------------------+----------+ | event| event_type| lat| lon| date| +--------------------+------------+-------------------+------------------+----------+ |[,, 2022-03-28 18...|subscription|-37.768011260910846|145.42977177931309|2022-03-28| |[,, 2022-03-28 09...|subscription|-31.270437980985538|115.92680612778504|2022-03-28| |[,, 2022-03-28 00...| reaction|-22.788926871491185|150.80866551705867|2022-03-28| |[,, 2022-03-28 21...| reaction| -22.91876910985198|151.43083424826006|2022-03-28| |[,, 2022-03-28 14...| reaction| null| null|2022-03-28| +--------------------+------------+-------------------+------------------+----------+ only showing top 5 rows
In [10]:
# Удаление ранее записанного сэмпла в личной директории (закомментитть, если не нужно)
! hdfs dfs -rm -r /user/microsegment/events_sample
Deleted /user/microsegment/events_sample
In [11]:
# Запись оптимизированного сэмпла датафрейма событий в свою директорию
# для дальнейшего чтения из нее
event_sample.write.parquet('/user/microsegment/events_sample')
# Проверка записи сэмпла
event_sample = spark.read.parquet('/user/microsegment/events_sample')
event_sample.show(5)
+--------------------+----------+-------------------+------------------+----------+ | event|event_type| lat| lon| date| +--------------------+----------+-------------------+------------------+----------+ |[,, 2022-05-24 20...| reaction| -37.0661314704226|143.96756707671318|2022-05-24| |[,, 2022-05-24 09...| reaction| -16.50448267872195|146.29769353191838|2022-05-24| |[,, 2022-05-24 10...| reaction| -27.13975389330464| 153.3100548392131|2022-05-24| |[,, 2022-05-24 03...| reaction| -34.44767721112111|138.79707015486716|2022-05-24| |[,, 2022-05-24 14...| reaction|-31.642042165822073| 116.6721962048457|2022-05-24| +--------------------+----------+-------------------+------------------+----------+ only showing top 5 rows
In [13]:
# Таблица событий проекта, только с гео-данными
# + Извлечение сэмпла из датафрейма событий для облегчения расчетов проекта
# + Оптимизация после сэмплироваия (партицироваие и кэширование)
event_onlywithgeo_sample = spark \
.read.parquet('/user/master/data/geo/events') \
.where('lat IS NOT NULL').where('lon IS NOT NULL') \
.sample(withReplacement=False, fraction=.0001, seed=42) \
.repartition('date').cache()
# Таблица целиком
event_onlywithgeo_sample.show(5)
[Stage 18:===================================================>(1467 + 1) / 1468]
+--------------------+------------+-------------------+------------------+----------+ | event| event_type| lat| lon| date| +--------------------+------------+-------------------+------------------+----------+ |[,, 2022-03-28 13...|subscription|-37.069071494951544|144.05667233743122|2022-03-28| |[,, 2022-03-28 19...|subscription|-19.238485381561514|147.63630816797908|2022-03-28| |[,, 2022-03-28 14...|subscription|-26.945005121532045|153.31517744462877|2022-03-28| |[,, 2022-03-28 12...|subscription|-27.027993652122817|153.64889192912614|2022-03-28| |[,, 2022-03-28 10...|subscription| -38.10304646072987|145.29878402667848|2022-03-28| +--------------------+------------+-------------------+------------------+----------+ only showing top 5 rows
In [14]:
# Удаление ранее записанного сэмпла в личной директории (закомментитть, если не нужно)
! hdfs dfs -rm -r /user/microsegment/event_onlywithgeo_sample
Deleted /user/microsegment/event_onlywithgeo_sample
In [15]:
# Запись оптимизированного сэмпла датафрейма событий в свою директорию
# для дальнейшего чтения из нее
event_onlywithgeo_sample.write.parquet('/user/microsegment/event_onlywithgeo_sample')
# Проверка записи сэмпла
event_onlywithgeo_sample = spark.read.parquet('/user/microsegment/event_onlywithgeo_sample')
event_onlywithgeo_sample.show(5)
+--------------------+------------+-------------------+------------------+----------+ | event| event_type| lat| lon| date| +--------------------+------------+-------------------+------------------+----------+ |[,, 2022-03-07 14...|subscription| -33.58639245694319|151.22313935814825|2022-03-07| |[,, 2022-03-07 14...|subscription| -27.35511178355465| 152.5469988129458|2022-03-07| |[[110467], 116064...| message| -36.5739088344417| 144.2930383084192|2022-03-07| |[[17789], 186313,...| message|-27.769410372844728|154.28716274752682|2022-01-28| |[,,,, can you /jo...| message|-11.756350419171689|131.69608278290468|2022-01-28| +--------------------+------------+-------------------+------------------+----------+ only showing top 5 rows
In [16]:
# Оптимизация после сэмплироваия
# Количество ядер
#total_cpu = spark.sparkContext.defaultParallelism
# repartition - Изменеие количества партиций с перемешиванием даных (shuffle)
# coalesce - Аналогично repartition, но без shuffle
# cache - Кэширование данных
#event_geo_sample = event_geo_sample.coalesce(total_cpu * 4)#.cache()
#event_geo_sample = event_geo_sample.repartition('date').cache()
#event_geo_sample = event_geo_sample.cache()
Файл с гео-данными городов¶
ВАЖНО!!! В исходном файле требуется разграничитель целой и дробной части с запятой (,
) надо заменить на точку (.
).
In [17]:
# Файл с гео-данными городов
geo_csv = pd.read_csv('geo.csv', sep=';')
geo_csv
Out[17]:
id | city | lat | lng | |
---|---|---|---|---|
0 | 1 | Sydney | -33.8650 | 151.2094 |
1 | 2 | Melbourne | -37.8136 | 144.9631 |
2 | 3 | Brisbane | -27.4678 | 153.0281 |
3 | 4 | Perth | -31.9522 | 115.8589 |
4 | 5 | Adelaide | -34.9289 | 138.6011 |
5 | 6 | Gold Coast | -28.0167 | 153.4000 |
6 | 7 | Cranbourne | -38.0996 | 145.2834 |
7 | 8 | Canberra | -35.2931 | 149.1269 |
8 | 9 | Newcastle | -32.9167 | 151.7500 |
9 | 10 | Wollongong | -34.4331 | 150.8831 |
10 | 11 | Geelong | -38.1500 | 144.3500 |
11 | 12 | Hobart | -42.8806 | 147.3250 |
12 | 13 | Townsville | -19.2564 | 146.8183 |
13 | 14 | Ipswich | -27.6167 | 152.7667 |
14 | 15 | Cairns | -16.9303 | 145.7703 |
15 | 16 | Toowoomba | -27.5667 | 151.9500 |
16 | 17 | Darwin | -12.4381 | 130.8411 |
17 | 18 | Ballarat | -37.5500 | 143.8500 |
18 | 19 | Bendigo | -36.7500 | 144.2667 |
19 | 20 | Launceston | -41.4419 | 147.1450 |
20 | 21 | Mackay | -21.1411 | 149.1861 |
21 | 22 | Rockhampton | -23.3750 | 150.5117 |
22 | 23 | Maitland | -32.7167 | 151.5500 |
23 | 24 | Bunbury | -33.3333 | 115.6333 |
In [18]:
# Копирование файла c гео-информацией городов
! hdfs dfs -rm /user/microsegment/geo.csv
! hdfs dfs -put geo.csv /user/microsegment/geo.csv
! hdfs dfs -ls /user/microsegment/geo.csv
Deleted /user/microsegment/geo.csv -rw-r--r-- 1 microsegment microsegment 704 2025-06-12 21:17 /user/microsegment/geo.csv
In [ ]: