5 # wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--instance uuid] Use the specified pipeline instance.
17 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
18 # to finish. Just find out whether jobs are finished,
19 # queued, or running for each component
21 # [--create-instance-only] Do not try to satisfy any components. Just
22 # create an instance, print its UUID to
25 # [--no-wait] Make only as much progress as possible without entering
28 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
29 # pipeline components. Always submit a new job
30 # or use an existing job which has not yet
33 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
34 # components. Submit a new job for every component.
36 # [--debug] Print extra debugging information on stderr.
38 # [--debug-level N] Increase amount of debugging information. Default
39 # 1, possible range 0..3.
41 # [--status-text path] Print plain text status report to a file or
42 # fifo. Default: /dev/stdout
44 # [--status-json path] Print JSON status report to a file or
45 # fifo. Default: /dev/null
49 # [param_name=param_value]
51 # [param_name param_value] Set (or override) the default value for
52 # every parameter with the given name.
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 # parameter for a single
61 class WhRunPipelineInstance
64 $application_version = 1.0
66 if RUBY_VERSION < '1.9.3' then
68 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
72 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
73 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
74 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
75 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
76 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
80 require 'google/api_client'
86 #{$0}: fatal: some runtime dependencies are missing.
87 Try: gem install pp google-api-client json trollop
91 def debuglog(message, verbosity=1)
92 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
97 original_verbosity = $VERBOSE
100 $VERBOSE = original_verbosity
105 if $arvados_api_host.match /local/
106 # You probably don't care about SSL certificate checks if you're
107 # testing with a dev server.
108 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
111 class Google::APIClient
112 def discovery_document(api, version)
114 return @discovery_documents["#{api}:#{version}"] ||=
116 response = self.execute!(
117 :http_method => :get,
118 :uri => self.discovery_uri(api, version),
119 :authenticated => false
121 response.body.class == String ? JSON.parse(response.body) : response.body
127 # Parse command line options (the kind that control the behavior of
128 # this program, that is, not the pipeline component parameters).
130 p = Trollop::Parser.new do
132 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
136 "Store plain text status in given file.",
139 :default => '/dev/stdout')
141 "Store json-formatted pipeline in given file.",
144 :default => '/dev/null')
146 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
149 opt(:no_reuse_finished,
150 "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
154 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
158 "Print extra debugging information on stderr.",
161 "Set debug verbosity level.",
165 "UUID of pipeline template.",
169 "UUID of pipeline instance.",
172 opt(:create_instance_only,
173 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
178 $options = Trollop::with_standard_exception_handling p do
181 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
183 if $options[:instance]
184 if $options[:template] or $options[:create_instance_only]
185 abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
187 elsif not $options[:template]
188 abort "#{$0}: syntax error: you must supply a --template or --instance."
191 # Set up the API client.
193 $client ||= Google::APIClient.
194 new(:host => $arvados_api_host,
195 :application_name => File.split($0).last,
196 :application_version => $application_version.to_s)
197 $arvados = $client.discovered_api('arvados', $arvados_api_version)
200 class PipelineInstance
202 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
204 :api_token => ENV['ARVADOS_API_TOKEN'],
207 :authenticated => false)
208 j = JSON.parse result.body, :symbolize_names => true
209 unless j.is_a? Hash and j[:uuid]
210 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
213 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
217 def self.create(attributes)
218 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
220 :api_token => ENV['ARVADOS_API_TOKEN'],
221 :pipeline_instance => attributes.to_json
223 :authenticated => false)
224 j = JSON.parse result.body, :symbolize_names => true
225 unless j.is_a? Hash and j[:uuid]
226 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
228 debuglog "Created pipeline instance: #{j[:uuid]}"
232 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
234 :api_token => ENV['ARVADOS_API_TOKEN'],
236 :pipeline_instance => @attributes_to_update.to_json
238 :authenticated => false)
239 j = JSON.parse result.body, :symbolize_names => true
240 unless j.is_a? Hash and j[:uuid]
241 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
244 @attributes_to_update = {}
249 @attributes_to_update[x] = y
257 @attributes_to_update = {}
265 result = $client.execute(:api_method => $arvados.jobs.get,
267 :api_token => ENV['ARVADOS_API_TOKEN'],
270 :authenticated => false)
271 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
273 def self.where(conditions)
274 result = $client.execute(:api_method => $arvados.jobs.list,
276 :api_token => ENV['ARVADOS_API_TOKEN'],
278 :where => conditions.to_json
280 :authenticated => false)
281 list = JSON.parse result.body, :symbolize_names => true
282 if list and list[:items].is_a? Array
288 def self.create(attributes)
290 result = $client.execute(:api_method => $arvados.jobs.create,
292 :api_token => ENV['ARVADOS_API_TOKEN'],
293 :job => attributes.to_json
295 :authenticated => false)
296 j = JSON.parse result.body, :symbolize_names => true
297 if j.is_a? Hash and j[:uuid]
300 debuglog "create job: #{j[:errors] rescue nil}", 0
306 class WhRunPipelineInstance
307 attr_reader :instance
309 def initialize(_options)
313 def fetch_template(template_uuid)
314 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
316 :api_token => ENV['ARVADOS_API_TOKEN'],
317 :uuid => template_uuid
319 :authenticated => false)
320 @template = JSON.parse result.body, :symbolize_names => true
322 abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
327 def fetch_instance(instance_uuid)
328 @instance = PipelineInstance.find(instance_uuid)
329 @template = @instance
333 def apply_parameters(params_args)
334 params_args.shift if params_args[0] == '--'
336 while !params_args.empty?
337 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
338 params[re[2]] = re[3]
340 elsif params_args.size > 1
341 param = params_args.shift.sub /^--/, ''
342 params[param] = params_args.shift
344 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
348 @components = @template[:components].dup
351 @components.each do |componentname, component|
352 component[:script_parameters].each do |parametername, parameter|
353 parameter = { :value => parameter } unless parameter.is_a? Hash
355 (params["#{componentname}::#{parametername}"] ||
357 (parameter[:output_of].nil? &&
358 (params[parametername.to_s] ||
359 parameter[:default])) ||
362 ![false,'false',0,'0'].index parameter[:required]
363 if parameter[:output_of]
366 errors << [componentname, parametername, "required parameter is missing"]
368 debuglog "parameter #{componentname}::#{parametername} == #{value}"
369 component[:script_parameters][parametername] = value
373 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
375 debuglog "options=" + @options.pretty_inspect
380 @instance ||= PipelineInstance.
381 create(:components => @components,
382 :pipeline_template_uuid => @template[:uuid],
391 @components.each do |cname, c|
394 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
395 # Job is fully specified (all parameter values are present) but
396 # no particular job has been found.
398 debuglog "component #{cname} ready to satisfy."
401 second_place_job = nil # satisfies component, but not finished yet
403 (@options[:no_reuse] ? [] : JobCache.
404 where(script: c[:script],
405 script_parameters: c[:script_parameters],
406 script_version_descends_from: c[:script_version_descends_from])
407 ).each do |candidate_job|
408 candidate_params_downcase = Hash[candidate_job[:script_parameters].
409 map { |k,v| [k.downcase,v] }]
410 c_params_downcase = Hash[c[:script_parameters].
411 map { |k,v| [k.downcase,v] }]
413 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
415 unless candidate_params_downcase == c_params_downcase
419 unless candidate_job[:success] || candidate_job[:running] ||
420 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
421 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
425 if candidate_job[:success]
426 unless @options[:no_reuse_finished]
428 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
432 second_place_job ||= candidate_job
436 if not c[:job] and second_place_job
437 job = second_place_job
438 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
442 debuglog "component #{cname} not satisfied by any existing job."
443 if !@options[:dry_run]
444 debuglog "component #{cname} new job."
445 job = JobCache.create(:script => c[:script],
446 :script_parameters => c[:script_parameters],
447 :resource_limits => c[:resource_limits] || {},
448 :script_version => c[:script_version] || 'master')
450 debuglog "component #{cname} new job #{job[:uuid]}"
453 debuglog "component #{cname} new job failed"
460 if c[:job] and c[:job][:uuid]
461 if not c[:job][:finished_at] and not c[:job][:cancelled_at]
462 c[:job] = JobCache.get(c[:job][:uuid])
465 # Populate script_parameters of other components waiting for
467 @components.each do |c2name, c2|
468 c2[:script_parameters].each do |pname, p|
469 if p.is_a? Hash and p[:output_of] == cname.to_s
470 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
471 c2[:script_parameters][pname] = c[:job][:output]
475 elsif c[:job][:running] ||
476 (!c[:job][:started_at] && !c[:job][:cancelled_at])
477 moretodo ||= !@options[:no_wait]
478 elsif c[:job][:cancelled_at]
479 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
483 @instance[:components] = @components
484 @instance[:active] = moretodo
490 debuglog "interrupt", 0
495 @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
501 @instance[:active] = false
515 if @options[:status_json] != '/dev/null'
516 File.open(@options[:status_json], 'w') do |f|
517 f.puts @components.pretty_inspect
521 if @options[:status_text] != '/dev/null'
522 File.open(@options[:status_text], 'w') do |f|
523 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
524 namewidth = @components.collect { |cname, c| cname.size }.max
525 @components.each do |cname, c|
526 jstatus = if !c[:job]
528 elsif c[:job][:running]
529 "#{c[:job][:tasks_summary].inspect}"
530 elsif c[:job][:success]
532 elsif c[:job][:cancelled_at]
533 "cancelled #{c[:job][:cancelled_at]}"
534 elsif c[:job][:finished_at]
535 "failed #{c[:job][:finished_at]}"
536 elsif c[:job][:started_at]
537 "started #{c[:job][:started_at]}"
539 "queued #{c[:job][:created_at]}"
541 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
548 runner = WhRunPipelineInstance.new($options)
550 if $options[:template]
551 runner.fetch_template($options[:template])
553 runner.fetch_instance($options[:instance])
555 runner.apply_parameters(p.leftovers)
556 runner.setup_instance
557 if $options[:create_instance_only]
559 puts runner.instance[:uuid]
563 rescue Exception => e