DAG: airflow_tv_ls_recall

schedule: 00 07 * * *


airflow_tv_ls_recall

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from datetime import datetime, timedelta
import pendulum
import pytz

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

tz = pytz.timezone('Asia/Shanghai')

default_args = {
    'owner': 'LIUBING',
    'start_date': pendulum.datetime(year=2024, month=4, day=23).astimezone(tz),
    'depends_on_past': False,
    'email': ['liubing03@tcl.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=10),
}

dag = DAG(
    dag_id='airflow_tv_ls_recall',
    description='',
    default_args=default_args,
    schedule_interval='00 07 * * *', 
)

_config = {
    'conf': {
        'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
        "spark.dynamicAllocation.minExecutors":6,
        "spark.dynamicAllocation.maxExecutors":6,
        "spark.yarn.executor.memoryOverhead":2648,
        "spark.driver.maxResultSize": "12g",
        "spark.yarn.maxAppAttempts": 1,
        "spark.network.timeout": 1000000,
        "spark.executor.heartbeatInterval" : 1000000
    },
    'executor_cores': 4,
    'executor_memory': '8g',
    'name': '{{ task_instance.task_id }}',
    'driver_memory': '8g',
}


get_tv_sp_traning_data_to_local = SparkSubmitOperator(
    task_id='get_tv_sp_traning_data_to_local',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/1.get_train_data_180days_to_local.py',
    conn_id = "spark_local",
    provide_context=True,
)

get_tv_sp_traning_data_filter_to_local = SparkSubmitOperator(
    task_id='get_tv_sp_traning_data_filter_to_local',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/1_1.get_train_data_180days_filter_to_local.py',
    conn_id = "spark_local",
    provide_context=True,
)
get_tv_sp_traning_data_to_local_ls = SparkSubmitOperator(
    task_id='get_tv_sp_traning_data_to_local_ls',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/1.get_train_data_180days_ls_to_local.py',
    conn_id = "spark_local",
    provide_context=True,
)
get_tv_sp_traning_data_to_local_ls_filter = SparkSubmitOperator(
    task_id='get_tv_sp_traning_data_to_local_ls_filter',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/1_1.get_train_data_180days_ls_filter_to_local.py',
    conn_id = "spark_local",
    provide_context=True,
)

filter_aid = BashOperator(
    task_id='filter_aid',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/0.cold_start_user.py'
)
aid_up = BashOperator(
    task_id='aid_up',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/0.aid_up.py'
)

update_media_type = BashOperator(
    task_id='update_media_type',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/1.get_media_type.py'
)

run_sdm = BashOperator(
    task_id='run_sdm',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/2.run_sdm.py'
)
run_sdm_01 = BashOperator(
    task_id='run_sdm_01',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/2.run_sdm_01.py'
)
run_sdm_38 = BashOperator(
    task_id='run_sdm_38',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/2.run_sdm_38.py'
)
# sim_paid = BashOperator(
#     task_id='sim_paid',
#     dag=dag,
#     bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/3.paid_user_sims.py'  
# )
# merge_recall = BashOperator(
#     task_id='merge_recall',
#     dag=dag,
#     bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/4.merge_recall.py'
# )
merge_recall_1 = BashOperator(
    task_id='merge_recall_1',
    dag=dag,
    bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/4.merge_recall.py'
)
upload_recall_to_hive = SparkSubmitOperator(
    task_id='upload_recall_to_hive',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/5.recall_to_hive.py',
    conn_id = "spark_local",
)

# #------------MV 0,1,2----------------

# update_media_mv = BashOperator(
#     task_id='update_media_mv',
#     dag=dag,
#     bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/1.get_media_type.py'    
# )
# get_mv_training_data_to_local = SparkSubmitOperator(
#     task_id='get_mv_training_data_to_local',
#     dag=dag,
#     **_config,
#     application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_training_data_to_local.py',
#     conn_id = "spark_local",
#     provide_context=True,
# )
# get_mv_training_data_to_local_ls = SparkSubmitOperator(
#     task_id='get_mv_training_data_to_local_ls',
#     dag=dag,
#     **_config,
#     application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_training_data_ls_to_local.py',
#     conn_id = "spark_local",
#     provide_context=True,
# )

# run_sdm_mv = BashOperator(
#     task_id='run_sdm_mv',
#     dag=dag,
#     bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/2.run_sdm.py'
# )
# # sim_paid_mv = BashOperator(
# #     task_id='sim_paid_mv',
# #     dag=dag,
# #     bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/3.paid_user_sims.py'  
# # )
# merge_recall_mv = BashOperator(
#     task_id='merge_recall_mv',
#     dag=dag,
#     bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/4.merge_recall.py'
# )
# upload_recall_to_hive_mv = SparkSubmitOperator(
#     task_id='upload_recall_to_hive_mv',
#     dag=dag,
#     **_config,
#     application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/5.recall_to_hive.py',
#     conn_id = "spark_cluster",
# )
# # = SparkSubmitOperator(
# #     task_id='upload_filter_recall_to_hive_mv',
# #     dag=dag,
# #     **_config,
# #     application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/6.filter_recall_to_hive.py',
# #     conn_id = "spark_cluster",
# # )

update_media_type >> aid_up >>filter_aid>> [get_tv_sp_traning_data_to_local,get_tv_sp_traning_data_to_local_ls]>> get_tv_sp_traning_data_filter_to_local>> get_tv_sp_traning_data_to_local_ls_filter>> run_sdm>> run_sdm_01>> run_sdm_38>>merge_recall_1>>upload_recall_to_hive