# [--no-wait] Make only as much progress as possible without entering
# a sleep/poll loop.
#
-# [--no-reuse-finished] Do not reuse existing outputs to satisfy
-# pipeline components. Always submit a new job
-# or use an existing job which has not yet
-# finished.
-#
# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
# components. Submit a new job for every component.
#
"Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
:short => :none,
:type => :boolean)
- opt(:no_reuse_finished,
- "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
- :short => :none,
- :type => :boolean)
opt(:no_reuse,
"Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
:short => :none,
if j.is_a? Hash and j[:uuid]
@cache[j[:uuid]] = j
else
- debuglog "create job: #{j[:errors] rescue nil}", 0
+ debuglog "create job: #{j[:errors] rescue nil} with attribute #{attributes}", 0
nil
end
end
moretodo = false
@components.each do |cname, c|
job = nil
- if !c[:job] and
- c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
- # Job is fully specified (all parameter values are present) but
- # no particular job has been found.
-
- debuglog "component #{cname} ready to satisfy."
-
- c.delete :wait
- second_place_job = nil # satisfies component, but not finished yet
-
- (@options[:no_reuse] ? [] : JobCache.
- where(script: c[:script],
- script_parameters: c[:script_parameters],
- script_version_descends_from: c[:script_version])
- ).each do |candidate_job|
- candidate_params_downcase = Hash[candidate_job[:script_parameters].
- map { |k,v| [k.downcase,v] }]
- c_params_downcase = Hash[c[:script_parameters].
- map { |k,v| [k.downcase,v] }]
-
- debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
-
- unless candidate_params_downcase == c_params_downcase
- next
- end
-
- if c[:script_version] !=
- candidate_job[:script_version][0,c[:script_version].length]
- debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
- next
- end
- unless candidate_job[:success] || candidate_job[:running] ||
- (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
- debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
- next
- end
-
- if candidate_job[:success]
- unless @options[:no_reuse_finished]
- job = candidate_job
- $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
- c[:job] = job
- end
- else
- second_place_job ||= candidate_job
- end
- break
- end
- if not c[:job] and second_place_job
- job = second_place_job
- $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
+ if !c[:job] and
+ c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
+ # No job yet associated with this component and is component inputs
+ # are fully specified (any output_of script_parameters are resolved
+ # to real value)
+ job = JobCache.create({:script => c[:script],
+ :script_parameters => c[:script_parameters],
+ :script_version => c[:script_version],
+ :minimum_script_version => c[:minimum_script_version],
+ :exclude_script_versions => c[:exclude_minimum_script_versions],
+ :nondeterministic => c[:nondeterministic],
+ :no_reuse => @options[:no_reuse]})
+ if job
+ debuglog "component #{cname} new job #{job[:uuid]}"
c[:job] = job
+ else
+ debuglog "component #{cname} new job failed"
end
- if not c[:job]
- debuglog "component #{cname} not satisfied by any existing job."
- if !@options[:dry_run]
- debuglog "component #{cname} new job."
- job = JobCache.create(:script => c[:script],
- :script_parameters => c[:script_parameters],
- :runtime_constraints => c[:runtime_constraints] || {},
- :script_version => c[:script_version] || 'master')
- if job
- debuglog "component #{cname} new job #{job[:uuid]}"
- c[:job] = job
- else
- debuglog "component #{cname} new job failed"
- end
- end
- end
- else
- c[:wait] = true
end
+
if c[:job] and c[:job][:uuid]
if (c[:job][:running] or
not (c[:job][:finished_at] or c[:job][:cancelled_at]))
- c[:job] = JobCache.get(c[:job][:uuid])
+ # Job is running so update copy of job record
+ c[:job] = JobCache.get(c[:job][:uuid])
end
+
if c[:job][:success]
# Populate script_parameters of other components waiting for
# this job
if p.is_a? Hash and p[:output_of] == cname.to_s
debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
c2[:script_parameters][pname] = c[:job][:output]
+ moretodo = true
end
end
end
elsif c[:job][:running] ||
(!c[:job][:started_at] && !c[:job][:cancelled_at])
- moretodo ||= true
+ # Job is still running
+ moretodo = true
elsif c[:job][:cancelled_at]
debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
end
ended += 1
if c[:job][:success] == true
succeeded += 1
+ elsif c[:job][:success] == false
+ failed += 1
end
end
end
end
- if ended == @components.length
+ if ended == @components.length or failed > 0
@instance[:active] = false
@instance[:success] = (succeeded == @components.length)
end