5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
45 # [--description] Description for the pipeline instance.
49 # [param_name=param_value]
51 # [param_name param_value] Set (or override) the default value for
52 # every parameter with the given name.
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 # parameter for a single
61 class WhRunPipelineInstance
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
113 # Parse command line options (the kind that control the behavior of
114 # this program, that is, not the pipeline component parameters).
116 p = Trollop::Parser.new do
119 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
123 "Store plain text status in given file.",
126 :default => '/dev/stdout')
128 "Store json-formatted pipeline in given file.",
131 :default => '/dev/null')
133 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
137 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
141 "Print extra debugging information on stderr.",
144 "Set debug verbosity level.",
148 "UUID of pipeline template, or path to local pipeline template file.",
152 "UUID of pipeline instance.",
156 "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
159 opt(:run_pipeline_here,
160 "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
164 "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
168 "Synonym for --run-jobs-here.",
172 "Description for the pipeline instance.",
177 $options = Trollop::with_standard_exception_handling p do
180 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
182 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
183 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 puts "error: you must supply a --template or --instance."
195 if $options[:run_pipeline_here] == $options[:submit]
196 abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
202 def suppress_warnings
203 original_verbosity = $VERBOSE
206 $VERBOSE = original_verbosity
211 if ENV['ARVADOS_API_HOST_INSECURE']
212 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
215 # Set up the API client.
217 $arv = Arvados.new api_version: 'v1'
218 $client = $arv.client
219 $arvados = $arv.arvados_api
221 class PipelineInstance
223 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
227 :authenticated => false,
229 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
231 j = JSON.parse result.body, :symbolize_names => true
232 unless j.is_a? Hash and j[:uuid]
233 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
236 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
240 def self.create(attributes)
241 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
243 :pipeline_instance => attributes
245 :authenticated => false,
247 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
249 j = JSON.parse result.body, :symbolize_names => true
250 unless j.is_a? Hash and j[:uuid]
251 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
253 debuglog "Created pipeline instance: #{j[:uuid]}"
257 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
262 :pipeline_instance => @attributes_to_update
264 :authenticated => false,
266 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
268 j = JSON.parse result.body, :symbolize_names => true
269 unless j.is_a? Hash and j[:uuid]
270 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
273 @attributes_to_update = {}
278 @attributes_to_update[x] = y
286 $arv.log.create log: {
287 event_type: 'stderr',
288 object_uuid: self[:uuid],
289 owner_uuid: self[:owner_uuid],
290 properties: {"text" => msg},
296 @attributes_to_update = {}
304 result = $client.execute(:api_method => $arvados.jobs.get,
308 :authenticated => false,
310 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
312 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
314 def self.where(conditions)
315 result = $client.execute(:api_method => $arvados.jobs.list,
318 :where => conditions.to_json
320 :authenticated => false,
322 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
324 list = JSON.parse result.body, :symbolize_names => true
325 if list and list[:items].is_a? Array
331 def self.create(pipeline, component, job, create_params)
334 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
336 result = $client.execute(:api_method => $arvados.jobs.create,
337 :body_object => body,
338 :authenticated => false,
340 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
342 j = JSON.parse result.body, :symbolize_names => true
343 if j.is_a? Hash and j[:uuid]
346 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
349 j[:errors].each do |err|
350 msg += "Error creating job for component #{component}: #{err}\n"
352 msg += "Job submission was: #{body.to_json}"
354 pipeline.log_stderr(msg)
361 def self.no_nil_values(hash)
362 hash.reject { |key, value| value.nil? }
366 class WhRunPipelineInstance
367 attr_reader :instance
369 def initialize(_options)
373 def fetch_template(template)
374 if template.match /[^-0-9a-z]/
375 # Doesn't look like a uuid -- use it as a filename.
376 @template = JSON.parse File.read(template), :symbolize_names => true
378 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
382 :authenticated => false,
384 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
386 @template = JSON.parse result.body, :symbolize_names => true
388 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
394 def fetch_instance(instance_uuid)
395 @instance = PipelineInstance.find(instance_uuid)
396 @template = @instance
400 def apply_parameters(params_args)
401 params_args.shift if params_args[0] == '--'
403 while !params_args.empty?
404 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
405 params[re[2]] = re[3]
407 elsif params_args.size > 1
408 param = params_args.shift.sub /^--/, ''
409 params[param] = params_args.shift
411 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
415 if not @template[:components].is_a?(Hash)
416 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
418 @components = @template[:components].dup
420 bad_components = @components.each_pair.select do |cname, cspec|
421 not cspec.is_a?(Hash)
423 if bad_components.any?
424 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
427 bad_components = @components.each_pair.select do |cname, cspec|
428 not cspec[:script_parameters].is_a?(Hash)
430 if bad_components.any?
431 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
435 @components.each do |componentname, component|
436 component[:script_parameters].each do |parametername, parameter|
437 parameter = { :value => parameter } unless parameter.is_a? Hash
439 (params["#{componentname}::#{parametername}"] ||
441 (parameter[:output_of].nil? &&
442 (params[parametername.to_s] ||
443 parameter[:default])) ||
446 ![false,'false',0,'0'].index parameter[:required]
447 if parameter[:output_of]
448 if not @components[parameter[:output_of].intern]
449 errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
453 errors << [componentname, parametername, "required parameter is missing"]
455 debuglog "parameter #{componentname}::#{parametername} == #{value}"
457 component[:script_parameters][parametername] =
458 parameter.dup.merge(value: value)
462 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
464 debuglog "options=" + @options.pretty_inspect
470 @instance[:properties][:run_options] ||= {}
471 if @options[:no_reuse]
472 # override properties of existing instance
473 @instance[:properties][:run_options][:enable_job_reuse] = false
475 # Default to "enable reuse" if not specified. (This code path
476 # can go away when old clients go away.)
477 if @instance[:properties][:run_options][:enable_job_reuse].nil?
478 @instance[:properties][:run_options][:enable_job_reuse] = true
482 description = $options[:description]
483 description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
484 @instance = PipelineInstance.
485 create(components: @components,
488 enable_job_reuse: !@options[:no_reuse]
491 pipeline_template_uuid: @template[:uuid],
492 description: description,
493 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
502 if @instance[:started_at].nil?
503 @instance[:started_at] = Time.now
506 job_creation_failed = 0
509 @components.each do |cname, c|
511 owner_uuid = @instance[:owner_uuid]
512 # Is the job satisfying this component already known to be
513 # finished? (Already meaning "before we query API server about
514 # the job's current state")
515 c_already_finished = (c[:job] &&
517 ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
519 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
520 # No job yet associated with this component and is component inputs
521 # are fully specified (any output_of script_parameters are resolved
523 my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
524 job = JobCache.create(@instance, cname, {
525 :script => c[:script],
526 :script_parameters => Hash[c[:script_parameters].map do |key, spec|
529 :script_version => c[:script_version],
530 :repository => c[:repository],
531 :nondeterministic => c[:nondeterministic],
532 :runtime_constraints => c[:runtime_constraints],
533 :owner_uuid => owner_uuid,
534 :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
535 :submit_id => my_submit_id,
536 :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
538 # This is the right place to put these attributes when
539 # dealing with new API servers.
540 :minimum_script_version => c[:minimum_script_version],
541 :exclude_script_versions => c[:exclude_minimum_script_versions],
542 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
543 !c[:nondeterministic]),
544 :filters => c[:filters]
547 debuglog "component #{cname} new job #{job[:uuid]}"
549 c[:run_in_process] = (@options[:run_jobs_here] and
550 job[:submit_id] == my_submit_id)
552 debuglog "component #{cname} new job failed", 0
553 job_creation_failed += 1
557 if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
561 Open3.popen3("arv-crunch-job", "--force-unlock",
562 "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
563 debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
566 rready, wready, = IO.select([stdout, stderr], [])
569 buf = rready[0].read_nonblock(2**20)
573 (rready[0] == stdout ? $stdout : $stderr).write(buf)
577 debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
579 if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
580 raise Exception.new("arv-crunch-job did not set finished_at.")
582 rescue Exception => e
583 debuglog "Interrupted (#{e}). Failing job.", 0
584 $arv.job.update(uuid: c[:job][:uuid],
591 if c[:job] and c[:job][:uuid]
592 if ["Running", "Queued"].include?(c[:job][:state])
593 # Job is running (or may be soon) so update copy of job record
594 c[:job] = JobCache.get(c[:job][:uuid])
597 if c[:job][:state] == "Complete"
598 # Populate script_parameters of other components waiting for
600 @components.each do |c2name, c2|
601 c2[:script_parameters].each do |pname, p|
602 if p.is_a? Hash and p[:output_of] == cname.to_s
603 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
604 c2[:script_parameters][pname] = {value: c[:job][:output]}
609 unless c_already_finished
610 # This is my first time discovering that the job
611 # succeeded. (At the top of this loop, I was still
612 # waiting for it to finish.)
614 if @instance[:name].andand.length.andand > 0
615 pipeline_name = @instance[:name]
616 elsif @template.andand[:name].andand.length.andand > 0
617 pipeline_name = @template[:name]
619 pipeline_name = @instance[:uuid]
621 if c[:output_name] != false
622 # Create a collection located in the same project as the pipeline with the contents of the output.
623 portable_data_hash = c[:job][:output]
624 collections = $arv.collection.list(limit: 1,
625 filters: [['portable_data_hash', '=', portable_data_hash]],
626 select: ["portable_data_hash", "manifest_text"]
629 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
631 # check if there is a name collision.
632 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
633 ["name", "=", name]])[:items]
635 newcollection_actual = nil
636 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
637 # There is already a collection with the same name and the
638 # same contents, so just point to that.
639 newcollection_actual = name_collisions.first
642 if newcollection_actual.nil?
643 # Did not find a collection with the same name (or the
644 # collection has a different portable data hash) so create
645 # a new collection with ensure_unique_name: true.
647 owner_uuid: owner_uuid,
649 portable_data_hash: collections.first[:portable_data_hash],
650 manifest_text: collections.first[:manifest_text]
652 debuglog "Creating collection #{newcollection}", 0
653 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
656 c[:output_uuid] = newcollection_actual[:uuid]
658 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
662 elsif ["Queued", "Running"].include? c[:job][:state]
663 # Job is running or queued to run, so indicate that pipeline
664 # should continue to run
666 elsif c[:job][:state] == "Cancelled"
667 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
669 elsif c[:job][:state] == "Failed"
674 @instance[:components] = @components
677 if @options[:no_wait]
681 # If job creation fails, just give up on this pipeline instance.
682 if job_creation_failed > 0
690 debuglog "interrupt", 0
697 c_in_state = @components.values.group_by { |c|
698 c[:job] and c[:job][:state]
700 succeeded = c_in_state["Complete"].andand.count || 0
701 failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
702 ended = succeeded + failed
704 success = (succeeded == @components.length)
706 # A job create call failed. Just give up.
707 if job_creation_failed > 0
708 debuglog "job creation failed - giving up on this pipeline instance", 0
715 @instance[:state] = 'Complete'
717 @instance[:state] = 'Paused'
720 if ended == @components.length or failed > 0
721 @instance[:state] = success ? 'Complete' : 'Failed'
725 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
726 @instance[:finished_at] = Time.now
729 debuglog "pipeline instance state is #{@instance[:state]}"
731 # set components_summary
732 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
733 @instance[:components_summary] = components_summary
739 if @instance and @instance[:state] == 'RunningOnClient'
740 @instance[:state] = 'Paused'
754 if @options[:status_json] != '/dev/null'
755 File.open(@options[:status_json], 'w') do |f|
756 f.puts @components.pretty_inspect
760 if @options[:status_text] != '/dev/null'
761 File.open(@options[:status_text], 'w') do |f|
763 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
764 namewidth = @components.collect { |cname, c| cname.size }.max
765 @components.each do |cname, c|
766 jstatus = if !c[:job]
768 else case c[:job][:state]
770 "#{c[:job][:tasks_summary].inspect}"
774 "cancelled #{c[:job][:cancelled_at]}"
776 "failed #{c[:job][:finished_at]}"
778 "queued #{c[:job][:created_at]}"
781 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
789 if ["New", "Ready", "RunningOnClient",
790 "RunningOnServer"].include?(@instance[:state])
791 @instance[:state] = "Failed"
792 @instance[:finished_at] = Time.now
795 @instance.log_stderr(msg)
801 runner = WhRunPipelineInstance.new($options)
803 if $options[:template]
804 runner.fetch_template($options[:template])
806 runner.fetch_instance($options[:instance])
808 runner.apply_parameters(p.leftovers)
809 runner.setup_instance
812 puts runner.instance[:uuid]
816 rescue Exception => e