5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 class Google::APIClient
113 def discovery_document(api, version)
115 return @discovery_documents["#{api}:#{version}"] ||=
117 response = self.execute!(
118 :http_method => :get,
119 :uri => self.discovery_uri(api, version),
120 :authenticated => false
122 response.body.class == String ? JSON.parse(response.body) : response.body
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
131 p = Trollop::Parser.new do
134 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
138 "Store plain text status in given file.",
141 :default => '/dev/stdout')
143 "Store json-formatted pipeline in given file.",
146 :default => '/dev/null')
148 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
156 "Print extra debugging information on stderr.",
159 "Set debug verbosity level.",
163 "UUID of pipeline template, or path to local pipeline template file.",
167 "UUID of pipeline instance.",
171 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
175 "Manage the pipeline in process.",
180 $options = Trollop::with_standard_exception_handling p do
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 puts "error: you must supply a --template or --instance."
195 if $options[:run_here] == $options[:submit]
196 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
202 def suppress_warnings
203 original_verbosity = $VERBOSE
206 $VERBOSE = original_verbosity
211 if ENV['ARVADOS_API_HOST_INSECURE']
212 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
215 # Set up the API client.
217 $client ||= Google::APIClient.
218 new(:host => $arvados_api_host,
219 :application_name => File.split($0).last,
220 :application_version => $application_version.to_s)
221 $arvados = $client.discovered_api('arvados', $arvados_api_version)
222 $arv = Arvados.new api_version: 'v1'
225 class PipelineInstance
227 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
231 :authenticated => false,
233 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
235 j = JSON.parse result.body, :symbolize_names => true
236 unless j.is_a? Hash and j[:uuid]
237 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
240 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
244 def self.create(attributes)
245 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
247 :pipeline_instance => attributes
249 :authenticated => false,
251 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
253 j = JSON.parse result.body, :symbolize_names => true
254 unless j.is_a? Hash and j[:uuid]
255 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
257 debuglog "Created pipeline instance: #{j[:uuid]}"
261 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
266 :pipeline_instance => @attributes_to_update
268 :authenticated => false,
270 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
272 j = JSON.parse result.body, :symbolize_names => true
273 unless j.is_a? Hash and j[:uuid]
274 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
277 @attributes_to_update = {}
282 @attributes_to_update[x] = y
290 $arv.log.create log: {
291 event_type: 'stderr',
292 object_uuid: self[:uuid],
293 owner_uuid: self[:owner_uuid],
294 properties: {"text" => msg},
300 @attributes_to_update = {}
308 result = $client.execute(:api_method => $arvados.jobs.get,
312 :authenticated => false,
314 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
316 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
318 def self.where(conditions)
319 result = $client.execute(:api_method => $arvados.jobs.list,
322 :where => conditions.to_json
324 :authenticated => false,
326 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
328 list = JSON.parse result.body, :symbolize_names => true
329 if list and list[:items].is_a? Array
335 def self.create(pipeline, component, job, create_params)
338 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
340 result = $client.execute(:api_method => $arvados.jobs.create,
341 :body_object => body,
342 :authenticated => false,
344 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
346 j = JSON.parse result.body, :symbolize_names => true
347 if j.is_a? Hash and j[:uuid]
350 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
353 j[:errors].each do |err|
354 msg += "Error creating job for component #{component}: #{err}\n"
356 msg += "Job submission was: #{body.to_json}"
358 pipeline.log_stderr(msg)
365 def self.no_nil_values(hash)
366 hash.reject { |key, value| value.nil? }
370 class WhRunPipelineInstance
371 attr_reader :instance
373 def initialize(_options)
377 def fetch_template(template)
378 if template.match /[^-0-9a-z]/
379 # Doesn't look like a uuid -- use it as a filename.
380 @template = JSON.parse File.read(template), :symbolize_names => true
382 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
386 :authenticated => false,
388 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
390 @template = JSON.parse result.body, :symbolize_names => true
392 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
398 def fetch_instance(instance_uuid)
399 @instance = PipelineInstance.find(instance_uuid)
400 @template = @instance
404 def apply_parameters(params_args)
405 params_args.shift if params_args[0] == '--'
407 while !params_args.empty?
408 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
409 params[re[2]] = re[3]
411 elsif params_args.size > 1
412 param = params_args.shift.sub /^--/, ''
413 params[param] = params_args.shift
415 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
419 if not @template[:components].is_a?(Hash)
420 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
422 @components = @template[:components].dup
424 bad_components = @components.each_pair.select do |cname, cspec|
425 not cspec.is_a?(Hash)
427 if bad_components.any?
428 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
431 bad_components = @components.each_pair.select do |cname, cspec|
432 not cspec[:script_parameters].is_a?(Hash)
434 if bad_components.any?
435 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
439 @components.each do |componentname, component|
440 component[:script_parameters].each do |parametername, parameter|
441 parameter = { :value => parameter } unless parameter.is_a? Hash
443 (params["#{componentname}::#{parametername}"] ||
445 (parameter[:output_of].nil? &&
446 (params[parametername.to_s] ||
447 parameter[:default])) ||
450 ![false,'false',0,'0'].index parameter[:required]
451 if parameter[:output_of]
454 errors << [componentname, parametername, "required parameter is missing"]
456 debuglog "parameter #{componentname}::#{parametername} == #{value}"
457 component[:script_parameters][parametername] = value
461 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
463 debuglog "options=" + @options.pretty_inspect
469 @instance[:properties][:run_options] ||= {}
470 if @options[:no_reuse]
471 # override properties of existing instance
472 @instance[:properties][:run_options][:enable_job_reuse] = false
474 # Default to "enable reuse" if not specified. (This code path
475 # can go away when old clients go away.)
476 if @instance[:properties][:run_options][:enable_job_reuse].nil?
477 @instance[:properties][:run_options][:enable_job_reuse] = true
481 @instance = PipelineInstance.
482 create(components: @components,
485 enable_job_reuse: !@options[:no_reuse]
488 pipeline_template_uuid: @template[:uuid],
489 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
498 job_creation_failed = 0
501 @components.each do |cname, c|
503 owner_uuid = @instance[:owner_uuid]
504 # Is the job satisfying this component already known to be
505 # finished? (Already meaning "before we query API server about
506 # the job's current state")
507 c_already_finished = (c[:job] &&
509 !c[:job][:success].nil?)
511 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
512 # No job yet associated with this component and is component inputs
513 # are fully specified (any output_of script_parameters are resolved
515 job = JobCache.create(@instance, cname, {
516 :script => c[:script],
517 :script_parameters => c[:script_parameters],
518 :script_version => c[:script_version],
519 :repository => c[:repository],
520 :nondeterministic => c[:nondeterministic],
521 :runtime_constraints => c[:runtime_constraints],
522 :owner_uuid => owner_uuid,
524 # This is the right place to put these attributes when
525 # dealing with new API servers.
526 :minimum_script_version => c[:minimum_script_version],
527 :exclude_script_versions => c[:exclude_minimum_script_versions],
528 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
529 !c[:nondeterministic]),
530 :filters => c[:filters]
533 debuglog "component #{cname} new job #{job[:uuid]}"
536 debuglog "component #{cname} new job failed", 0
537 job_creation_failed += 1
541 if c[:job] and c[:job][:uuid]
542 if (c[:job][:running] or
543 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
544 # Job is running so update copy of job record
545 c[:job] = JobCache.get(c[:job][:uuid])
549 # Populate script_parameters of other components waiting for
551 @components.each do |c2name, c2|
552 c2[:script_parameters].each do |pname, p|
553 if p.is_a? Hash and p[:output_of] == cname.to_s
554 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
555 c2[:script_parameters][pname] = c[:job][:output]
560 unless c_already_finished
561 # This is my first time discovering that the job
562 # succeeded. (At the top of this loop, I was still
563 # waiting for it to finish.)
565 debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
566 if (not @instance[:name].nil?) and (not @instance[:name].empty?)
567 pipeline_name = @instance[:name]
569 fetch_template(@instance[:pipeline_template_uuid])
570 pipeline_name = @template[:name]
572 if c[:output_name] != false
573 # Create a collection located in the same project as the pipeline with the contents of the output.
574 portable_data_hash = c[:job][:output]
575 collections = $arv.collection.list(limit: 1,
576 filters: [['portable_data_hash', '=', portable_data_hash]],
577 select: ["portable_data_hash", "manifest_text"]
580 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
582 # check if there is a name collision.
583 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
584 ["name", "=", name]])[:items]
586 newcollection_actual = nil
587 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
588 # There is already a collection with the same name and the
589 # same contents, so just point to that.
590 newcollection_actual = name_collisions.first
593 if newcollection_actual.nil?
594 # Did not find a collection with the same name (or the
595 # collection has a different portable data hash) so create
596 # a new collection with ensure_unique_name: true.
598 owner_uuid: owner_uuid,
600 portable_data_hash: collections.first[:portable_data_hash],
601 manifest_text: collections.first[:manifest_text]
603 debuglog "Creating collection #{newcollection}", 0
604 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
607 c[:output_uuid] = newcollection_actual[:uuid]
609 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
613 elsif c[:job][:running] ||
614 (!c[:job][:started_at] && !c[:job][:cancelled_at])
615 # Job is still running
617 elsif c[:job][:cancelled_at]
618 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
622 @instance[:components] = @components
625 if @options[:no_wait]
629 # If job creation fails, just give up on this pipeline instance.
630 if job_creation_failed > 0
638 debuglog "interrupt", 0
648 @components.each do |cname, c|
650 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
652 if c[:job][:success] == true
654 elsif c[:job][:success] == false or c[:job][:cancelled_at]
661 success = (succeeded == @components.length)
663 # A job create call failed. Just give up.
664 if job_creation_failed > 0
665 debuglog "job creation failed - giving up on this pipeline instance", 0
672 @instance[:state] = 'Complete'
674 @instance[:state] = 'Paused'
677 if ended == @components.length or failed > 0
678 @instance[:state] = success ? 'Complete' : 'Failed'
682 debuglog "pipeline instance state is #{@instance[:state]}"
684 # set components_summary
685 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
686 @instance[:components_summary] = components_summary
692 if @instance and @instance[:state] == 'RunningOnClient'
693 @instance[:state] = 'Paused'
707 if @options[:status_json] != '/dev/null'
708 File.open(@options[:status_json], 'w') do |f|
709 f.puts @components.pretty_inspect
713 if @options[:status_text] != '/dev/null'
714 File.open(@options[:status_text], 'w') do |f|
716 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
717 namewidth = @components.collect { |cname, c| cname.size }.max
718 @components.each do |cname, c|
719 jstatus = if !c[:job]
721 elsif c[:job][:running]
722 "#{c[:job][:tasks_summary].inspect}"
723 elsif c[:job][:success]
725 elsif c[:job][:cancelled_at]
726 "cancelled #{c[:job][:cancelled_at]}"
727 elsif c[:job][:finished_at]
728 "failed #{c[:job][:finished_at]}"
729 elsif c[:job][:started_at]
730 "started #{c[:job][:started_at]}"
732 "queued #{c[:job][:created_at]}"
734 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
742 if ["New", "Ready", "RunningOnClient",
743 "RunningOnServer"].include?(@instance[:state])
744 @instance[:state] = "Failed"
747 @instance.log_stderr(msg)
753 runner = WhRunPipelineInstance.new($options)
755 if $options[:template]
756 runner.fetch_template($options[:template])
758 runner.fetch_instance($options[:instance])
760 runner.apply_parameters(p.leftovers)
761 runner.setup_instance
764 puts runner.instance[:uuid]
768 rescue Exception => e