from datetime import datetime, timedelta
import pendulum
import pytz
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'LIUBING',
'start_date': pendulum.datetime(year=2024, month=7, day=17).astimezone(tz),
'depends_on_past': False,
'email': ['liubing03@tcl.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=10),
}
dag = DAG(
dag_id='airflow_ldd_ls_recall',
description='',
default_args=default_args,
schedule_interval='40 03 * * *',
)
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":6,
"spark.dynamicAllocation.maxExecutors":6,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "12g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '8g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '8g',
}
update_media = BashOperator(
task_id='update_media',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_ldd/0.get_media_type_tag.py'
)
get_ldd_traning_data_180days_to_local = SparkSubmitOperator(
task_id='get_ldd_traning_data_180days_to_local',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_ldd/1.get_training_data_to_local.py',
conn_id = "spark_local",
provide_context=True,
)
run_sdm = BashOperator(
task_id='run_sdm',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_ldd/2.run_sdm.py'
)
merge_recall = BashOperator(
task_id='merge_recall',
dag=dag,
bash_command= '/data/dev/miniconda/envs/deepmatch/bin/python /data/gangyanyuan/liubing/DeepMatch/scripts_ldd/3.merge_recall.py'
)
upload_recall_to_hive = SparkSubmitOperator(
task_id='upload_recall_to_hive',
dag=dag,
**_config,
application= '/data/gangyanyuan/liubing/DeepMatch/scripts_ldd/4.recall_to_hive.py',
conn_id = "spark_local",
)
update_media>>get_ldd_traning_data_180days_to_local>>run_sdm>>merge_recall>>upload_recall_to_hive