DAG: multi_interest_tv

schedule: 10 07 * * *


multi_interest_tv

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz

tz = pytz.timezone('Asia/Shanghai')

default_args = {
    'owner': 'Terry',
    'depends_on_past': False,
    'start_date': pendulum.datetime(year=2024, month=5, day=28).astimezone(tz),
    'email': ['tian23.li@tcl.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=8),
}

_config = {
    'conf': {
        'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
        "spark.dynamicAllocation.minExecutors":8,
        "spark.dynamicAllocation.maxExecutors":8,
        "spark.yarn.executor.memoryOverhead":2648,
        "spark.driver.maxResultSize": "4g",
        "spark.yarn.maxAppAttempts": 1,
        "spark.network.timeout": 1000000,
        "spark.executor.heartbeatInterval" : 1000000
    },
    'executor_cores': 4,
    'executor_memory': '6g',
    'name': '{{ task_instance.task_id }}',
    
    'driver_memory': '6g',
}

dag = DAG(
    dag_id='multi_interest_tv',
    description='multi-interest tv',
    default_args=default_args,
    schedule_interval='10 07 * * *',#30 05
)

get_vip_users = BashOperator(
    task_id='get_vip_users',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 0_get_vip_users.py",
    depends_on_past =False,
    dag=dag,
)
get_pretrained_text_embedding_vip = BashOperator(
    task_id='get_pretrained_text_embedding_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python get_pretrained_text_embedding_vip.py --channel tv",
    depends_on_past =False,
    dag=dag,
)
# get_pretrained_image_embedding_vip = BashOperator(
#     task_id='get_pretrained_image_embedding_vip',
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python get_pretrained_image_embedding_vip.py --channel tv",
#     depends_on_past =False,
#     dag=dag,
# )
update_training_data_movie_tv_60days = BashOperator(
    task_id='update_training_data_movie_tv_60days',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 2_1_update_training_data_movie_tv_60days.py --channel tv",
    depends_on_past =False,
    dag=dag,
)

generate_diffurec_dataset_tv = BashOperator(
    task_id='generate_diffurec_dataset_tv',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 3_generate_diffurec_dataset_movie_tv.py --dataset tv --channel tv",
    depends_on_past =False,
    dag=dag,
)
get_pretrained_text_embedding = BashOperator(
    task_id='get_pretrained_text_embedding',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python get_pretrained_text_embedding.py --channel tv",
    depends_on_past =False,
    dag=dag,
)
pca_text = BashOperator(
    task_id='pca_text',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script && /data/dev/miniconda/envs/deepmatch/bin/python pca.py --dataset tv --text_feat",
    depends_on_past =False,
    dag=dag,
)
train_diffurec_tv = BashOperator(
    task_id='train_diffurec_tv',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/DiffuRec/src_v2 && /data/dev/miniconda/envs/pt1.0/bin/python main_daily_movie_tv.py --dataset tv --max_len 10",
    depends_on_past =False,
    dag=dag,
)
generate_inference_dataset_tv = BashOperator(
    task_id='generate_inference_dataset_tv',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 4_generate_inference_dataset_movie_tv.py --dataset tv --channel tv",
    depends_on_past =False,
    dag=dag,
)
generate_diffurec_recall_result_tv = BashOperator(
    task_id='generate_diffurec_recall_result_tv',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/pt1.0/bin/python 6_generate_diffurec_recall_result_movie_tv.py --dataset tv --max_len 10",
    depends_on_past =False,
    dag=dag,
)

## vip 

generate_diffurec_dataset_tv_vip = BashOperator(
    task_id='generate_diffurec_dataset_tv_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv/vip && /data/dev/miniconda/envs/deepmatch/bin/python 3_generate_diffurec_dataset_movie_tv_vip.py --dataset tv --channel tv",
    depends_on_past =False,
    dag=dag,
)
pca_text_vip = BashOperator(
    task_id='pca_text_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script && /data/dev/miniconda/envs/deepmatch/bin/python pca_vip.py --dataset tv --text_feat",
    depends_on_past =False,
    dag=dag,
)
train_diffurec_tv_vip = BashOperator(
    task_id='train_diffurec_tv_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/DiffuRec/src_v2 && /data/dev/miniconda/envs/pt1.0/bin/python main_daily_vip.py --dataset tv --max_len 10",
    depends_on_past =False,
    dag=dag,
)
generate_inference_dataset_tv_vip = BashOperator(
    task_id='generate_inference_dataset_tv_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv/vip && /data/dev/miniconda/envs/deepmatch/bin/python 4_generate_inference_dataset_movie_tv_vip.py --dataset tv --channel tv",
    depends_on_past =False,
    dag=dag,
)
generate_diffurec_recall_result_tv_vip = BashOperator(
    task_id='generate_diffurec_recall_result_tv_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv/vip && /data/dev/miniconda/envs/pt1.0/bin/python 6_generate_diffurec_recall_result_movie_tv_vip.py --dataset tv --max_len 10",
    depends_on_past =False,
    dag=dag,
)

merge_xsimgcl_recall_tv_novip = BashOperator(
    task_id='merge_xsimgcl_recall_tv_novip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python merge_xsimgcl_recall.py --group novip",
    depends_on_past =False,
    dag=dag,
)
# merge_xsimgcl_recall_tv_vip = BashOperator(
#     task_id='merge_xsimgcl_recall_tv_vip',
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python merge_xsimgcl_recall.py --group vip",
#     depends_on_past =False,
#     dag=dag,
# )

save_tv_recall_to_hive = SparkSubmitOperator(
    task_id='save_tv_recall_to_hive',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv/save_tv_recall_to_hive_v2.py',
    conn_id = "spark_local",
)

get_vip_users >> update_training_data_movie_tv_60days >> generate_diffurec_dataset_tv >> get_pretrained_text_embedding >> pca_text >> train_diffurec_tv >> generate_inference_dataset_tv >> generate_diffurec_recall_result_tv >> merge_xsimgcl_recall_tv_novip


update_training_data_movie_tv_60days >> generate_diffurec_dataset_tv_vip >> get_pretrained_text_embedding_vip >> pca_text_vip >> train_diffurec_tv_vip >> generate_inference_dataset_tv_vip >> generate_diffurec_recall_result_tv_vip #>> merge_xsimgcl_recall_tv_vip

[merge_xsimgcl_recall_tv_novip, generate_diffurec_recall_result_tv_vip] >> save_tv_recall_to_hive

#[merge_xsimgcl_recall_tv_novip, merge_xsimgcl_recall_tv_vip] >> save_tv_recall_to_hive