1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 | from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
import pendulum
import pytz
tz = pytz.timezone("Asia/Shanghai")
default_args = {
"owner": "Terry",
"depends_on_past": False,
"start_date": pendulum.datetime(year=2024, month=7, day=24).astimezone(tz),
"email": ["garyfan@tcl.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=10),
}
# media funnel table finished preparation at around 8:47
dag = DAG(
dag_id="image_embedding",
description="update image embedding",
default_args=default_args,
schedule_interval="0 14 * * *",
)
_config = {
"conf": {
"spark.executor.extraJavaOptions": "-XX:+UseConcMarkSweepGC",
"spark.dynamicAllocation.minExecutors": 8,
"spark.dynamicAllocation.maxExecutors": 8,
"spark.yarn.executor.memoryOverhead": 2648,
"spark.driver.maxResultSize": "4g",
"spark.sql.pivotMaxValues": 1000000,
"spark.yarn.maxAppAttempts": 1,
"spark.network.timeout": 1000000,
"spark.executor.heartbeatInterval": 1000000,
},
"executor_cores": 4,
"executor_memory": "6g",
"name": "{{ task_instance.task_id }}",
"driver_memory": "4g",
}
# download_images = BashOperator(
# task_id="download_images",
# bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python incrementally_download_vod_images.py",
# depends_on_past=False,
# dag=dag,
# )
incrementally_download_vod_images = BashOperator(
task_id="incrementally_download_vod_images",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 1_incrementally_download_images.py --dataset jx_lst",
depends_on_past=False,
dag=dag,
)
incrementally_download_youku_images = BashOperator(
task_id="incrementally_download_youku_images",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 1_incrementally_download_images.py --dataset youku_jx_lst",
depends_on_past=False,
dag=dag,
)
incrementally_download_ldd_images = BashOperator(
task_id="incrementally_download_ldd_images",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 1_incrementally_download_images.py --dataset ldd",
depends_on_past=False,
dag=dag,
)
incrementally_download_jx_feed_images = BashOperator(
task_id="incrementally_download_jx_feed_images",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 1_incrementally_download_images.py --dataset jx_feed",
depends_on_past=False,
dag=dag,
)
update_image_embedding = BashOperator(
task_id="update_image_embedding",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script/image_emb_v2 && /data/dev/miniconda/envs/deepmatch/bin/python 3_update_image_embedding.py",
depends_on_past=False,
dag=dag,
)
update_minicpm_image_embedding = BashOperator(
task_id="update_minicpm_image_embedding",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script && /data/dev/miniconda/envs/easyrec/bin/python update_minicpm_image_embedding.py",
depends_on_past=False,
dag=dag,
)
update_minicpm_image_embedding_youku = BashOperator(
task_id="update_minicpm_image_embedding_youku",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script && /data/dev/miniconda/envs/easyrec/bin/python update_minicpm_image_embedding_youku.py",
depends_on_past=False,
dag=dag,
)
update_minicpm_image_embedding_ldd = BashOperator(
task_id="update_minicpm_image_embedding_ldd",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script && /data/dev/miniconda/envs/easyrec/bin/python update_minicpm_image_embedding_ldd.py",
depends_on_past=False,
dag=dag,
)
update_minicpm_image_embedding_jx_feed = BashOperator(
task_id="update_minicpm_image_embedding_jx_feed",
bash_command="cd /data/gangyanyuan/terry/terry-recsys-2024/script && /data/dev/miniconda/envs/easyrec/bin/python update_minicpm_image_embedding_jx_feed.py",
depends_on_past=False,
dag=dag,
)
incrementally_download_vod_images >> update_image_embedding >> update_minicpm_image_embedding
incrementally_download_youku_images >> incrementally_download_ldd_images >> incrementally_download_jx_feed_images >> update_minicpm_image_embedding_youku >> update_minicpm_image_embedding_ldd >> update_minicpm_image_embedding_jx_feed
|