Skip to content

Commit fa9a068

Browse files
Merge pull request #181 from andrew-saydjari/2025_04_28
plate era synergizing code
2 parents 35f3442 + fef58f9 commit fa9a068

27 files changed

+254
-207
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ slurm_logs/*.sh
99
*.err
1010
*.out
1111
*.txt
12+
*.log
1213

1314
stash/
1415
.gitignore

dags/main.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ def poke(self, context) -> bool:
100100
' git fetch origin main\n'
101101
' git merge origin/main --no-edit\n'
102102
"fi\n"
103+
# --- Separate check for unpushed commits (ahead of origin) ---
104+
'read behind ahead < <(git rev-list --left-right --count origin/' + f'{REPO_BRANCH}' + '...HEAD)\n'
105+
'if [[ "$ahead" -gt 0 ]]; then\n'
106+
f' echo "Branch is ahead of origin/{REPO_BRANCH} by $ahead commits. Pushing..."\n'
107+
f' git push origin HEAD:{REPO_BRANCH}\n'
108+
"else\n"
109+
' echo "Branch is up to date with origin/{REPO_BRANCH}"\n'
110+
"fi\n"
103111
),
104112
)
105113
) >> (

pipeline.jl

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,12 @@ flush(stdout);
135135
mkpath(dirName)
136136
end
137137

138-
df = h5open(joinpath(outdir, "almanac/$(runname).h5")) do f
139-
DataFrame(read(f["$(parg["tele"])/$(mjd)/exposures"]))
140-
end
138+
df = read_almanac_exp_df(joinpath(outdir, "almanac/$(runname).h5"), parg["tele"], mjd)
141139

142140
# check if chip is in the llist of chips in df.something[expid] (waiting on Andy Casey to update alamanc)
143141
rawpath = build_raw_path(
144-
df.observatory[expid], df.mjd[expid], chip, df.exposure[expid])
145-
cartid = parseCartID(df.cartid[expid])
142+
df.observatory[expid], chip, df.mjd[expid], lpad(df.exposure_int[expid], 8, "0"))
143+
cartid = df.cartidInt[expid]
146144
# decompress and convert apz data format to a standard 3D cube of reads
147145
cubedat, hdr_dict = apz2cube(rawpath)
148146

@@ -242,7 +240,7 @@ flush(stdout);
242240
# need to clean up exptype to account for FPI versus ARCLAMP
243241
outfname = join(
244242
["ar2D", df.observatory[expid], df.mjd[expid],
245-
chip, df.exposure[expid], df.exptype[expid]],
243+
last(df.exposure_str[expid],4), chip, df.exptype[expid]],
246244
"_")
247245
# probably change to FITS to make astronomers happy (this JLD2, which is HDF5, is just for debugging)
248246

@@ -262,8 +260,8 @@ flush(stdout);
262260

263261
# come back to tuning the chi2perdofcut once more rigorously establish noise model
264262
function process_2Dcal(fname; chi2perdofcut = 100)
265-
sname = split(fname, "_")
266-
tele, mjd, chip, expid = sname[(end - 4):(end - 1)]
263+
sname = split(split(split(fname, "/")[end],".h5")[1], "_")
264+
fnameType, tele, mjd, expnum, chip, exptype = sname[(end - 5):end]
267265

268266
dimage = load(fname, "dimage")
269267
ivarimage = load(fname, "ivarimage")

pipeline_2d_1d.jl

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ flush(stdout);
105105
using DataFrames, EllipsisNotation, StatsBase
106106
using AstroTime # can remove after Adam merges the PR to recast as Float
107107
using ParallelDataTransfer, ProgressMeter
108+
using ApogeeReduction
108109

109110
src_dir = "./"
110111
include(src_dir * "src/ar1D.jl")
@@ -126,16 +127,15 @@ flush(stdout);
126127
##### 1D stage
127128
@everywhere begin
128129
function process_1D(fname)
129-
sname = split(split(fname, "/")[end], "_")
130-
fnameType, tele, mjd, chip, expid = sname[(end - 5):(end - 1)]
130+
sname = split(split(split(fname, "/")[end],".h5")[1], "_")
131+
fnameType, tele, mjd, expnum, chip, exptype = sname[(end - 5):end]
131132

132133
# how worried should I be about loading this every time?
133-
falm = h5open(parg["outdir"] * "almanac/$(parg["runname"]).h5")
134-
dfalmanac = DataFrame(read(falm["$(parg["tele"])/$(mjd)/exposures"]))
135-
dfalmanac.cartidInt = parseCartID.(dfalmanac.cartid)
134+
falm = h5open(joinpath(parg["outdir"], "almanac/$(parg["runname"]).h5"))
135+
dfalmanac = read_almanac_exp_df(falm, parg["tele"], mjd)
136+
136137
med_center_to_fiber_func, x_prof_min, x_prof_max_ind, n_sub, min_prof_fib, max_prof_fib,
137-
all_y_prof, all_y_prof_deriv = gh_profiles(
138-
tele, mjd, chip, expid; n_sub = 100)
138+
all_y_prof, all_y_prof_deriv = gh_profiles(tele, mjd, expnum, chip; n_sub = 100)
139139

140140
fnamecal = if (fnameType == "ar2D")
141141
replace(fname, "ar2D" => "ar2Dcal")
@@ -180,24 +180,23 @@ flush(stdout);
180180
# this is the path to the underlying fluxing file.
181181
# it is symlinked below to an exposure-specific file (linkPath).
182182
calPath = get_fluxing_file(
183-
dfalmanac, parg["outdir"], mjd, tele, expid, fluxing_chip = "c")
184-
expid_num = parse(Int, last(expid, 4)) #this is silly because we translate right back
185-
fibtargDict = get_fibTargDict(falm, tele, parse(Int, mjd), expid_num)
183+
dfalmanac, parg["outdir"], tele, mjd, expnum, fluxing_chip = "c")
184+
expid_num = parse(Int, last(expnum, 4)) #this is silly because we translate right back
185+
fibtargDict = get_fibTargDict(falm, tele, mjd, expid_num)
186186
fiberTypeList = map(x -> fibtargDict[x], 1:300)
187187

188188
if isnothing(calPath)
189189
# TODO uncomment this
190-
@warn "No fluxing file available for $(tele) $(mjd) $(chip) $(expid)"
190+
@warn "No fluxing file available for $(tele) $(mjd) $(expnum) $(chip)"
191191
relthrpt = ones(size(flux_1d, 2))
192192
bitmsk_relthrpt = 2^2 * ones(Int, size(flux_1d, 2))
193193
elseif !isfile(calPath)
194-
error("Fluxing file for $(tele) $(mjd) $(chip) $(expid) does not exist")
194+
error("Fluxing file $(calPath) for $(tele) $(mjd) $(expnum) $(chip) does not exist")
195195
else
196-
calPath = abspath(calPath)
197196
linkPath = abspath(joinpath(
198-
dirname(fname), "relFlux_$(tele)_$(mjd)_$(chip)_$(expid).h5"))
197+
dirname(fname), "relFlux_$(tele)_$(mjd)_$(expnum)_$(chip).h5"))
199198
if !islink(linkPath)
200-
symlink(calPath, linkPath)
199+
symlink(abspath(calPath), linkPath)
201200
end
202201
relthrpt = load(linkPath, "relthrpt")
203202
relthrptr = reshape(relthrpt, (1, length(relthrpt)))
@@ -246,9 +245,7 @@ end
246245

247246
list2Dexp = []
248247
for mjd in unique_mjds
249-
f = h5open(parg["outdir"] * "almanac/$(parg["runname"]).h5")
250-
df = DataFrame(read(f["$(parg["tele"])/$(mjd)/exposures"]))
251-
close(f)
248+
df = read_almanac_exp_df(joinpath(parg["outdir"], "almanac/$(parg["runname"]).h5"), parg["tele"], mjd)
252249
function get_2d_name_partial(expid)
253250
parg["outdir"] * "/apred/$(mjd)/" *
254251
replace(get_1d_name(expid, df), "ar1D" => "ar2D") * ".h5"
@@ -302,9 +299,7 @@ all2Dcal = replace.(all2D, "ar2D" => "ar2Dcal")
302299
## get all OBJECT files (happy to add any other types that see sky?)
303300
list1DexpObject = []
304301
for mjd in unique_mjds
305-
f = h5open(parg["outdir"] * "almanac/$(parg["runname"]).h5")
306-
df = DataFrame(read(f["$(parg["tele"])/$(mjd)/exposures"]))
307-
close(f)
302+
df = read_almanac_exp_df(joinpath(parg["outdir"], "almanac/$(parg["runname"]).h5"), parg["tele"], mjd)
308303
function get_1d_name_partial(expid)
309304
if df.imagetyp[expid] == "Object"
310305
return parg["outdir"] * "/apred/$(mjd)/" * get_1d_name(expid, df, cal = true) * ".h5"
@@ -342,7 +337,7 @@ flush(stdout);
342337
println("Solving skyline wavelength solution:");
343338
flush(stdout);
344339
all1DObjectSkyPeaks = replace.(
345-
replace.(all1DObject, "ar1Dcal" => "skyLine_peaks"), "ar1D" => "skyLine_peaks")
340+
replace.(all1DObject, "ar1Dcal" => "skyLinePeaks"), "ar1D" => "skyLinePeaks")
346341
@showprogress pmap(get_and_save_sky_wavecal, all1DObjectSkyPeaks)
347342

348343
## TODO when are we going to split into individual fiber files? Then we should be writing fiber type to the file name

src/ApogeeReduction.jl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
module ApogeeReduction
22

3+
export N_FIBERS, N_XPIX, N_CHIPS
34
const N_FIBERS = 300
45
const N_XPIX = 2048
6+
const N_CHIPS = 3
57

68
include("utils.jl")
79
include("ar3D.jl")

src/ar1D.jl

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -278,11 +278,11 @@ function extract_optimal_iter(dimage, ivarimage, pix_bitmask, trace_params,
278278
end
279279

280280
"""
281-
Given an open HDF.file, `f`, and the telescope, mjd, and a "short" expid, return a dictionary
281+
Given an open HDF.file, `f`, and the telescope, mjd, and expnum, return a dictionary
282282
mapping fiber id to fiber type.
283283
"""
284-
function get_fibTargDict(f, tele, mjd, exposure_id)
285-
exposure_id = short_expid_to_long(mjd, exposure_id)
284+
function get_fibTargDict(f, tele, mjd, expnum)
285+
exposure_id = short_expid_to_long(mjd, expnum)
286286

287287
# translate confSummary/almanac terminology to AR.jl terminology
288288
fiber_type_names = Dict(
@@ -304,18 +304,19 @@ function get_fibTargDict(f, tele, mjd, exposure_id)
304304
# TODO Andrew thinks the fibers with category "" might be serendipitous targets
305305

306306
mjdfps2plate = get_fps_plate_divide(tele)
307-
configName, configIdCol, target_type_col = if mjd > mjdfps2plate
307+
configName, configIdCol, target_type_col = if parse(Int, mjd) > mjdfps2plate
308308
"fps", "configid", "category"
309309
else
310310
"plates", "plateid", "target_type" # TODO should this be source_type?
311311
end
312312

313-
df_exp = DataFrame(read(f["$(tele)/$(mjd)/exposures"]))
314-
if !(exposure_id in df_exp.exposure)
313+
df_exp = read_almanac_exp_df(f, tele, mjd)
314+
315+
if !(exposure_id in df_exp.exposure_str)
315316
@warn "Exposure $(exposure_id) not found in $(tele)/$(mjd)/exposures"
316317
return Dict(1:300 .=> "fiberTypeFail")
317318
end
318-
exposure_info = df_exp[findfirst(df_exp[!, "exposure"] .== exposure_id), :]
319+
exposure_info = df_exp[findfirst(df_exp[!, "exposure_str"] .== exposure_id), :]
319320
configid = exposure_info[configIdCol]
320321

321322
fibtargDict = if exposure_info.exptype == "OBJECT"
@@ -336,13 +337,22 @@ function get_fibTargDict(f, tele, mjd, exposure_id)
336337
fiber_types = map(df_fib[!, target_type_col]) do t
337338
if t in keys(fiber_type_names)
338339
fiber_type_names[t]
339-
340340
else
341341
# @warn "Unknown fiber type for $(tele)/$(mjd)/fibers/$(configName)/$(configid): $(repr(t))"
342342
"fiberTypeFail"
343343
end
344344
end
345-
Dict(df_fib[!, fiberid_col] .=> fiber_types)
345+
fibernum_col = df_fib[!, fiberid_col]
346+
# println(typeof(fibernum_col))
347+
fibernumvec = if fibernum_col isa AbstractVector{<:Integer}
348+
fibernum_col
349+
elseif fibernum_col isa AbstractVector{<:String}
350+
parse.(Int, fibernum_col)
351+
else
352+
@warn "Fiber numbers are neither integers or strings"
353+
fibernum_col
354+
end
355+
Dict(fibernumvec .=> fiber_types)
346356
catch e
347357
rethrow(e)
348358
@warn "Failed to get any fiber type information for $(tele)/$(mjd)/fibers/$(configName)/$(configid) (exposure $(exposure_id)). Returning fiberTypeFail for all fibers."
@@ -353,7 +363,7 @@ function get_fibTargDict(f, tele, mjd, exposure_id)
353363
Dict(1:300 .=> "cal")
354364
end
355365

356-
if mjd > mjdfps2plate
366+
if parse(Int, mjd) > mjdfps2plate
357367
fpifib1, fpifib2 = get_fpi_guide_fiberID(tele)
358368
fibtargDict[fpifib1] = "fpiguide"
359369
fibtargDict[fpifib2] = "fpiguide"
@@ -363,15 +373,17 @@ end
363373

364374
# hardcoded to use chip c only for now
365375
# must use dome flats, not quartz flats (need fiber runs to telescope)
366-
function get_fluxing_file(dfalmanac, parent_dir, mjd, tele, expidstr; fluxing_chip = "c")
367-
expidfull = parse(Int, expidstr)
376+
# use full exposure_id
377+
function get_fluxing_file(dfalmanac, parent_dir, tele, mjd, expnum; fluxing_chip = "c")
378+
exposure_id = parse(Int, short_expid_to_long(mjd, expnum))
368379
df_mjd = sort(
369380
dfalmanac[(dfalmanac.mjd .== parse(Int, mjd)) .& (dfalmanac.observatory .== tele), :],
370381
:exposure)
371-
expIndex = findfirst(df_mjd.exposure .== expidfull)
382+
expIndex = findfirst(df_mjd.exposure_int .== exposure_id)
372383
cartId = df_mjd.cartidInt[expIndex]
373-
expIndex_before = findlast((df_mjd.imagetyp .== "DomeFlat") .& (df_mjd.exposure .< expidfull))
374-
expIndex_after = findfirst((df_mjd.imagetyp .== "DomeFlat") .& (df_mjd.exposure .> expidfull))
384+
# this needs to have cuts that match those in make_runlist_dome_flats.jl
385+
expIndex_before = findlast((df_mjd.imagetyp .== "DomeFlat") .& (df_mjd.exposure_int .< exposure_id) .& (df_mjd.nreadInt .> 3))
386+
expIndex_after = findfirst((df_mjd.imagetyp .== "DomeFlat") .& (df_mjd.exposure_int .> exposure_id) .& (df_mjd.nreadInt .> 3))
375387
valid_before = if !isnothing(expIndex_before)
376388
all(df_mjd.cartidInt[expIndex_before:expIndex] .== cartId) * 1
377389
elseif !isnothing(expIndex_before)
@@ -389,43 +401,43 @@ function get_fluxing_file(dfalmanac, parent_dir, mjd, tele, expidstr; fluxing_ch
389401

390402
if valid_before == 1
391403
return get_fluxing_file_name(
392-
parent_dir, mjd, tele, fluxing_chip, df_mjd.exposure[expIndex_before], cartId)
404+
parent_dir, tele, mjd, last(df_mjd.exposure_str[expIndex_before], 4), fluxing_chip, cartId)
393405
elseif valid_after == 1
394406
return get_fluxing_file_name(
395-
parent_dir, mjd, tele, fluxing_chip, df_mjd.exposure[expIndex_after], cartId)
407+
parent_dir, tele, mjd, last(df_mjd.exposure_str[expIndex_after], 4), fluxing_chip, cartId)
396408
# any of the cases below here we could consider using a global file
397409
elseif valid_before == 2
398410
return get_fluxing_file_name(
399-
parent_dir, mjd, tele, fluxing_chip, df_mjd.exposure[expIndex_before], cartId)
411+
parent_dir, tele, mjd, last(df_mjd.exposure_str[expIndex_before], 4), fluxing_chip, cartId)
400412
elseif valid_after == 2
401413
return get_fluxing_file_name(
402-
parent_dir, mjd, tele, fluxing_chip, df_mjd.exposure[expIndex_after], cartId)
414+
parent_dir, tele, mjd, last(df_mjd.exposure_str[expIndex_after], 4), fluxing_chip, cartId)
403415
else
404416
return nothing
405417
end
406418
end
407419

408420
# TODO: switch to meta data dict and then save wavecal flags etc.
409-
function reinterp_spectra(fname; wavecal_type = "wavecal_skyline")
421+
function reinterp_spectra(fname; wavecal_type = "waveCalSkyLine")
410422
# might need to add in telluric div functionality here?
411423

412-
sname = split(split(fname, "/")[end], "_")
413-
fnameType, tele, mjd, chip, expid = sname[(end - 5):(end - 1)]
424+
sname = split(split(split(fname, "/")[end],".h5")[1], "_")
425+
fnameType, tele, mjd, expnum, chip, exptype = sname[(end - 5):end]
414426

415427
# could shift this to a preallocation step
416-
outflux = zeros(length(logUniWaveAPOGEE), 300)
417-
outvar = zeros(length(logUniWaveAPOGEE), 300)
418-
outmsk = zeros(Int, length(logUniWaveAPOGEE), 300)
419-
cntvec = zeros(Int, length(logUniWaveAPOGEE), 300)
428+
outflux = zeros(length(logUniWaveAPOGEE), N_FIBERS)
429+
outvar = zeros(length(logUniWaveAPOGEE), N_FIBERS)
430+
outmsk = zeros(Int, length(logUniWaveAPOGEE), N_FIBERS)
431+
cntvec = zeros(Int, length(logUniWaveAPOGEE), N_FIBERS)
420432

421-
pixvec = 1:(3 * 2048)
422-
flux_stack = zeros(3 * 2048, 300)
423-
ivar_stack = zeros(3 * 2048, 300)
424-
mask_stack = zeros(Int, 3 * 2048, 300)
425-
wave_stack = zeros(3 * 2048, 300)
426-
chipBit_stack = zeros(Int, 3 * 2048, 300)
433+
pixvec = 1:(N_CHIPS * N_XPIX)
434+
flux_stack = zeros(N_CHIPS * N_XPIX, N_FIBERS)
435+
ivar_stack = zeros(N_CHIPS * N_XPIX, N_FIBERS)
436+
mask_stack = zeros(Int, N_CHIPS * N_XPIX, N_FIBERS)
437+
wave_stack = zeros(N_CHIPS * N_XPIX, N_FIBERS)
438+
chipBit_stack = zeros(Int, N_CHIPS * N_XPIX, N_FIBERS)
427439

428-
ingestBit = zeros(Int, 300)
440+
ingestBit = zeros(Int, N_FIBERS)
429441

430442
# add a for loop over the exposures (stop thinking about "visits" for now)
431443
# probably just generate ap1D file names from the alamanc files
@@ -440,10 +452,10 @@ function reinterp_spectra(fname; wavecal_type = "wavecal_skyline")
440452
chipWaveSoln = f["chipWaveSoln"]
441453
close(f)
442454
else #this is a terrible global fallback, just so we get something to look at
443-
chipWaveSoln = zeros(2048, 300, 3)
455+
chipWaveSoln = zeros(N_XPIX, N_FIBERS, N_CHIPS)
444456
for (chipind, chip) in enumerate(["a", "b", "c"])
445457
chipWaveSoln[:, :, chipind] .= rough_linear_wave.(
446-
1:2048, a = roughwave_dict[tele][chip][1], b = roughwave_dict[tele][chip][2])
458+
1:N_XPIX, a = roughwave_dict[tele][chip][1], b = roughwave_dict[tele][chip][2])
447459
end
448460
println("No wavecal found for $(fname), using fallback")
449461
flush(stdout)
@@ -458,11 +470,11 @@ function reinterp_spectra(fname; wavecal_type = "wavecal_skyline")
458470
mask_1d = f["mask_1d"]
459471
close(f)
460472

461-
flux_stack[(1:2048) .+ (3 - chipind) * 2048, :] .= flux_1d[end:-1:1, :]
462-
ivar_stack[(1:2048) .+ (3 - chipind) * 2048, :] .= ivar_1d[end:-1:1, :]
463-
mask_stack[(1:2048) .+ (3 - chipind) * 2048, :] .= mask_1d[end:-1:1, :]
464-
wave_stack[(1:2048) .+ (3 - chipind) * 2048, :] .= chipWaveSoln[end:-1:1, :, chipind]
465-
chipBit_stack[(1:2048) .+ (3 - chipind) * 2048, :] .+= 2^(chipind)
473+
flux_stack[(1:N_XPIX) .+ (3 - chipind) * N_XPIX, :] .= flux_1d[end:-1:1, :]
474+
ivar_stack[(1:N_XPIX) .+ (3 - chipind) * N_XPIX, :] .= ivar_1d[end:-1:1, :]
475+
mask_stack[(1:N_XPIX) .+ (3 - chipind) * N_XPIX, :] .= mask_1d[end:-1:1, :]
476+
wave_stack[(1:N_XPIX) .+ (3 - chipind) * N_XPIX, :] .= chipWaveSoln[end:-1:1, :, chipind]
477+
chipBit_stack[(1:N_XPIX) .+ (3 - chipind) * N_XPIX, :] .+= 2^(chipind)
466478
end
467479

468480
noBadBits = (mask_stack .& bad_pix_bits .== 0)
@@ -474,7 +486,7 @@ function reinterp_spectra(fname; wavecal_type = "wavecal_skyline")
474486
(ivar_stack .> (10^-20))
475487

476488
## need to propagate the bit mask
477-
for fiberindx in 1:300
489+
for fiberindx in 1:N_FIBERS
478490
good_pix_fiber = good_pix[:, fiberindx]
479491
flux_fiber = flux_stack[good_pix_fiber, fiberindx]
480492
ivar_fiber = ivar_stack[good_pix_fiber, fiberindx]

src/ar2Dcal.jl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
using DataFrames
22

33
function calStrip(fname)
4-
calType, telescope, chip, mjdstart, mjdend = split(
5-
split(split(fname, "/")[end], ".")[1], "_")
6-
return vcat(
7-
abspath(fname), calType, telescope, chip, parse(Int, mjdstart), parse(Int, mjdend))
4+
sname = split(split(split(fname, "/")[end],".h5")[1], "_")
5+
calType, tele, chip, mjdstart, mjdend = sname[(end - 4):end]
6+
return vcat(abspath(fname), calType, tele, chip, parse(Int, mjdstart), parse(Int, mjdend))
87
end
98

109
function cal2df(flist)

0 commit comments

Comments
 (0)