Skip to content

Commit aa307c6

Browse files
committed
add restart option
1 parent ace3c1a commit aa307c6

File tree

9 files changed

+320
-94
lines changed

9 files changed

+320
-94
lines changed

src/DiskArrayEngine.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ include("buffers.jl")
77
include("userfuncs.jl")
88
include("mwops.jl")
99
include("executionplan.jl")
10+
include("restart.jl")
1011
include("runner.jl")
1112
include("distribute.jl")
1213
include("enginearrays.jl")

src/distribute.jl

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,11 @@ function is_output_chunk_overlap(spec,outar,idim,lr)
1111
looprange = lr.members[idim]
1212
length(looprange) == 1 && return false
1313
!all(looprange) do r
14-
# w1 = first(windows[first(r)])
15-
# w2 = last(windows[last(r)])
1614
w1 = inner_index(windows,first(r))
17-
w2 = inner_index(windows,first(r))
15+
w2 = inner_index(windows,last(r))
1816
cr = DiskArrays.findchunk(cs,first(w1):last(w2))
1917
#check if start and end are on a chunk boundary
20-
first(cs[first(cr)])==first(ii) && last(cs[last(cr)])==last(ii)
18+
first(cs[first(cr)])==first(w1) && last(cs[last(cr)])==last(w2)
2119
end
2220
else
2321
false
@@ -40,6 +38,7 @@ function split_dim_reasons(op,lr,outars)
4038
for (spec,ar) in zip(op.outspecs,outars)
4139
foreach(1:ndims(lr)) do idim
4240
if is_output_chunk_overlap(spec,ar,idim,lr)
41+
@warn "Overlapping output chunks in dimension $idim"
4342
push!(ret[idim],:output_chunk)
4443
end
4544
if is_output_reducedim(spec,ar,idim)
@@ -53,7 +52,7 @@ reason_priority = Dict(
5352
:foldl => 1,
5453
:reducedim => 2,
5554
:output_chunk => 3,
56-
:overlapinputs =>4,
55+
:overlapinputs => 4,
5756
)
5857

5958
function get_procgroups(op, lr,outars)

