DAG: airflow_mv_ls_recall

schedule: 40 08 * * *


airflow_mv_ls_recall

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from datetime import datetime, timedelta
import pendulum
import pytz

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

tz = pytz.timezone('Asia/Shanghai')

default_args = {
    'owner': 'LIUBING',
    'start_date': pendulum.datetime(year=2024, month=6, day=25).astimezone(tz),
    'depends_on_past': False,
    'email': ['liubing03@tcl.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=10),
}

dag = DAG(
    dag_id='airflow_mv_ls_recall',
    description='',
    default_args=default_args,
    schedule_interval='40 08 * * *', 
)

_config = {
    'conf': {
        'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
        "spark.dynamicAllocation.minExecutors":6,
        "spark.dynamicAllocation.maxExecutors":6,
        "spark.yarn.executor.memoryOverhead":2648,
        "spark.driver.maxResultSize": "12g",
        "spark.yarn.maxAppAttempts": 1,
        "spark.network.timeout": 1000000,
        "spark.executor.heartbeatInterval" : 1000000
    },
    'executor_cores': 4,
    'executor_memory': '8g',
    'name': '{{ task_instance.task_id }}',
    'driver_memory': '8g',
}



#----------MV 0,1,2----------------
aid_up = BashOperator(
    task_id='aid_up',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/0.aid_up.py',
)

filter_aid_cold = BashOperator(
    task_id='filter_aid_cold',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/0.cold_start_user.py',
)

update_media_mv = BashOperator(
    task_id='update_media_mv',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_media_type.py '    
)

get_mv_training_data_to_local = SparkSubmitOperator(
    task_id='get_mv_training_data_to_local',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_training_data_to_local.py',
    conn_id = "spark_local",
    provide_context=True,
)
get_mv_training_data_to_local_ls = SparkSubmitOperator(
    task_id='get_mv_training_data_to_local_ls',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_training_data_ls_to_local.py',
    conn_id = "spark_local",
    provide_context=True,
)

run_sdm_mv = BashOperator(
    task_id='run_sdm_mv',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/2.run_sdm.py'
)

run_sdm_mv_0178 = BashOperator(
    task_id='run_sdm_mv_0178',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/2.run_sdm_0178.py '
)
merge_recall_mv = BashOperator(
    task_id='merge_recall_mv',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/4.merge_recall_1.py'
)
upload_recall_to_hive_mv = SparkSubmitOperator(
    task_id='upload_recall_to_hive_mv',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/5.recall_to_hive.py',
    conn_id = "spark_cluster",
)
# = SparkSubmitOperator(
#     task_id='upload_filter_recall_to_hive_mv',
#     dag=dag,
#     **_config,
#     application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/6.filter_recall_to_hive.py',
#     conn_id = "spark_cluster",
# )
update_media_mv >> [get_mv_training_data_to_local,get_mv_training_data_to_local_ls]>> run_sdm_mv >> run_sdm_mv_0178 >> merge_recall_mv >> upload_recall_to_hive_mv
update_media_mv>> [aid_up,filter_aid_cold]>>merge_recall_mv