5 # wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--instance uuid] Use the specified pipeline instance.
17 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
18 # to finish. Just find out whether jobs are finished,
19 # queued, or running for each component
21 # [--create-instance-only] Do not try to satisfy any components. Just
22 # create an instance, print its UUID to
25 # [--no-wait] Make only as much progress as possible without entering
28 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
29 # pipeline components. Always submit a new job
30 # or use an existing job which has not yet
33 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
34 # components. Submit a new job for every component.
36 # [--debug] Print extra debugging information on stderr.
38 # [--debug-level N] Increase amount of debugging information. Default
39 # 1, possible range 0..3.
41 # [--status-text path] Print plain text status report to a file or
42 # fifo. Default: /dev/stdout
44 # [--status-json path] Print JSON status report to a file or
45 # fifo. Default: /dev/null
49 # [param_name=param_value]
51 # [param_name param_value] Set (or override) the default value for
52 # every parameter with the given name.
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 # parameter for a single
61 class WhRunPipelineInstance
64 $application_version = 1.0
66 if RUBY_VERSION < '1.9.3' then
68 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
72 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
73 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
74 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
75 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
76 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
80 require 'google/api_client'
86 #{$0}: fatal: some runtime dependencies are missing.
87 Try: gem install pp google-api-client json trollop
91 def debuglog(message, verbosity=1)
92 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
97 original_verbosity = $VERBOSE
100 $VERBOSE = original_verbosity
105 if $arvados_api_host.match /local/
106 # You probably don't care about SSL certificate checks if you're
107 # testing with a dev server.
108 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
111 class Google::APIClient
112 def discovery_document(api, version)
114 return @discovery_documents["#{api}:#{version}"] ||=
116 response = self.execute!(
117 :http_method => :get,
118 :uri => self.discovery_uri(api, version),
119 :authenticated => false
121 response.body.class == String ? JSON.parse(response.body) : response.body
127 # Parse command line options (the kind that control the behavior of
128 # this program, that is, not the pipeline component parameters).
130 p = Trollop::Parser.new do
132 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
136 "Store plain text status in given file.",
139 :default => '/dev/stdout')
141 "Store json-formatted pipeline in given file.",
144 :default => '/dev/null')
146 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
149 opt(:no_reuse_finished,
150 "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
154 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
158 "Print extra debugging information on stderr.",
161 "Set debug verbosity level.",
165 "UUID of pipeline template.",
169 "UUID of pipeline instance.",
172 opt(:create_instance_only,
173 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
178 $options = Trollop::with_standard_exception_handling p do
181 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
183 if $options[:instance]
184 if $options[:template] or $options[:create_instance_only]
185 abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
187 elsif not $options[:template]
188 abort "#{$0}: syntax error: you must supply a --template or --instance."
191 # Set up the API client.
193 $client ||= Google::APIClient.
194 new(:host => $arvados_api_host,
195 :application_name => File.split($0).last,
196 :application_version => $application_version.to_s)
197 $arvados = $client.discovered_api('arvados', $arvados_api_version)
200 class PipelineInstance
202 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
207 :api_token => ENV['ARVADOS_API_TOKEN']
209 :authenticated => false)
210 j = JSON.parse result.body, :symbolize_names => true
211 unless j.is_a? Hash and j[:uuid]
212 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
215 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
219 def self.create(attributes)
220 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
222 :api_token => ENV['ARVADOS_API_TOKEN'],
223 :pipeline_instance => attributes
225 :authenticated => false)
226 j = JSON.parse result.body, :symbolize_names => true
227 unless j.is_a? Hash and j[:uuid]
228 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
230 debuglog "Created pipeline instance: #{j[:uuid]}"
234 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
239 :api_token => ENV['ARVADOS_API_TOKEN'],
240 :pipeline_instance => @attributes_to_update.to_json
242 :authenticated => false)
243 j = JSON.parse result.body, :symbolize_names => true
244 unless j.is_a? Hash and j[:uuid]
245 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
248 @attributes_to_update = {}
253 @attributes_to_update[x] = y
261 @attributes_to_update = {}
269 result = $client.execute(:api_method => $arvados.jobs.get,
271 :api_token => ENV['ARVADOS_API_TOKEN'],
274 :authenticated => false)
275 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
277 def self.where(conditions)
278 result = $client.execute(:api_method => $arvados.jobs.list,
280 :api_token => ENV['ARVADOS_API_TOKEN'],
282 :where => conditions.to_json
284 :authenticated => false)
285 list = JSON.parse result.body, :symbolize_names => true
286 if list and list[:items].is_a? Array
292 def self.create(attributes)
294 result = $client.execute(:api_method => $arvados.jobs.create,
296 :api_token => ENV['ARVADOS_API_TOKEN'],
297 :job => attributes.to_json
299 :authenticated => false)
300 j = JSON.parse result.body, :symbolize_names => true
301 if j.is_a? Hash and j[:uuid]
304 debuglog "create job: #{j[:errors] rescue nil}", 0
310 class WhRunPipelineInstance
311 attr_reader :instance
313 def initialize(_options)
317 def fetch_template(template_uuid)
318 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
320 :api_token => ENV['ARVADOS_API_TOKEN'],
321 :uuid => template_uuid
323 :authenticated => false)
324 @template = JSON.parse result.body, :symbolize_names => true
326 abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
331 def fetch_instance(instance_uuid)
332 @instance = PipelineInstance.find(instance_uuid)
333 @template = @instance
337 def apply_parameters(params_args)
338 params_args.shift if params_args[0] == '--'
340 while !params_args.empty?
341 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
342 params[re[2]] = re[3]
344 elsif params_args.size > 1
345 param = params_args.shift.sub /^--/, ''
346 params[param] = params_args.shift
348 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
352 @components = @template[:components].dup
355 @components.each do |componentname, component|
356 component[:script_parameters].each do |parametername, parameter|
357 parameter = { :value => parameter } unless parameter.is_a? Hash
359 (params["#{componentname}::#{parametername}"] ||
361 (parameter[:output_of].nil? &&
362 (params[parametername.to_s] ||
363 parameter[:default])) ||
366 ![false,'false',0,'0'].index parameter[:required]
367 if parameter[:output_of]
370 errors << [componentname, parametername, "required parameter is missing"]
372 debuglog "parameter #{componentname}::#{parametername} == #{value}"
373 component[:script_parameters][parametername] = value
377 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
379 debuglog "options=" + @options.pretty_inspect
384 @instance ||= PipelineInstance.
385 create(:components => @components,
386 :pipeline_template_uuid => @template[:uuid],
395 @components.each do |cname, c|
398 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
399 # Job is fully specified (all parameter values are present) but
400 # no particular job has been found.
402 debuglog "component #{cname} ready to satisfy."
405 second_place_job = nil # satisfies component, but not finished yet
407 (@options[:no_reuse] ? [] : JobCache.
408 where(script: c[:script],
409 script_parameters: c[:script_parameters],
410 script_version_descends_from: c[:script_version_descends_from])
411 ).each do |candidate_job|
412 candidate_params_downcase = Hash[candidate_job[:script_parameters].
413 map { |k,v| [k.downcase,v] }]
414 c_params_downcase = Hash[c[:script_parameters].
415 map { |k,v| [k.downcase,v] }]
417 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
419 unless candidate_params_downcase == c_params_downcase
423 unless candidate_job[:success] || candidate_job[:running] ||
424 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
425 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
429 if candidate_job[:success]
430 unless @options[:no_reuse_finished]
432 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
436 second_place_job ||= candidate_job
440 if not c[:job] and second_place_job
441 job = second_place_job
442 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
446 debuglog "component #{cname} not satisfied by any existing job."
447 if !@options[:dry_run]
448 debuglog "component #{cname} new job."
449 job = JobCache.create(:script => c[:script],
450 :script_parameters => c[:script_parameters],
451 :runtime_constraints => c[:runtime_constraints] || {},
452 :script_version => c[:script_version] || 'master')
454 debuglog "component #{cname} new job #{job[:uuid]}"
457 debuglog "component #{cname} new job failed"
464 if c[:job] and c[:job][:uuid]
465 if not c[:job][:finished_at] and not c[:job][:cancelled_at]
466 c[:job] = JobCache.get(c[:job][:uuid])
469 # Populate script_parameters of other components waiting for
471 @components.each do |c2name, c2|
472 c2[:script_parameters].each do |pname, p|
473 if p.is_a? Hash and p[:output_of] == cname.to_s
474 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
475 c2[:script_parameters][pname] = c[:job][:output]
479 elsif c[:job][:running] ||
480 (!c[:job][:started_at] && !c[:job][:cancelled_at])
481 moretodo ||= !@options[:no_wait]
482 elsif c[:job][:cancelled_at]
483 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
487 @instance[:components] = @components
488 @instance[:active] = moretodo
494 debuglog "interrupt", 0
499 @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
505 @instance[:active] = false
519 if @options[:status_json] != '/dev/null'
520 File.open(@options[:status_json], 'w') do |f|
521 f.puts @components.pretty_inspect
525 if @options[:status_text] != '/dev/null'
526 File.open(@options[:status_text], 'w') do |f|
527 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
528 namewidth = @components.collect { |cname, c| cname.size }.max
529 @components.each do |cname, c|
530 jstatus = if !c[:job]
532 elsif c[:job][:running]
533 "#{c[:job][:tasks_summary].inspect}"
534 elsif c[:job][:success]
536 elsif c[:job][:cancelled_at]
537 "cancelled #{c[:job][:cancelled_at]}"
538 elsif c[:job][:finished_at]
539 "failed #{c[:job][:finished_at]}"
540 elsif c[:job][:started_at]
541 "started #{c[:job][:started_at]}"
543 "queued #{c[:job][:created_at]}"
545 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
552 runner = WhRunPipelineInstance.new($options)
554 if $options[:template]
555 runner.fetch_template($options[:template])
557 runner.fetch_instance($options[:instance])
559 runner.apply_parameters(p.leftovers)
560 runner.setup_instance
561 if $options[:create_instance_only]
563 puts runner.instance[:uuid]
567 rescue Exception => e