1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 | from datetime import datetime, timedelta
import pendulum
import pytz
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'LIUBING',
'start_date': pendulum.datetime(year=2024, month=5, day=14).astimezone(tz),
'depends_on_past': False,
'email': ['liubing03@tcl.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=10),
}
dag = DAG(
dag_id='airflow_xxl_ls_recall',
description='',
default_args=default_args,
schedule_interval='30 04 * * *',
)
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":6,
"spark.dynamicAllocation.maxExecutors":6,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "12g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '8g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '8g',
}
# kill_large_processes = BashOperator(
# task_id='kill_large_processes',
# ##
# bash_command="cd /data/gangyanyuan/scheduled_jobs /data/gangyanyuan/scheduled_jobs/kill-large-processes.sh ",
# dag=dag,
# )
# #---------jx_sp 0,1--------------------------
# get_xxl_traning_data_30days_to_local = SparkSubmitOperator(
# task_id='get_xxl_traning_data_30days_to_local',
# dag=dag,
# **_config,
# application= '/data/gangyanyuan/liubing/DeepMatch/scripts_xxl/2.get_training_data_30days_to_local.py',
# conn_id = "spark_local",
# provide_context=True,
# )
get_xxl_traning_data_180days_to_local = SparkSubmitOperator(
task_id='get_xxl_traning_data_180days_to_local',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_xxl/2.get_training_data_180days_to_local.py',
conn_id = "spark_local",
provide_context=True,
)
get_xxl_traning_data_180days_to_local_ls = SparkSubmitOperator(
task_id='get_xxl_traning_data_180days_to_local_ls',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_xxl/2.get_training_data_180days_to_local_ls.py',
conn_id = "spark_local",
provide_context=True,
)
update_media_xxl = BashOperator(
task_id='update_media_xxl',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_xxl/1.update_media_xxl.py'
)
update_media_xxl_all = BashOperator(
task_id='update_media_xxl_all',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_xxl/1.update_media_xxl_all.py'
)
run_sdm_12 = BashOperator(
task_id='run_sdm_12',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_xxl/3.run_sdm_1.py'
)
run_sdm_0789 = BashOperator(
task_id='run_sdm_0789',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_xxl/3.run_sdm_789.py'
)
run_sdm_34 = BashOperator(
task_id='run_sdm_34',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_xxl/3.run_sdm_34.py'
)
merge_recall = BashOperator(
task_id='merge_recall',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_xxl/4.merge_recall.py'
)
upload_recall_to_hive = SparkSubmitOperator(
task_id='upload_recall_to_hive',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_xxl/5.recall_to_hive.py',
conn_id = "spark_local",
)
merge_xxl = SparkSubmitOperator(
task_id='merge_xxl',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_xxl/6.merge_xxl.py',
conn_id = "spark_local",
)
get_xxl_traning_data_180days_to_local>>get_xxl_traning_data_180days_to_local_ls >> [update_media_xxl,update_media_xxl_all]>> run_sdm_12 >> run_sdm_0789 >> run_sdm_34>> merge_recall >> upload_recall_to_hive >>merge_xxl
|