5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 class Google::APIClient
113 def discovery_document(api, version)
115 return @discovery_documents["#{api}:#{version}"] ||=
117 response = self.execute!(
118 :http_method => :get,
119 :uri => self.discovery_uri(api, version),
120 :authenticated => false
122 response.body.class == String ? JSON.parse(response.body) : response.body
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
131 p = Trollop::Parser.new do
134 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
138 "Store plain text status in given file.",
141 :default => '/dev/stdout')
143 "Store json-formatted pipeline in given file.",
146 :default => '/dev/null')
148 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
156 "Print extra debugging information on stderr.",
159 "Set debug verbosity level.",
163 "UUID of pipeline template, or path to local pipeline template file.",
167 "UUID of pipeline instance.",
171 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
175 "Manage the pipeline in process.",
180 $options = Trollop::with_standard_exception_handling p do
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 puts "error: you must supply a --template or --instance."
195 if $options[:run_here] == $options[:submit]
196 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
202 def suppress_warnings
203 original_verbosity = $VERBOSE
206 $VERBOSE = original_verbosity
211 if ENV['ARVADOS_API_HOST_INSECURE']
212 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
215 # Set up the API client.
217 $client ||= Google::APIClient.
218 new(:host => $arvados_api_host,
219 :application_name => File.split($0).last,
220 :application_version => $application_version.to_s)
221 $arvados = $client.discovered_api('arvados', $arvados_api_version)
222 $arv = Arvados.new api_version: 'v1'
225 class PipelineInstance
227 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
231 :authenticated => false,
233 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
235 j = JSON.parse result.body, :symbolize_names => true
236 unless j.is_a? Hash and j[:uuid]
237 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
240 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
244 def self.create(attributes)
245 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
247 :pipeline_instance => attributes
249 :authenticated => false,
251 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
253 j = JSON.parse result.body, :symbolize_names => true
254 unless j.is_a? Hash and j[:uuid]
255 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
257 debuglog "Created pipeline instance: #{j[:uuid]}"
261 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
266 :pipeline_instance => @attributes_to_update
268 :authenticated => false,
270 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
272 j = JSON.parse result.body, :symbolize_names => true
273 unless j.is_a? Hash and j[:uuid]
274 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
277 @attributes_to_update = {}
282 @attributes_to_update[x] = y
290 $arv.log.create log: {
291 event_type: 'stderr',
292 object_uuid: self[:uuid],
293 owner_uuid: self[:owner_uuid],
294 properties: {"text" => msg},
300 @attributes_to_update = {}
308 result = $client.execute(:api_method => $arvados.jobs.get,
312 :authenticated => false,
314 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
316 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
318 def self.where(conditions)
319 result = $client.execute(:api_method => $arvados.jobs.list,
322 :where => conditions.to_json
324 :authenticated => false,
326 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
328 list = JSON.parse result.body, :symbolize_names => true
329 if list and list[:items].is_a? Array
335 def self.create(pipeline, component, job, create_params)
338 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
340 result = $client.execute(:api_method => $arvados.jobs.create,
341 :body_object => body,
342 :authenticated => false,
344 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
346 j = JSON.parse result.body, :symbolize_names => true
347 if j.is_a? Hash and j[:uuid]
350 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
353 j[:errors].each do |err|
354 msg += "Error creating job for component #{component}: #{err}\n"
356 msg += "Job submission was: #{body.to_json}"
358 pipeline.log_stderr(msg)
365 def self.no_nil_values(hash)
366 hash.reject { |key, value| value.nil? }
370 class WhRunPipelineInstance
371 attr_reader :instance
373 def initialize(_options)
377 def fetch_template(template)
378 if template.match /[^-0-9a-z]/
379 # Doesn't look like a uuid -- use it as a filename.
380 @template = JSON.parse File.read(template), :symbolize_names => true
382 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
386 :authenticated => false,
388 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
390 @template = JSON.parse result.body, :symbolize_names => true
392 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
398 def fetch_instance(instance_uuid)
399 @instance = PipelineInstance.find(instance_uuid)
400 @template = @instance
404 def apply_parameters(params_args)
405 params_args.shift if params_args[0] == '--'
407 while !params_args.empty?
408 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
409 params[re[2]] = re[3]
411 elsif params_args.size > 1
412 param = params_args.shift.sub /^--/, ''
413 params[param] = params_args.shift
415 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
419 if not @template[:components].is_a?(Hash)
420 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
422 @components = @template[:components].dup
424 bad_components = @components.each_pair.select do |cname, cspec|
425 not cspec.is_a?(Hash)
427 if bad_components.any?
428 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
431 bad_components = @components.each_pair.select do |cname, cspec|
432 not cspec[:script_parameters].is_a?(Hash)
434 if bad_components.any?
435 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
439 @components.each do |componentname, component|
440 component[:script_parameters].each do |parametername, parameter|
441 parameter = { :value => parameter } unless parameter.is_a? Hash
443 (params["#{componentname}::#{parametername}"] ||
445 (parameter[:output_of].nil? &&
446 (params[parametername.to_s] ||
447 parameter[:default])) ||
450 ![false,'false',0,'0'].index parameter[:required]
451 if parameter[:output_of]
454 errors << [componentname, parametername, "required parameter is missing"]
456 debuglog "parameter #{componentname}::#{parametername} == #{value}"
457 component[:script_parameters][parametername] = value
461 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
463 debuglog "options=" + @options.pretty_inspect
469 @instance[:properties][:run_options] ||= {}
470 if @options[:no_reuse]
471 # override properties of existing instance
472 @instance[:properties][:run_options][:enable_job_reuse] = false
474 # Default to "enable reuse" if not specified. (This code path
475 # can go away when old clients go away.)
476 if @instance[:properties][:run_options][:enable_job_reuse].nil?
477 @instance[:properties][:run_options][:enable_job_reuse] = true
481 @instance = PipelineInstance.
482 create(components: @components,
485 enable_job_reuse: !@options[:no_reuse]
488 pipeline_template_uuid: @template[:uuid],
489 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
498 if @instance[:started_at].nil?
499 @instance[:started_at] = Time.now
502 job_creation_failed = 0
505 @components.each do |cname, c|
507 owner_uuid = @instance[:owner_uuid]
508 # Is the job satisfying this component already known to be
509 # finished? (Already meaning "before we query API server about
510 # the job's current state")
511 c_already_finished = (c[:job] &&
513 !c[:job][:success].nil?)
515 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
516 # No job yet associated with this component and is component inputs
517 # are fully specified (any output_of script_parameters are resolved
519 job = JobCache.create(@instance, cname, {
520 :script => c[:script],
521 :script_parameters => c[:script_parameters],
522 :script_version => c[:script_version],
523 :repository => c[:repository],
524 :nondeterministic => c[:nondeterministic],
525 :runtime_constraints => c[:runtime_constraints],
526 :owner_uuid => owner_uuid,
528 # This is the right place to put these attributes when
529 # dealing with new API servers.
530 :minimum_script_version => c[:minimum_script_version],
531 :exclude_script_versions => c[:exclude_minimum_script_versions],
532 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
533 !c[:nondeterministic]),
534 :filters => c[:filters]
537 debuglog "component #{cname} new job #{job[:uuid]}"
540 debuglog "component #{cname} new job failed", 0
541 job_creation_failed += 1
545 if c[:job] and c[:job][:uuid]
546 if (c[:job][:running] or
547 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
548 # Job is running so update copy of job record
549 c[:job] = JobCache.get(c[:job][:uuid])
553 # Populate script_parameters of other components waiting for
555 @components.each do |c2name, c2|
556 c2[:script_parameters].each do |pname, p|
557 if p.is_a? Hash and p[:output_of] == cname.to_s
558 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
559 c2[:script_parameters][pname] = c[:job][:output]
564 unless c_already_finished
565 # This is my first time discovering that the job
566 # succeeded. (At the top of this loop, I was still
567 # waiting for it to finish.)
569 debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
570 if (not @instance[:name].nil?) and (not @instance[:name].empty?)
571 pipeline_name = @instance[:name]
573 fetch_template(@instance[:pipeline_template_uuid])
574 pipeline_name = @template[:name]
576 if c[:output_name] != false
577 # Create a collection located in the same project as the pipeline with the contents of the output.
578 portable_data_hash = c[:job][:output]
579 collections = $arv.collection.list(limit: 1,
580 filters: [['portable_data_hash', '=', portable_data_hash]],
581 select: ["portable_data_hash", "manifest_text"]
584 name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
586 # check if there is a name collision.
587 name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
588 ["name", "=", name]])[:items]
590 newcollection_actual = nil
591 if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
592 # There is already a collection with the same name and the
593 # same contents, so just point to that.
594 newcollection_actual = name_collisions.first
597 if newcollection_actual.nil?
598 # Did not find a collection with the same name (or the
599 # collection has a different portable data hash) so create
600 # a new collection with ensure_unique_name: true.
602 owner_uuid: owner_uuid,
604 portable_data_hash: collections.first[:portable_data_hash],
605 manifest_text: collections.first[:manifest_text]
607 debuglog "Creating collection #{newcollection}", 0
608 newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
611 c[:output_uuid] = newcollection_actual[:uuid]
613 debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
617 elsif c[:job][:running] ||
618 (!c[:job][:started_at] && !c[:job][:cancelled_at])
619 # Job is still running
621 elsif c[:job][:cancelled_at]
622 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
626 @instance[:components] = @components
629 if @options[:no_wait]
633 # If job creation fails, just give up on this pipeline instance.
634 if job_creation_failed > 0
642 debuglog "interrupt", 0
652 @components.each do |cname, c|
654 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
656 if c[:job][:success] == true
658 elsif c[:job][:success] == false or c[:job][:cancelled_at]
665 success = (succeeded == @components.length)
667 # A job create call failed. Just give up.
668 if job_creation_failed > 0
669 debuglog "job creation failed - giving up on this pipeline instance", 0
676 @instance[:state] = 'Complete'
678 @instance[:state] = 'Paused'
681 if ended == @components.length or failed > 0
682 @instance[:state] = success ? 'Complete' : 'Failed'
686 if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
687 @instance[:finished_at] = Time.now
690 debuglog "pipeline instance state is #{@instance[:state]}"
692 # set components_summary
693 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
694 @instance[:components_summary] = components_summary
700 if @instance and @instance[:state] == 'RunningOnClient'
701 @instance[:state] = 'Paused'
715 if @options[:status_json] != '/dev/null'
716 File.open(@options[:status_json], 'w') do |f|
717 f.puts @components.pretty_inspect
721 if @options[:status_text] != '/dev/null'
722 File.open(@options[:status_text], 'w') do |f|
724 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
725 namewidth = @components.collect { |cname, c| cname.size }.max
726 @components.each do |cname, c|
727 jstatus = if !c[:job]
729 elsif c[:job][:running]
730 "#{c[:job][:tasks_summary].inspect}"
731 elsif c[:job][:success]
733 elsif c[:job][:cancelled_at]
734 "cancelled #{c[:job][:cancelled_at]}"
735 elsif c[:job][:finished_at]
736 "failed #{c[:job][:finished_at]}"
737 elsif c[:job][:started_at]
738 "started #{c[:job][:started_at]}"
740 "queued #{c[:job][:created_at]}"
742 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
750 if ["New", "Ready", "RunningOnClient",
751 "RunningOnServer"].include?(@instance[:state])
752 @instance[:state] = "Failed"
753 @instance[:finished_at] = Time.now
756 @instance.log_stderr(msg)
762 runner = WhRunPipelineInstance.new($options)
764 if $options[:template]
765 runner.fetch_template($options[:template])
767 runner.fetch_instance($options[:instance])
769 runner.apply_parameters(p.leftovers)
770 runner.setup_instance
773 puts runner.instance[:uuid]
777 rescue Exception => e