5 # wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--instance uuid] Use the specified pipeline instance.
17 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
18 # to finish. Just find out whether jobs are finished,
19 # queued, or running for each component
21 # [--create-only] Do not try to satisfy any components. Just create an
22 # instance, print its UUID to stdout, and exit.
24 # [--no-wait] Make only as much progress as possible without entering
27 # [--debug] Print extra debugging information on stderr.
29 # [--debug-level N] Increase amount of debugging information. Default
30 # 1, possible range 0..3.
32 # [--status-text path] Print plain text status report to a file or
33 # fifo. Default: /dev/stdout
35 # [--status-json path] Print JSON status report to a file or
36 # fifo. Default: /dev/null
40 # [param_name=param_value]
42 # [param_name param_value] Set (or override) the default value for
43 # every parameter with the given name.
45 # [component_name::param_name=param_value]
46 # [component_name::param_name param_value]
47 # [--component_name::param_name=param_value]
48 # [--component_name::param_name param_value] Set the value of a
49 # parameter for a single
52 class WhRunPipelineInstance
55 $application_version = 1.0
57 if RUBY_VERSION < '1.9.3' then
59 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
63 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
64 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
65 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
66 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
67 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
71 require 'google/api_client'
77 #{$0}: fatal: some runtime dependencies are missing.
78 Try: gem install pp google-api-client json trollop
82 def debuglog(message, verbosity=1)
83 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
88 original_verbosity = $VERBOSE
91 $VERBOSE = original_verbosity
96 if $arvados_api_host.match /local/
97 # You probably don't care about SSL certificate checks if you're
98 # testing with a dev server.
99 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
102 class Google::APIClient
103 def discovery_document(api, version)
105 return @discovery_documents["#{api}:#{version}"] ||=
107 response = self.execute!(
108 :http_method => :get,
109 :uri => self.discovery_uri(api, version),
110 :authenticated => false
112 response.body.class == String ? JSON.parse(response.body) : response.body
118 # Parse command line options (the kind that control the behavior of
119 # this program, that is, not the pipeline component parameters).
121 p = Trollop::Parser.new do
123 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
127 "Store plain text status in given file.",
130 :default => '/dev/stdout')
132 "Store json-formatted pipeline in given file.",
135 :default => '/dev/null')
137 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
141 "Print extra debugging information on stderr.",
144 "Set debug verbosity level.",
148 "UUID of pipeline template.",
152 "UUID of pipeline instance.",
156 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
161 $options = Trollop::with_standard_exception_handling p do
164 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
166 if $options[:instance]
167 if $options[:template] or $options[:create_only]
168 abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only."
170 elsif not $options[:template]
171 abort "#{$0}: syntax error: you must supply a --template or --instance."
174 # Set up the API client.
176 $client ||= Google::APIClient.
177 new(:host => $arvados_api_host,
178 :application_name => File.split($0).last,
179 :application_version => $application_version.to_s)
180 $arvados = $client.discovered_api('arvados', $arvados_api_version)
183 class PipelineInstance
185 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
187 :api_token => ENV['ARVADOS_API_TOKEN'],
190 :authenticated => false)
191 j = JSON.parse result.body, :symbolize_names => true
192 unless j.is_a? Hash and j[:uuid]
193 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
196 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
200 def self.create(attributes)
201 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
203 :api_token => ENV['ARVADOS_API_TOKEN'],
204 :pipeline_instance => attributes.to_json
206 :authenticated => false)
207 j = JSON.parse result.body, :symbolize_names => true
208 unless j.is_a? Hash and j[:uuid]
209 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
211 debuglog "Created pipeline instance: #{j[:uuid]}"
215 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
217 :api_token => ENV['ARVADOS_API_TOKEN'],
219 :pipeline_instance => @attributes_to_update.to_json
221 :authenticated => false)
222 j = JSON.parse result.body, :symbolize_names => true
223 unless j.is_a? Hash and j[:uuid]
224 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
227 @attributes_to_update = {}
232 @attributes_to_update[x] = y
240 @attributes_to_update = {}
248 result = $client.execute(:api_method => $arvados.jobs.get,
250 :api_token => ENV['ARVADOS_API_TOKEN'],
253 :authenticated => false)
254 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
256 def self.where(conditions)
257 result = $client.execute(:api_method => $arvados.jobs.list,
259 :api_token => ENV['ARVADOS_API_TOKEN'],
261 :where => conditions.to_json
263 :authenticated => false)
264 list = JSON.parse result.body, :symbolize_names => true
265 if list and list[:items].is_a? Array
271 def self.create(attributes)
273 result = $client.execute(:api_method => $arvados.jobs.create,
275 :api_token => ENV['ARVADOS_API_TOKEN'],
276 :job => attributes.to_json
278 :authenticated => false)
279 j = JSON.parse result.body, :symbolize_names => true
280 if j.is_a? Hash and j[:uuid]
283 debuglog "create job: #{j[:errors] rescue nil}"
289 class WhRunPipelineInstance
290 attr_reader :instance
292 def initialize(_options)
296 def fetch_template(template_uuid)
297 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
299 :api_token => ENV['ARVADOS_API_TOKEN'],
300 :uuid => template_uuid
302 :authenticated => false)
303 @template = JSON.parse result.body, :symbolize_names => true
305 abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
310 def fetch_instance(instance_uuid)
311 @instance = PipelineInstance.find(instance_uuid)
312 @template = @instance
316 def apply_parameters(params_args)
317 params_args.shift if params_args[0] == '--'
319 while !params_args.empty?
320 if (re = params_args[0].match /^(--)?([^-].*?)=(.)/)
321 params[re[2]] = re[3]
323 elsif params_args.size > 1
324 param = params_args.shift.sub /^--/, ''
325 params[param] = params_args.shift
327 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
331 @components = @template[:components].dup
334 @components.each do |componentname, component|
335 component[:script_parameters].each do |parametername, parameter|
336 parameter = { :value => parameter } unless parameter.is_a? Hash
338 (params["#{componentname}::#{parametername}"] ||
340 (parameter[:output_of].nil? &&
341 (params[parametername.to_s] ||
342 parameter[:default])) ||
345 ![false,'false',0,'0'].index parameter[:required]
346 if parameter[:output_of]
349 errors << [componentname, parametername, "required parameter is missing"]
351 debuglog "parameter #{componentname}::#{parametername} == #{value}"
352 component[:script_parameters][parametername] = value
356 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
358 debuglog "options=" + @options.pretty_inspect
363 @instance ||= PipelineInstance.
364 create(:components => @components,
365 :pipeline_template_uuid => @template[:uuid],
374 @components.each do |cname, c|
377 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
378 # Job is fully specified (all parameter values are present) but
379 # no particular job has been found.
381 debuglog "component #{cname} ready to satisfy."
384 second_place_job = nil # satisfies component, but not finished yet
385 JobCache.where(:script => c[:script],
386 :script_parameters => c[:script_parameters],
387 :script_version_descends_from => c[:script_version_descends_from]).
388 each do |candidate_job|
389 candidate_params_downcase = Hash[candidate_job[:script_parameters].
390 map { |k,v| [k.downcase,v] }]
391 c_params_downcase = Hash[c[:script_parameters].
392 map { |k,v| [k.downcase,v] }]
394 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
396 unless candidate_params_downcase == c_params_downcase
400 unless candidate_job[:success] || candidate_job[:running] ||
401 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
402 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
406 if candidate_job[:success]
408 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
411 second_place_job ||= candidate_job
415 if not c[:job] and second_place_job
416 job = second_place_job
417 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
421 debuglog "component #{cname} not satisfied by any existing job."
422 if !@options[:dry_run]
423 debuglog "component #{cname} new job."
424 job = JobCache.create(:script => c[:script],
425 :script_parameters => c[:script_parameters],
426 :resource_limits => c[:resource_limits] || {},
427 :script_version => c[:script_version] || 'master')
429 debuglog "component #{cname} new job #{job[:uuid]}"
432 debuglog "component #{cname} new job failed: #{job[:errors]}"
439 if c[:job] and c[:job][:uuid]
440 if not c[:job][:finished_at] and not c[:job][:cancelled_at]
441 c[:job] = JobCache.get(c[:job][:uuid])
444 # Populate script_parameters of other components waiting for
446 @components.each do |c2name, c2|
447 c2[:script_parameters].each do |pname, p|
448 if p.is_a? Hash and p[:output_of] == cname.to_s
449 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
450 c2[:script_parameters][pname] = c[:job][:output]
454 elsif c[:job][:running] ||
455 (!c[:job][:started_at] && !c[:job][:cancelled_at])
456 moretodo ||= !@options[:no_wait]
457 elsif c[:job][:cancelled_at]
458 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
462 @instance[:components] = @components
463 @instance[:active] = moretodo
467 @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
473 @instance[:active] = false
487 if @options[:status_json] != '/dev/null'
488 File.open(@options[:status_json], 'w') do |f|
489 f.puts @components.pretty_inspect
493 if @options[:status_text] != '/dev/null'
494 File.open(@options[:status_text], 'w') do |f|
495 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
496 namewidth = @components.collect { |cname, c| cname.size }.max
497 @components.each do |cname, c|
498 jstatus = if !c[:job]
500 elsif c[:job][:running]
501 "#{c[:job][:tasks_summary].inspect}"
502 elsif c[:job][:success]
504 elsif c[:job][:cancelled_at]
505 "cancelled #{c[:job][:cancelled_at]}"
506 elsif c[:job][:finished_at]
507 "failed #{c[:job][:finished_at]}"
508 elsif c[:job][:started_at]
509 "started #{c[:job][:started_at]}"
511 "queued #{c[:job][:created_at]}"
513 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
520 runner = WhRunPipelineInstance.new($options)
522 if $options[:template]
523 runner.fetch_template($options[:template])
525 runner.fetch_instance($options[:instance])
527 runner.apply_parameters(p.leftovers)
528 runner.setup_instance
529 if $options[:create_only]
531 puts runner.instance[:uuid]
535 rescue Exception => e