DAG: xsimgcl_recall_ldd

schedule: 30 21 * * *


xsimgcl_recall_ldd

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz

tz = pytz.timezone('Asia/Shanghai')

default_args = {
    'owner': 'Terry',
    'depends_on_past': False,
    'start_date': pendulum.datetime(year=2024, month=9, day=22).astimezone(tz),
    'email': ['tian23.li@tcl.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=8),
}

_config = {
    'conf': {
        'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
        "spark.dynamicAllocation.minExecutors":8,
        "spark.dynamicAllocation.maxExecutors":8,
        "spark.yarn.executor.memoryOverhead":2648,
        "spark.driver.maxResultSize": "4g",
        "spark.yarn.maxAppAttempts": 1,
        "spark.network.timeout": 1000000,
        "spark.executor.heartbeatInterval" : 1000000
    },
    'executor_cores': 4,
    'executor_memory': '6g',
    'name': '{{ task_instance.task_id }}',
    
    'driver_memory': '6g',
}

dag = DAG(
    dag_id='xsimgcl_recall_ldd',
    description='xsimgcl_recall_ldd',
    default_args=default_args,
    schedule_interval='30 21 * * *',
)
xsimgcl_dir="/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl"

user_click_exposure_video_ldd = SparkSubmitOperator(
    task_id='user_click_exposure_video_ldd',
    dag=dag,
    **_config,
    application= f'{xsimgcl_dir}/HandleRawLog/user_click_exposure_video_ldd.py',
    conn_id = "spark_yarn_cluster",
)
get_click_seq_30days_ldd_hive = SparkSubmitOperator(
    task_id='get_click_seq_30days_ldd_hive',
    dag=dag,
    **_config,
    application=f"{xsimgcl_dir}/HandleRawLog/get_user_click_session_seq_ndays_ldd.py",
    conn_id = "spark_yarn_cluster",
)
get_click_seq_30days_ldd_local = SparkSubmitOperator(
    task_id='get_click_seq_30days_ldd_local',
    dag=dag,   
    **_config,
    application=f"{xsimgcl_dir}/HandleDataToLocal/get_user_click_sess_seq_30days_ldd_rr.py",
    conn_id = "spark_local",
)
# for top300
# generate_training_data_ldd_top300 = BashOperator(
#     task_id='generate_training_data_ldd_top300',
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd && /data/dev/miniconda/envs/deepmatch/bin/python generate_training_data_xsimgcl_ldd_top300.py",
#     depends_on_past =False,
#     dag=dag,
# )
# xsimgcl_train_ldd_top300 = BashOperator(
#     task_id='xsimgcl_train_ldd_top300',
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/QRec_Single_Channel && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/QRec_Single_Channel/train_xsimgcl.py",
#     depends_on_past =False,
#     dag=dag,
#)
# generate_xsimgcl_recall_ldd_top300 = BashOperator(
#     task_id='generate_xsimgcl_recall_ldd_top300',
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd && /data/dev/miniconda/envs/deepmatch/bin/python generate_xsimgcl_recall_ldd_top300.py",
#     depends_on_past =False,
#     dag=dag,
# )
# for others
generate_training_data_ldd = BashOperator(
    task_id='generate_training_data_ldd',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd && /data/dev/miniconda/envs/deepmatch/bin/python generate_training_data_xsimgcl_ldd.py",
    depends_on_past =False,
    dag=dag,
)
xsimgcl_train_ldd = BashOperator(
    task_id='xsimgcl_train_ldd',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/QRec_Single_Channel && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/QRec_Single_Channel/train_xsimgcl.py",
    depends_on_past =False,
    dag=dag,
)
generate_xsimgcl_recall_ldd = BashOperator(
    task_id='generate_xsimgcl_recall_ldd',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd && /data/dev/miniconda/envs/deepmatch/bin/python generate_xsimgcl_recall_ldd.py",
    depends_on_past =False,
    dag=dag,
)
save_ldd_recall_to_hive = SparkSubmitOperator(
    task_id='save_ldd_recall_to_hive',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/ldd/save_ldd_recall_to_hive.py',
    conn_id = "spark_local",
)

user_click_exposure_video_ldd >> get_click_seq_30days_ldd_hive >> get_click_seq_30days_ldd_local >> generate_training_data_ldd >> xsimgcl_train_ldd >> generate_xsimgcl_recall_ldd >> save_ldd_recall_to_hive