DAG: image_embedding

schedule: 0 14 * * *


image_embedding

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
import pendulum
import pytz

tz = pytz.timezone("Asia/Shanghai")

default_args = {
    "owner": "Terry",
    "depends_on_past": False,
    "start_date": pendulum.datetime(year=2024, month=7, day=24).astimezone(tz),
    "email": ["garyfan@tcl.com"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=10),
}

# media funnel table finished preparation at around 8:47
dag = DAG(
    dag_id="image_embedding",
    description="update image embedding",
    default_args=default_args,
    schedule_interval="0 14 * * *",
)

_config = {
    "conf": {
        "spark.executor.extraJavaOptions": "-XX:+UseConcMarkSweepGC",
        "spark.dynamicAllocation.minExecutors": 8,
        "spark.dynamicAllocation.maxExecutors": 8,
        "spark.yarn.executor.memoryOverhead": 2648,
        "spark.driver.maxResultSize": "4g",
        "spark.sql.pivotMaxValues": 1000000,
        "spark.yarn.maxAppAttempts": 1,
        "spark.network.timeout": 1000000,
        "spark.executor.heartbeatInterval": 1000000,
    },
    "executor_cores": 4,
    "executor_memory": "6g",
    "name": "{{ task_instance.task_id }}",
    "driver_memory": "4g",
}

# download_images = BashOperator(
#     task_id="download_images",
#     bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python incrementally_download_vod_images.py",
#     depends_on_past=False,
#     dag=dag,
# )
incrementally_download_vod_images = BashOperator(
    task_id="incrementally_download_vod_images",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 1_incrementally_download_images.py --dataset jx_lst",
    depends_on_past=False,
    dag=dag,
)
incrementally_download_youku_images = BashOperator(
    task_id="incrementally_download_youku_images",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 1_incrementally_download_images.py --dataset youku_jx_lst",
    depends_on_past=False,
    dag=dag,
)
incrementally_download_ldd_images = BashOperator(
    task_id="incrementally_download_ldd_images",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 1_incrementally_download_images.py --dataset ldd",
    depends_on_past=False,
    dag=dag,
)
incrementally_download_jx_feed_images = BashOperator(
    task_id="incrementally_download_jx_feed_images",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 1_incrementally_download_images.py --dataset jx_feed",
    depends_on_past=False,
    dag=dag,
)

update_image_embedding = BashOperator(
    task_id="update_image_embedding",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 3_update_image_embedding.py",
    depends_on_past=False,
    dag=dag,
)

update_minicpm_image_embedding = BashOperator(
    task_id="update_minicpm_image_embedding",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script && /data/dev/miniconda/envs/easyrec/bin/python update_minicpm_image_embedding.py",
    depends_on_past=False,
    dag=dag,
)
update_minicpm_image_embedding_youku = BashOperator(
    task_id="update_minicpm_image_embedding_youku",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script && /data/dev/miniconda/envs/easyrec/bin/python update_minicpm_image_embedding_youku.py",
    depends_on_past=False,
    dag=dag,
)
update_minicpm_image_embedding_ldd = BashOperator(
    task_id="update_minicpm_image_embedding_ldd",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script && /data/dev/miniconda/envs/easyrec/bin/python update_minicpm_image_embedding_ldd.py",
    depends_on_past=False,
    dag=dag,
)
update_minicpm_image_embedding_jx_feed = BashOperator(
    task_id="update_minicpm_image_embedding_jx_feed",
    bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script && /data/dev/miniconda/envs/easyrec/bin/python update_minicpm_image_embedding_jx_feed.py",
    depends_on_past=False,
    dag=dag,
)

incrementally_download_vod_images >> update_image_embedding >> update_minicpm_image_embedding

incrementally_download_youku_images >> incrementally_download_ldd_images >> incrementally_download_jx_feed_images >> update_minicpm_image_embedding_youku >> update_minicpm_image_embedding_ldd >> update_minicpm_image_embedding_jx_feed