5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
32 # pipeline components. Always submit a new job
33 # or use an existing job which has not yet
36 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
37 # components. Submit a new job for every component.
39 # [--debug] Print extra debugging information on stderr.
41 # [--debug-level N] Increase amount of debugging information. Default
42 # 1, possible range 0..3.
44 # [--status-text path] Print plain text status report to a file or
45 # fifo. Default: /dev/stdout
47 # [--status-json path] Print JSON status report to a file or
48 # fifo. Default: /dev/null
52 # [param_name=param_value]
54 # [param_name param_value] Set (or override) the default value for
55 # every parameter with the given name.
57 # [component_name::param_name=param_value]
58 # [component_name::param_name param_value]
59 # [--component_name::param_name=param_value]
60 # [--component_name::param_name param_value] Set the value of a
61 # parameter for a single
64 class WhRunPipelineInstance
67 $application_version = 1.0
69 if RUBY_VERSION < '1.9.3' then
71 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
75 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
76 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
77 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
78 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
79 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
86 require 'google/api_client'
90 #{$0}: fatal: #{l.message}
91 Some runtime dependencies may be missing.
92 Try: gem install pp google-api-client json trollop
96 def debuglog(message, verbosity=1)
97 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
101 def suppress_warnings
102 original_verbosity = $VERBOSE
105 $VERBOSE = original_verbosity
110 if $arvados_api_host.match /local/
111 # You probably don't care about SSL certificate checks if you're
112 # testing with a dev server.
113 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
116 class Google::APIClient
117 def discovery_document(api, version)
119 return @discovery_documents["#{api}:#{version}"] ||=
121 response = self.execute!(
122 :http_method => :get,
123 :uri => self.discovery_uri(api, version),
124 :authenticated => false
126 response.body.class == String ? JSON.parse(response.body) : response.body
132 # Parse command line options (the kind that control the behavior of
133 # this program, that is, not the pipeline component parameters).
135 p = Trollop::Parser.new do
138 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
142 "Store plain text status in given file.",
145 :default => '/dev/stdout')
147 "Store json-formatted pipeline in given file.",
150 :default => '/dev/null')
152 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
155 opt(:no_reuse_finished,
156 "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
160 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
164 "Print extra debugging information on stderr.",
167 "Set debug verbosity level.",
171 "UUID of pipeline template, or path to local pipeline template file.",
175 "UUID of pipeline instance.",
179 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
183 "Manage the pipeline in process.",
188 $options = Trollop::with_standard_exception_handling p do
191 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
193 if $options[:instance]
194 if $options[:template] or $options[:submit]
195 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
197 elsif not $options[:template]
198 abort "#{$0}: syntax error: you must supply a --template or --instance."
201 if $options[:run_here] == $options[:submit]
202 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
205 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
208 def suppress_warnings
209 original_verbosity = $VERBOSE
212 $VERBOSE = original_verbosity
217 if ENV['ARVADOS_API_HOST_INSECURE']
218 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
221 # Set up the API client.
223 $client ||= Google::APIClient.
224 new(:host => $arvados_api_host,
225 :application_name => File.split($0).last,
226 :application_version => $application_version.to_s)
227 $arvados = $client.discovered_api('arvados', $arvados_api_version)
230 class PipelineInstance
232 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
237 :api_token => ENV['ARVADOS_API_TOKEN']
239 :authenticated => false)
240 j = JSON.parse result.body, :symbolize_names => true
241 unless j.is_a? Hash and j[:uuid]
242 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
245 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
249 def self.create(attributes)
250 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
252 :api_token => ENV['ARVADOS_API_TOKEN'],
253 :pipeline_instance => attributes
255 :authenticated => false)
256 j = JSON.parse result.body, :symbolize_names => true
257 unless j.is_a? Hash and j[:uuid]
258 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
260 debuglog "Created pipeline instance: #{j[:uuid]}"
264 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
269 :api_token => ENV['ARVADOS_API_TOKEN'],
270 :pipeline_instance => @attributes_to_update.to_json
272 :authenticated => false)
273 j = JSON.parse result.body, :symbolize_names => true
274 unless j.is_a? Hash and j[:uuid]
275 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
278 @attributes_to_update = {}
283 @attributes_to_update[x] = y
291 @attributes_to_update = {}
299 result = $client.execute(:api_method => $arvados.jobs.get,
301 :api_token => ENV['ARVADOS_API_TOKEN'],
304 :authenticated => false)
305 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
307 def self.where(conditions)
308 result = $client.execute(:api_method => $arvados.jobs.list,
310 :api_token => ENV['ARVADOS_API_TOKEN'],
312 :where => conditions.to_json
314 :authenticated => false)
315 list = JSON.parse result.body, :symbolize_names => true
316 if list and list[:items].is_a? Array
322 def self.create(attributes)
324 result = $client.execute(:api_method => $arvados.jobs.create,
326 :api_token => ENV['ARVADOS_API_TOKEN'],
327 :job => attributes.to_json
329 :authenticated => false)
330 j = JSON.parse result.body, :symbolize_names => true
331 if j.is_a? Hash and j[:uuid]
334 debuglog "create job: #{j[:errors] rescue nil}", 0
340 class WhRunPipelineInstance
341 attr_reader :instance
343 def initialize(_options)
347 def fetch_template(template)
348 if template.match /[^-0-9a-z]/
349 # Doesn't look like a uuid -- use it as a filename.
350 @template = JSON.parse File.read(template), :symbolize_names => true
351 if !@template[:components]
352 abort ("#{$0}: Template loaded from #{template} " +
353 "does not have a \"components\" key")
356 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
358 :api_token => ENV['ARVADOS_API_TOKEN'],
361 :authenticated => false)
362 @template = JSON.parse result.body, :symbolize_names => true
364 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
370 def fetch_instance(instance_uuid)
371 @instance = PipelineInstance.find(instance_uuid)
372 @template = @instance
376 def apply_parameters(params_args)
377 params_args.shift if params_args[0] == '--'
379 while !params_args.empty?
380 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
381 params[re[2]] = re[3]
383 elsif params_args.size > 1
384 param = params_args.shift.sub /^--/, ''
385 params[param] = params_args.shift
387 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
391 @components = @template[:components].dup
394 @components.each do |componentname, component|
395 component[:script_parameters].each do |parametername, parameter|
396 parameter = { :value => parameter } unless parameter.is_a? Hash
398 (params["#{componentname}::#{parametername}"] ||
400 (parameter[:output_of].nil? &&
401 (params[parametername.to_s] ||
402 parameter[:default])) ||
405 ![false,'false',0,'0'].index parameter[:required]
406 if parameter[:output_of]
409 errors << [componentname, parametername, "required parameter is missing"]
411 debuglog "parameter #{componentname}::#{parametername} == #{value}"
412 component[:script_parameters][parametername] = value
416 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
418 debuglog "options=" + @options.pretty_inspect
423 @instance ||= PipelineInstance.
424 create(:components => @components,
425 :pipeline_template_uuid => @template[:uuid],
434 @components.each do |cname, c|
437 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
438 # Job is fully specified (all parameter values are present) but
439 # no particular job has been found.
441 debuglog "component #{cname} ready to satisfy."
444 second_place_job = nil # satisfies component, but not finished yet
446 (@options[:no_reuse] ? [] : JobCache.
447 where(script: c[:script],
448 script_parameters: c[:script_parameters],
449 script_version_descends_from: c[:script_version])
450 ).each do |candidate_job|
451 candidate_params_downcase = Hash[candidate_job[:script_parameters].
452 map { |k,v| [k.downcase,v] }]
453 c_params_downcase = Hash[c[:script_parameters].
454 map { |k,v| [k.downcase,v] }]
456 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
458 unless candidate_params_downcase == c_params_downcase
462 if c[:script_version] !=
463 candidate_job[:script_version][0,c[:script_version].length]
464 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
468 unless candidate_job[:success] || candidate_job[:running] ||
469 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
470 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
474 if candidate_job[:success]
475 unless @options[:no_reuse_finished]
477 $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
481 second_place_job ||= candidate_job
485 if not c[:job] and second_place_job
486 job = second_place_job
487 $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
491 debuglog "component #{cname} not satisfied by any existing job."
492 if !@options[:dry_run]
493 debuglog "component #{cname} new job."
494 job = JobCache.create(:script => c[:script],
495 :script_parameters => c[:script_parameters],
496 :runtime_constraints => c[:runtime_constraints] || {},
497 :script_version => c[:script_version] || 'master')
499 debuglog "component #{cname} new job #{job[:uuid]}"
502 debuglog "component #{cname} new job failed"
509 if c[:job] and c[:job][:uuid]
510 if (c[:job][:running] or
511 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
512 c[:job] = JobCache.get(c[:job][:uuid])
515 # Populate script_parameters of other components waiting for
517 @components.each do |c2name, c2|
518 c2[:script_parameters].each do |pname, p|
519 if p.is_a? Hash and p[:output_of] == cname.to_s
520 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
521 c2[:script_parameters][pname] = c[:job][:output]
526 elsif c[:job][:running] ||
527 (!c[:job][:started_at] && !c[:job][:cancelled_at])
529 elsif c[:job][:cancelled_at]
530 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
534 @instance[:components] = @components
535 @instance[:active] = moretodo
538 if @options[:no_wait]
546 debuglog "interrupt", 0
555 @components.each do |cname, c|
557 if c[:job][:finished_at]
559 if c[:job][:success] == true
561 elsif c[:job][:success] == false
568 if ended == @components.length or failed > 0
569 @instance[:active] = false
570 @instance[:success] = (succeeded == @components.length)
578 @instance[:active] = false
592 if @options[:status_json] != '/dev/null'
593 File.open(@options[:status_json], 'w') do |f|
594 f.puts @components.pretty_inspect
598 if @options[:status_text] != '/dev/null'
599 File.open(@options[:status_text], 'w') do |f|
601 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
602 namewidth = @components.collect { |cname, c| cname.size }.max
603 @components.each do |cname, c|
604 jstatus = if !c[:job]
606 elsif c[:job][:running]
607 "#{c[:job][:tasks_summary].inspect}"
608 elsif c[:job][:success]
610 elsif c[:job][:cancelled_at]
611 "cancelled #{c[:job][:cancelled_at]}"
612 elsif c[:job][:finished_at]
613 "failed #{c[:job][:finished_at]}"
614 elsif c[:job][:started_at]
615 "started #{c[:job][:started_at]}"
617 "queued #{c[:job][:created_at]}"
619 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
626 runner = WhRunPipelineInstance.new($options)
628 if $options[:template]
629 runner.fetch_template($options[:template])
631 runner.fetch_instance($options[:instance])
633 runner.apply_parameters(p.leftovers)
634 runner.setup_instance
637 puts runner.instance[:uuid]
641 rescue Exception => e