5 # wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
7 # Satisfy a pipeline template by finding or submitting a mapreduce job
8 # for each pipeline component.
12 # [--template uuid] Use the specified pipeline template.
14 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
15 # to finish. Just find out whether jobs are finished,
16 # queued, or running for each component
18 # [--no-wait] Make only as much progress as possible without entering
21 # [--debug] Print extra debugging information on stderr.
23 # [--debug-level N] Increase amount of debugging information. Default
24 # 1, possible range 0..3.
26 # [--status-text path] Print plain text status report to a file or
27 # fifo. Default: /dev/stdout
29 # [--status-json path] Print JSON status report to a file or
30 # fifo. Default: /dev/null
34 # [param_name=param_value]
36 # [param_name param_value] Set (or override) the default value for
37 # every parameter with the given name.
39 # [component_name::param_name=param_value]
40 # [component_name::param_name param_value]
41 # [--component_name::param_name=param_value]
42 # [--component_name::param_name param_value] Set the value of a
43 # parameter for a single
46 class WhRunPipelineInstance
49 $application_version = 1.0
51 if RUBY_VERSION < '1.9.3' then
53 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
57 $orvos_api_version = ENV['ORVOS_API_VERSION'] || 'v1'
58 $orvos_api_host = ENV['ORVOS_API_HOST'] or
59 abort "#{$0}: fatal: ORVOS_API_HOST environment variable not set."
60 $orvos_api_token = ENV['ORVOS_API_TOKEN'] or
61 abort "#{$0}: fatal: ORVOS_API_TOKEN environment variable not set."
65 require 'google/api_client'
71 #{$0}: fatal: some runtime dependencies are missing.
72 Try: gem install pp google-api-client json trollop
76 def debuglog(message, verbosity=1)
77 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
82 original_verbosity = $VERBOSE
85 $VERBOSE = original_verbosity
90 if $orvos_api_host.match /local/
91 # You probably don't care about SSL certificate checks if you're
92 # testing with a dev server.
93 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
96 class Google::APIClient
97 def discovery_document(api, version)
99 return @discovery_documents["#{api}:#{version}"] ||=
101 response = self.execute!(
102 :http_method => :get,
103 :uri => self.discovery_uri(api, version),
104 :authenticated => false
106 response.body.class == String ? JSON.parse(response.body) : response.body
112 # Parse command line options (the kind that control the behavior of
113 # this program, that is, not the pipeline component parameters).
115 p = Trollop::Parser.new do
117 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
121 "Store plain text status in given file.",
124 :default => '/dev/stdout')
126 "Store json-formatted pipeline in given file.",
129 :default => '/dev/null')
131 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
135 "Print extra debugging information on stderr.",
138 "Set debug verbosity level.",
142 "UUID of pipeline template.",
148 $options = Trollop::with_standard_exception_handling p do
151 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
154 # Set up the API client.
156 $client ||= Google::APIClient.
157 new(:host => $orvos_api_host,
158 :application_name => File.split($0).last,
159 :application_version => $application_version.to_s)
160 $orvos = $client.discovered_api('orvos', $orvos_api_version)
163 class PipelineInstance
164 def initialize(attributes)
165 @attributes_to_update = {}
166 result = $client.execute(:api_method => $orvos.pipeline_instances.create,
168 :api_token => ENV['ORVOS_API_TOKEN'],
169 :pipeline_instance => attributes.to_json
171 :authenticated => false)
172 j = JSON.parse result.body, :symbolize_names => true
173 unless j.is_a? Hash and j[:uuid]
174 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil}"
176 debuglog "Created pipeline instance: #{j[:uuid]}"
180 result = $client.execute(:api_method => $orvos.pipeline_instances.update,
182 :api_token => ENV['ORVOS_API_TOKEN'],
184 :pipeline_instance => @attributes_to_update.to_json
186 :authenticated => false)
187 j = JSON.parse result.body, :symbolize_names => true
188 unless j.is_a? Hash and j[:uuid]
189 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
192 @attributes_to_update = {}
197 @attributes_to_update[x] = y
208 result = $client.execute(:api_method => $orvos.jobs.get,
210 :api_token => ENV['ORVOS_API_TOKEN'],
213 :authenticated => false)
214 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
216 def self.where(conditions)
217 result = $client.execute(:api_method => $orvos.jobs.list,
219 :api_token => ENV['ORVOS_API_TOKEN'],
221 :where => conditions.to_json
223 :authenticated => false)
224 list = JSON.parse result.body, :symbolize_names => true
225 if list and list[:items].is_a? Array
231 def self.create(attributes)
233 result = $client.execute(:api_method => $orvos.jobs.create,
235 :api_token => ENV['ORVOS_API_TOKEN'],
236 :job => attributes.to_json
238 :authenticated => false)
239 j = JSON.parse result.body, :symbolize_names => true
240 if j.is_a? Hash and j[:uuid]
243 debuglog "create job: #{j[:errors] rescue nil}"
249 class WhRunPipelineInstance
250 def initialize(_options)
254 def fetch_template(template_uuid)
255 result = $client.execute(:api_method => $orvos.pipeline_templates.get,
257 :api_token => ENV['ORVOS_API_TOKEN'],
258 :uuid => template_uuid
260 :authenticated => false)
261 @template = JSON.parse result.body, :symbolize_names => true
263 abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
268 def apply_parameters(params_args)
269 params_args.shift if params_args[0] == '--'
271 while !params_args.empty?
272 if (re = params_args[0].match /^(--)?([^-].*?)=(.)/)
273 params[re[2]] = re[3]
275 elsif params_args.size > 1
276 param = params_args.shift.sub /^--/, ''
277 params[param] = params_args.shift
279 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
283 @components = @template[:components].dup
286 @components.each do |componentname, component|
287 component[:script_parameters].each do |parametername, parameter|
288 parameter = { :value => parameter } unless parameter.is_a? Hash
290 (params["#{componentname}::#{parametername}"] ||
292 (parameter[:output_of].nil? &&
293 (params[parametername.to_s] ||
294 parameter[:default])) ||
297 ![false,'false',0,'0'].index parameter[:required]
298 if parameter[:output_of]
301 errors << [componentname, parametername, "required parameter is missing"]
303 debuglog "parameter #{componentname}::#{parametername} == #{value}"
304 component[:script_parameters][parametername] = value
308 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
310 debuglog "options=" + @options.pretty_inspect
319 @instance ||= PipelineInstance.new(:components => @components,
320 :pipeline_template_uuid => @template[:uuid],
322 @components.each do |cname, c|
325 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
326 # Job is fully specified (all parameter values are present) but
327 # no particular job has been found.
329 debuglog "component #{cname} ready to satisfy."
332 JobCache.where(:script => c[:script],
333 :script_parameters => c[:script_parameters],
334 :script_version_descends_from => c[:script_version_descends_from]).
335 each do |candidate_job|
336 candidate_params_downcase = Hash[candidate_job[:script_parameters].
337 map { |k,v| [k.downcase,v] }]
338 c_params_downcase = Hash[c[:script_parameters].
339 map { |k,v| [k.downcase,v] }]
341 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
343 unless candidate_params_downcase == c_params_downcase
347 unless candidate_job[:success] || candidate_job[:running] ||
348 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
349 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
354 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
359 debuglog "component #{cname} not satisfied by any existing job."
360 if !@options[:dry_run]
361 debuglog "component #{cname} new job."
362 job = JobCache.create(:script => c[:script],
363 :script_parameters => c[:script_parameters],
364 :resource_limits => c[:resource_limits] || {},
365 :script_version => c[:script_version] || 'master')
367 debuglog "component #{cname} new job #{job[:uuid]}"
370 debuglog "component #{cname} new job failed: #{job[:errors]}"
377 if c[:job] and c[:job][:uuid]
378 if not c[:job][:finished_at] and not c[:job][:cancelled_at]
379 c[:job] = JobCache.get(c[:job][:uuid])
382 # Populate script_parameters of other components waiting for
384 @components.each do |c2name, c2|
385 c2[:script_parameters].each do |pname, p|
386 if p.is_a? Hash and p[:output_of] == cname.to_s
387 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
388 c2[:script_parameters][pname] = c[:job][:output]
392 elsif c[:job][:running] ||
393 (!c[:job][:started_at] && !c[:job][:cancelled_at])
394 moretodo ||= !@options[:no_wait]
395 elsif c[:job][:cancelled_at]
396 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
400 @instance[:components] = @components
401 @instance[:active] = moretodo
408 @instance[:active] = false
417 if @options[:status_json] != '/dev/null'
418 File.open(@options[:status_json], 'w') do |f|
419 f.puts @components.pretty_inspect
423 if @options[:status_text] != '/dev/null'
424 File.open(@options[:status_text], 'w') do |f|
425 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
426 namewidth = @components.collect { |cname, c| cname.size }.max
427 @components.each do |cname, c|
428 jstatus = if !c[:job]
430 elsif c[:job][:running]
431 "#{c[:job][:tasks_summary].inspect}"
432 elsif c[:job][:success]
434 elsif c[:job][:cancelled_at]
435 "cancelled #{c[:job][:cancelled_at]}"
436 elsif c[:job][:started_at]
437 "started #{c[:job][:started_at]}"
439 "queued #{c[:job][:created_at]}"
441 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
448 instance = WhRunPipelineInstance.new($options)
450 instance.fetch_template($options[:template]).
451 apply_parameters(p.leftovers).
453 rescue Exception => e