Skip to content

Commit 449a27d

Browse files
Merge pull request #290 from andrew-saydjari/airflow
Airflow
2 parents ad3a096 + d724ca7 commit 449a27d

File tree

6 files changed

+33
-14
lines changed

6 files changed

+33
-14
lines changed

dags/ar_main.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import time
55
from datetime import datetime, timedelta
66
from astropy.time import Time
7+
import pytz
78
from airflow import DAG
89
from airflow.sensors.filesystem import FileSensor
910
from airflow.operators.bash import BashOperator
@@ -74,7 +75,7 @@ def poke(self, context) -> bool:
7475
catchup=False,
7576
on_failure_callback=[
7677
send_slack_notification_partial(
77-
text="ApogeeReduction-airflow DAG failed on {{ (ds | string | to_datetime - timedelta(days=1)).strftime('%Y-%m-%d') }}",
78+
text="ApogeeReduction-airflow DAG failed on {{ task_instance.xcom_pull(task_ids='setup.date_mjd') }}",
7879
)
7980
]
8081
) as dag:
@@ -98,7 +99,7 @@ def poke(self, context) -> bool:
9899
bash_command=(
99100
"ORIG_PWD=$(pwd)\n"
100101
"cd /mnt/ceph/users/sdssv/raw/APOGEE/sdsscore/\n"
101-
"./update.sh\n"
102+
# "./update.sh\n"
102103
"cd $ORIG_PWD"
103104
)
104105
)
@@ -108,7 +109,12 @@ def poke(self, context) -> bool:
108109
with TaskGroup(group_id="setup") as group_setup:
109110
mjd = PythonOperator(
110111
task_id="mjd",
111-
python_callable=lambda data_interval_start, **_: int(Time(data_interval_start).mjd) -1 # +1 offset to get the most recent day
112+
python_callable=lambda data_interval_start, **_: int(Time(data_interval_start).mjd) - 2
113+
)
114+
115+
date_mjd = PythonOperator(
116+
task_id="date_mjd",
117+
python_callable=lambda data_interval_start, **_: (data_interval_start.astimezone(pytz.timezone('America/New_York')) - timedelta(days=2)).strftime('%Y-%m-%d')
112118
)
113119

114120
observatory_groups = []
@@ -119,7 +125,7 @@ def poke(self, context) -> bool:
119125
python_callable=lambda **_: None, # Simple no-op function
120126
on_success_callback=[
121127
send_slack_notification_partial(
122-
text="Starting reduction for " + observatory + " for SJD {{ task_instance.xcom_pull(task_ids='setup.mjd') }} (night of {{ (ds | string | to_datetime - timedelta(days=1)).strftime('%Y-%m-%d') }}). Exposure list can be found at https://users.flatironinstitute.org/~asaydjari/" + slack_token + "/gitcode/ApogeeReduction.jl/metadata/observing_log_viewer/?sjd={{ task_instance.xcom_pull(task_ids='setup.mjd') }}&site=" + observatory
128+
text="Starting reduction for " + observatory + " for SJD {{ task_instance.xcom_pull(task_ids='setup.mjd') }} (night of {{ task_instance.xcom_pull(task_ids='setup.date_mjd') }}). Exposure list can be found at https://users.flatironinstitute.org/~asaydjari/" + slack_token + "/gitcode/ApogeeReduction.jl/metadata/observing_log_viewer/?sjd={{ task_instance.xcom_pull(task_ids='setup.mjd') }}&site=" + observatory
123129
)
124130
]
125131
)
@@ -130,16 +136,16 @@ def poke(self, context) -> bool:
130136
task_id="science",
131137
python_callable=submit_and_wait,
132138
op_kwargs={
133-
'bash_command': f"{sbatch_prefix} --job-name=ar_all_{observatory}_{{{{ ti.xcom_pull(task_ids='setup.mjd') }}}} /mnt/home/sdssv/gitcode/ApogeeReduction.jl/scripts/daily/run_all.sh {observatory} {{{{ ti.xcom_pull(task_ids='setup.mjd') }}}} {OUT_DIR} /mnt/home/sdssv/gitcode/arMADGICS.jl false"
139+
'bash_command': f"{sbatch_prefix} --job-name=ar_all_{observatory}_{{{{ ti.xcom_pull(task_ids='setup.mjd') }}}} /mnt/home/sdssv/gitcode/ApogeeReduction.jl/scripts/daily/run_all.sh {observatory} {{{{ ti.xcom_pull(task_ids='setup.mjd') }}}} {OUT_DIR} /mnt/home/sdssv/gitcode/arMADGICS.jl/ false"
134140
},
135141
on_success_callback=[
136142
send_slack_notification_partial(
137-
text=observatory + " science frames reduced for SJD {{ ti.xcom_pull(task_ids='setup.mjd') }} (night of {{ (ds | string | to_datetime - timedelta(days=1)).strftime('%Y-%m-%d') }}).",
143+
text=observatory + " science frames reduced for SJD {{ ti.xcom_pull(task_ids='setup.mjd') }} (night of {{ task_instance.xcom_pull(task_ids='setup.date_mjd') }}).",
138144
)
139145
],
140146
on_failure_callback=[
141147
send_slack_notification_partial(
142-
text=observatory + " science frame reduction failed for SJD {{ ti.xcom_pull(task_ids='setup.mjd') }} (night of {{ (ds | string | to_datetime - timedelta(days=1)).strftime('%Y-%m-%d') }}). :picard_facepalm:",
148+
text=observatory + " science frame reduction failed for SJD {{ ti.xcom_pull(task_ids='setup.mjd') }} (night of {{ task_instance.xcom_pull(task_ids='setup.date_mjd') }}). :picard_facepalm:",
143149
)
144150
]
145151
)
@@ -153,7 +159,7 @@ def poke(self, context) -> bool:
153159
python_callable=lambda **_: None, # dummy function that does nothing
154160
on_success_callback=[
155161
send_slack_notification_partial(
156-
text="ApogeeReduction pipeline completed successfully for SJD {{ ti.xcom_pull(task_ids='setup.mjd') }} (night of {{ (ds | string | to_datetime - timedelta(days=1)).strftime('%Y-%m-%d') }}). Both observatories processed."
162+
text="ApogeeReduction pipeline completed successfully for SJD {{ ti.xcom_pull(task_ids='setup.mjd') }} (night of {{ task_instance.xcom_pull(task_ids='setup.date_mjd') }}). Both observatories processed."
157163
)
158164
],
159165
dag=dag

