5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
45 # [--description] Description for the pipeline instance.
49 # [param_name=param_value]
51 # [param_name param_value] Set (or override) the default value for
52 # every parameter with the given name.
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 # parameter for a single
61 class WhRunPipelineInstance
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
76 require 'google/api_client'
80 #{$0}: fatal: #{l.message}
81 Some runtime dependencies may be missing.
82 Try: gem install arvados pp google-api-client json trollop
86 def debuglog(message, verbosity=1)
87 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
90 # Parse command line options (the kind that control the behavior of
91 # this program, that is, not the pipeline component parameters).
93 p = Trollop::Parser.new do
96 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
100 "Store plain text status in given file.",
103 :default => '/dev/stdout')
105 "Store json-formatted pipeline in given file.",
108 :default => '/dev/null')
110 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
114 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
118 "Print extra debugging information on stderr.",
121 "Set debug verbosity level.",
125 "UUID of pipeline template, or path to local pipeline template file.",
129 "UUID of pipeline instance.",
133 "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
136 opt(:run_pipeline_here,
137 "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
141 "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
145 "Synonym for --run-jobs-here.",
149 "Description for the pipeline instance.",
154 $options = Trollop::with_standard_exception_handling p do
157 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
159 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
160 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
162 if $options[:instance]
163 if $options[:template] or $options[:submit]
164 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
166 elsif not $options[:template]
167 puts "error: you must supply a --template or --instance."
172 if $options[:run_pipeline_here] == $options[:submit]
173 abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
176 # Set up the API client.
178 $arv = Arvados.new api_version: 'v1'
179 $client = $arv.client
180 $arvados = $arv.arvados_api
182 class PipelineInstance
184 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
188 :authenticated => false,
190 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
192 j = JSON.parse result.body, :symbolize_names => true
193 unless j.is_a? Hash and j[:uuid]
194 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
197 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
201 def self.create(attributes)
202 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
204 :pipeline_instance => attributes
206 :authenticated => false,
208 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
210 j = JSON.parse result.body, :symbolize_names => true
211 unless j.is_a? Hash and j[:uuid]
212 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
214 debuglog "Created pipeline instance: #{j[:uuid]}"
218 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
223 :pipeline_instance => @attributes_to_update
225 :authenticated => false,
227 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
229 j = JSON.parse result.body, :symbolize_names => true
230 unless j.is_a? Hash and j[:uuid]
231 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
234 @attributes_to_update = {}
239 @attributes_to_update[x] = y
247 $arv.log.create log: {
248 event_type: 'stderr',
249 object_uuid: self[:uuid],
250 owner_uuid: self[:owner_uuid],
251 properties: {"text" => msg},
257 @attributes_to_update = {}
265 result = $client.execute(:api_method => $arvados.jobs.get,
269 :authenticated => false,
271 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
273 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
275 def self.where(conditions)
276 result = $client.execute(:api_method => $arvados.jobs.list,
279 :where => conditions.to_json
281 :authenticated => false,
283 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
285 list = JSON.parse result.body, :symbolize_names => true
286 if list and list[:items].is_a? Array
292 def self.create(pipeline, component, job, create_params)
295 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
297 result = $client.execute(:api_method => $arvados.jobs.create,
298 :body_object => body,
299 :authenticated => false,
301 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
303 j = JSON.parse result.body, :symbolize_names => true
304 if j.is_a? Hash and j[:uuid]
307 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
310 j[:errors].each do |err|
311 msg += "Error creating job for component #{component}: #{err}\n"
313 msg += "Job submission was: #{body.to_json}"
315 pipeline.log_stderr(msg)
322 def self.no_nil_values(hash)
323 hash.reject { |key, value| value.nil? }
327 class WhRunPipelineInstance
328 attr_reader :instance
330 def initialize(_options)
334 def fetch_template(template)
335 if template.match /[^-0-9a-z]/
336 # Doesn't look like a uuid -- use it as a filename.
337 @template = JSON.parse File.read(template), :symbolize_names => true
339 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
343 :authenticated => false,
345 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
347 @template = JSON.parse result.body, :symbolize_names => true
349 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
355 def fetch_instance(instance_uuid)
356 @instance = PipelineInstance.find(instance_uuid)
357 @template = @instance
361 def apply_parameters(params_args)
362 params_args.shift if params_args[0] == '--'
364 while !params_args.empty?
365 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
366 params[re[2]] = re[3]
368 elsif params_args.size > 1
369 param = params_args.shift.sub /^--/, ''
370 params[param] = params_args.shift
372 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
376 if not @template[:components].is_a?(Hash)
377 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
379 @components = @template[:components].dup
381 bad_components = @components.each_pair.select do |cname, cspec|
382 not cspec.is_a?(Hash)
384 if bad_components.any?
385 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
388 bad_components = @components.each_pair.select do |cname, cspec|
389 not cspec[:script_parameters].is_a?(Hash)
391 if bad_components.any?
392 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
396 @components.each do |componentname, component|
397 component[:script_parameters].each do |parametername, parameter|
398 parameter = { :value => parameter } unless parameter.is_a? Hash
400 (params["#{componentname}::#{parametername}"] ||
402 (parameter[:output_of].nil? &&
403 (params[parametername.to_s] ||
404 parameter[:default])) ||
407 ![false,'false',0,'0'].index parameter[:required]
408 if parameter[:output_of]
409 if not @components[parameter[:output_of].intern]
410 errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
414 errors << [componentname, parametername, "required parameter is missing"]
416 debuglog "parameter #{componentname}::#{parametername} == #{value}"
418 component[:script_parameters][parametername] =
419 parameter.dup.merge(value: value)
423 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
425 debuglog "options=" + @options.pretty_inspect
431 @instance[:properties][:run_options] ||= {}
432 if @options[:no_reuse]
433 # override properties of existing instance
434 @instance[:properties][:run_options][:enable_job_reuse] = false
436 # Default to "enable reuse" if not specified. (This code path
437 # can go away when old clients go away.)
438 if @instance[:properties][:run_options][:enable_job_reuse].nil?
439 @instance[:properties][:run_options][:enable_job_reuse] = true
443 description = $options[:description]
444 description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
445 @instance = PipelineInstance.
446 create(components: @components,
449 enable_job_reuse: !@options[:no_reuse]
452 pipeline_template_uuid: @template[:uuid],
453 description: description,
454 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
463 if @instance[:started_at].nil?
464 @instance[:started_at] = Time.now
467 job_creation_failed = 0
470 @components.each do |cname, c|
472 owner_uuid = @instance[:owner_uuid]
473 # Is the job satisfying this component already known to be
474 # finished? (Already meaning "before we query API server about
475 # the job's current state")
476 c_already_finished = (c[:job] &&
478 ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
480 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
481 # No job yet associated with this component and is component inputs
482 # are fully specified (any output_of script_parameters are resolved
484 my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
485 job = JobCache.create(@instance, cname, {
486 :script => c[:script],
487 :script_parameters => Hash[c[:script_parameters].map do |key, spec|
490 :script_version => c[:script_version],
491 :repository => c[:repository],
492 :nondeterministic => c[:nondeterministic],
493 :runtime_constraints => c[:runtime_constraints],
494 :owner_uuid => owner_uuid,
495 :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
496 :submit_id => my_submit_id,
497 :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
499 # This is the right place to put these attributes when
500 # dealing with new API servers.
501 :minimum_script_version => c[:minimum_script_version],
502 :exclude_script_versions => c[:exclude_minimum_script_versions],
503 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
504 !c[:nondeterministic]),
505 :filters => c[:filters]
508 debuglog "component #{cname} new job #{job[:uuid]}"
510 c[:run_in_process] = (@options[:run_jobs_here] and
511 job[:submit_id] == my_submit_id)
513 debuglog "component #{cname} new job failed", 0
514 job_creation_failed += 1
518 if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
522 Open3.popen3("arv-crunch-job", "--force-unlock",
523 "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
524 debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
527 rready, wready, = IO.select([stdout, stderr], [])
530 buf = rready[0].read_nonblock(2**20)
534 (rready[0] == stdout ? $stdout : $stderr).write(buf)
538 debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
540 if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
541 raise Exception.new("arv-crunch-job did not set finished_at.")
543 rescue Exception => e
544 debuglog "Interrupted (#{e}). Failing job.", 0
545 $arv.job.update(uuid: c[:job][:uuid],
552 if c[:job] and c[:job][:uuid]
553 if ["Running", "Queued"].include?(c[:job][:state])
554 # Job is running (or may be soon) so update copy of job record
555 c[:job] = JobCache.get(c[:job][:uuid])
558 if c[:job][:state] == "Complete"
559 # Populate script_parameters of other components waiting for
561 @components.each do |c2name, c2|
562 c2[:script_parameters].each do |pname, p|
563 if p.is_a? Hash and p[:output_of] == cname.to_s
564 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
565 c2[:script_parameters][pname] = {value: c[:job][:output]}
570 unless c_already_finished
571 # This is my first time discovering that the job
572 # succeeded. (At the top of this loop, I was still
573 # waiting for it to finish.)
575 if @instance[:name].andand.length.andand > 0
576 pipeline_name = @instance[:name]
577 elsif @template.andand[:name].andand.length.andand > 0
578 pipeline_name = @template[:name]
580 pipeline_name = @instance[:uuid]
582 if c[:output_name] != false
583 # Create a collection located in the same project as the pipeline with the contents of the output.
584 portable_data_hash = c[:job][:output]
585 collections = $arv.collection.list(limit: 1,
586 filters: [['portable_data_hash', '=', portable_data_hash]],
587 select: ["portable_data_hash", "manifest_text"]
590 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
592 # check if there is a name collision.
593 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
594 ["name", "=", name]])[:items]
596 newcollection_actual = nil
597 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
598 # There is already a collection with the same name and the
599 # same contents, so just point to that.
600 newcollection_actual = name_collisions.first
603 if newcollection_actual.nil?
604 # Did not find a collection with the same name (or the
605 # collection has a different portable data hash) so create
606 # a new collection with ensure_unique_name: true.
608 owner_uuid: owner_uuid,
610 portable_data_hash: collections.first[:portable_data_hash],
611 manifest_text: collections.first[:manifest_text]
613 debuglog "Creating collection #{newcollection}", 0
614 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
617 c[:output_uuid] = newcollection_actual[:uuid]
619 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
623 elsif ["Queued", "Running"].include? c[:job][:state]
624 # Job is running or queued to run, so indicate that pipeline
625 # should continue to run
627 elsif c[:job][:state] == "Cancelled"
628 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
630 elsif c[:job][:state] == "Failed"
635 @instance[:components] = @components
638 if @options[:no_wait]
642 # If job creation fails, just give up on this pipeline instance.
643 if job_creation_failed > 0
651 debuglog "interrupt", 0
658 c_in_state = @components.values.group_by { |c|
659 c[:job] and c[:job][:state]
661 succeeded = c_in_state["Complete"].andand.count || 0
662 failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
663 ended = succeeded + failed
665 success = (succeeded == @components.length)
667 # A job create call failed. Just give up.
668 if job_creation_failed > 0
669 debuglog "job creation failed - giving up on this pipeline instance", 0
676 @instance[:state] = 'Complete'
678 @instance[:state] = 'Paused'
681 if ended == @components.length or failed > 0
682 @instance[:state] = success ? 'Complete' : 'Failed'
686 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
687 @instance[:finished_at] = Time.now
690 debuglog "pipeline instance state is #{@instance[:state]}"
692 # set components_summary
693 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
694 @instance[:components_summary] = components_summary
700 if @instance and @instance[:state] == 'RunningOnClient'
701 @instance[:state] = 'Paused'
715 if @options[:status_json] != '/dev/null'
716 File.open(@options[:status_json], 'w') do |f|
717 f.puts @components.pretty_inspect
721 if @options[:status_text] != '/dev/null'
722 File.open(@options[:status_text], 'w') do |f|
724 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
725 namewidth = @components.collect { |cname, c| cname.size }.max
726 @components.each do |cname, c|
727 jstatus = if !c[:job]
729 else case c[:job][:state]
731 "#{c[:job][:tasks_summary].inspect}"
735 "cancelled #{c[:job][:cancelled_at]}"
737 "failed #{c[:job][:finished_at]}"
739 "queued #{c[:job][:created_at]}"
742 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
750 if ["New", "Ready", "RunningOnClient",
751 "RunningOnServer"].include?(@instance[:state])
752 @instance[:state] = "Failed"
753 @instance[:finished_at] = Time.now
756 @instance.log_stderr(msg)
762 runner = WhRunPipelineInstance.new($options)
764 if $options[:template]
765 runner.fetch_template($options[:template])
767 runner.fetch_instance($options[:instance])
769 runner.apply_parameters(p.leftovers)
770 runner.setup_instance
773 puts runner.instance[:uuid]
777 rescue Exception => e