1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115 | from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'Terry',
'depends_on_past': False,
'start_date': pendulum.datetime(year=2024, month=9, day=22).astimezone(tz),
'email': ['tian23.li@tcl.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=8),
}
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":8,
"spark.dynamicAllocation.maxExecutors":8,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "4g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '6g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '6g',
}
dag = DAG(
dag_id='xsimgcl_recall_ldd',
description='xsimgcl_recall_ldd',
default_args=default_args,
schedule_interval='30 21 * * *',
)
xsimgcl_dir="/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl"
user_click_exposure_video_ldd = SparkSubmitOperator(
task_id='user_click_exposure_video_ldd',
dag=dag,
**_config,
application= f'{xsimgcl_dir}/HandleRawLog/user_click_exposure_video_ldd.py',
conn_id = "spark_yarn_cluster",
)
get_click_seq_30days_ldd_hive = SparkSubmitOperator(
task_id='get_click_seq_30days_ldd_hive',
dag=dag,
**_config,
application=f"{xsimgcl_dir}/HandleRawLog/get_user_click_session_seq_ndays_ldd.py",
conn_id = "spark_yarn_cluster",
)
get_click_seq_30days_ldd_local = SparkSubmitOperator(
task_id='get_click_seq_30days_ldd_local',
dag=dag,
**_config,
application=f"{xsimgcl_dir}/HandleDataToLocal/get_user_click_sess_seq_30days_ldd_rr.py",
conn_id = "spark_local",
)
# for top300
# generate_training_data_ldd_top300 = BashOperator(
# task_id='generate_training_data_ldd_top300',
# bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd && /data/dev/miniconda/envs/deepmatch/bin/python generate_training_data_xsimgcl_ldd_top300.py",
# depends_on_past =False,
# dag=dag,
# )
# xsimgcl_train_ldd_top300 = BashOperator(
# task_id='xsimgcl_train_ldd_top300',
# bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/QRec_Single_Channel && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/QRec_Single_Channel/train_xsimgcl.py",
# depends_on_past =False,
# dag=dag,
#)
# generate_xsimgcl_recall_ldd_top300 = BashOperator(
# task_id='generate_xsimgcl_recall_ldd_top300',
# bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd && /data/dev/miniconda/envs/deepmatch/bin/python generate_xsimgcl_recall_ldd_top300.py",
# depends_on_past =False,
# dag=dag,
# )
# for others
generate_training_data_ldd = BashOperator(
task_id='generate_training_data_ldd',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd && /data/dev/miniconda/envs/deepmatch/bin/python generate_training_data_xsimgcl_ldd.py",
depends_on_past =False,
dag=dag,
)
xsimgcl_train_ldd = BashOperator(
task_id='xsimgcl_train_ldd',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/QRec_Single_Channel && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/QRec_Single_Channel/train_xsimgcl.py",
depends_on_past =False,
dag=dag,
)
generate_xsimgcl_recall_ldd = BashOperator(
task_id='generate_xsimgcl_recall_ldd',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd && /data/dev/miniconda/envs/deepmatch/bin/python generate_xsimgcl_recall_ldd.py",
depends_on_past =False,
dag=dag,
)
save_ldd_recall_to_hive = SparkSubmitOperator(
task_id='save_ldd_recall_to_hive',
dag=dag,
**_config,
application= '/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd/save_ldd_recall_to_hive.py',
conn_id = "spark_local",
)
user_click_exposure_video_ldd >> get_click_seq_30days_ldd_hive >> get_click_seq_30days_ldd_local >> generate_training_data_ldd >> xsimgcl_train_ldd >> generate_xsimgcl_recall_ldd >> save_ldd_recall_to_hive
|