Microsegment.ru
  • Главная страница
  • О проекте
  • Портфолио
  • Блог

Файл datalake_hdfs_etl_dag.py проекта Data Lake для соцсети

Файл datalake_hdfs_etl_dag.py содержит код DAG для оркестрации данных с помощью Apache Airflow в проекте Data Lake для соцсети. DAG содержит 3 таска, каждый из которых связан с автоматизацией доставки данных в витрину:

  • Таск users_cities — первая витрина, представляющая информацию о действиях в разрезе пользователей.
  • Таск events_cities — вторая витрина, представляющая пользователей в разрезе зон.
  • Таск recommended_friends — третья витрина для рекомендаций друзьям.

Файл datalake_hdfs_etl_dag.py


import airflow
from datetime import timedelta
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
import os
from datetime import date, datetime

os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['JAVA_HOME'] = '/usr'
os.environ['SPARK_HOME'] ='/usr/lib/spark'
os.environ['PYTHONPATH'] ='/usr/local/lib/python3.8'

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2020, 1, 1)
}

dag_spark = DAG(
    dag_id = "datalake_hdfs_etl",
    default_args = default_args,
    schedule_interval = None,
)

users_cities = SparkSubmitOperator(
    task_id = 'users_cities',
    dag = dag_spark,
    application ='/lessons/scripts/users_cities.py' ,
    conn_id = 'yarn_spark',
    conf = {
        "spark.driver.maxResultSize": "20g"
    },
    executor_cores = 1,
    executor_memory = '4g'
)

events_cities = SparkSubmitOperator(
    task_id = 'events_cities',
    dag = dag_spark,
    application ='/lessons/scripts/events_cities.py' ,
    conn_id = 'yarn_spark',
    #conn_id = 'local',
    #application_args = [],
    conf = {
        "spark.driver.maxResultSize": "20g"
    },
    #executor_cores = 1,
    executor_memory = '4g'
)

recommended_friends = SparkSubmitOperator(
    task_id = 'recommended_friends',
    dag = dag_spark,
    application = '/lessons/scripts/recommended_friends.py' ,
    #conn_id = 'local',
    conn_id = 'yarn_spark',
    conf={
        "spark.driver.maxResultSize": "20g"
    },
    #Sexecutor_cores = 1,
    executor_memory = '4g'
)

[users_cities, events_cities, recommended_friends]

Политика конфиденциальности

Продолжая использовать данный сайт вы подтверждаете свое согласие с условиями его политики конфиденциальности. Подробнее…




Администрация и владельцы данного информационного ресурса не несут ответственности за возможные последствия, связанные с использованием информации, размещенной на нем.


Все права защищены. При копировании материалов сайта обязательно указывать ссылку на © Microsegment.ru (2020-2025)