5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 class Google::APIClient
113 def discovery_document(api, version)
115 return @discovery_documents["#{api}:#{version}"] ||=
117 response = self.execute!(
118 :http_method => :get,
119 :uri => self.discovery_uri(api, version),
120 :authenticated => false
122 response.body.class == String ? JSON.parse(response.body) : response.body
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
131 p = Trollop::Parser.new do
134 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
138 "Store plain text status in given file.",
141 :default => '/dev/stdout')
143 "Store json-formatted pipeline in given file.",
146 :default => '/dev/null')
148 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
156 "Print extra debugging information on stderr.",
159 "Set debug verbosity level.",
163 "UUID of pipeline template, or path to local pipeline template file.",
167 "UUID of pipeline instance.",
171 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
175 "Manage the pipeline in process.",
180 $options = Trollop::with_standard_exception_handling p do
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 puts "error: you must supply a --template or --instance."
195 if $options[:run_here] == $options[:submit]
196 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
202 def suppress_warnings
203 original_verbosity = $VERBOSE
206 $VERBOSE = original_verbosity
211 if ENV['ARVADOS_API_HOST_INSECURE']
212 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
215 # Set up the API client.
217 $client ||= Google::APIClient.
218 new(:host => $arvados_api_host,
219 :application_name => File.split($0).last,
220 :application_version => $application_version.to_s)
221 $arvados = $client.discovered_api('arvados', $arvados_api_version)
222 $arv = Arvados.new api_version: 'v1'
225 class PipelineInstance
227 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
231 :authenticated => false,
233 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
235 j = JSON.parse result.body, :symbolize_names => true
236 unless j.is_a? Hash and j[:uuid]
237 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
240 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
244 def self.create(attributes)
245 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
247 :pipeline_instance => attributes
249 :authenticated => false,
251 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
253 j = JSON.parse result.body, :symbolize_names => true
254 unless j.is_a? Hash and j[:uuid]
255 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
257 debuglog "Created pipeline instance: #{j[:uuid]}"
261 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
266 :pipeline_instance => @attributes_to_update
268 :authenticated => false,
270 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
272 j = JSON.parse result.body, :symbolize_names => true
273 unless j.is_a? Hash and j[:uuid]
274 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
277 @attributes_to_update = {}
282 @attributes_to_update[x] = y
290 $arv.log.create log: {
291 event_type: 'stderr',
292 object_uuid: self[:uuid],
293 owner_uuid: self[:owner_uuid],
294 properties: {"text" => msg},
300 @attributes_to_update = {}
308 result = $client.execute(:api_method => $arvados.jobs.get,
312 :authenticated => false,
314 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
316 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
318 def self.where(conditions)
319 result = $client.execute(:api_method => $arvados.jobs.list,
322 :where => conditions.to_json
324 :authenticated => false,
326 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
328 list = JSON.parse result.body, :symbolize_names => true
329 if list and list[:items].is_a? Array
335 def self.create(pipeline, component, job, create_params)
338 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
340 result = $client.execute(:api_method => $arvados.jobs.create,
341 :body_object => body,
342 :authenticated => false,
344 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
346 j = JSON.parse result.body, :symbolize_names => true
347 if j.is_a? Hash and j[:uuid]
350 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
353 j[:errors].each do |err|
354 msg += "Error creating job for component #{component}: #{err}\n"
356 msg += "Job submission was: #{body.to_json}"
358 pipeline.log_stderr(msg)
365 def self.no_nil_values(hash)
366 hash.reject { |key, value| value.nil? }
370 class WhRunPipelineInstance
371 attr_reader :instance
373 def initialize(_options)
377 def fetch_template(template)
378 if template.match /[^-0-9a-z]/
379 # Doesn't look like a uuid -- use it as a filename.
380 @template = JSON.parse File.read(template), :symbolize_names => true
382 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
386 :authenticated => false,
388 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
390 @template = JSON.parse result.body, :symbolize_names => true
392 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
398 def fetch_instance(instance_uuid)
399 @instance = PipelineInstance.find(instance_uuid)
400 @template = @instance
404 def apply_parameters(params_args)
405 params_args.shift if params_args[0] == '--'
407 while !params_args.empty?
408 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
409 params[re[2]] = re[3]
411 elsif params_args.size > 1
412 param = params_args.shift.sub /^--/, ''
413 params[param] = params_args.shift
415 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
419 if not @template[:components].is_a?(Hash)
420 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
422 @components = @template[:components].dup
424 bad_components = @components.each_pair.select do |cname, cspec|
425 not cspec.is_a?(Hash)
427 if bad_components.any?
428 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
431 bad_components = @components.each_pair.select do |cname, cspec|
432 not cspec[:script_parameters].is_a?(Hash)
434 if bad_components.any?
435 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
439 @components.each do |componentname, component|
440 component[:script_parameters].each do |parametername, parameter|
441 parameter = { :value => parameter } unless parameter.is_a? Hash
443 (params["#{componentname}::#{parametername}"] ||
445 (parameter[:output_of].nil? &&
446 (params[parametername.to_s] ||
447 parameter[:default])) ||
450 ![false,'false',0,'0'].index parameter[:required]
451 if parameter[:output_of]
454 errors << [componentname, parametername, "required parameter is missing"]
456 debuglog "parameter #{componentname}::#{parametername} == #{value}"
457 component[:script_parameters][parametername] = value
461 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
463 debuglog "options=" + @options.pretty_inspect
469 @instance[:properties][:run_options] ||= {}
470 if @options[:no_reuse]
471 # override properties of existing instance
472 @instance[:properties][:run_options][:enable_job_reuse] = false
474 # Default to "enable reuse" if not specified. (This code path
475 # can go away when old clients go away.)
476 if @instance[:properties][:run_options][:enable_job_reuse].nil?
477 @instance[:properties][:run_options][:enable_job_reuse] = true
481 @instance = PipelineInstance.
482 create(components: @components,
485 enable_job_reuse: !@options[:no_reuse]
488 pipeline_template_uuid: @template[:uuid],
489 description: "Created at #{Time.now.localtime}" + (@template[:name].andand.size>0 ? " using the pipeline template *#{@template[:name]}*" : ""),
490 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
499 if @instance[:started_at].nil?
500 @instance[:started_at] = Time.now
503 job_creation_failed = 0
506 @components.each do |cname, c|
508 owner_uuid = @instance[:owner_uuid]
509 # Is the job satisfying this component already known to be
510 # finished? (Already meaning "before we query API server about
511 # the job's current state")
512 c_already_finished = (c[:job] &&
514 !c[:job][:success].nil?)
516 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
517 # No job yet associated with this component and is component inputs
518 # are fully specified (any output_of script_parameters are resolved
520 job = JobCache.create(@instance, cname, {
521 :script => c[:script],
522 :script_parameters => c[:script_parameters],
523 :script_version => c[:script_version],
524 :repository => c[:repository],
525 :nondeterministic => c[:nondeterministic],
526 :runtime_constraints => c[:runtime_constraints],
527 :owner_uuid => owner_uuid,
529 # This is the right place to put these attributes when
530 # dealing with new API servers.
531 :minimum_script_version => c[:minimum_script_version],
532 :exclude_script_versions => c[:exclude_minimum_script_versions],
533 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
534 !c[:nondeterministic]),
535 :filters => c[:filters]
538 debuglog "component #{cname} new job #{job[:uuid]}"
541 debuglog "component #{cname} new job failed", 0
542 job_creation_failed += 1
546 if c[:job] and c[:job][:uuid]
547 if (c[:job][:running] or
548 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
549 # Job is running so update copy of job record
550 c[:job] = JobCache.get(c[:job][:uuid])
554 # Populate script_parameters of other components waiting for
556 @components.each do |c2name, c2|
557 c2[:script_parameters].each do |pname, p|
558 if p.is_a? Hash and p[:output_of] == cname.to_s
559 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
560 c2[:script_parameters][pname] = c[:job][:output]
565 unless c_already_finished
566 # This is my first time discovering that the job
567 # succeeded. (At the top of this loop, I was still
568 # waiting for it to finish.)
570 debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
571 if (not @instance[:name].nil?) and (not @instance[:name].empty?)
572 pipeline_name = @instance[:name]
574 fetch_template(@instance[:pipeline_template_uuid])
575 pipeline_name = @template[:name]
577 if c[:output_name] != false
578 # Create a collection located in the same project as the pipeline with the contents of the output.
579 portable_data_hash = c[:job][:output]
580 collections = $arv.collection.list(limit: 1,
581 filters: [['portable_data_hash', '=', portable_data_hash]],
582 select: ["portable_data_hash", "manifest_text"]
585 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
587 # check if there is a name collision.
588 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
589 ["name", "=", name]])[:items]
591 newcollection_actual = nil
592 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
593 # There is already a collection with the same name and the
594 # same contents, so just point to that.
595 newcollection_actual = name_collisions.first
598 if newcollection_actual.nil?
599 # Did not find a collection with the same name (or the
600 # collection has a different portable data hash) so create
601 # a new collection with ensure_unique_name: true.
603 owner_uuid: owner_uuid,
605 portable_data_hash: collections.first[:portable_data_hash],
606 manifest_text: collections.first[:manifest_text]
608 debuglog "Creating collection #{newcollection}", 0
609 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
612 c[:output_uuid] = newcollection_actual[:uuid]
614 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
618 elsif c[:job][:running] ||
619 (!c[:job][:started_at] && !c[:job][:cancelled_at])
620 # Job is still running
622 elsif c[:job][:cancelled_at]
623 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
627 @instance[:components] = @components
630 if @options[:no_wait]
634 # If job creation fails, just give up on this pipeline instance.
635 if job_creation_failed > 0
643 debuglog "interrupt", 0
653 @components.each do |cname, c|
655 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
657 if c[:job][:success] == true
659 elsif c[:job][:success] == false or c[:job][:cancelled_at]
666 success = (succeeded == @components.length)
668 # A job create call failed. Just give up.
669 if job_creation_failed > 0
670 debuglog "job creation failed - giving up on this pipeline instance", 0
677 @instance[:state] = 'Complete'
679 @instance[:state] = 'Paused'
682 if ended == @components.length or failed > 0
683 @instance[:state] = success ? 'Complete' : 'Failed'
687 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
688 @instance[:finished_at] = Time.now
691 debuglog "pipeline instance state is #{@instance[:state]}"
693 # set components_summary
694 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
695 @instance[:components_summary] = components_summary
701 if @instance and @instance[:state] == 'RunningOnClient'
702 @instance[:state] = 'Paused'
716 if @options[:status_json] != '/dev/null'
717 File.open(@options[:status_json], 'w') do |f|
718 f.puts @components.pretty_inspect
722 if @options[:status_text] != '/dev/null'
723 File.open(@options[:status_text], 'w') do |f|
725 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
726 namewidth = @components.collect { |cname, c| cname.size }.max
727 @components.each do |cname, c|
728 jstatus = if !c[:job]
730 elsif c[:job][:running]
731 "#{c[:job][:tasks_summary].inspect}"
732 elsif c[:job][:success]
734 elsif c[:job][:cancelled_at]
735 "cancelled #{c[:job][:cancelled_at]}"
736 elsif c[:job][:finished_at]
737 "failed #{c[:job][:finished_at]}"
738 elsif c[:job][:started_at]
739 "started #{c[:job][:started_at]}"
741 "queued #{c[:job][:created_at]}"
743 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
751 if ["New", "Ready", "RunningOnClient",
752 "RunningOnServer"].include?(@instance[:state])
753 @instance[:state] = "Failed"
754 @instance[:finished_at] = Time.now
757 @instance.log_stderr(msg)
763 runner = WhRunPipelineInstance.new($options)
765 if $options[:template]
766 runner.fetch_template($options[:template])
768 runner.fetch_instance($options[:instance])
770 runner.apply_parameters(p.leftovers)
771 runner.setup_instance
774 puts runner.instance[:uuid]
778 rescue Exception => e