# == Synopsis
#
-# wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
-# wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
+# arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
+# arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
#
# Satisfy a pipeline template by finding or submitting a mapreduce job
# for each pipeline component.
#
# [--template uuid] Use the specified pipeline template.
#
+# [--template path] Load the pipeline template from the specified
+# local file.
+#
# [--instance uuid] Use the specified pipeline instance.
#
# [-n, --dry-run] Do not start any new jobs or wait for existing jobs
# to finish. Just find out whether jobs are finished,
# queued, or running for each component
#
-# [--create-instance-only] Do not try to satisfy any components. Just
+# [--submit] Do not try to satisfy any components. Just
# create an instance, print its UUID to
# stdout, and exit.
#
# [--no-wait] Make only as much progress as possible without entering
# a sleep/poll loop.
#
-# [--no-reuse-finished] Do not reuse existing outputs to satisfy
-# pipeline components. Always submit a new job
-# or use an existing job which has not yet
-# finished.
-#
# [--no-reuse] Do not reuse existing jobs to satisfy pipeline
# components. Submit a new job for every component.
#
abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
begin
+ require 'arvados'
require 'rubygems'
- require 'google/api_client'
require 'json'
require 'pp'
require 'trollop'
-rescue LoadError
+ require 'google/api_client'
+rescue LoadError => l
+ puts $:
abort <<-EOS
-#{$0}: fatal: some runtime dependencies are missing.
-Try: gem install pp google-api-client json trollop
+#{$0}: fatal: #{l.message}
+Some runtime dependencies may be missing.
+Try: gem install arvados pp google-api-client json trollop
EOS
end
# this program, that is, not the pipeline component parameters).
p = Trollop::Parser.new do
+ version __FILE__
opt(:dry_run,
"Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
:type => :boolean,
"Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
:short => :none,
:type => :boolean)
- opt(:no_reuse_finished,
- "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
- :short => :none,
- :type => :boolean)
opt(:no_reuse,
"Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
:short => :none,
:short => :none,
:type => :integer)
opt(:template,
- "UUID of pipeline template.",
+ "UUID of pipeline template, or path to local pipeline template file.",
:short => :none,
:type => :string)
opt(:instance,
"UUID of pipeline instance.",
:short => :none,
:type => :string)
- opt(:create_instance_only,
+ opt(:submit,
"Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
:short => :none,
:type => :boolean)
+ opt(:run_here,
+ "Manage the pipeline in process.",
+ :short => :none,
+ :type => :boolean)
stop_on [:'--']
end
$options = Trollop::with_standard_exception_handling p do
$debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
if $options[:instance]
- if $options[:template] or $options[:create_instance_only]
- abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
+ if $options[:template] or $options[:submit]
+ abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
end
elsif not $options[:template]
- abort "#{$0}: syntax error: you must supply a --template or --instance."
+ puts "error: you must supply a --template or --instance."
+ p.educate
+ abort
+end
+
+if $options[:run_here] == $options[:submit]
+ abort "#{$0}: syntax error: you must supply either --run-here or --submit."
+end
+
+# Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
+
+module Kernel
+ def suppress_warnings
+ original_verbosity = $VERBOSE
+ $VERBOSE = nil
+ result = yield
+ $VERBOSE = original_verbosity
+ return result
+ end
+end
+
+if ENV['ARVADOS_API_HOST_INSECURE']
+ suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
end
# Set up the API client.
:application_name => File.split($0).last,
:application_version => $application_version.to_s)
$arvados = $client.discovered_api('arvados', $arvados_api_version)
+$arv = Arvados.new api_version: 'v1'
class PipelineInstance
def self.find(uuid)
result = $client.execute(:api_method => $arvados.pipeline_instances.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => uuid
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
end
def self.create(attributes)
result = $client.execute(:api_method => $arvados.pipeline_instances.create,
- :parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :pipeline_instance => attributes.to_json
+ :body_object => {
+ :pipeline_instance => attributes
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
- abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
end
debuglog "Created pipeline instance: #{j[:uuid]}"
self.new(j)
def save
result = $client.execute(:api_method => $arvados.pipeline_instances.update,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :uuid => @pi[:uuid],
- :pipeline_instance => @attributes_to_update.to_json
+ :uuid => @pi[:uuid]
+ },
+ :body_object => {
+ :pipeline_instance => @attributes_to_update
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
def [](x)
@pi[x]
end
+
+ def log_stderr(msg)
+ $arv.log.create log: {
+ event_type: 'stderr',
+ object_uuid: self[:uuid],
+ owner_uuid: self[:owner_uuid],
+ properties: {"text" => msg},
+ }
+ end
+
protected
def initialize(j)
@attributes_to_update = {}
@cache ||= {}
result = $client.execute(:api_method => $arvados.jobs.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => uuid
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
@cache[uuid] = JSON.parse result.body, :symbolize_names => true
end
def self.where(conditions)
result = $client.execute(:api_method => $arvados.jobs.list,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:limit => 10000,
:where => conditions.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
list = JSON.parse result.body, :symbolize_names => true
if list and list[:items].is_a? Array
list[:items]
[]
end
end
- def self.create(attributes)
+ def self.create(pipeline, component, job, create_params)
@cache ||= {}
+
+ body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
+
result = $client.execute(:api_method => $arvados.jobs.create,
- :parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :job => attributes.to_json
- },
- :authenticated => false)
+ :body_object => body,
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
if j.is_a? Hash and j[:uuid]
@cache[j[:uuid]] = j
else
- debuglog "create job: #{j[:errors] rescue nil}", 0
+ debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+
+ msg = ""
+ j[:errors].each do |err|
+ msg += "Error creating job for component #{component}: #{err}\n"
+ end
+ msg += "Job submission was: #{body.to_json}"
+
+ pipeline.log_stderr(msg)
nil
end
end
+
+ protected
+
+ def self.no_nil_values(hash)
+ hash.reject { |key, value| value.nil? }
+ end
end
class WhRunPipelineInstance
@options = _options
end
- def fetch_template(template_uuid)
- result = $client.execute(:api_method => $arvados.pipeline_templates.get,
- :parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :uuid => template_uuid
- },
- :authenticated => false)
- @template = JSON.parse result.body, :symbolize_names => true
- if !@template[:uuid]
- abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
+ def fetch_template(template)
+ if template.match /[^-0-9a-z]/
+ # Doesn't look like a uuid -- use it as a filename.
+ @template = JSON.parse File.read(template), :symbolize_names => true
+ else
+ result = $client.execute(:api_method => $arvados.pipeline_templates.get,
+ :parameters => {
+ :uuid => template
+ },
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
+ @template = JSON.parse result.body, :symbolize_names => true
+ if !@template[:uuid]
+ abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
+ end
end
self
end
param = params_args.shift.sub /^--/, ''
params[param] = params_args.shift
else
- abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
end
end
+ if not @template[:components].is_a?(Hash)
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
+ end
@components = @template[:components].dup
+ bad_components = @components.each_pair.select do |cname, cspec|
+ not cspec.is_a?(Hash)
+ end
+ if bad_components.any?
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
+ end
+
+ bad_components = @components.each_pair.select do |cname, cspec|
+ not cspec[:script_parameters].is_a?(Hash)
+ end
+ if bad_components.any?
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
+ end
+
errors = []
@components.each do |componentname, component|
component[:script_parameters].each do |parametername, parameter|
end
end
if !errors.empty?
- abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
end
debuglog "options=" + @options.pretty_inspect
self
end
def setup_instance
- @instance ||= PipelineInstance.
- create(:components => @components,
- :pipeline_template_uuid => @template[:uuid],
- :active => true)
+ if @instance
+ @instance[:properties][:run_options] ||= {}
+ if @options[:no_reuse]
+ # override properties of existing instance
+ @instance[:properties][:run_options][:enable_job_reuse] = false
+ else
+ # Default to "enable reuse" if not specified. (This code path
+ # can go away when old clients go away.)
+ if @instance[:properties][:run_options][:enable_job_reuse].nil?
+ @instance[:properties][:run_options][:enable_job_reuse] = true
+ end
+ end
+ else
+ @instance = PipelineInstance.
+ create(components: @components,
+ properties: {
+ run_options: {
+ enable_job_reuse: !@options[:no_reuse]
+ }
+ },
+ pipeline_template_uuid: @template[:uuid],
+ state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
+ end
self
end
def run
moretodo = true
+ interrupted = false
+
+ if @instance[:started_at].nil?
+ @instance[:started_at] = Time.now
+ end
+
+ job_creation_failed = 0
while moretodo
moretodo = false
@components.each do |cname, c|
job = nil
+ owner_uuid = @instance[:owner_uuid]
+ # Is the job satisfying this component already known to be
+ # finished? (Already meaning "before we query API server about
+ # the job's current state")
+ c_already_finished = (c[:job] &&
+ c[:job][:uuid] &&
+ !c[:job][:success].nil?)
if !c[:job] and
- c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
- # Job is fully specified (all parameter values are present) but
- # no particular job has been found.
-
- debuglog "component #{cname} ready to satisfy."
-
- c.delete :wait
- second_place_job = nil # satisfies component, but not finished yet
-
- (@options[:no_reuse] ? [] : JobCache.
- where(script: c[:script],
- script_parameters: c[:script_parameters],
- script_version_descends_from: c[:script_version_descends_from])
- ).each do |candidate_job|
- candidate_params_downcase = Hash[candidate_job[:script_parameters].
- map { |k,v| [k.downcase,v] }]
- c_params_downcase = Hash[c[:script_parameters].
- map { |k,v| [k.downcase,v] }]
-
- debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
-
- unless candidate_params_downcase == c_params_downcase
- next
- end
-
- unless candidate_job[:success] || candidate_job[:running] ||
- (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
- debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
- next
- end
-
- if candidate_job[:success]
- unless @options[:no_reuse_finished]
- job = candidate_job
- debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
- c[:job] = job
- end
- else
- second_place_job ||= candidate_job
- end
- break
- end
- if not c[:job] and second_place_job
- job = second_place_job
- debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
+ c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
+ # No job yet associated with this component and is component inputs
+ # are fully specified (any output_of script_parameters are resolved
+ # to real value)
+ job = JobCache.create(@instance, cname, {
+ :script => c[:script],
+ :script_parameters => c[:script_parameters],
+ :script_version => c[:script_version],
+ :repository => c[:repository],
+ :nondeterministic => c[:nondeterministic],
+ :runtime_constraints => c[:runtime_constraints],
+ :owner_uuid => owner_uuid,
+ }, {
+ # This is the right place to put these attributes when
+ # dealing with new API servers.
+ :minimum_script_version => c[:minimum_script_version],
+ :exclude_script_versions => c[:exclude_minimum_script_versions],
+ :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
+ !c[:nondeterministic]),
+ :filters => c[:filters]
+ })
+ if job
+ debuglog "component #{cname} new job #{job[:uuid]}"
c[:job] = job
+ else
+ debuglog "component #{cname} new job failed", 0
+ job_creation_failed += 1
end
- if not c[:job]
- debuglog "component #{cname} not satisfied by any existing job."
- if !@options[:dry_run]
- debuglog "component #{cname} new job."
- job = JobCache.create(:script => c[:script],
- :script_parameters => c[:script_parameters],
- :resource_limits => c[:resource_limits] || {},
- :script_version => c[:script_version] || 'master')
- if job
- debuglog "component #{cname} new job #{job[:uuid]}"
- c[:job] = job
- else
- debuglog "component #{cname} new job failed"
- end
- end
- end
- else
- c[:wait] = true
end
+
if c[:job] and c[:job][:uuid]
- if not c[:job][:finished_at] and not c[:job][:cancelled_at]
+ if (c[:job][:running] or
+ not (c[:job][:finished_at] or c[:job][:cancelled_at]))
+ # Job is running so update copy of job record
c[:job] = JobCache.get(c[:job][:uuid])
end
+
if c[:job][:success]
# Populate script_parameters of other components waiting for
# this job
if p.is_a? Hash and p[:output_of] == cname.to_s
debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
c2[:script_parameters][pname] = c[:job][:output]
+ moretodo = true
+ end
+ end
+ end
+ unless c_already_finished
+ # This is my first time discovering that the job
+ # succeeded. (At the top of this loop, I was still
+ # waiting for it to finish.)
+
+ debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
+ if (not @instance[:name].nil?) and (not @instance[:name].empty?)
+ pipeline_name = @instance[:name]
+ else
+ fetch_template(@instance[:pipeline_template_uuid])
+ pipeline_name = @template[:name]
+ end
+ if c[:output_name] != false
+ # Create a collection located in the same project as the pipeline with the contents of the output.
+ portable_data_hash = c[:job][:output]
+ collections = $arv.collection.list(limit: 1,
+ filters: [['portable_data_hash', '=', portable_data_hash]],
+ select: ["portable_data_hash", "manifest_text"]
+ )[:items]
+ if collections.any?
+ name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
+
+ # check if there is a name collision.
+ name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
+ ["name", "=", name]])[:items]
+
+ newcollection_actual = nil
+ if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
+ # There is already a collection with the same name and the
+ # same contents, so just point to that.
+ newcollection_actual = name_collisions.first
+ end
+
+ if newcollection_actual.nil?
+ # Did not find a collection with the same name (or the
+ # collection has a different portable data hash) so create
+ # a new collection with ensure_unique_name: true.
+ newcollection = {
+ owner_uuid: owner_uuid,
+ name: name,
+ portable_data_hash: collections.first[:portable_data_hash],
+ manifest_text: collections.first[:manifest_text]
+ }
+ debuglog "Creating collection #{newcollection}", 0
+ newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
+ end
+
+ c[:output_uuid] = newcollection_actual[:uuid]
+ else
+ debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
end
end
end
elsif c[:job][:running] ||
(!c[:job][:started_at] && !c[:job][:cancelled_at])
- moretodo ||= !@options[:no_wait]
+ # Job is still running
+ moretodo = true
elsif c[:job][:cancelled_at]
debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
end
end
end
@instance[:components] = @components
- @instance[:active] = moretodo
report_status
+
+ if @options[:no_wait]
+ moretodo = false
+ end
+
+ # If job creation fails, just give up on this pipeline instance.
+ if job_creation_failed > 0
+ moretodo = false
+ end
+
if moretodo
begin
sleep 10
rescue Interrupt
debuglog "interrupt", 0
- abort
+ interrupted = true
+ break
+ end
+ end
+ end
+
+ ended = 0
+ succeeded = 0
+ failed = 0
+ @components.each do |cname, c|
+ if c[:job]
+ if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
+ ended += 1
+ if c[:job][:success] == true
+ succeeded += 1
+ elsif c[:job][:success] == false or c[:job][:cancelled_at]
+ failed += 1
+ end
end
end
end
- @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
+
+ success = (succeeded == @components.length)
+
+ # A job create call failed. Just give up.
+ if job_creation_failed > 0
+ debuglog "job creation failed - giving up on this pipeline instance", 0
+ success = false
+ failed += 1
+ end
+
+ if interrupted
+ if success
+ @instance[:state] = 'Complete'
+ else
+ @instance[:state] = 'Paused'
+ end
+ else
+ if ended == @components.length or failed > 0
+ @instance[:state] = success ? 'Complete' : 'Failed'
+ end
+ end
+
+ if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
+ @instance[:finished_at] = Time.now
+ end
+
+ debuglog "pipeline instance state is #{@instance[:state]}"
+
+ # set components_summary
+ components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
+ @instance[:components_summary] = components_summary
+
@instance.save
end
def cleanup
- if @instance
- @instance[:active] = false
+ if @instance and @instance[:state] == 'RunningOnClient'
+ @instance[:state] = 'Paused'
@instance.save
end
end
if @options[:status_text] != '/dev/null'
File.open(@options[:status_text], 'w') do |f|
+ f.puts ""
f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
namewidth = @components.collect { |cname, c| cname.size }.max
@components.each do |cname, c|
end
end
end
+
+ def abort(msg)
+ if @instance
+ if ["New", "Ready", "RunningOnClient",
+ "RunningOnServer"].include?(@instance[:state])
+ @instance[:state] = "Failed"
+ @instance[:finished_at] = Time.now
+ @instance.save
+ end
+ @instance.log_stderr(msg)
+ end
+ Kernel::abort(msg)
+ end
end
runner = WhRunPipelineInstance.new($options)
end
runner.apply_parameters(p.leftovers)
runner.setup_instance
- if $options[:create_instance_only]
+ if $options[:submit]
runner.instance.save
puts runner.instance[:uuid]
else