1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190 | from datetime import datetime, timedelta
import pendulum
import pytz
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'LIUBING',
'start_date': pendulum.datetime(year=2024, month=4, day=23).astimezone(tz),
'depends_on_past': False,
'email': ['liubing03@tcl.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=10),
}
dag = DAG(
dag_id='airflow_tv_ls_recall',
description='',
default_args=default_args,
schedule_interval='00 07 * * *',
)
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":6,
"spark.dynamicAllocation.maxExecutors":6,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "12g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '8g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '8g',
}
get_tv_sp_traning_data_to_local = SparkSubmitOperator(
task_id='get_tv_sp_traning_data_to_local',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/1.get_train_data_180days_to_local.py',
conn_id = "spark_local",
provide_context=True,
)
get_tv_sp_traning_data_filter_to_local = SparkSubmitOperator(
task_id='get_tv_sp_traning_data_filter_to_local',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/1_1.get_train_data_180days_filter_to_local.py',
conn_id = "spark_local",
provide_context=True,
)
get_tv_sp_traning_data_to_local_ls = SparkSubmitOperator(
task_id='get_tv_sp_traning_data_to_local_ls',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/1.get_train_data_180days_ls_to_local.py',
conn_id = "spark_local",
provide_context=True,
)
get_tv_sp_traning_data_to_local_ls_filter = SparkSubmitOperator(
task_id='get_tv_sp_traning_data_to_local_ls_filter',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/1_1.get_train_data_180days_ls_filter_to_local.py',
conn_id = "spark_local",
provide_context=True,
)
filter_aid = BashOperator(
task_id='filter_aid',
dag=dag,
bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/0.cold_start_user.py'
)
aid_up = BashOperator(
task_id='aid_up',
dag=dag,
bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/0.aid_up.py'
)
update_media_type = BashOperator(
task_id='update_media_type',
dag=dag,
bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/1.get_media_type.py'
)
run_sdm = BashOperator(
task_id='run_sdm',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/2.run_sdm.py'
)
run_sdm_01 = BashOperator(
task_id='run_sdm_01',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/2.run_sdm_01.py'
)
run_sdm_38 = BashOperator(
task_id='run_sdm_38',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/2.run_sdm_38.py'
)
# sim_paid = BashOperator(
# task_id='sim_paid',
# dag=dag,
# bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/3.paid_user_sims.py'
# )
# merge_recall = BashOperator(
# task_id='merge_recall',
# dag=dag,
# bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/4.merge_recall.py'
# )
merge_recall_1 = BashOperator(
task_id='merge_recall_1',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/4.merge_recall.py'
)
upload_recall_to_hive = SparkSubmitOperator(
task_id='upload_recall_to_hive',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_test/5.recall_to_hive.py',
conn_id = "spark_local",
)
# #------------MV 0,1,2----------------
# update_media_mv = BashOperator(
# task_id='update_media_mv',
# dag=dag,
# bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_test/1.get_media_type.py'
# )
# get_mv_training_data_to_local = SparkSubmitOperator(
# task_id='get_mv_training_data_to_local',
# dag=dag,
# **_config,
# application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_training_data_to_local.py',
# conn_id = "spark_local",
# provide_context=True,
# )
# get_mv_training_data_to_local_ls = SparkSubmitOperator(
# task_id='get_mv_training_data_to_local_ls',
# dag=dag,
# **_config,
# application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_training_data_ls_to_local.py',
# conn_id = "spark_local",
# provide_context=True,
# )
# run_sdm_mv = BashOperator(
# task_id='run_sdm_mv',
# dag=dag,
# bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/2.run_sdm.py'
# )
# # sim_paid_mv = BashOperator(
# # task_id='sim_paid_mv',
# # dag=dag,
# # bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/3.paid_user_sims.py'
# # )
# merge_recall_mv = BashOperator(
# task_id='merge_recall_mv',
# dag=dag,
# bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/4.merge_recall.py'
# )
# upload_recall_to_hive_mv = SparkSubmitOperator(
# task_id='upload_recall_to_hive_mv',
# dag=dag,
# **_config,
# application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/5.recall_to_hive.py',
# conn_id = "spark_cluster",
# )
# # = SparkSubmitOperator(
# # task_id='upload_filter_recall_to_hive_mv',
# # dag=dag,
# # **_config,
# # application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/6.filter_recall_to_hive.py',
# # conn_id = "spark_cluster",
# # )
update_media_type >> aid_up >>filter_aid>> [get_tv_sp_traning_data_to_local,get_tv_sp_traning_data_to_local_ls]>> get_tv_sp_traning_data_filter_to_local>> get_tv_sp_traning_data_to_local_ls_filter>> run_sdm>> run_sdm_01>> run_sdm_38>>merge_recall_1>>upload_recall_to_hive
|