DAG: xsimgcl_recall

schedule: 00 21 * * *


xsimgcl_recall

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz

tz = pytz.timezone('Asia/Shanghai')

default_args = {
    'owner': 'Terry',
    'depends_on_past': False,
    'start_date': pendulum.datetime(year=2024, month=9, day=12).astimezone(tz),
    'email': ['tian23.li@tcl.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=8),
}

_config = {
    'conf': {
        'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
        "spark.dynamicAllocation.minExecutors":8,
        "spark.dynamicAllocation.maxExecutors":8,
        "spark.yarn.executor.memoryOverhead":2648,
        "spark.driver.maxResultSize": "4g",
        "spark.yarn.maxAppAttempts": 1,
        "spark.network.timeout": 1000000,
        "spark.executor.heartbeatInterval" : 1000000
    },
    'executor_cores': 4,
    'executor_memory': '6g',
    'name': '{{ task_instance.task_id }}',
    
    'driver_memory': '6g',
}

dag = DAG(
    dag_id='xsimgcl_recall',
    description='xsimgcl_recall',
    default_args=default_args,
    schedule_interval='00 21 * * *',
)
xsimgcl_dir="/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl"

user_click_exposure_video = SparkSubmitOperator(
    task_id='user_click_exposure_video',
    dag=dag,
    **_config,
    application= f'{xsimgcl_dir}/HandleRawLog/user_click_exposure_video.py',
    conn_id = "spark_yarn_cluster",
)
get_click_seq_30days_hive = SparkSubmitOperator(
    task_id='get_click_seq_30days_hive',
    dag=dag,
    **_config,
    application=f"{xsimgcl_dir}/HandleRawLog/get_user_click_session_seq_ndays.py",
    conn_id = "spark_yarn_cluster",
)

get_click_30days_local = SparkSubmitOperator(
    task_id='get_click_seq_30days_local',
    dag=dag,   
    **_config,
    application=f"{xsimgcl_dir}/HandleDataToLocal/get_user_click_sess_seq_30days_rr.py",
    conn_id = "spark_local",
)

generate_training_data = BashOperator(
    task_id='generate_training_data',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl && /data/dev/miniconda/envs/deepmatch/bin/python generate_training_data_xsimgcl.py",
    depends_on_past =False,
    dag=dag,
)
# generate_training_data = BashOperator(
#     task_id='generate_training_data',
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl && /data/dev/miniconda/envs/deepmatch/bin/python generate_training_data_xsimgcl_all_platform.py",
#     depends_on_past =False,
#     dag=dag,
# )

xsimgcl_train_novip = BashOperator(
    task_id='xsimgcl_train_novip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/QRec && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/QRec/train_xsimgcl_cross_novip.py",
    depends_on_past =False,
    dag=dag,
)
xsimgcl_train_vip = BashOperator(
    task_id='xsimgcl_train_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-rr/QRec && /data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/terry/terry-recsys-rr/QRec/train_xsimgcl_cross_vip.py",
    depends_on_past =False,
    dag=dag,
)
generate_xsimgcl_recall_novip = BashOperator(
    task_id='generate_xsimgcl_recall_novip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl && /data/dev/miniconda/envs/deepmatch/bin/python generate_xsimgcl_recall.py --group novip",
    depends_on_past =False,
    dag=dag,
)
generate_xsimgcl_recall_vip = BashOperator(
    task_id='generate_xsimgcl_recall_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl && /data/dev/miniconda/envs/deepmatch/bin/python generate_xsimgcl_recall.py --group vip",
    depends_on_past =False,
    dag=dag,
)
save_recall_to_hive = SparkSubmitOperator(
    task_id='save_recall_to_hive',
    dag=dag,
    **_config,
    application= '/data/gangyanyuan/terry/terry-recsys-2024/xsimgcl/save_recall_to_hive.py',
    conn_id = "spark_local",
)
# dnum tv recall
generate_dnum_tv_recall_novip = BashOperator(
    task_id='generate_dnum_tv_recall_novip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl && /data/dev/miniconda/envs/deepmatch/bin/python generate_dnum_tv_recall.py --group novip",
    depends_on_past =False,
    dag=dag,
)
generate_dnum_tv_recall_vip = BashOperator(
    task_id='generate_dnum_tv_recall_vip',
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/xsimgcl && /data/dev/miniconda/envs/deepmatch/bin/python generate_dnum_tv_recall.py --group vip",
    depends_on_past =False,
    dag=dag,
)


user_click_exposure_video >> get_click_seq_30days_hive >> get_click_30days_local >> generate_training_data

generate_training_data >> xsimgcl_train_novip >> generate_xsimgcl_recall_novip >> generate_dnum_tv_recall_novip >> save_recall_to_hive

generate_training_data >> xsimgcl_train_vip >> generate_xsimgcl_recall_vip >> generate_dnum_tv_recall_vip >> save_recall_to_hive