5 # wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--create-instance-only] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
32 # pipeline components. Always submit a new job
33 # or use an existing job which has not yet
36 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
37 # components. Submit a new job for every component.
39 # [--debug] Print extra debugging information on stderr.
41 # [--debug-level N] Increase amount of debugging information. Default
42 # 1, possible range 0..3.
44 # [--status-text path] Print plain text status report to a file or
45 # fifo. Default: /dev/stdout
47 # [--status-json path] Print JSON status report to a file or
48 # fifo. Default: /dev/null
52 # [param_name=param_value]
54 # [param_name param_value] Set (or override) the default value for
55 # every parameter with the given name.
57 # [component_name::param_name=param_value]
58 # [component_name::param_name param_value]
59 # [--component_name::param_name=param_value]
60 # [--component_name::param_name param_value] Set the value of a
61 # parameter for a single
64 class WhRunPipelineInstance
67 $application_version = 1.0
69 if RUBY_VERSION < '1.9.3' then
71 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
75 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
76 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
77 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
78 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
79 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
83 require 'google/api_client'
89 #{$0}: fatal: some runtime dependencies are missing.
90 Try: gem install pp google-api-client json trollop
94 def debuglog(message, verbosity=1)
95 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
100 original_verbosity = $VERBOSE
103 $VERBOSE = original_verbosity
108 if $arvados_api_host.match /local/
109 # You probably don't care about SSL certificate checks if you're
110 # testing with a dev server.
111 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
114 class Google::APIClient
115 def discovery_document(api, version)
117 return @discovery_documents["#{api}:#{version}"] ||=
119 response = self.execute!(
120 :http_method => :get,
121 :uri => self.discovery_uri(api, version),
122 :authenticated => false
124 response.body.class == String ? JSON.parse(response.body) : response.body
130 # Parse command line options (the kind that control the behavior of
131 # this program, that is, not the pipeline component parameters).
133 p = Trollop::Parser.new do
136 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
140 "Store plain text status in given file.",
143 :default => '/dev/stdout')
145 "Store json-formatted pipeline in given file.",
148 :default => '/dev/null')
150 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
153 opt(:no_reuse_finished,
154 "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
158 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
162 "Print extra debugging information on stderr.",
165 "Set debug verbosity level.",
169 "UUID of pipeline template, or path to local pipeline template file.",
173 "UUID of pipeline instance.",
176 opt(:create_instance_only,
177 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
182 $options = Trollop::with_standard_exception_handling p do
185 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
187 if $options[:instance]
188 if $options[:template] or $options[:create_instance_only]
189 abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
191 elsif not $options[:template]
192 abort "#{$0}: syntax error: you must supply a --template or --instance."
195 # Set up the API client.
197 $client ||= Google::APIClient.
198 new(:host => $arvados_api_host,
199 :application_name => File.split($0).last,
200 :application_version => $application_version.to_s)
201 $arvados = $client.discovered_api('arvados', $arvados_api_version)
204 class PipelineInstance
206 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
211 :api_token => ENV['ARVADOS_API_TOKEN']
213 :authenticated => false)
214 j = JSON.parse result.body, :symbolize_names => true
215 unless j.is_a? Hash and j[:uuid]
216 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
219 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
223 def self.create(attributes)
224 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
226 :api_token => ENV['ARVADOS_API_TOKEN'],
227 :pipeline_instance => attributes
229 :authenticated => false)
230 j = JSON.parse result.body, :symbolize_names => true
231 unless j.is_a? Hash and j[:uuid]
232 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
234 debuglog "Created pipeline instance: #{j[:uuid]}"
238 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
243 :api_token => ENV['ARVADOS_API_TOKEN'],
244 :pipeline_instance => @attributes_to_update.to_json
246 :authenticated => false)
247 j = JSON.parse result.body, :symbolize_names => true
248 unless j.is_a? Hash and j[:uuid]
249 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
252 @attributes_to_update = {}
257 @attributes_to_update[x] = y
265 @attributes_to_update = {}
273 result = $client.execute(:api_method => $arvados.jobs.get,
275 :api_token => ENV['ARVADOS_API_TOKEN'],
278 :authenticated => false)
279 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
281 def self.where(conditions)
282 result = $client.execute(:api_method => $arvados.jobs.list,
284 :api_token => ENV['ARVADOS_API_TOKEN'],
286 :where => conditions.to_json
288 :authenticated => false)
289 list = JSON.parse result.body, :symbolize_names => true
290 if list and list[:items].is_a? Array
296 def self.create(attributes)
298 result = $client.execute(:api_method => $arvados.jobs.create,
300 :api_token => ENV['ARVADOS_API_TOKEN'],
301 :job => attributes.to_json
303 :authenticated => false)
304 j = JSON.parse result.body, :symbolize_names => true
305 if j.is_a? Hash and j[:uuid]
308 debuglog "create job: #{j[:errors] rescue nil}", 0
314 class WhRunPipelineInstance
315 attr_reader :instance
317 def initialize(_options)
321 def fetch_template(template)
322 if template.match /[^-0-9a-z]/
323 # Doesn't look like a uuid -- use it as a filename.
324 @template = JSON.parse File.read(template), :symbolize_names => true
325 if !@template[:components]
326 abort ("#{$0}: Template loaded from #{template} " +
327 "does not have a \"components\" key")
330 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
332 :api_token => ENV['ARVADOS_API_TOKEN'],
335 :authenticated => false)
336 @template = JSON.parse result.body, :symbolize_names => true
338 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
344 def fetch_instance(instance_uuid)
345 @instance = PipelineInstance.find(instance_uuid)
346 @template = @instance
350 def apply_parameters(params_args)
351 params_args.shift if params_args[0] == '--'
353 while !params_args.empty?
354 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
355 params[re[2]] = re[3]
357 elsif params_args.size > 1
358 param = params_args.shift.sub /^--/, ''
359 params[param] = params_args.shift
361 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
365 @components = @template[:components].dup
368 @components.each do |componentname, component|
369 component[:script_parameters].each do |parametername, parameter|
370 parameter = { :value => parameter } unless parameter.is_a? Hash
372 (params["#{componentname}::#{parametername}"] ||
374 (parameter[:output_of].nil? &&
375 (params[parametername.to_s] ||
376 parameter[:default])) ||
379 ![false,'false',0,'0'].index parameter[:required]
380 if parameter[:output_of]
383 errors << [componentname, parametername, "required parameter is missing"]
385 debuglog "parameter #{componentname}::#{parametername} == #{value}"
386 component[:script_parameters][parametername] = value
390 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
392 debuglog "options=" + @options.pretty_inspect
397 @instance ||= PipelineInstance.
398 create(:components => @components,
399 :pipeline_template_uuid => @template[:uuid],
408 @components.each do |cname, c|
411 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
412 # Job is fully specified (all parameter values are present) but
413 # no particular job has been found.
415 debuglog "component #{cname} ready to satisfy."
418 second_place_job = nil # satisfies component, but not finished yet
420 (@options[:no_reuse] ? [] : JobCache.
421 where(script: c[:script],
422 script_parameters: c[:script_parameters],
423 script_version_descends_from: c[:script_version])
424 ).each do |candidate_job|
425 candidate_params_downcase = Hash[candidate_job[:script_parameters].
426 map { |k,v| [k.downcase,v] }]
427 c_params_downcase = Hash[c[:script_parameters].
428 map { |k,v| [k.downcase,v] }]
430 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
432 unless candidate_params_downcase == c_params_downcase
436 if c[:script_version] !=
437 candidate_job[:script_version][0,c[:script_version].length]
438 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
442 unless candidate_job[:success] || candidate_job[:running] ||
443 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
444 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
448 if candidate_job[:success]
449 unless @options[:no_reuse_finished]
451 $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
455 second_place_job ||= candidate_job
459 if not c[:job] and second_place_job
460 job = second_place_job
461 $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
465 debuglog "component #{cname} not satisfied by any existing job."
466 if !@options[:dry_run]
467 debuglog "component #{cname} new job."
468 job = JobCache.create(:script => c[:script],
469 :script_parameters => c[:script_parameters],
470 :runtime_constraints => c[:runtime_constraints] || {},
471 :script_version => c[:script_version] || 'master')
473 debuglog "component #{cname} new job #{job[:uuid]}"
476 debuglog "component #{cname} new job failed"
483 if c[:job] and c[:job][:uuid]
484 if (c[:job][:running] or
485 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
486 c[:job] = JobCache.get(c[:job][:uuid])
489 # Populate script_parameters of other components waiting for
491 @components.each do |c2name, c2|
492 c2[:script_parameters].each do |pname, p|
493 if p.is_a? Hash and p[:output_of] == cname.to_s
494 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
495 c2[:script_parameters][pname] = c[:job][:output]
499 elsif c[:job][:running] ||
500 (!c[:job][:started_at] && !c[:job][:cancelled_at])
501 moretodo ||= !@options[:no_wait]
502 elsif c[:job][:cancelled_at]
503 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
507 @instance[:components] = @components
508 @instance[:active] = moretodo
514 debuglog "interrupt", 0
519 @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
525 @instance[:active] = false
539 if @options[:status_json] != '/dev/null'
540 File.open(@options[:status_json], 'w') do |f|
541 f.puts @components.pretty_inspect
545 if @options[:status_text] != '/dev/null'
546 File.open(@options[:status_text], 'w') do |f|
548 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
549 namewidth = @components.collect { |cname, c| cname.size }.max
550 @components.each do |cname, c|
551 jstatus = if !c[:job]
553 elsif c[:job][:running]
554 "#{c[:job][:tasks_summary].inspect}"
555 elsif c[:job][:success]
557 elsif c[:job][:cancelled_at]
558 "cancelled #{c[:job][:cancelled_at]}"
559 elsif c[:job][:finished_at]
560 "failed #{c[:job][:finished_at]}"
561 elsif c[:job][:started_at]
562 "started #{c[:job][:started_at]}"
564 "queued #{c[:job][:created_at]}"
566 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
573 runner = WhRunPipelineInstance.new($options)
575 if $options[:template]
576 runner.fetch_template($options[:template])
578 runner.fetch_instance($options[:instance])
580 runner.apply_parameters(p.leftovers)
581 runner.setup_instance
582 if $options[:create_instance_only]
584 puts runner.instance[:uuid]
588 rescue Exception => e