from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'Terry',
'depends_on_past': False,
'start_date': pendulum.datetime(year=2025, month=2, day=17).astimezone(tz),
'email': ['tian23.li@tcl.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=8),
}
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":8,
"spark.dynamicAllocation.maxExecutors":8,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "8g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '8g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '8g',
}
dag = DAG(
dag_id='user_rationales',
description='user_rationales',
default_args=default_args,
schedule_interval='00 11 * * *',
)
user_clustering = BashOperator(
task_id='user_clustering',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/Reland/script && /data/dev/miniconda/envs/easyrec/bin/python 1_user_clustering.py",
depends_on_past =False,
dag=dag,
)
rec_rationales = BashOperator(
task_id='rec_rationales',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/Reland/script && /data/dev/miniconda/envs/easyrec/bin/python 2_rec_rationales.py",
depends_on_past =False,
dag=dag,
)
save_rationales_to_hive = SparkSubmitOperator(
task_id='save_rationales_to_hive',
dag=dag,
**_config,
application= '/data/gangyanyuan/terry/terry-recsys-2024/Reland/script/3_save_rationales_to_hive.py',
conn_id = "spark_local",
)
user_clustering_vip = BashOperator(
task_id='user_clustering_vip',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/Reland/script/vip && /data/dev/miniconda/envs/easyrec/bin/python 1_user_clustering_vip.py",
depends_on_past =False,
dag=dag,
)
rec_rationales_vip = BashOperator(
task_id='rec_rationales_vip',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/Reland/script/vip && /data/dev/miniconda/envs/easyrec/bin/python 2_rec_rationales_vip.py",
depends_on_past =False,
dag=dag,
)
save_rationales_vip_to_hive = SparkSubmitOperator(
task_id='save_rationales_vip_to_hive',
dag=dag,
**_config,
application= '/data/gangyanyuan/terry/terry-recsys-2024/Reland/script/vip/3_save_rationales_vip_to_hive.py',
conn_id = "spark_local",
)
user_clustering >> rec_rationales >> save_rationales_to_hive
rec_rationales >> user_clustering_vip >> rec_rationales_vip >> save_rationales_vip_to_hive