2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 class WhRunPipelineInstance
9 if RUBY_VERSION < '1.9.3' then
11 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
21 require 'google/api_client'
25 #{$0}: fatal: #{l.message}
26 Some runtime dependencies may be missing.
27 Try: gem install arvados pp google-api-client json trollop
31 def debuglog(message, verbosity=1)
32 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
35 # Parse command line options (the kind that control the behavior of
36 # this program, that is, not the pipeline component parameters).
38 p = Trollop::Parser.new do
43 arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
44 arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
47 param_name=param_value
48 param_name param_value
49 Set (or override) the default value for every
50 pipeline component parameter with the given
53 component_name::param_name=param_value
54 component_name::param_name param_value
55 --component_name::param_name=param_value
56 --component_name::param_name param_value
57 Set the value of a parameter for a single
63 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
67 "Store plain text status in given file.",
70 :default => '/dev/stdout')
72 "Store json-formatted pipeline in given file.",
75 :default => '/dev/null')
77 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
81 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
85 "Print extra debugging information on stderr.",
88 "Set debug verbosity level.",
92 "UUID of pipeline template, or path to local pipeline template file.",
96 "UUID of pipeline instance.",
100 "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
103 opt(:run_pipeline_here,
104 "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
108 "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
112 "Synonym for --run-jobs-here.",
116 "Description for the pipeline instance.",
120 "UUID of the project for the pipeline instance.",
125 $options = Trollop::with_standard_exception_handling p do
128 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
130 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
131 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
133 if $options[:instance]
134 if $options[:template] or $options[:submit]
135 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
137 elsif not $options[:template]
138 $stderr.puts "error: you must supply a --template or --instance."
143 if $options[:run_pipeline_here] == $options[:submit]
144 abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
147 # Set up the API client.
149 $arv = Arvados.new api_version: 'v1'
150 $client = $arv.client
151 $arvados = $arv.arvados_api
153 class PipelineInstance
155 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
159 :authenticated => false,
161 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
163 j = JSON.parse result.body, :symbolize_names => true
164 unless j.is_a? Hash and j[:uuid]
165 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
168 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
172 def self.create(attributes)
173 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
175 :pipeline_instance => attributes
177 :authenticated => false,
179 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
181 j = JSON.parse result.body, :symbolize_names => true
182 unless j.is_a? Hash and j[:uuid]
183 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
185 debuglog "Created pipeline instance: #{j[:uuid]}"
189 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
194 :pipeline_instance => @attributes_to_update
196 :authenticated => false,
198 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
200 j = JSON.parse result.body, :symbolize_names => true
201 unless j.is_a? Hash and j[:uuid]
202 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
205 @attributes_to_update = {}
210 @attributes_to_update[x] = y
218 $arv.log.create log: {
219 event_type: 'stderr',
220 object_uuid: self[:uuid],
221 owner_uuid: self[:owner_uuid],
222 properties: {"text" => msg},
228 @attributes_to_update = {}
236 result = $client.execute(:api_method => $arvados.jobs.get,
240 :authenticated => false,
242 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
244 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
246 def self.where(conditions)
247 result = $client.execute(:api_method => $arvados.jobs.list,
250 :where => conditions.to_json
252 :authenticated => false,
254 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
256 list = JSON.parse result.body, :symbolize_names => true
257 if list and list[:items].is_a? Array
264 # create() returns [job, exception]. If both job and exception are
265 # nil, there was a non-retryable error and the call should not be
267 def self.create(pipeline, component, job, create_params)
270 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
274 result = $client.execute(
275 :api_method => $arvados.jobs.create,
276 :body_object => body,
277 :authenticated => false,
279 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
281 if result.status == 429 || result.status >= 500
282 raise Exception.new("HTTP status #{result.status}")
284 rescue Exception => e
287 j = JSON.parse(result.body, :symbolize_names => true) rescue nil
288 if result.status == 200 && j.is_a?(Hash) && j[:uuid]
292 errors = j[:errors] rescue []
293 debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0
297 msg += "Error creating job for component #{component}: #{err}\n"
299 msg += "Job submission was: #{body.to_json}"
301 pipeline.log_stderr(msg)
308 def self.no_nil_values(hash)
309 hash.reject { |key, value| value.nil? }
313 class WhRunPipelineInstance
314 attr_reader :instance
316 def initialize(_options)
320 def fetch_template(template)
321 if template.match /[^-0-9a-z]/
322 # Doesn't look like a uuid -- use it as a filename.
323 @template = JSON.parse File.read(template), :symbolize_names => true
325 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
329 :authenticated => false,
331 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
333 @template = JSON.parse result.body, :symbolize_names => true
335 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
341 def fetch_instance(instance_uuid)
342 @instance = PipelineInstance.find(instance_uuid)
343 @template = @instance
347 def apply_parameters(params_args)
348 params_args.shift if params_args[0] == '--'
350 while !params_args.empty?
351 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
352 params[re[2]] = re[3]
354 elsif params_args.size > 1
355 param = params_args.shift.sub /^--/, ''
356 params[param] = params_args.shift
358 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
362 if not @template[:components].is_a?(Hash)
363 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
365 @components = @template[:components].dup
367 bad_components = @components.each_pair.select do |cname, cspec|
368 not cspec.is_a?(Hash)
370 if bad_components.any?
371 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
374 bad_components = @components.each_pair.select do |cname, cspec|
375 not cspec[:script_parameters].is_a?(Hash)
377 if bad_components.any?
378 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
382 @components.each do |componentname, component|
383 component[:script_parameters].each do |parametername, parameter|
384 parameter = { :value => parameter } unless parameter.is_a? Hash
385 if params.has_key?("#{componentname}::#{parametername}")
386 value = params["#{componentname}::#{parametername}"]
387 elsif parameter.has_key?(:value)
388 value = parameter[:value]
389 elsif parameter.has_key?(:output_of)
390 if !@components[parameter[:output_of].intern]
391 errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
393 # value will be filled in later when the upstream
394 # component's output becomes known
397 elsif params.has_key?(parametername.to_s)
398 value = params[parametername.to_s]
399 elsif parameter.has_key?(:default)
400 value = parameter[:default]
401 elsif [false, 'false', 0, '0'].index(parameter[:required])
404 errors << [componentname, parametername, "required parameter is missing"]
407 debuglog "parameter #{componentname}::#{parametername} == #{value}"
409 component[:script_parameters][parametername] =
410 parameter.dup.merge(value: value)
414 all_errors = errors.collect do |c,p,e|
415 "#{c}::#{p} - #{e}\n"
417 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
419 debuglog "options=" + @options.pretty_inspect
425 @instance[:properties][:run_options] ||= {}
426 if @options[:no_reuse]
427 # override properties of existing instance
428 @instance[:properties][:run_options][:enable_job_reuse] = false
430 # Default to "enable reuse" if not specified. (This code path
431 # can go away when old clients go away.)
432 if @instance[:properties][:run_options][:enable_job_reuse].nil?
433 @instance[:properties][:run_options][:enable_job_reuse] = true
437 description = $options[:description] ||
438 ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
440 components: @components,
443 enable_job_reuse: !@options[:no_reuse]
446 pipeline_template_uuid: @template[:uuid],
447 description: description,
448 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
450 if @options[:project_uuid]
451 instance_body[:owner_uuid] = @options[:project_uuid]
453 @instance = PipelineInstance.create(instance_body)
462 if @instance[:started_at].nil?
463 @instance[:started_at] = Time.now
466 job_creation_failed = 0
469 @components.each do |cname, c|
471 owner_uuid = @instance[:owner_uuid]
472 # Is the job satisfying this component already known to be
473 # finished? (Already meaning "before we query API server about
474 # the job's current state")
475 c_already_finished = (c[:job] &&
477 ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
479 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
480 # No job yet associated with this component and is component inputs
481 # are fully specified (any output_of script_parameters are resolved
483 my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
484 job, err = JobCache.create(@instance, cname, {
485 :script => c[:script],
486 :script_parameters => Hash[c[:script_parameters].map do |key, spec|
489 :script_version => c[:script_version],
490 :repository => c[:repository],
491 :nondeterministic => c[:nondeterministic],
492 :runtime_constraints => c[:runtime_constraints],
493 :owner_uuid => owner_uuid,
494 :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
495 :submit_id => my_submit_id,
496 :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
498 # This is the right place to put these attributes when
499 # dealing with new API servers.
500 :minimum_script_version => c[:minimum_script_version],
501 :exclude_script_versions => c[:exclude_minimum_script_versions],
502 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
503 !c[:nondeterministic]),
504 :filters => c[:filters]
507 debuglog "component #{cname} new job #{job[:uuid]}"
509 c[:run_in_process] = (@options[:run_jobs_here] and
510 job[:submit_id] == my_submit_id)
512 debuglog "component #{cname} new job failed", 0
513 job_creation_failed += 1
515 debuglog "component #{cname} new job failed, err=#{err}", 0
519 if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
523 Open3.popen3("arv-crunch-job", "--force-unlock",
524 "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
525 debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
528 rready, wready, = IO.select([stdout, stderr], [])
531 buf = rready[0].read_nonblock(2**20)
535 (rready[0] == stdout ? $stdout : $stderr).write(buf)
539 debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
541 if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
542 raise Exception.new("arv-crunch-job did not set finished_at.")
544 rescue Exception => e
545 debuglog "Interrupted (#{e}). Failing job.", 0
546 $arv.job.update(uuid: c[:job][:uuid],
553 if c[:job] and c[:job][:uuid]
554 if ["Running", "Queued"].include?(c[:job][:state])
555 # Job is running (or may be soon) so update copy of job record
556 c[:job] = JobCache.get(c[:job][:uuid])
559 if c[:job][:state] == "Complete"
560 # Populate script_parameters of other components waiting for
562 @components.each do |c2name, c2|
563 c2[:script_parameters].each do |pname, p|
564 if p.is_a? Hash and p[:output_of] == cname.to_s
565 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
566 c2[:script_parameters][pname] = {value: c[:job][:output]}
571 unless c_already_finished
572 # This is my first time discovering that the job
573 # succeeded. (At the top of this loop, I was still
574 # waiting for it to finish.)
576 if @instance[:name].andand.length.andand > 0
577 pipeline_name = @instance[:name]
578 elsif @template.andand[:name].andand.length.andand > 0
579 pipeline_name = @template[:name]
581 pipeline_name = @instance[:uuid]
583 if c[:output_name] != false
584 # Create a collection located in the same project as the pipeline with the contents of the output.
585 portable_data_hash = c[:job][:output]
586 collections = $arv.collection.list(limit: 1,
587 filters: [['portable_data_hash', '=', portable_data_hash]],
588 select: ["portable_data_hash", "manifest_text"]
591 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
593 # check if there is a name collision.
594 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
595 ["name", "=", name]])[:items]
597 newcollection_actual = nil
598 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
599 # There is already a collection with the same name and the
600 # same contents, so just point to that.
601 newcollection_actual = name_collisions.first
604 if newcollection_actual.nil?
605 # Did not find a collection with the same name (or the
606 # collection has a different portable data hash) so create
607 # a new collection with ensure_unique_name: true.
609 owner_uuid: owner_uuid,
611 portable_data_hash: collections.first[:portable_data_hash],
612 manifest_text: collections.first[:manifest_text]
614 debuglog "Creating collection #{newcollection}", 0
615 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
618 c[:output_uuid] = newcollection_actual[:uuid]
620 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
624 elsif ["Queued", "Running"].include? c[:job][:state]
625 # Job is running or queued to run, so indicate that pipeline
626 # should continue to run
628 elsif c[:job][:state] == "Cancelled"
629 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
631 elsif c[:job][:state] == "Failed"
636 @instance[:components] = @components
639 if @options[:no_wait]
643 # If job creation fails, just give up on this pipeline instance.
644 if job_creation_failed > 0
652 debuglog "interrupt", 0
659 c_in_state = @components.values.group_by { |c|
660 c[:job] and c[:job][:state]
662 succeeded = c_in_state["Complete"].andand.count || 0
663 failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
664 ended = succeeded + failed
666 success = (succeeded == @components.length)
668 # A job create call failed. Just give up.
669 if job_creation_failed > 0
670 debuglog "job creation failed - giving up on this pipeline instance", 0
677 @instance[:state] = 'Complete'
679 @instance[:state] = 'Paused'
682 if ended == @components.length or failed > 0
683 @instance[:state] = success ? 'Complete' : 'Failed'
687 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
688 @instance[:finished_at] = Time.now
691 debuglog "pipeline instance state is #{@instance[:state]}"
693 # set components_summary
694 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
695 @instance[:components_summary] = components_summary
701 if @instance and @instance[:state] == 'RunningOnClient'
702 @instance[:state] = 'Paused'
716 if @options[:status_json] != '/dev/null'
717 File.open(@options[:status_json], 'w') do |f|
718 f.puts @components.pretty_inspect
722 if @options[:status_text] != '/dev/null'
723 File.open(@options[:status_text], 'w') do |f|
725 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
726 namewidth = @components.collect { |cname, c| cname.size }.max
727 @components.each do |cname, c|
728 jstatus = if !c[:job]
730 else case c[:job][:state]
732 "#{c[:job][:tasks_summary].inspect}"
736 "cancelled #{c[:job][:cancelled_at]}"
738 "failed #{c[:job][:finished_at]}"
740 "queued #{c[:job][:created_at]}"
743 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
751 if ["New", "Ready", "RunningOnClient",
752 "RunningOnServer"].include?(@instance[:state])
753 @instance[:state] = "Failed"
754 @instance[:finished_at] = Time.now
757 @instance.log_stderr(msg)
763 runner = WhRunPipelineInstance.new($options)
765 if $options[:template]
766 runner.fetch_template($options[:template])
768 runner.fetch_instance($options[:instance])
770 runner.apply_parameters(p.leftovers)
771 runner.setup_instance
774 puts runner.instance[:uuid]
778 rescue Exception => e