2 # Copyright (C) The Arvados Authors. All rights reserved.
4 # SPDX-License-Identifier: Apache-2.0
6 class WhRunPipelineInstance
9 if RUBY_VERSION < '1.9.3' then
11 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
21 require 'google/api_client'
25 #{$0}: fatal: #{l.message}
26 Some runtime dependencies may be missing.
27 Try: gem install arvados pp google-api-client json trollop
31 def debuglog(message, verbosity=1)
32 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
35 # Parse command line options (the kind that control the behavior of
36 # this program, that is, not the pipeline component parameters).
38 p = Trollop::Parser.new do
43 arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
44 arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
47 param_name=param_value
48 param_name param_value
49 Set (or override) the default value for every
50 pipeline component parameter with the given
53 component_name::param_name=param_value
54 component_name::param_name param_value
55 --component_name::param_name=param_value
56 --component_name::param_name param_value
57 Set the value of a parameter for a single
63 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
67 "Store plain text status in given file.",
70 :default => '/dev/stdout')
72 "Store json-formatted pipeline in given file.",
75 :default => '/dev/null')
77 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
81 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
85 "Print extra debugging information on stderr.",
88 "Set debug verbosity level.",
92 "UUID of pipeline template, or path to local pipeline template file.",
96 "UUID of pipeline instance.",
100 "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
103 opt(:run_pipeline_here,
104 "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
108 "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
112 "Synonym for --run-jobs-here.",
116 "Description for the pipeline instance.",
120 "UUID of the project for the pipeline instance.",
125 $options = Trollop::with_standard_exception_handling p do
128 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
130 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
131 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
133 if $options[:instance]
134 if $options[:template] or $options[:submit]
135 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
137 elsif not $options[:template]
138 $stderr.puts "error: you must supply a --template or --instance."
143 if $options[:run_pipeline_here] == $options[:submit]
144 abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
147 # Set up the API client.
149 $arv = Arvados.new api_version: 'v1'
150 $client = $arv.client
151 $arvados = $arv.arvados_api
153 class PipelineInstance
155 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
159 :authenticated => false,
161 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
163 j = JSON.parse result.body, :symbolize_names => true
164 unless j.is_a? Hash and j[:uuid]
165 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
168 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
172 def self.create(attributes)
173 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
175 :pipeline_instance => attributes
177 :authenticated => false,
179 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
181 j = JSON.parse result.body, :symbolize_names => true
182 unless j.is_a? Hash and j[:uuid]
183 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
185 debuglog "Created pipeline instance: #{j[:uuid]}"
189 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
194 :pipeline_instance => @attributes_to_update
196 :authenticated => false,
198 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
200 j = JSON.parse result.body, :symbolize_names => true
201 unless j.is_a? Hash and j[:uuid]
202 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
205 @attributes_to_update = {}
210 @attributes_to_update[x] = y
218 $arv.log.create log: {
219 event_type: 'stderr',
220 object_uuid: self[:uuid],
221 owner_uuid: self[:owner_uuid],
222 properties: {"text" => msg},
228 @attributes_to_update = {}
236 result = $client.execute(:api_method => $arvados.jobs.get,
240 :authenticated => false,
242 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
244 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
246 def self.where(conditions)
247 result = $client.execute(:api_method => $arvados.jobs.list,
250 :where => conditions.to_json
252 :authenticated => false,
254 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
256 list = JSON.parse result.body, :symbolize_names => true
257 if list and list[:items].is_a? Array
263 def self.create(pipeline, component, job, create_params)
266 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
268 result = $client.execute(:api_method => $arvados.jobs.create,
269 :body_object => body,
270 :authenticated => false,
272 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
274 j = JSON.parse result.body, :symbolize_names => true
275 if j.is_a? Hash and j[:uuid]
278 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
281 j[:errors].each do |err|
282 msg += "Error creating job for component #{component}: #{err}\n"
284 msg += "Job submission was: #{body.to_json}"
286 pipeline.log_stderr(msg)
293 def self.no_nil_values(hash)
294 hash.reject { |key, value| value.nil? }
298 class WhRunPipelineInstance
299 attr_reader :instance
301 def initialize(_options)
305 def fetch_template(template)
306 if template.match /[^-0-9a-z]/
307 # Doesn't look like a uuid -- use it as a filename.
308 @template = JSON.parse File.read(template), :symbolize_names => true
310 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
314 :authenticated => false,
316 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
318 @template = JSON.parse result.body, :symbolize_names => true
320 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
326 def fetch_instance(instance_uuid)
327 @instance = PipelineInstance.find(instance_uuid)
328 @template = @instance
332 def apply_parameters(params_args)
333 params_args.shift if params_args[0] == '--'
335 while !params_args.empty?
336 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
337 params[re[2]] = re[3]
339 elsif params_args.size > 1
340 param = params_args.shift.sub /^--/, ''
341 params[param] = params_args.shift
343 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
347 if not @template[:components].is_a?(Hash)
348 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
350 @components = @template[:components].dup
352 bad_components = @components.each_pair.select do |cname, cspec|
353 not cspec.is_a?(Hash)
355 if bad_components.any?
356 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
359 bad_components = @components.each_pair.select do |cname, cspec|
360 not cspec[:script_parameters].is_a?(Hash)
362 if bad_components.any?
363 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
367 @components.each do |componentname, component|
368 component[:script_parameters].each do |parametername, parameter|
369 parameter = { :value => parameter } unless parameter.is_a? Hash
370 if params.has_key?("#{componentname}::#{parametername}")
371 value = params["#{componentname}::#{parametername}"]
372 elsif parameter.has_key?(:value)
373 value = parameter[:value]
374 elsif parameter.has_key?(:output_of)
375 if !@components[parameter[:output_of].intern]
376 errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
378 # value will be filled in later when the upstream
379 # component's output becomes known
382 elsif params.has_key?(parametername.to_s)
383 value = params[parametername.to_s]
384 elsif parameter.has_key?(:default)
385 value = parameter[:default]
386 elsif [false, 'false', 0, '0'].index(parameter[:required])
389 errors << [componentname, parametername, "required parameter is missing"]
392 debuglog "parameter #{componentname}::#{parametername} == #{value}"
394 component[:script_parameters][parametername] =
395 parameter.dup.merge(value: value)
399 all_errors = errors.collect do |c,p,e|
400 "#{c}::#{p} - #{e}\n"
402 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
404 debuglog "options=" + @options.pretty_inspect
410 @instance[:properties][:run_options] ||= {}
411 if @options[:no_reuse]
412 # override properties of existing instance
413 @instance[:properties][:run_options][:enable_job_reuse] = false
415 # Default to "enable reuse" if not specified. (This code path
416 # can go away when old clients go away.)
417 if @instance[:properties][:run_options][:enable_job_reuse].nil?
418 @instance[:properties][:run_options][:enable_job_reuse] = true
422 description = $options[:description] ||
423 ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
425 components: @components,
428 enable_job_reuse: !@options[:no_reuse]
431 pipeline_template_uuid: @template[:uuid],
432 description: description,
433 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
435 if @options[:project_uuid]
436 instance_body[:owner_uuid] = @options[:project_uuid]
438 @instance = PipelineInstance.create(instance_body)
447 if @instance[:started_at].nil?
448 @instance[:started_at] = Time.now
451 job_creation_failed = 0
454 @components.each do |cname, c|
456 owner_uuid = @instance[:owner_uuid]
457 # Is the job satisfying this component already known to be
458 # finished? (Already meaning "before we query API server about
459 # the job's current state")
460 c_already_finished = (c[:job] &&
462 ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
464 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
465 # No job yet associated with this component and is component inputs
466 # are fully specified (any output_of script_parameters are resolved
468 my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
469 job = JobCache.create(@instance, cname, {
470 :script => c[:script],
471 :script_parameters => Hash[c[:script_parameters].map do |key, spec|
474 :script_version => c[:script_version],
475 :repository => c[:repository],
476 :nondeterministic => c[:nondeterministic],
477 :runtime_constraints => c[:runtime_constraints],
478 :owner_uuid => owner_uuid,
479 :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
480 :submit_id => my_submit_id,
481 :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
483 # This is the right place to put these attributes when
484 # dealing with new API servers.
485 :minimum_script_version => c[:minimum_script_version],
486 :exclude_script_versions => c[:exclude_minimum_script_versions],
487 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
488 !c[:nondeterministic]),
489 :filters => c[:filters]
492 debuglog "component #{cname} new job #{job[:uuid]}"
494 c[:run_in_process] = (@options[:run_jobs_here] and
495 job[:submit_id] == my_submit_id)
497 debuglog "component #{cname} new job failed", 0
498 job_creation_failed += 1
502 if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
506 Open3.popen3("arv-crunch-job", "--force-unlock",
507 "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
508 debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
511 rready, wready, = IO.select([stdout, stderr], [])
514 buf = rready[0].read_nonblock(2**20)
518 (rready[0] == stdout ? $stdout : $stderr).write(buf)
522 debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
524 if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
525 raise Exception.new("arv-crunch-job did not set finished_at.")
527 rescue Exception => e
528 debuglog "Interrupted (#{e}). Failing job.", 0
529 $arv.job.update(uuid: c[:job][:uuid],
536 if c[:job] and c[:job][:uuid]
537 if ["Running", "Queued"].include?(c[:job][:state])
538 # Job is running (or may be soon) so update copy of job record
539 c[:job] = JobCache.get(c[:job][:uuid])
542 if c[:job][:state] == "Complete"
543 # Populate script_parameters of other components waiting for
545 @components.each do |c2name, c2|
546 c2[:script_parameters].each do |pname, p|
547 if p.is_a? Hash and p[:output_of] == cname.to_s
548 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
549 c2[:script_parameters][pname] = {value: c[:job][:output]}
554 unless c_already_finished
555 # This is my first time discovering that the job
556 # succeeded. (At the top of this loop, I was still
557 # waiting for it to finish.)
559 if @instance[:name].andand.length.andand > 0
560 pipeline_name = @instance[:name]
561 elsif @template.andand[:name].andand.length.andand > 0
562 pipeline_name = @template[:name]
564 pipeline_name = @instance[:uuid]
566 if c[:output_name] != false
567 # Create a collection located in the same project as the pipeline with the contents of the output.
568 portable_data_hash = c[:job][:output]
569 collections = $arv.collection.list(limit: 1,
570 filters: [['portable_data_hash', '=', portable_data_hash]],
571 select: ["portable_data_hash", "manifest_text"]
574 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
576 # check if there is a name collision.
577 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
578 ["name", "=", name]])[:items]
580 newcollection_actual = nil
581 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
582 # There is already a collection with the same name and the
583 # same contents, so just point to that.
584 newcollection_actual = name_collisions.first
587 if newcollection_actual.nil?
588 # Did not find a collection with the same name (or the
589 # collection has a different portable data hash) so create
590 # a new collection with ensure_unique_name: true.
592 owner_uuid: owner_uuid,
594 portable_data_hash: collections.first[:portable_data_hash],
595 manifest_text: collections.first[:manifest_text]
597 debuglog "Creating collection #{newcollection}", 0
598 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
601 c[:output_uuid] = newcollection_actual[:uuid]
603 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
607 elsif ["Queued", "Running"].include? c[:job][:state]
608 # Job is running or queued to run, so indicate that pipeline
609 # should continue to run
611 elsif c[:job][:state] == "Cancelled"
612 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
614 elsif c[:job][:state] == "Failed"
619 @instance[:components] = @components
622 if @options[:no_wait]
626 # If job creation fails, just give up on this pipeline instance.
627 if job_creation_failed > 0
635 debuglog "interrupt", 0
642 c_in_state = @components.values.group_by { |c|
643 c[:job] and c[:job][:state]
645 succeeded = c_in_state["Complete"].andand.count || 0
646 failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
647 ended = succeeded + failed
649 success = (succeeded == @components.length)
651 # A job create call failed. Just give up.
652 if job_creation_failed > 0
653 debuglog "job creation failed - giving up on this pipeline instance", 0
660 @instance[:state] = 'Complete'
662 @instance[:state] = 'Paused'
665 if ended == @components.length or failed > 0
666 @instance[:state] = success ? 'Complete' : 'Failed'
670 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
671 @instance[:finished_at] = Time.now
674 debuglog "pipeline instance state is #{@instance[:state]}"
676 # set components_summary
677 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
678 @instance[:components_summary] = components_summary
684 if @instance and @instance[:state] == 'RunningOnClient'
685 @instance[:state] = 'Paused'
699 if @options[:status_json] != '/dev/null'
700 File.open(@options[:status_json], 'w') do |f|
701 f.puts @components.pretty_inspect
705 if @options[:status_text] != '/dev/null'
706 File.open(@options[:status_text], 'w') do |f|
708 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
709 namewidth = @components.collect { |cname, c| cname.size }.max
710 @components.each do |cname, c|
711 jstatus = if !c[:job]
713 else case c[:job][:state]
715 "#{c[:job][:tasks_summary].inspect}"
719 "cancelled #{c[:job][:cancelled_at]}"
721 "failed #{c[:job][:finished_at]}"
723 "queued #{c[:job][:created_at]}"
726 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
734 if ["New", "Ready", "RunningOnClient",
735 "RunningOnServer"].include?(@instance[:state])
736 @instance[:state] = "Failed"
737 @instance[:finished_at] = Time.now
740 @instance.log_stderr(msg)
746 runner = WhRunPipelineInstance.new($options)
748 if $options[:template]
749 runner.fetch_template($options[:template])
751 runner.fetch_instance($options[:instance])
753 runner.apply_parameters(p.leftovers)
754 runner.setup_instance
757 puts runner.instance[:uuid]
761 rescue Exception => e