5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
45 # [--description] Description for the pipeline instance.
49 # [param_name=param_value]
51 # [param_name param_value] Set (or override) the default value for
52 # every parameter with the given name.
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 # parameter for a single
61 class WhRunPipelineInstance
64 $application_version = 1.0
66 if RUBY_VERSION < '1.9.3' then
68 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
72 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
73 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
74 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
75 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
76 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
84 require 'google/api_client'
88 #{$0}: fatal: #{l.message}
89 Some runtime dependencies may be missing.
90 Try: gem install arvados pp google-api-client json trollop
94 def debuglog(message, verbosity=1)
95 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
100 original_verbosity = $VERBOSE
103 $VERBOSE = original_verbosity
108 if $arvados_api_host.match /local/
109 # You probably don't care about SSL certificate checks if you're
110 # testing with a dev server.
111 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
114 class Google::APIClient
115 def discovery_document(api, version)
117 return @discovery_documents["#{api}:#{version}"] ||=
119 response = self.execute!(
120 :http_method => :get,
121 :uri => self.discovery_uri(api, version),
122 :authenticated => false
124 response.body.class == String ? JSON.parse(response.body) : response.body
130 # Parse command line options (the kind that control the behavior of
131 # this program, that is, not the pipeline component parameters).
133 p = Trollop::Parser.new do
136 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
140 "Store plain text status in given file.",
143 :default => '/dev/stdout')
145 "Store json-formatted pipeline in given file.",
148 :default => '/dev/null')
150 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
154 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
158 "Print extra debugging information on stderr.",
161 "Set debug verbosity level.",
165 "UUID of pipeline template, or path to local pipeline template file.",
169 "UUID of pipeline instance.",
173 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
177 "Manage the pipeline in process.",
181 "Description for the pipeline instance.",
186 $options = Trollop::with_standard_exception_handling p do
189 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
191 if $options[:instance]
192 if $options[:template] or $options[:submit]
193 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
195 elsif not $options[:template]
196 puts "error: you must supply a --template or --instance."
201 if $options[:run_here] == $options[:submit]
202 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
205 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
208 def suppress_warnings
209 original_verbosity = $VERBOSE
212 $VERBOSE = original_verbosity
217 if ENV['ARVADOS_API_HOST_INSECURE']
218 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
221 # Set up the API client.
223 $client ||= Google::APIClient.
224 new(:host => $arvados_api_host,
225 :application_name => File.split($0).last,
226 :application_version => $application_version.to_s)
227 $arvados = $client.discovered_api('arvados', $arvados_api_version)
228 $arv = Arvados.new api_version: 'v1'
231 class PipelineInstance
233 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
237 :authenticated => false,
239 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
241 j = JSON.parse result.body, :symbolize_names => true
242 unless j.is_a? Hash and j[:uuid]
243 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
246 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
250 def self.create(attributes)
251 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
253 :pipeline_instance => attributes
255 :authenticated => false,
257 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
259 j = JSON.parse result.body, :symbolize_names => true
260 unless j.is_a? Hash and j[:uuid]
261 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
263 debuglog "Created pipeline instance: #{j[:uuid]}"
267 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
272 :pipeline_instance => @attributes_to_update
274 :authenticated => false,
276 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
278 j = JSON.parse result.body, :symbolize_names => true
279 unless j.is_a? Hash and j[:uuid]
280 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
283 @attributes_to_update = {}
288 @attributes_to_update[x] = y
296 $arv.log.create log: {
297 event_type: 'stderr',
298 object_uuid: self[:uuid],
299 owner_uuid: self[:owner_uuid],
300 properties: {"text" => msg},
306 @attributes_to_update = {}
314 result = $client.execute(:api_method => $arvados.jobs.get,
318 :authenticated => false,
320 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
322 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
324 def self.where(conditions)
325 result = $client.execute(:api_method => $arvados.jobs.list,
328 :where => conditions.to_json
330 :authenticated => false,
332 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
334 list = JSON.parse result.body, :symbolize_names => true
335 if list and list[:items].is_a? Array
341 def self.create(pipeline, component, job, create_params)
344 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
346 result = $client.execute(:api_method => $arvados.jobs.create,
347 :body_object => body,
348 :authenticated => false,
350 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
352 j = JSON.parse result.body, :symbolize_names => true
353 if j.is_a? Hash and j[:uuid]
356 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
359 j[:errors].each do |err|
360 msg += "Error creating job for component #{component}: #{err}\n"
362 msg += "Job submission was: #{body.to_json}"
364 pipeline.log_stderr(msg)
371 def self.no_nil_values(hash)
372 hash.reject { |key, value| value.nil? }
376 class WhRunPipelineInstance
377 attr_reader :instance
379 def initialize(_options)
383 def fetch_template(template)
384 if template.match /[^-0-9a-z]/
385 # Doesn't look like a uuid -- use it as a filename.
386 @template = JSON.parse File.read(template), :symbolize_names => true
388 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
392 :authenticated => false,
394 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
396 @template = JSON.parse result.body, :symbolize_names => true
398 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
404 def fetch_instance(instance_uuid)
405 @instance = PipelineInstance.find(instance_uuid)
406 @template = @instance
410 def apply_parameters(params_args)
411 params_args.shift if params_args[0] == '--'
413 while !params_args.empty?
414 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
415 params[re[2]] = re[3]
417 elsif params_args.size > 1
418 param = params_args.shift.sub /^--/, ''
419 params[param] = params_args.shift
421 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
425 if not @template[:components].is_a?(Hash)
426 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
428 @components = @template[:components].dup
430 bad_components = @components.each_pair.select do |cname, cspec|
431 not cspec.is_a?(Hash)
433 if bad_components.any?
434 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
437 bad_components = @components.each_pair.select do |cname, cspec|
438 not cspec[:script_parameters].is_a?(Hash)
440 if bad_components.any?
441 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
445 @components.each do |componentname, component|
446 component[:script_parameters].each do |parametername, parameter|
447 parameter = { :value => parameter } unless parameter.is_a? Hash
449 (params["#{componentname}::#{parametername}"] ||
451 (parameter[:output_of].nil? &&
452 (params[parametername.to_s] ||
453 parameter[:default])) ||
456 ![false,'false',0,'0'].index parameter[:required]
457 if parameter[:output_of]
460 errors << [componentname, parametername, "required parameter is missing"]
462 debuglog "parameter #{componentname}::#{parametername} == #{value}"
463 component[:script_parameters][parametername] = value
467 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
469 debuglog "options=" + @options.pretty_inspect
475 @instance[:properties][:run_options] ||= {}
476 if @options[:no_reuse]
477 # override properties of existing instance
478 @instance[:properties][:run_options][:enable_job_reuse] = false
480 # Default to "enable reuse" if not specified. (This code path
481 # can go away when old clients go away.)
482 if @instance[:properties][:run_options][:enable_job_reuse].nil?
483 @instance[:properties][:run_options][:enable_job_reuse] = true
487 description = $options[:description]
488 description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
489 @instance = PipelineInstance.
490 create(components: @components,
493 enable_job_reuse: !@options[:no_reuse]
496 pipeline_template_uuid: @template[:uuid],
497 description: description,
498 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
507 if @instance[:started_at].nil?
508 @instance[:started_at] = Time.now
511 job_creation_failed = 0
514 @components.each do |cname, c|
516 owner_uuid = @instance[:owner_uuid]
517 # Is the job satisfying this component already known to be
518 # finished? (Already meaning "before we query API server about
519 # the job's current state")
520 c_already_finished = (c[:job] &&
522 !c[:job][:success].nil?)
524 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
525 # No job yet associated with this component and is component inputs
526 # are fully specified (any output_of script_parameters are resolved
528 job = JobCache.create(@instance, cname, {
529 :script => c[:script],
530 :script_parameters => c[:script_parameters],
531 :script_version => c[:script_version],
532 :repository => c[:repository],
533 :nondeterministic => c[:nondeterministic],
534 :runtime_constraints => c[:runtime_constraints],
535 :owner_uuid => owner_uuid,
537 # This is the right place to put these attributes when
538 # dealing with new API servers.
539 :minimum_script_version => c[:minimum_script_version],
540 :exclude_script_versions => c[:exclude_minimum_script_versions],
541 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
542 !c[:nondeterministic]),
543 :filters => c[:filters]
546 debuglog "component #{cname} new job #{job[:uuid]}"
549 debuglog "component #{cname} new job failed", 0
550 job_creation_failed += 1
554 if c[:job] and c[:job][:uuid]
555 if (c[:job][:running] or
556 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
557 # Job is running so update copy of job record
558 c[:job] = JobCache.get(c[:job][:uuid])
562 # Populate script_parameters of other components waiting for
564 @components.each do |c2name, c2|
565 c2[:script_parameters].each do |pname, p|
566 if p.is_a? Hash and p[:output_of] == cname.to_s
567 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
568 c2[:script_parameters][pname] = c[:job][:output]
573 unless c_already_finished
574 # This is my first time discovering that the job
575 # succeeded. (At the top of this loop, I was still
576 # waiting for it to finish.)
578 debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
579 if (not @instance[:name].nil?) and (not @instance[:name].empty?)
580 pipeline_name = @instance[:name]
581 elsif @instance[:pipeline_template_uuid]
582 fetch_template(@instance[:pipeline_template_uuid])
583 pipeline_name = @template[:name]
585 pipeline_name = "pipeline started #{@instance[:started_at]}"
587 if c[:output_name] != false
588 # Create a collection located in the same project as the pipeline with the contents of the output.
589 portable_data_hash = c[:job][:output]
590 collections = $arv.collection.list(limit: 1,
591 filters: [['portable_data_hash', '=', portable_data_hash]],
592 select: ["portable_data_hash", "manifest_text"]
595 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
597 # check if there is a name collision.
598 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
599 ["name", "=", name]])[:items]
601 newcollection_actual = nil
602 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
603 # There is already a collection with the same name and the
604 # same contents, so just point to that.
605 newcollection_actual = name_collisions.first
608 if newcollection_actual.nil?
609 # Did not find a collection with the same name (or the
610 # collection has a different portable data hash) so create
611 # a new collection with ensure_unique_name: true.
613 owner_uuid: owner_uuid,
615 portable_data_hash: collections.first[:portable_data_hash],
616 manifest_text: collections.first[:manifest_text]
618 debuglog "Creating collection #{newcollection}", 0
619 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
622 c[:output_uuid] = newcollection_actual[:uuid]
624 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
628 elsif c[:job][:running] ||
629 (!c[:job][:started_at] && !c[:job][:cancelled_at])
630 # Job is still running
632 elsif c[:job][:cancelled_at]
633 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
637 @instance[:components] = @components
640 if @options[:no_wait]
644 # If job creation fails, just give up on this pipeline instance.
645 if job_creation_failed > 0
653 debuglog "interrupt", 0
663 @components.each do |cname, c|
665 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
667 if c[:job][:success] == true
669 elsif c[:job][:success] == false or c[:job][:cancelled_at]
676 success = (succeeded == @components.length)
678 # A job create call failed. Just give up.
679 if job_creation_failed > 0
680 debuglog "job creation failed - giving up on this pipeline instance", 0
687 @instance[:state] = 'Complete'
689 @instance[:state] = 'Paused'
692 if ended == @components.length or failed > 0
693 @instance[:state] = success ? 'Complete' : 'Failed'
697 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
698 @instance[:finished_at] = Time.now
701 debuglog "pipeline instance state is #{@instance[:state]}"
703 # set components_summary
704 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
705 @instance[:components_summary] = components_summary
711 if @instance and @instance[:state] == 'RunningOnClient'
712 @instance[:state] = 'Paused'
726 if @options[:status_json] != '/dev/null'
727 File.open(@options[:status_json], 'w') do |f|
728 f.puts @components.pretty_inspect
732 if @options[:status_text] != '/dev/null'
733 File.open(@options[:status_text], 'w') do |f|
735 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
736 namewidth = @components.collect { |cname, c| cname.size }.max
737 @components.each do |cname, c|
738 jstatus = if !c[:job]
740 elsif c[:job][:running]
741 "#{c[:job][:tasks_summary].inspect}"
742 elsif c[:job][:success]
744 elsif c[:job][:cancelled_at]
745 "cancelled #{c[:job][:cancelled_at]}"
746 elsif c[:job][:finished_at]
747 "failed #{c[:job][:finished_at]}"
748 elsif c[:job][:started_at]
749 "started #{c[:job][:started_at]}"
751 "queued #{c[:job][:created_at]}"
753 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
761 if ["New", "Ready", "RunningOnClient",
762 "RunningOnServer"].include?(@instance[:state])
763 @instance[:state] = "Failed"
764 @instance[:finished_at] = Time.now
767 @instance.log_stderr(msg)
773 runner = WhRunPipelineInstance.new($options)
775 if $options[:template]
776 runner.fetch_template($options[:template])
778 runner.fetch_instance($options[:instance])
780 runner.apply_parameters(p.leftovers)
781 runner.setup_instance
784 puts runner.instance[:uuid]
788 rescue Exception => e