from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'Terry',
'depends_on_past': False,
'start_date': pendulum.datetime(year=2025, month=3, day=26).astimezone(tz),
'email': ['tian23.li@tcl.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=8),
}
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":8,
"spark.dynamicAllocation.maxExecutors":8,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "8g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '8g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '8g',
}
dag = DAG(
dag_id='vod_rr',
description='vod_rr',
default_args=default_args,
schedule_interval='00 15 * * *',
)
update_vod_media_rr = BashOperator(
task_id='update_vod_media_rr',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script_rr && /data/dev/miniconda/envs/recsys2/bin/python 0_update_vod_media_rr.py",
depends_on_past =False,
dag=dag,
)
update_vod_content_embedding = BashOperator(
task_id='update_vod_content_embedding',
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script_rr && /data/dev/miniconda/envs/easyrec/bin/python 1_update_vod_content_embedding.py",
depends_on_past =False,
dag=dag,
)
update_vod_media_rr >> update_vod_content_embedding