5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
45 # [--description] Description for the pipeline instance.
49 # [param_name=param_value]
51 # [param_name param_value] Set (or override) the default value for
52 # every parameter with the given name.
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 # parameter for a single
61 class WhRunPipelineInstance
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
76 require 'google/api_client'
80 #{$0}: fatal: #{l.message}
81 Some runtime dependencies may be missing.
82 Try: gem install arvados pp google-api-client json trollop
86 def debuglog(message, verbosity=1)
87 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
90 # Parse command line options (the kind that control the behavior of
91 # this program, that is, not the pipeline component parameters).
93 p = Trollop::Parser.new do
96 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
100 "Store plain text status in given file.",
103 :default => '/dev/stdout')
105 "Store json-formatted pipeline in given file.",
108 :default => '/dev/null')
110 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
114 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
118 "Print extra debugging information on stderr.",
121 "Set debug verbosity level.",
125 "UUID of pipeline template, or path to local pipeline template file.",
129 "UUID of pipeline instance.",
133 "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
136 opt(:run_pipeline_here,
137 "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
141 "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
145 "Synonym for --run-jobs-here.",
149 "Description for the pipeline instance.",
153 "UUID of the project for the pipeline instance.",
158 $options = Trollop::with_standard_exception_handling p do
161 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
163 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
164 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
166 if $options[:instance]
167 if $options[:template] or $options[:submit]
168 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
170 elsif not $options[:template]
171 puts "error: you must supply a --template or --instance."
176 if $options[:run_pipeline_here] == $options[:submit]
177 abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
180 # Set up the API client.
182 $arv = Arvados.new api_version: 'v1'
183 $client = $arv.client
184 $arvados = $arv.arvados_api
186 class PipelineInstance
188 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
192 :authenticated => false,
194 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
196 j = JSON.parse result.body, :symbolize_names => true
197 unless j.is_a? Hash and j[:uuid]
198 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
201 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
205 def self.create(attributes)
206 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
208 :pipeline_instance => attributes
210 :authenticated => false,
212 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
214 j = JSON.parse result.body, :symbolize_names => true
215 unless j.is_a? Hash and j[:uuid]
216 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
218 debuglog "Created pipeline instance: #{j[:uuid]}"
222 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
227 :pipeline_instance => @attributes_to_update
229 :authenticated => false,
231 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
233 j = JSON.parse result.body, :symbolize_names => true
234 unless j.is_a? Hash and j[:uuid]
235 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
238 @attributes_to_update = {}
243 @attributes_to_update[x] = y
251 $arv.log.create log: {
252 event_type: 'stderr',
253 object_uuid: self[:uuid],
254 owner_uuid: self[:owner_uuid],
255 properties: {"text" => msg},
261 @attributes_to_update = {}
269 result = $client.execute(:api_method => $arvados.jobs.get,
273 :authenticated => false,
275 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
277 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
279 def self.where(conditions)
280 result = $client.execute(:api_method => $arvados.jobs.list,
283 :where => conditions.to_json
285 :authenticated => false,
287 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
289 list = JSON.parse result.body, :symbolize_names => true
290 if list and list[:items].is_a? Array
296 def self.create(pipeline, component, job, create_params)
299 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
301 result = $client.execute(:api_method => $arvados.jobs.create,
302 :body_object => body,
303 :authenticated => false,
305 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
307 j = JSON.parse result.body, :symbolize_names => true
308 if j.is_a? Hash and j[:uuid]
311 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
314 j[:errors].each do |err|
315 msg += "Error creating job for component #{component}: #{err}\n"
317 msg += "Job submission was: #{body.to_json}"
319 pipeline.log_stderr(msg)
326 def self.no_nil_values(hash)
327 hash.reject { |key, value| value.nil? }
331 class WhRunPipelineInstance
332 attr_reader :instance
334 def initialize(_options)
338 def fetch_template(template)
339 if template.match /[^-0-9a-z]/
340 # Doesn't look like a uuid -- use it as a filename.
341 @template = JSON.parse File.read(template), :symbolize_names => true
343 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
347 :authenticated => false,
349 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
351 @template = JSON.parse result.body, :symbolize_names => true
353 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
359 def fetch_instance(instance_uuid)
360 @instance = PipelineInstance.find(instance_uuid)
361 @template = @instance
365 def apply_parameters(params_args)
366 params_args.shift if params_args[0] == '--'
368 while !params_args.empty?
369 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
370 params[re[2]] = re[3]
372 elsif params_args.size > 1
373 param = params_args.shift.sub /^--/, ''
374 params[param] = params_args.shift
376 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
380 if not @template[:components].is_a?(Hash)
381 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
383 @components = @template[:components].dup
385 bad_components = @components.each_pair.select do |cname, cspec|
386 not cspec.is_a?(Hash)
388 if bad_components.any?
389 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
392 bad_components = @components.each_pair.select do |cname, cspec|
393 not cspec[:script_parameters].is_a?(Hash)
395 if bad_components.any?
396 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
400 @components.each do |componentname, component|
401 component[:script_parameters].each do |parametername, parameter|
402 parameter = { :value => parameter } unless parameter.is_a? Hash
404 (params["#{componentname}::#{parametername}"] ||
406 (parameter[:output_of].nil? &&
407 (params[parametername.to_s] ||
408 parameter[:default])) ||
411 ![false,'false',0,'0'].index parameter[:required]
412 if parameter[:output_of]
413 if not @components[parameter[:output_of].intern]
414 errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
418 errors << [componentname, parametername, "required parameter is missing"]
420 debuglog "parameter #{componentname}::#{parametername} == #{value}"
422 component[:script_parameters][parametername] =
423 parameter.dup.merge(value: value)
427 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
429 debuglog "options=" + @options.pretty_inspect
435 @instance[:properties][:run_options] ||= {}
436 if @options[:no_reuse]
437 # override properties of existing instance
438 @instance[:properties][:run_options][:enable_job_reuse] = false
440 # Default to "enable reuse" if not specified. (This code path
441 # can go away when old clients go away.)
442 if @instance[:properties][:run_options][:enable_job_reuse].nil?
443 @instance[:properties][:run_options][:enable_job_reuse] = true
447 description = $options[:description] ||
448 ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
450 components: @components,
453 enable_job_reuse: !@options[:no_reuse]
456 pipeline_template_uuid: @template[:uuid],
457 description: description,
458 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
460 if @options[:project_uuid]
461 instance_body[:owner_uuid] = @options[:project_uuid]
463 @instance = PipelineInstance.create(instance_body)
472 if @instance[:started_at].nil?
473 @instance[:started_at] = Time.now
476 job_creation_failed = 0
479 @components.each do |cname, c|
481 owner_uuid = @instance[:owner_uuid]
482 # Is the job satisfying this component already known to be
483 # finished? (Already meaning "before we query API server about
484 # the job's current state")
485 c_already_finished = (c[:job] &&
487 ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
489 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
490 # No job yet associated with this component and is component inputs
491 # are fully specified (any output_of script_parameters are resolved
493 my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
494 job = JobCache.create(@instance, cname, {
495 :script => c[:script],
496 :script_parameters => Hash[c[:script_parameters].map do |key, spec|
499 :script_version => c[:script_version],
500 :repository => c[:repository],
501 :nondeterministic => c[:nondeterministic],
502 :runtime_constraints => c[:runtime_constraints],
503 :owner_uuid => owner_uuid,
504 :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
505 :submit_id => my_submit_id,
506 :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
508 # This is the right place to put these attributes when
509 # dealing with new API servers.
510 :minimum_script_version => c[:minimum_script_version],
511 :exclude_script_versions => c[:exclude_minimum_script_versions],
512 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
513 !c[:nondeterministic]),
514 :filters => c[:filters]
517 debuglog "component #{cname} new job #{job[:uuid]}"
519 c[:run_in_process] = (@options[:run_jobs_here] and
520 job[:submit_id] == my_submit_id)
522 debuglog "component #{cname} new job failed", 0
523 job_creation_failed += 1
527 if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
531 Open3.popen3("arv-crunch-job", "--force-unlock",
532 "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
533 debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
536 rready, wready, = IO.select([stdout, stderr], [])
539 buf = rready[0].read_nonblock(2**20)
543 (rready[0] == stdout ? $stdout : $stderr).write(buf)
547 debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
549 if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
550 raise Exception.new("arv-crunch-job did not set finished_at.")
552 rescue Exception => e
553 debuglog "Interrupted (#{e}). Failing job.", 0
554 $arv.job.update(uuid: c[:job][:uuid],
561 if c[:job] and c[:job][:uuid]
562 if ["Running", "Queued"].include?(c[:job][:state])
563 # Job is running (or may be soon) so update copy of job record
564 c[:job] = JobCache.get(c[:job][:uuid])
567 if c[:job][:state] == "Complete"
568 # Populate script_parameters of other components waiting for
570 @components.each do |c2name, c2|
571 c2[:script_parameters].each do |pname, p|
572 if p.is_a? Hash and p[:output_of] == cname.to_s
573 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
574 c2[:script_parameters][pname] = {value: c[:job][:output]}
579 unless c_already_finished
580 # This is my first time discovering that the job
581 # succeeded. (At the top of this loop, I was still
582 # waiting for it to finish.)
584 if @instance[:name].andand.length.andand > 0
585 pipeline_name = @instance[:name]
586 elsif @template.andand[:name].andand.length.andand > 0
587 pipeline_name = @template[:name]
589 pipeline_name = @instance[:uuid]
591 if c[:output_name] != false
592 # Create a collection located in the same project as the pipeline with the contents of the output.
593 portable_data_hash = c[:job][:output]
594 collections = $arv.collection.list(limit: 1,
595 filters: [['portable_data_hash', '=', portable_data_hash]],
596 select: ["portable_data_hash", "manifest_text"]
599 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
601 # check if there is a name collision.
602 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
603 ["name", "=", name]])[:items]
605 newcollection_actual = nil
606 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
607 # There is already a collection with the same name and the
608 # same contents, so just point to that.
609 newcollection_actual = name_collisions.first
612 if newcollection_actual.nil?
613 # Did not find a collection with the same name (or the
614 # collection has a different portable data hash) so create
615 # a new collection with ensure_unique_name: true.
617 owner_uuid: owner_uuid,
619 portable_data_hash: collections.first[:portable_data_hash],
620 manifest_text: collections.first[:manifest_text]
622 debuglog "Creating collection #{newcollection}", 0
623 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
626 c[:output_uuid] = newcollection_actual[:uuid]
628 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
632 elsif ["Queued", "Running"].include? c[:job][:state]
633 # Job is running or queued to run, so indicate that pipeline
634 # should continue to run
636 elsif c[:job][:state] == "Cancelled"
637 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
639 elsif c[:job][:state] == "Failed"
644 @instance[:components] = @components
647 if @options[:no_wait]
651 # If job creation fails, just give up on this pipeline instance.
652 if job_creation_failed > 0
660 debuglog "interrupt", 0
667 c_in_state = @components.values.group_by { |c|
668 c[:job] and c[:job][:state]
670 succeeded = c_in_state["Complete"].andand.count || 0
671 failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
672 ended = succeeded + failed
674 success = (succeeded == @components.length)
676 # A job create call failed. Just give up.
677 if job_creation_failed > 0
678 debuglog "job creation failed - giving up on this pipeline instance", 0
685 @instance[:state] = 'Complete'
687 @instance[:state] = 'Paused'
690 if ended == @components.length or failed > 0
691 @instance[:state] = success ? 'Complete' : 'Failed'
695 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
696 @instance[:finished_at] = Time.now
699 debuglog "pipeline instance state is #{@instance[:state]}"
701 # set components_summary
702 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
703 @instance[:components_summary] = components_summary
709 if @instance and @instance[:state] == 'RunningOnClient'
710 @instance[:state] = 'Paused'
724 if @options[:status_json] != '/dev/null'
725 File.open(@options[:status_json], 'w') do |f|
726 f.puts @components.pretty_inspect
730 if @options[:status_text] != '/dev/null'
731 File.open(@options[:status_text], 'w') do |f|
733 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
734 namewidth = @components.collect { |cname, c| cname.size }.max
735 @components.each do |cname, c|
736 jstatus = if !c[:job]
738 else case c[:job][:state]
740 "#{c[:job][:tasks_summary].inspect}"
744 "cancelled #{c[:job][:cancelled_at]}"
746 "failed #{c[:job][:finished_at]}"
748 "queued #{c[:job][:created_at]}"
751 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
759 if ["New", "Ready", "RunningOnClient",
760 "RunningOnServer"].include?(@instance[:state])
761 @instance[:state] = "Failed"
762 @instance[:finished_at] = Time.now
765 @instance.log_stderr(msg)
771 runner = WhRunPipelineInstance.new($options)
773 if $options[:template]
774 runner.fetch_template($options[:template])
776 runner.fetch_instance($options[:instance])
778 runner.apply_parameters(p.leftovers)
779 runner.setup_instance
782 puts runner.instance[:uuid]
786 rescue Exception => e