src/executionplan.jl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,6 @@ function optimize_loopranges(op::GMDWop,max_cache;tol_low=0.2,tol_high = 0.05,ma
278278
optprob = OptimizationFunction(compute_time, Optimization.AutoForwardDiff(), cons = all_constraints!)
279279
prob = OptimizationProblem(optprob, x0, chunkspecs, lcons = lb, ucons = ub)
280280
sol = solve(prob, OptimizationOptimJL.IPNewton())
281-
@show sol.u
282281
@debug "Optimized Loop sizes: ", sol.u
283282
lr = adjust_loopranges(op,sol.u;tol_low,tol_high,max_order)
284283
ExecutionPlan(input_chunkspecs, output_chunkspecs,(sol.u...,),totsize,sol.objective,lr)
@@ -316,7 +315,7 @@ function find_adjust_candidates(optires,smax,intsizes;reltol_low=0.2,reltol_high
316315
end
317316
if length(intsizes) > 1
318317
#Simply try with less input arrays, to at least align a few of them, this could be further optimized
319-
return find_adjust_candidates(optires,smax,Base.tail(intsizes);reltol_low, reltol_high,max_order)
318+
return find_adjust_candidates(optires,smax,Base.front(intsizes);reltol_low, reltol_high,max_order)
320319
end
321320
cand = round(Int,optires)//1
322321
is_possible_candidate(cand,smax,optires,reltol_low,reltol_high) && return cand

src/restart.jl

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
const K_RestartHeader = UInt64(19021983)
2+
const K_ProductArray = UInt8(0)
3+
const K_RegularChunks = UInt8(1)
4+
const K_IrregularChunks = UInt8(2)
5+
6+
struct Restarter{LRT,R<:Union{Nothing, Vector{LRT}}}
7+
file::String
8+
remaining_loopranges::R
9+
LRT::Type{LRT}
10+
end
11+
Base.ndims(::Restarter{LRT}) where LRT = fieldcount(LRT)
12+
13+
create_restarter(::Nothing, _,_) = nothing
14+
15+
function create_restarter(filename,lr,restartmode)
16+
filename === nothing && return nothing
17+
restartmode in (:continue,:overwrite) || error("Unknown restartmode")
18+
if isfile(filename) && restartmode == :continue
19+
restarter = Restarter(filename,nothing,eltype(lr))
20+
loopranges_loaded = orig_loopranges(restarter)
21+
if loopranges_loaded != lr
22+
error("Loopranges in file do not match")
23+
end
24+
entries = finished_entries(restarter)
25+
if isempty(entries)
26+
return restarter
27+
else
28+
loopranges_remaining = setdiff(lr,entries)
29+
return Restarter(filename,loopranges_remaining,eltype(lr))
30+
end
31+
else
32+
open(filename,"w") do f
33+
write(f,K_RestartHeader)
34+
iob = IOBuffer()
35+
putitem(iob,lr)
36+
alloopranges = take!(iob)
37+
write(f,UInt64(length(alloopranges)+16))
38+
write(f,alloopranges)
39+
end
40+
Restarter(filename, nothing, eltype(lr))
41+
end
42+
end
43+
44+
function putitem(f::IO,lr::ProductArray)
45+
write(f,K_ProductArray)
46+
write(f,UInt64(length(lr.members)))
47+
foreach(lr.members) do m
48+
putitem(f,m)
49+
end
50+
end
51+
function putitem(f::IO,g::DiskArrays.RegularChunks)
52+
write(f,K_RegularChunks)
53+
write(f,g.cs)
54+
write(f,g.offset)
55+
write(f,g.s)
56+
end
57+
function putitem(f::IO, g::DiskArrays.IrregularChunks)
58+
write(f,K_IrregularChunks)
59+
write(f,UInt64(length(g.offsets)))
60+
write(f,g.offsets)
61+
end
62+
function putitem(f::IO, i::UnitRange{Int})
63+
write(f,first(i))
64+
write(f,last(i))
65+
end
66+
67+
function orig_loopranges(r::Restarter)
68+
open(r.file,"r") do f
69+
read(f,UInt64) == K_RestartHeader || error("Not a valid Restart file")
70+
lheader = read(f,UInt64)
71+
nextitem = read(f,UInt8)
72+
if nextitem == K_ProductArray
73+
n_members = Int(read(f,UInt64))
74+
members = ntuple(n_members) do _
75+
readmember(f)
76+
end
77+
ProductArray(members)
78+
else
79+
error("Unknown Looprange type")
80+
end
81+
end
82+
end
83+
84+
function finished_entries(r::Restarter{LRT}) where LRT
85+
nd = fieldcount(LRT)
86+
open(r.file,"r") do f
87+
read(f,UInt64) == K_RestartHeader || error("Not a valid Restart file")
88+
lheader = read(f,UInt64)
89+
seek(f,lheader)
90+
out = LRT[]
91+
while !eof(f)
92+
entry = ntuple(nd) do _
93+
read(f,Int):read(f,Int)
94+
end
95+
push!(out,entry)
96+
end
97+
out
98+
end
99+
end
100+
101+
function readmember(f)
102+
membertype = read(f,UInt8)
103+
if membertype == K_RegularChunks
104+
cs, offs, s = read(f,Int), read(f,Int), read(f,Int)
105+
DiskArrays.RegularChunks(cs,offs,s)
106+
elseif membertype == K_IrregularChunks
107+
n = Int(read(f,UInt64))
108+
r = Vector{Int}(undef,n)
109+
read!(f,r)
110+
DiskArrays.IrregularChunks(r)
111+
else
112+
error("Unknown membertype")
113+
end
114+
end
115+
116+
117+
118+
function add_entry(r::Restarter, inow::Tuple)
119+
open(r.file,"a") do f
120+
foreach(inow) do ii
121+
putitem(f,ii)
122+
end
123+
end
124+
end

src/runner.jl

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -121,34 +121,56 @@ struct LocalRunner
121121
inbuffers_pure
122122
outbuffers
123123
progress
124-
end
125-
function LocalRunner(op,exec_plan,outars=create_outars(op,exec_plan);threaded=true,showprogress=true)
124+
restarter
125+
end
126+
function LocalRunner(op,exec_plan,
127+
outars=create_outars(op,exec_plan);
128+
threaded=true,
129+
showprogress=true,
130+
restartfile=nothing,
131+
restartmode=:continue,
132+
)
126133
loopranges = plan_to_loopranges(exec_plan)
127134
inbuffers_pure = generate_inbuffers(op.inars, loopranges)
128135
outbuffers = generate_outbuffers(op.outspecs,op.f, loopranges)
129136
pm = showprogress ? Progress(length(loopranges)) : nothing
130-
LocalRunner(op,plan_to_loopranges(loopranges),outars, threaded, inbuffers_pure,outbuffers,pm)
137+
loopranges = plan_to_loopranges(exec_plan)
138+
restarter = create_restarter(restartfile,loopranges,restartmode)
139+
LocalRunner(op,loopranges,outars, threaded, inbuffers_pure,outbuffers,pm,restarter)
131140
end
132141

133142
update_progress!(::Nothing) = nothing
134143
update_progress!(pm) = next!(pm)
144+
need_run(inow,restarter::Restarter) = need_run(inow,restarter.remaining_loopranges)
145+
need_run(inow,::Nothing) = true
146+
need_run(inow,remaining_loopranges) = inow in remaining_loopranges
147+
135148

136149
function run_loop(runner::LocalRunner,loopranges = runner.loopranges;groupspecs=nothing)
137150
run_loop(
138-
runner,runner.op, runner.inbuffers_pure,runner.outbuffers,runner.threaded,runner.outars,runner.progress,loopranges;groupspecs
151+
runner,runner.op, runner.inbuffers_pure,runner.outbuffers,runner.threaded,runner.outars,runner.progress,loopranges, runner.restarter;groupspecs
139152
)
140153
end
141154

142-
@noinline function run_loop(::LocalRunner,op,inbuffers_pure,outbuffers,threaded,outars,progress,loopranges;groupspecs=nothing)
143-
for inow in loopranges
144-
@debug "inow = ", inow
155+
function default_loopbody(inow, re, op, inbuffers_pure, outbuffers, threaded,outars, progress)
156+
@debug "inow = ", inow
157+
if need_run(inow,re)
145158
inbuffers_wrapped = read_range.((inow,),op.inars,inbuffers_pure);
146159
outbuffers_now = extract_outbuffer.((inow,),op.outspecs,op.f.init,op.f.buftype,outbuffers)
147160
run_block(op,inow,inbuffers_wrapped,outbuffers_now,threaded)
148161
put_buffer.((inow,),outbuffers_now,outars,nothing)
149162
clean_aggregator.(outbuffers)
150163
update_progress!(progress)
164+
update_restarter(re, inow)
165+
end
166+
end
167+
168+
@noinline function run_loop(::LocalRunner,op,inbuffers_pure,outbuffers,threaded,outars,progress,loopranges,re;groupspecs=nothing)
169+
for inow in loopranges
170+
default_loopbody(inow, re, op, inbuffers_pure, outbuffers, threaded,outars, progress)
151171
end
172+
finish_progress(progress)
173+
finish_restarter(re)
152174
end
153175

154176
using Distributed
@@ -161,8 +183,9 @@ struct PMapRunner
161183
inbuffers_pure
162184
outbuffers
163185
progress_channel
186+
restarter
164187
end
165-
function PMapRunner(op,exec_plan,outars=create_outars(op,exec_plan);threaded=true,showprogress=true)
188+
function PMapRunner(op,exec_plan,outars=create_outars(op,exec_plan);threaded=true,showprogress=true,restartfile=nothing,restartmode=:continue)
166189
all(isnothing,op.f.red) || error("PMapRunner can not be used for reductions. Use DaggerRunner instead")
167190
loopranges = plan_to_loopranges(exec_plan)
168191
inbuffers_pure = generate_inbuffers(op.inars, loopranges)
@@ -174,30 +197,42 @@ function PMapRunner(op,exec_plan,outars=create_outars(op,exec_plan);threaded=tru
174197
next!(progress)
175198
end
176199
channel
200+
else
201+
nothing
177202
end
178-
PMapRunner(op,plan_to_loopranges(loopranges),outars, threaded, inbuffers_pure,outbuffers,progress_channel)
203+
restarter = create_restarter(restartfile,loopranges,restartmode)
204+
restart_channel = if isnothing(restarter)
205+
nothing
206+
else
207+
nd = ndims(restarter)
208+
channel = Distributed.RemoteChannel(()->Channel{Union{Nothing,NTuple{nd,Int}}}(), 1)
209+
@async while true
210+
update = take!(channel)
211+
isnothing(update) && break
212+
add_entry(restarter,update)
213+
end
214+
end
215+
PMapRunner(op,loopranges, outars, threaded, inbuffers_pure,outbuffers,progress_channel,restart_channel)
179216
end
180217

181218
update_progress!(pm::RemoteChannel) = put!(pm, true)
182-
219+
update_restarter(re::RemoteChannel,i) = put!(re, i)
220+
update_restarter(::Nothing,i) = nothing
221+
update_restarter(re::Restarter,i) = add_entry(re,i)
183222
finish_progress(::Any) = nothing
184223
finish_progress(pm::RemoteChannel) = put!(pm,false)
224+
finish_restarter(::Any) = nothing
225+
finish_restarter(re::RemoteChannel) = put!(re,nothing)
185226

186227
function run_loop(runner::PMapRunner,loopranges = runner.loopranges;groupspecs=nothing)
187228
run_loop(
188-
runner,runner.op, runner.inbuffers_pure,runner.outbuffers,runner.threaded,runner.outars,runner.progress_channel,loopranges;groupspecs
229+
runner,runner.op, runner.inbuffers_pure,runner.outbuffers,runner.threaded,runner.outars,runner.progress_channel,runner.restarter,loopranges;groupspecs
189230
)
190231
end
191232

192-
@noinline function run_loop(::PMapRunner,op,inbuffers_pure,outbuffers,threaded,outars,progress,loopranges;groupspecs=nothing)
233+
@noinline function run_loop(::PMapRunner,op,inbuffers_pure,outbuffers,threaded,outars,progress,restarter,loopranges;groupspecs=nothing)
193234
pmap(CachingPool(workers()),loopranges) do inow
194-
@debug "inow = ", inow
195-
inbuffers_wrapped = read_range.((inow,),op.inars,inbuffers_pure);
196-
outbuffers_now = extract_outbuffer.((inow,),op.outspecs,op.f.init,op.f.buftype,outbuffers)
197-
run_block(op,inow,inbuffers_wrapped,outbuffers_now,threaded)
198-
put_buffer.((inow,),outbuffers_now,outars,nothing)
199-
clean_aggregator.(outbuffers)
200-
update_progress!(progress)
235+
default_loopbody(inow, restarter, op, inbuffers_pure, outbuffers, threaded,outars, progress)
201236
end
202237
finish_progress(progress)
203238
end

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ include("test_auxtypes.jl")
66
include("test_optimise.jl")
77
include("test_buffers.jl")
88
include("test_distribute.jl")
9+
include("test_restart.jl")

0 commit comments

Comments
 (0)