#
# [--template uuid] Use the specified pipeline template.
#
+# [--template path] Load the pipeline template from the specified
+# local file.
+#
# [--instance uuid] Use the specified pipeline instance.
#
# [-n, --dry-run] Do not start any new jobs or wait for existing jobs
# to finish. Just find out whether jobs are finished,
# queued, or running for each component
#
-# [--create-only] Do not try to satisfy any components. Just create an
-# instance, print its UUID to stdout, and exit.
+# [--create-instance-only] Do not try to satisfy any components. Just
+# create an instance, print its UUID to
+# stdout, and exit.
#
# [--no-wait] Make only as much progress as possible without entering
# a sleep/poll loop.
#
+# [--no-reuse-finished] Do not reuse existing outputs to satisfy
+# pipeline components. Always submit a new job
+# or use an existing job which has not yet
+# finished.
+#
+# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
+# components. Submit a new job for every component.
+#
# [--debug] Print extra debugging information on stderr.
#
# [--debug-level N] Increase amount of debugging information. Default
"Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
:short => :none,
:type => :boolean)
+ opt(:no_reuse_finished,
+ "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
+ :short => :none,
+ :type => :boolean)
+ opt(:no_reuse,
+ "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
+ :short => :none,
+ :type => :boolean)
opt(:debug,
"Print extra debugging information on stderr.",
:type => :boolean)
:short => :none,
:type => :integer)
opt(:template,
- "UUID of pipeline template.",
+ "UUID of pipeline template, or path to local pipeline template file.",
:short => :none,
:type => :string)
opt(:instance,
"UUID of pipeline instance.",
:short => :none,
:type => :string)
- opt(:create_only,
+ opt(:create_instance_only,
"Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
:short => :none,
:type => :boolean)
$debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
if $options[:instance]
- if $options[:template] or $options[:create_only]
- abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only."
+ if $options[:template] or $options[:create_instance_only]
+ abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
end
elsif not $options[:template]
abort "#{$0}: syntax error: you must supply a --template or --instance."
def self.find(uuid)
result = $client.execute(:api_method => $arvados.pipeline_instances.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => uuid
},
+ :body => {
+ :api_token => ENV['ARVADOS_API_TOKEN']
+ },
:authenticated => false)
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
end
def self.create(attributes)
result = $client.execute(:api_method => $arvados.pipeline_instances.create,
- :parameters => {
+ :body => {
:api_token => ENV['ARVADOS_API_TOKEN'],
- :pipeline_instance => attributes.to_json
+ :pipeline_instance => attributes
},
:authenticated => false)
j = JSON.parse result.body, :symbolize_names => true
def save
result = $client.execute(:api_method => $arvados.pipeline_instances.update,
:parameters => {
+ :uuid => @pi[:uuid]
+ },
+ :body => {
:api_token => ENV['ARVADOS_API_TOKEN'],
- :uuid => @pi[:uuid],
:pipeline_instance => @attributes_to_update.to_json
},
:authenticated => false)
if j.is_a? Hash and j[:uuid]
@cache[j[:uuid]] = j
else
- debuglog "create job: #{j[:errors] rescue nil}"
+ debuglog "create job: #{j[:errors] rescue nil}", 0
nil
end
end
@options = _options
end
- def fetch_template(template_uuid)
- result = $client.execute(:api_method => $arvados.pipeline_templates.get,
- :parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :uuid => template_uuid
- },
- :authenticated => false)
- @template = JSON.parse result.body, :symbolize_names => true
- if !@template[:uuid]
- abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
+ def fetch_template(template)
+ if template.match /[^-0-9a-z]/
+ # Doesn't look like a uuid -- use it as a filename.
+ @template = JSON.parse File.read(template), :symbolize_names => true
+ if !@template[:components]
+ abort ("#{$0}: Template loaded from #{template} " +
+ "does not have a \"components\" key")
+ end
+ else
+ result = $client.execute(:api_method => $arvados.pipeline_templates.get,
+ :parameters => {
+ :api_token => ENV['ARVADOS_API_TOKEN'],
+ :uuid => template
+ },
+ :authenticated => false)
+ @template = JSON.parse result.body, :symbolize_names => true
+ if !@template[:uuid]
+ abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
+ end
end
self
end
params_args.shift if params_args[0] == '--'
params = {}
while !params_args.empty?
- if (re = params_args[0].match /^(--)?([^-].*?)=(.)/)
+ if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
params[re[2]] = re[3]
params_args.shift
elsif params_args.size > 1
c.delete :wait
second_place_job = nil # satisfies component, but not finished yet
- JobCache.where(:script => c[:script],
- :script_parameters => c[:script_parameters],
- :script_version_descends_from => c[:script_version_descends_from]).
- each do |candidate_job|
+
+ (@options[:no_reuse] ? [] : JobCache.
+ where(script: c[:script],
+ script_parameters: c[:script_parameters],
+ script_version_descends_from: c[:script_version])
+ ).each do |candidate_job|
candidate_params_downcase = Hash[candidate_job[:script_parameters].
map { |k,v| [k.downcase,v] }]
c_params_downcase = Hash[c[:script_parameters].
next
end
+ if c[:script_version] !=
+ candidate_job[:script_version][0,c[:script_version].length]
+ debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
+ next
+ end
+
unless candidate_job[:success] || candidate_job[:running] ||
(!candidate_job[:started_at] && !candidate_job[:cancelled_at])
debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
end
if candidate_job[:success]
- job = candidate_job
- debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
- c[:job] = job
+ unless @options[:no_reuse_finished]
+ job = candidate_job
+ debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
+ c[:job] = job
+ end
else
second_place_job ||= candidate_job
end
debuglog "component #{cname} new job."
job = JobCache.create(:script => c[:script],
:script_parameters => c[:script_parameters],
- :resource_limits => c[:resource_limits] || {},
+ :runtime_constraints => c[:runtime_constraints] || {},
:script_version => c[:script_version] || 'master')
if job
debuglog "component #{cname} new job #{job[:uuid]}"
c[:job] = job
else
- debuglog "component #{cname} new job failed: #{job[:errors]}"
+ debuglog "component #{cname} new job failed"
end
end
end
c[:wait] = true
end
if c[:job] and c[:job][:uuid]
- if not c[:job][:finished_at] and not c[:job][:cancelled_at]
+ if (c[:job][:running] or
+ not (c[:job][:finished_at] or c[:job][:cancelled_at]))
c[:job] = JobCache.get(c[:job][:uuid])
end
if c[:job][:success]
@instance[:components] = @components
@instance[:active] = moretodo
report_status
- sleep 10 if moretodo
+ if moretodo
+ begin
+ sleep 10
+ rescue Interrupt
+ debuglog "interrupt", 0
+ abort
+ end
+ end
end
@instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
@instance.save
end
runner.apply_parameters(p.leftovers)
runner.setup_instance
- if $options[:create_only]
+ if $options[:create_instance_only]
runner.instance.save
puts runner.instance[:uuid]
else