scripts/bulk/make_runlist_all.jl

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ tele2do = if parg["tele"] == "both"
4040
else
4141
[parg["tele"]]
4242
end
43+
44+
# Check if almanac file is empty and exit with specific code
45+
if length(keys(f)) == 0
46+
println("WARNING: No exposures found for the specified parameters")
47+
println("Exiting with code 16 to indicate empty runlist")
48+
exit(16)
49+
end
50+
4351
for tele in tele2do
4452
mjd_list = keys(f[tele])
4553
for tstmjd in mjd_list

scripts/bulk/run_bulk.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ fi
101101
almanac -p 12 -v --mjd-start $mjd_start --mjd-end $mjd_end --output $almanac_file --fibers
102102

103103
print_elapsed_time "Building Runlist"
104+
set +e # Temporarily disable exit on error
104105
julia +$julia_version --project=$base_dir $base_dir/scripts/bulk/make_runlist_all.jl --almanac_file $almanac_file --output $runlist
105106
exit_code=$?
107+
set -e # Re-enable exit on error
106108
if [ $exit_code -eq 16 ]; then
107109
echo "No exposures found for this night. Exiting gracefully."
108110
exit 0

scripts/cal/make_relFlux.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,5 +247,5 @@ if length(all1Da) > 0
247247
end
248248
else
249249
thread = SlackThread()
250-
thread("No files of at least one cal type found for relFluxing")
250+
thread("No files of at least one cal type found for $(parg["tele"]) relFluxing")
251251
end

scripts/daily/generate_dashboard.jl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -408,9 +408,10 @@ function main()
408408
# post a link to slack
409409
# the link to the dashboard is relative to the current directory
410410
dir = abspath(joinpath(pwd(), parg["outdir"], "plots", string(mjd), "dashboard.html"))
411-
url = replace(
412-
replace(dir, "/uufs/chpc.utah.edu/common/home/sdss42/" => "https://data.sdss5.org/sas/"),
413-
"/mnt/ceph/users/sdssv/work/asaydjari/" => "https://users.flatironinstitute.org/~asaydjari/$(ENV["SLACK_TOKEN"])/sdsswork/"
411+
url = replace(replace(replace(dir,
412+
"/uufs/chpc.utah.edu/common/home/sdss42/" => "https://data.sdss5.org/sas/"),
413+
"/mnt/ceph/users/sdssv/work/asaydjari/" => "https://users.flatironinstitute.org/~asaydjari/$(ENV["SLACK_TOKEN"])/sdsswork/"),
414+
"/mnt/ceph/users/sdssv/work/" => "https://users.flatironinstitute.org/~asaydjari/$(ENV["SLACK_TOKEN"])/"
414415
)
415416
thread = SlackThread()
416417
msg = "The reduction plots dashboard for SJD $(mjd) has been generated and is available at: $url"

scripts/daily/run_all.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,10 @@ fi
9898
almanac -v --mjd-start $mjd --mjd-end $mjd --$tele --output $almanac_file --fibers
9999

100100
print_elapsed_time "Building Runlist"
101-
julia +$julia_version --project=$base_dir $base_dir/scripts/bulk/make_runlist_all.jl --almanac_file $almanac_file --output $runlist
101+
set +e # Temporarily disable exit on error
102+
julia +$julia_version --project=$base_dir $base_dir/scripts/bulk/make_runlist_all.jl --tele $tele --almanac_file $almanac_file --output $runlist
102103
exit_code=$?
104+
set -e # Re-enable exit on error
103105
if [ $exit_code -eq 16 ]; then
104106
echo "No exposures found for this night. Exiting gracefully."
105107
exit 0
@@ -131,7 +133,7 @@ if [ "$run_2d_only" != "true" ]; then
131133
julia +$julia_version --project=$base_dir $base_dir/pipeline_2d_1d.jl --tele $tele --runlist $flatrunlist --outdir $outdir --runname $runname --relFlux false --waveSoln false --checkpoint_mode $checkpoint_mode
132134

133135
print_elapsed_time "Making relFlux for $flat_type Flats"
134-
julia +$julia_version --project=$base_dir $base_dir/scripts/cal/make_relFlux.jl --trace_dir ${outdir} --runlist $flatrunlist --runname $runname
136+
julia +$julia_version --project=$base_dir $base_dir/scripts/cal/make_relFlux.jl --trace_dir ${outdir} --runlist $flatrunlist --runname $runname --tele $tele
135137
done
136138

137139
## 2D->1D

0 commit comments

Comments
 (0)