DAG: multi_modal_movie

schedule: 00 08 * * *


multi_modal_movie

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz

tz = pytz.timezone('Asia/Shanghai')

default_args = {
    'owner': 'Terry',
    'depends_on_past': False,
    'start_date': pendulum.datetime(year=2025, month=1, day=12).astimezone(tz),
    'email': ['tian23.li@tcl.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

_config = {
    'conf': {
        'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
        "spark.dynamicAllocation.minExecutors":8,
        "spark.dynamicAllocation.maxExecutors":8,
        "spark.yarn.executor.memoryOverhead":2648,
        "spark.driver.maxResultSize": "4g",
        "spark.yarn.maxAppAttempts": 1,
        "spark.network.timeout": 1000000,
        "spark.executor.heartbeatInterval" : 1000000
    },
    'executor_cores': 4,
    'executor_memory': '6g',
    'name': '{{ task_instance.task_id }}',
    
    'driver_memory': '6g',
}

dag = DAG(
    dag_id='multi_modal_movie',
    description='multi-modal-movie',
    default_args=default_args,
    schedule_interval='00 08 * * *',
)
# movie data flow. 

update_training_data_movie_tv_60days = BashOperator(
    task_id='update_training_data_movie_tv_60days',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 2_1_update_training_data_movie_tv_60days.py --channel movie",
    depends_on_past =False,
    dag=dag,
)
# training and test dataset. 
generate_diffurec_dataset_movie = BashOperator(
    task_id='generate_diffurec_dataset_movie',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 3_generate_diffurec_dataset_movie_tv.py --dataset movie --channel movie",
    depends_on_past =False,
    dag=dag,
)

get_pretrained_text_embedding = BashOperator(
    task_id='get_pretrained_text_embedding',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python get_pretrained_text_embedding.py --channel movie",
    depends_on_past =False,
    dag=dag,
)
# get_pretrained_image_embedding = BashOperator(
#     task_id='get_pretrained_image_embedding',
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python get_pretrained_image_embedding.py --channel movie",
#     depends_on_past =False,
#     dag=dag,
# )

pca_text = BashOperator(
    task_id='pca_text',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script && /data/dev/miniconda/envs/deepmatch/bin/python pca.py --dataset movie --text_feat",
    depends_on_past =False,
    dag=dag,
)

# train_diffurec_movie_text = BashOperator(
#     task_id='train_diffurec_movie_text',
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/DiffuRec/src_v2 && /data/dev/miniconda/envs/pt1.0/bin/python main_daily_movie_tv.py --dataset movie --text_feat --epochs 15",
#     depends_on_past =False,
#     dag=dag,
# )
train_diffurec_movie = BashOperator(
    task_id='train_diffurec_movie',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/DiffuRec/src_v2 && /data/dev/miniconda/envs/pt1.0/bin/python main_daily_movie_tv.py --dataset movie --epochs 15",
    depends_on_past =False,
    dag=dag,
)
generate_inference_dataset_movie = BashOperator(
    task_id='generate_inference_dataset_movie',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python 4_generate_inference_dataset_movie_tv.py --dataset movie --channel movie",
    depends_on_past =False,
    dag=dag,
)

generate_diffurec_recall_result_movie = BashOperator(
    task_id='generate_diffurec_recall_result_movie',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/pt1.0/bin/python 7_generate_diffurec_recall_result_movie_tv_text.py --dataset movie",
    depends_on_past =False,
    dag=dag,
)

merge_xsimgcl_recall_movie = BashOperator(
    task_id='merge_xsimgcl_recall_movie',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv && /data/dev/miniconda/envs/deepmatch/bin/python merge_xsimgcl_recall_movie.py",
    depends_on_past =False,
    dag=dag,
)
save_movie_recall_to_hive = SparkSubmitOperator(
    task_id='save_movie_recall_to_hive',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/terry/terry-recsys-2024/multi-interest/script/movie_tv/save_movie_recall_to_hive.py',
    conn_id = "spark_local",
)
update_training_data_movie_tv_60days >> generate_diffurec_dataset_movie >> get_pretrained_text_embedding >> pca_text >> train_diffurec_movie >> generate_inference_dataset_movie >> generate_diffurec_recall_result_movie >> merge_xsimgcl_recall_movie >> save_movie_recall_to_hive