1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 | from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'Terry',
'depends_on_past': False,
'start_date': pendulum.datetime(year=2025, month=3, day=26).astimezone(tz),
'email': ['tian23.li@tcl.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=8),
}
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":8,
"spark.dynamicAllocation.maxExecutors":8,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "8g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '8g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '8g',
}
dag = DAG(
dag_id='vod_rr',
description='vod_rr',
default_args=default_args,
schedule_interval='30 16 * * *',
)
update_vod_media_rr = BashOperator(
task_id='update_vod_media_rr',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script_rr && /data/dev/miniconda/envs/recsys2/bin/python 0_update_vod_media_rr.py",
depends_on_past =False,
dag=dag,
)
update_ldd_media_rr = BashOperator(
task_id='update_ldd_media_rr',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script_rr && /data/dev/miniconda/envs/recsys2/bin/python 0_1_update_ldd_media_rr.py",
depends_on_past =False,
dag=dag,
)
update_vod_content_embedding = BashOperator(
task_id='update_vod_content_embedding',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script_rr && /data/dev/miniconda/envs/easyrec/bin/python 1_update_vod_content_embedding.py",
depends_on_past =False,
dag=dag,
)
get_base_aid_cid_click_expose_count_ndays = SparkSubmitOperator(
task_id='get_base_aid_cid_click_expose_count_ndays_v2',
dag=dag,
**_config,
application= '/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/HandleRawLog/get_base_aid_cid_click_expose_count_ndays_v2.py',
conn_id = "spark_local",
)
get_base_aid_cid_click_expose_count_1_7_days_local = SparkSubmitOperator(
task_id='get_base_aid_cid_click_expose_count_1_7_days_local',
dag=dag,
**_config,
application= '/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/HandleDataToLocal/get_base_aid_cid_click_expose_count_1_7_days_rr_v2.py',
conn_id = "spark_local",
)
get_base_aid_cid_click_expose_count_ndays_ldd = SparkSubmitOperator(
task_id='get_base_aid_cid_click_expose_count_ndays_ldd',
dag=dag,
**_config,
application= '/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/HandleRawLog/get_base_aid_cid_click_expose_count_ndays_ldd.py',
conn_id = "spark_local",
)
get_base_aid_cid_click_expose_count_1_7_days_local_ldd = SparkSubmitOperator(
task_id='get_base_aid_cid_click_expose_count_1_7_days_local_ldd',
dag=dag,
**_config,
application= '/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/HandleDataToLocal/get_base_aid_cid_click_expose_count_1_7_days_rr_ldd.py',
conn_id = "spark_local",
)
py_root_dir="/data/gangyanyuan/recommender_system_data_processing_relevant_recommendation"
handle_doubaninfo_detail_v2 = SparkSubmitOperator(
task_id='handle_doubaninfo_detail_v2',
dag=dag,
**_config,
application= f'{py_root_dir}/HandleItemFeature/handle_doubaninfo_detail_v2.py',
conn_id = "spark_yarn_cluster",
)
### update media embedding
update_media_text_embedding_v2 = BashOperator(
task_id='update_media_text_embedding_v2',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/script && /data/dev/miniconda/envs/recsys2/bin/python /data/gangyanyuan/terry/terry-recsys-rr/script/update_media_text_embedding_v2.py",
depends_on_past =False,
dag=dag,
)
generate_training_label = BashOperator(
task_id='generate_training_label',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/script && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/script_2025/generate_training_label.py",
depends_on_past =False,
dag=dag,
)
feature_engineering = BashOperator(
task_id='feature_engineering',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/script && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/script_2025/feature_engineering_v2.py",
depends_on_past =False,
dag=dag,
)
generate_joint_features = BashOperator(
task_id='generate_joint_features',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/script && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/script_2025/generate_joint_features.py",
depends_on_past =False,
dag=dag,
)
# train_inttower = BashOperator(
# task_id='train_inttower',
# bash_command="cd /data/gangyanyuan/recommender_system_ranking_vod/vod-ranking/script && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/recommender_system_ranking_vod/vod-ranking/script/train_inttower.py",
# depends_on_past =False,
# dag=dag,
# )
update_vod_media_rr >> update_vod_content_embedding >> generate_training_label >> feature_engineering >> generate_joint_features
handle_doubaninfo_detail_v2 >> update_media_text_embedding_v2 >> get_base_aid_cid_click_expose_count_ndays >> get_base_aid_cid_click_expose_count_1_7_days_local >> get_base_aid_cid_click_expose_count_ndays_ldd >> get_base_aid_cid_click_expose_count_1_7_days_local_ldd
update_ldd_media_rr
|