1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 | from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'Terry',
'depends_on_past': False,
'start_date': pendulum.datetime(year=2025, month=1, day=12).astimezone(tz),
'email': ['tian23.li@tcl.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":8,
"spark.dynamicAllocation.maxExecutors":8,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "4g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '6g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '6g',
}
dag = DAG(
dag_id='multi_modal_movie',
description='multi-modal-movie',
default_args=default_args,
schedule_interval='00 08 * * *',
)
# movie data flow.
update_training_data_movie_tv_60days = BashOperator(
task_id='update_training_data_movie_tv_60days',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 2_1_update_training_data_movie_tv_60days.py --channel movie",
depends_on_past =False,
dag=dag,
)
# training and test dataset.
generate_diffurec_dataset_movie = BashOperator(
task_id='generate_diffurec_dataset_movie',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 3_generate_diffurec_dataset_movie_tv.py --dataset movie --channel movie",
depends_on_past =False,
dag=dag,
)
get_pretrained_text_embedding = BashOperator(
task_id='get_pretrained_text_embedding',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python get_pretrained_text_embedding.py --channel movie",
depends_on_past =False,
dag=dag,
)
# get_pretrained_image_embedding = BashOperator(
# task_id='get_pretrained_image_embedding',
# bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python get_pretrained_image_embedding.py --channel movie",
# depends_on_past =False,
# dag=dag,
# )
pca_text = BashOperator(
task_id='pca_text',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script && /data/dev/miniconda/envs/deepmatch/bin/python pca.py --dataset movie --text_feat",
depends_on_past =False,
dag=dag,
)
# train_diffurec_movie_text = BashOperator(
# task_id='train_diffurec_movie_text',
# bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/DiffuRec/src_v2 && /data/dev/miniconda/envs/pt1.0/bin/python main_daily_movie_tv.py --dataset movie --text_feat --epochs 15",
# depends_on_past =False,
# dag=dag,
# )
train_diffurec_movie = BashOperator(
task_id='train_diffurec_movie',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/DiffuRec/src_v2 && /data/dev/miniconda/envs/pt1.0/bin/python main_daily_movie_tv.py --dataset movie --epochs 15",
depends_on_past =False,
dag=dag,
)
generate_inference_dataset_movie = BashOperator(
task_id='generate_inference_dataset_movie',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 4_generate_inference_dataset_movie_tv.py --dataset movie --channel movie",
depends_on_past =False,
dag=dag,
)
generate_diffurec_recall_result_movie = BashOperator(
task_id='generate_diffurec_recall_result_movie',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/pt1.0/bin/python 7_generate_diffurec_recall_result_movie_tv_text.py --dataset movie",
depends_on_past =False,
dag=dag,
)
merge_xsimgcl_recall_movie = BashOperator(
task_id='merge_xsimgcl_recall_movie',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python merge_xsimgcl_recall_movie.py",
depends_on_past =False,
dag=dag,
)
save_movie_recall_to_hive = SparkSubmitOperator(
task_id='save_movie_recall_to_hive',
dag=dag,
**_config,
application= '/data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv/save_movie_recall_to_hive.py',
conn_id = "spark_local",
)
update_training_data_movie_tv_60days >> generate_diffurec_dataset_movie >> get_pretrained_text_embedding >> pca_text >> train_diffurec_movie >> generate_inference_dataset_movie >> generate_diffurec_recall_result_movie >> merge_xsimgcl_recall_movie >> save_movie_recall_to_hive
|