3 class WhRunPipelineInstance
6 if RUBY_VERSION < '1.9.3' then
8 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
18 require 'google/api_client'
22 #{$0}: fatal: #{l.message}
23 Some runtime dependencies may be missing.
24 Try: gem install arvados pp google-api-client json trollop
28 def debuglog(message, verbosity=1)
29 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
32 # Parse command line options (the kind that control the behavior of
33 # this program, that is, not the pipeline component parameters).
35 p = Trollop::Parser.new do
40 arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
41 arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
44 param_name=param_value
45 param_name param_value
46 Set (or override) the default value for every
47 pipeline component parameter with the given
50 component_name::param_name=param_value
51 component_name::param_name param_value
52 --component_name::param_name=param_value
53 --component_name::param_name param_value
54 Set the value of a parameter for a single
60 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
64 "Store plain text status in given file.",
67 :default => '/dev/stdout')
69 "Store json-formatted pipeline in given file.",
72 :default => '/dev/null')
74 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
78 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
82 "Print extra debugging information on stderr.",
85 "Set debug verbosity level.",
89 "UUID of pipeline template, or path to local pipeline template file.",
93 "UUID of pipeline instance.",
97 "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
100 opt(:run_pipeline_here,
101 "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
105 "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
109 "Synonym for --run-jobs-here.",
113 "Description for the pipeline instance.",
117 "UUID of the project for the pipeline instance.",
122 $options = Trollop::with_standard_exception_handling p do
125 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
127 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
128 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
130 if $options[:instance]
131 if $options[:template] or $options[:submit]
132 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
134 elsif not $options[:template]
135 $stderr.puts "error: you must supply a --template or --instance."
140 if $options[:run_pipeline_here] == $options[:submit]
141 abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
144 # Set up the API client.
146 $arv = Arvados.new api_version: 'v1'
147 $client = $arv.client
148 $arvados = $arv.arvados_api
150 class PipelineInstance
152 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
156 :authenticated => false,
158 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
160 j = JSON.parse result.body, :symbolize_names => true
161 unless j.is_a? Hash and j[:uuid]
162 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
165 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
169 def self.create(attributes)
170 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
172 :pipeline_instance => attributes
174 :authenticated => false,
176 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
178 j = JSON.parse result.body, :symbolize_names => true
179 unless j.is_a? Hash and j[:uuid]
180 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
182 debuglog "Created pipeline instance: #{j[:uuid]}"
186 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
191 :pipeline_instance => @attributes_to_update
193 :authenticated => false,
195 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
197 j = JSON.parse result.body, :symbolize_names => true
198 unless j.is_a? Hash and j[:uuid]
199 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
202 @attributes_to_update = {}
207 @attributes_to_update[x] = y
215 $arv.log.create log: {
216 event_type: 'stderr',
217 object_uuid: self[:uuid],
218 owner_uuid: self[:owner_uuid],
219 properties: {"text" => msg},
225 @attributes_to_update = {}
233 result = $client.execute(:api_method => $arvados.jobs.get,
237 :authenticated => false,
239 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
241 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
243 def self.where(conditions)
244 result = $client.execute(:api_method => $arvados.jobs.list,
247 :where => conditions.to_json
249 :authenticated => false,
251 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
253 list = JSON.parse result.body, :symbolize_names => true
254 if list and list[:items].is_a? Array
260 def self.create(pipeline, component, job, create_params)
263 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
265 result = $client.execute(:api_method => $arvados.jobs.create,
266 :body_object => body,
267 :authenticated => false,
269 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
271 j = JSON.parse result.body, :symbolize_names => true
272 if j.is_a? Hash and j[:uuid]
275 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
278 j[:errors].each do |err|
279 msg += "Error creating job for component #{component}: #{err}\n"
281 msg += "Job submission was: #{body.to_json}"
283 pipeline.log_stderr(msg)
290 def self.no_nil_values(hash)
291 hash.reject { |key, value| value.nil? }
295 class WhRunPipelineInstance
296 attr_reader :instance
298 def initialize(_options)
302 def fetch_template(template)
303 if template.match /[^-0-9a-z]/
304 # Doesn't look like a uuid -- use it as a filename.
305 @template = JSON.parse File.read(template), :symbolize_names => true
307 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
311 :authenticated => false,
313 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
315 @template = JSON.parse result.body, :symbolize_names => true
317 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
323 def fetch_instance(instance_uuid)
324 @instance = PipelineInstance.find(instance_uuid)
325 @template = @instance
329 def apply_parameters(params_args)
330 params_args.shift if params_args[0] == '--'
332 while !params_args.empty?
333 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
334 params[re[2]] = re[3]
336 elsif params_args.size > 1
337 param = params_args.shift.sub /^--/, ''
338 params[param] = params_args.shift
340 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
344 if not @template[:components].is_a?(Hash)
345 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
347 @components = @template[:components].dup
349 bad_components = @components.each_pair.select do |cname, cspec|
350 not cspec.is_a?(Hash)
352 if bad_components.any?
353 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
356 bad_components = @components.each_pair.select do |cname, cspec|
357 not cspec[:script_parameters].is_a?(Hash)
359 if bad_components.any?
360 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
364 @components.each do |componentname, component|
365 component[:script_parameters].each do |parametername, parameter|
366 parameter = { :value => parameter } unless parameter.is_a? Hash
367 if params.has_key?("#{componentname}::#{parametername}")
368 value = params["#{componentname}::#{parametername}"]
369 elsif parameter.has_key?(:value)
370 value = parameter[:value]
371 elsif parameter.has_key?(:output_of)
372 if !@components[parameter[:output_of].intern]
373 errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
375 # value will be filled in later when the upstream
376 # component's output becomes known
379 elsif params.has_key?(parametername.to_s)
380 value = params[parametername.to_s]
381 elsif parameter.has_key?(:default)
382 value = parameter[:default]
383 elsif [false, 'false', 0, '0'].index(parameter[:required])
386 errors << [componentname, parametername, "required parameter is missing"]
389 debuglog "parameter #{componentname}::#{parametername} == #{value}"
391 component[:script_parameters][parametername] =
392 parameter.dup.merge(value: value)
396 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
398 debuglog "options=" + @options.pretty_inspect
404 @instance[:properties][:run_options] ||= {}
405 if @options[:no_reuse]
406 # override properties of existing instance
407 @instance[:properties][:run_options][:enable_job_reuse] = false
409 # Default to "enable reuse" if not specified. (This code path
410 # can go away when old clients go away.)
411 if @instance[:properties][:run_options][:enable_job_reuse].nil?
412 @instance[:properties][:run_options][:enable_job_reuse] = true
416 description = $options[:description] ||
417 ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
419 components: @components,
422 enable_job_reuse: !@options[:no_reuse]
425 pipeline_template_uuid: @template[:uuid],
426 description: description,
427 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
429 if @options[:project_uuid]
430 instance_body[:owner_uuid] = @options[:project_uuid]
432 @instance = PipelineInstance.create(instance_body)
441 if @instance[:started_at].nil?
442 @instance[:started_at] = Time.now
445 job_creation_failed = 0
448 @components.each do |cname, c|
450 owner_uuid = @instance[:owner_uuid]
451 # Is the job satisfying this component already known to be
452 # finished? (Already meaning "before we query API server about
453 # the job's current state")
454 c_already_finished = (c[:job] &&
456 ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
458 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
459 # No job yet associated with this component and is component inputs
460 # are fully specified (any output_of script_parameters are resolved
462 my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
463 job = JobCache.create(@instance, cname, {
464 :script => c[:script],
465 :script_parameters => Hash[c[:script_parameters].map do |key, spec|
468 :script_version => c[:script_version],
469 :repository => c[:repository],
470 :nondeterministic => c[:nondeterministic],
471 :runtime_constraints => c[:runtime_constraints],
472 :owner_uuid => owner_uuid,
473 :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
474 :submit_id => my_submit_id,
475 :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
477 # This is the right place to put these attributes when
478 # dealing with new API servers.
479 :minimum_script_version => c[:minimum_script_version],
480 :exclude_script_versions => c[:exclude_minimum_script_versions],
481 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
482 !c[:nondeterministic]),
483 :filters => c[:filters]
486 debuglog "component #{cname} new job #{job[:uuid]}"
488 c[:run_in_process] = (@options[:run_jobs_here] and
489 job[:submit_id] == my_submit_id)
491 debuglog "component #{cname} new job failed", 0
492 job_creation_failed += 1
496 if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
500 Open3.popen3("arv-crunch-job", "--force-unlock",
501 "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
502 debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
505 rready, wready, = IO.select([stdout, stderr], [])
508 buf = rready[0].read_nonblock(2**20)
512 (rready[0] == stdout ? $stdout : $stderr).write(buf)
516 debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
518 if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
519 raise Exception.new("arv-crunch-job did not set finished_at.")
521 rescue Exception => e
522 debuglog "Interrupted (#{e}). Failing job.", 0
523 $arv.job.update(uuid: c[:job][:uuid],
530 if c[:job] and c[:job][:uuid]
531 if ["Running", "Queued"].include?(c[:job][:state])
532 # Job is running (or may be soon) so update copy of job record
533 c[:job] = JobCache.get(c[:job][:uuid])
536 if c[:job][:state] == "Complete"
537 # Populate script_parameters of other components waiting for
539 @components.each do |c2name, c2|
540 c2[:script_parameters].each do |pname, p|
541 if p.is_a? Hash and p[:output_of] == cname.to_s
542 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
543 c2[:script_parameters][pname] = {value: c[:job][:output]}
548 unless c_already_finished
549 # This is my first time discovering that the job
550 # succeeded. (At the top of this loop, I was still
551 # waiting for it to finish.)
553 if @instance[:name].andand.length.andand > 0
554 pipeline_name = @instance[:name]
555 elsif @template.andand[:name].andand.length.andand > 0
556 pipeline_name = @template[:name]
558 pipeline_name = @instance[:uuid]
560 if c[:output_name] != false
561 # Create a collection located in the same project as the pipeline with the contents of the output.
562 portable_data_hash = c[:job][:output]
563 collections = $arv.collection.list(limit: 1,
564 filters: [['portable_data_hash', '=', portable_data_hash]],
565 select: ["portable_data_hash", "manifest_text"]
568 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
570 # check if there is a name collision.
571 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
572 ["name", "=", name]])[:items]
574 newcollection_actual = nil
575 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
576 # There is already a collection with the same name and the
577 # same contents, so just point to that.
578 newcollection_actual = name_collisions.first
581 if newcollection_actual.nil?
582 # Did not find a collection with the same name (or the
583 # collection has a different portable data hash) so create
584 # a new collection with ensure_unique_name: true.
586 owner_uuid: owner_uuid,
588 portable_data_hash: collections.first[:portable_data_hash],
589 manifest_text: collections.first[:manifest_text]
591 debuglog "Creating collection #{newcollection}", 0
592 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
595 c[:output_uuid] = newcollection_actual[:uuid]
597 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
601 elsif ["Queued", "Running"].include? c[:job][:state]
602 # Job is running or queued to run, so indicate that pipeline
603 # should continue to run
605 elsif c[:job][:state] == "Cancelled"
606 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
608 elsif c[:job][:state] == "Failed"
613 @instance[:components] = @components
616 if @options[:no_wait]
620 # If job creation fails, just give up on this pipeline instance.
621 if job_creation_failed > 0
629 debuglog "interrupt", 0
636 c_in_state = @components.values.group_by { |c|
637 c[:job] and c[:job][:state]
639 succeeded = c_in_state["Complete"].andand.count || 0
640 failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
641 ended = succeeded + failed
643 success = (succeeded == @components.length)
645 # A job create call failed. Just give up.
646 if job_creation_failed > 0
647 debuglog "job creation failed - giving up on this pipeline instance", 0
654 @instance[:state] = 'Complete'
656 @instance[:state] = 'Paused'
659 if ended == @components.length or failed > 0
660 @instance[:state] = success ? 'Complete' : 'Failed'
664 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
665 @instance[:finished_at] = Time.now
668 debuglog "pipeline instance state is #{@instance[:state]}"
670 # set components_summary
671 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
672 @instance[:components_summary] = components_summary
678 if @instance and @instance[:state] == 'RunningOnClient'
679 @instance[:state] = 'Paused'
693 if @options[:status_json] != '/dev/null'
694 File.open(@options[:status_json], 'w') do |f|
695 f.puts @components.pretty_inspect
699 if @options[:status_text] != '/dev/null'
700 File.open(@options[:status_text], 'w') do |f|
702 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
703 namewidth = @components.collect { |cname, c| cname.size }.max
704 @components.each do |cname, c|
705 jstatus = if !c[:job]
707 else case c[:job][:state]
709 "#{c[:job][:tasks_summary].inspect}"
713 "cancelled #{c[:job][:cancelled_at]}"
715 "failed #{c[:job][:finished_at]}"
717 "queued #{c[:job][:created_at]}"
720 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
728 if ["New", "Ready", "RunningOnClient",
729 "RunningOnServer"].include?(@instance[:state])
730 @instance[:state] = "Failed"
731 @instance[:finished_at] = Time.now
734 @instance.log_stderr(msg)
740 runner = WhRunPipelineInstance.new($options)
742 if $options[:template]
743 runner.fetch_template($options[:template])
745 runner.fetch_instance($options[:instance])
747 runner.apply_parameters(p.leftovers)
748 runner.setup_instance
751 puts runner.instance[:uuid]
755 rescue Exception => e