from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import timedelta,datetime
import pendulum
import pytz
tz = pytz.timezone('Asia/Shanghai')
default_args = {
'owner': 'Terry',
'depends_on_past': False,
'start_date': pendulum.datetime(year=2024, month=7, day=25).astimezone(tz),
'email': ['tian23.li@tcl.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 2,
'retry_delay': timedelta(minutes=8),
}
_config = {
'conf': {
'spark.executor.extraJavaOptions': '-XX:+UseConcMarkSweepGC',
"spark.dynamicAllocation.minExecutors":8,
"spark.dynamicAllocation.maxExecutors":8,
"spark.yarn.executor.memoryOverhead":2648,
"spark.driver.maxResultSize": "4g",
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval" : 1000000
},
'executor_cores': 4,
'executor_memory': '6g',
'name': '{{ task_instance.task_id }}',
'driver_memory': '6g',
}
dag = DAG(
dag_id='kill_large_processes',
description='flow for kill_large_processes',
default_args=default_args,
schedule_interval='00 02 * * *',
)
kill_large_processes = BashOperator(
task_id='kill_large_processes',
##
bash_command="cd /data/gangyanyuan/scheduled_jobs /data/gangyanyuan/scheduled_jobs/kill-large-processes.sh ",
dag=dag,
)
kill_large_processes