1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 | from datetime import datetime, timedelta
import pendulum
import pytz
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'LIUBING',
'start_date': pendulum.datetime(year=2024, month=6, day=25).astimezone(tz),
'depends_on_past': False,
'email': ['liubing03@tcl.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=10),
}
dag = DAG(
dag_id='airflow_mv_ls_recall',
description='',
default_args=default_args,
schedule_interval='40 08 * * *',
)
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":6,
"spark.dynamicAllocation.maxExecutors":6,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "12g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '8g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '8g',
}
#----------MV 0,1,2----------------
aid_up = BashOperator(
task_id='aid_up',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/0.aid_up.py',
)
filter_aid_cold = BashOperator(
task_id='filter_aid_cold',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/0.cold_start_user.py',
)
update_media_mv = BashOperator(
task_id='update_media_mv',
dag=dag,
bash_command= '/data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_media_type.py '
)
get_mv_training_data_to_local = SparkSubmitOperator(
task_id='get_mv_training_data_to_local',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_training_data_to_local.py',
conn_id = "spark_local",
provide_context=True,
)
get_mv_training_data_to_local_ls = SparkSubmitOperator(
task_id='get_mv_training_data_to_local_ls',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/1.get_training_data_ls_to_local.py',
conn_id = "spark_local",
provide_context=True,
)
run_sdm_mv = BashOperator(
task_id='run_sdm_mv',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/2.run_sdm.py'
)
run_sdm_mv_0178 = BashOperator(
task_id='run_sdm_mv_0178',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/2.run_sdm_0178.py '
)
merge_recall_mv = BashOperator(
task_id='merge_recall_mv',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_movie/4.merge_recall_1.py'
)
upload_recall_to_hive_mv = SparkSubmitOperator(
task_id='upload_recall_to_hive_mv',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/5.recall_to_hive.py',
conn_id = "spark_cluster",
)
# = SparkSubmitOperator(
# task_id='upload_filter_recall_to_hive_mv',
# dag=dag,
# **_config,
# application= '/data/gangyanyuan/liubing/DeepMatch/scripts_movie/6.filter_recall_to_hive.py',
# conn_id = "spark_cluster",
# )
update_media_mv >> [get_mv_training_data_to_local,get_mv_training_data_to_local_ls]>> run_sdm_mv >> run_sdm_mv_0178 >> merge_recall_mv >> upload_recall_to_hive_mv
update_media_mv>> [aid_up,filter_aid_cold]>>merge_recall_mv
|