pass component[:script_parameters] to server when searching for jobs. refs #1447
[arvados.git] / cli / wh-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #
7 # Satisfy a pipeline template by finding or submitting a mapreduce job
8 # for each pipeline component.
9 #
10 # == Options
11 #
12 # [--template uuid] Use the specified pipeline template.
13 #
14 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
15 #                 to finish. Just find out whether jobs are finished,
16 #                 queued, or running for each component
17 #
18 # [--no-wait] Make only as much progress as possible without entering
19 #             a sleep/poll loop.
20 #
21 # [--debug] Print extra debugging information on stderr.
22 #
23 # [--debug-level N] Increase amount of debugging information. Default
24 #                   1, possible range 0..3.
25 #
26 # [--status-text path] Print plain text status report to a file or
27 #                      fifo. Default: /dev/stdout
28 #
29 # [--status-json path] Print JSON status report to a file or
30 #                      fifo. Default: /dev/null
31 #
32 # == Parameters
33 #
34 # [param_name=param_value]
35 #
36 # [param_name param_value] Set (or override) the default value for
37 #                          every parameter with the given name.
38 #
39 # [component_name::param_name=param_value]
40 # [component_name::param_name param_value]
41 # [--component_name::param_name=param_value]
42 # [--component_name::param_name param_value] Set the value of a
43 #                                            parameter for a single
44 #                                            component.
45 #
46 class WhRunPipelineInstance
47 end
48
49 $application_version = 1.0
50
51 if RUBY_VERSION < '1.9.3' then
52   abort <<-EOS
53 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
54   EOS
55 end
56
57 $orvos_api_version = ENV['ORVOS_API_VERSION'] || 'v1'
58 $orvos_api_host = ENV['ORVOS_API_HOST'] or
59   abort "#{$0}: fatal: ORVOS_API_HOST environment variable not set."
60 $orvos_api_token = ENV['ORVOS_API_TOKEN'] or
61   abort "#{$0}: fatal: ORVOS_API_TOKEN environment variable not set."
62
63 begin
64   require 'rubygems'
65   require 'google/api_client'
66   require 'json'
67   require 'pp'
68   require 'trollop'
69 rescue LoadError
70   abort <<-EOS
71 #{$0}: fatal: some runtime dependencies are missing.
72 Try: gem install pp google-api-client json trollop
73   EOS
74 end
75
76 def debuglog(message, verbosity=1)
77   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
78 end
79
80 module Kernel
81   def suppress_warnings
82     original_verbosity = $VERBOSE
83     $VERBOSE = nil
84     result = yield
85     $VERBOSE = original_verbosity
86     return result
87   end
88 end
89
90 if $orvos_api_host.match /local/
91   # You probably don't care about SSL certificate checks if you're
92   # testing with a dev server.
93   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
94 end
95
96 class Google::APIClient
97   def discovery_document(api, version)
98     api = api.to_s
99     return @discovery_documents["#{api}:#{version}"] ||=
100       begin
101         response = self.execute!(
102                                  :http_method => :get,
103                                  :uri => self.discovery_uri(api, version),
104                                  :authenticated => false
105                                  )
106         response.body.class == String ? JSON.parse(response.body) : response.body
107       end
108   end
109 end
110
111
112 # Parse command line options (the kind that control the behavior of
113 # this program, that is, not the pipeline component parameters).
114
115 p = Trollop::Parser.new do
116   opt(:dry_run,
117       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
118       :type => :boolean,
119       :short => :n)
120   opt(:status_text,
121       "Store plain text status in given file.",
122       :short => :none,
123       :type => :string,
124       :default => '/dev/stdout')
125   opt(:status_json,
126       "Store json-formatted pipeline in given file.",
127       :short => :none,
128       :type => :string,
129       :default => '/dev/null')
130   opt(:no_wait,
131       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
132       :short => :none,
133       :type => :boolean)
134   opt(:debug,
135       "Print extra debugging information on stderr.",
136       :type => :boolean)
137   opt(:debug_level,
138       "Set debug verbosity level.",
139       :short => :none,
140       :type => :integer)
141   opt(:template,
142       "UUID of pipeline template.",
143       :short => :none,
144       :type => :string,
145       :required => true)
146   stop_on [:'--']
147 end
148 $options = Trollop::with_standard_exception_handling p do
149   p.parse ARGV
150 end
151 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
152
153
154 # Set up the API client.
155
156 $client ||= Google::APIClient.
157   new(:host => $orvos_api_host,
158       :application_name => File.split($0).last,
159       :application_version => $application_version.to_s)
160 $orvos = $client.discovered_api('orvos', $orvos_api_version)
161
162
163 class PipelineInstance
164   def initialize(attributes)
165     @attributes_to_update = {}
166     result = $client.execute(:api_method => $orvos.pipeline_instances.create,
167                              :parameters => {
168                                :api_token => ENV['ORVOS_API_TOKEN'],
169                                :pipeline_instance => attributes.to_json
170                              },
171                              :authenticated => false)
172     j = JSON.parse result.body, :symbolize_names => true
173     unless j.is_a? Hash and j[:uuid]
174       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil}"
175     end
176     debuglog "Created pipeline instance: #{j[:uuid]}"
177     @pi = j
178   end
179   def save
180     result = $client.execute(:api_method => $orvos.pipeline_instances.update,
181                              :parameters => {
182                                :api_token => ENV['ORVOS_API_TOKEN'],
183                                :uuid => @pi[:uuid],
184                                :pipeline_instance => @attributes_to_update.to_json
185                              },
186                              :authenticated => false)
187     j = JSON.parse result.body, :symbolize_names => true
188     unless j.is_a? Hash and j[:uuid]
189       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
190       nil
191     else
192       @attributes_to_update = {}
193       @pi = j
194     end
195   end
196   def []=(x,y)
197     @attributes_to_update[x] = y
198     @pi[x] = y
199   end
200   def [](x)
201     @pi[x]
202   end
203 end
204
205 class JobCache
206   def self.get(uuid)
207     @cache ||= {}
208     result = $client.execute(:api_method => $orvos.jobs.get,
209                              :parameters => {
210                                :api_token => ENV['ORVOS_API_TOKEN'],
211                                :uuid => uuid
212                              },
213                              :authenticated => false)
214     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
215   end
216   def self.where(conditions)
217     result = $client.execute(:api_method => $orvos.jobs.list,
218                              :parameters => {
219                                :api_token => ENV['ORVOS_API_TOKEN'],
220                                :limit => 10000,
221                                :where => conditions.to_json
222                              },
223                              :authenticated => false)
224     list = JSON.parse result.body, :symbolize_names => true
225     if list and list[:items].is_a? Array
226       list[:items]
227     else
228       []
229     end
230   end
231   def self.create(attributes)
232     @cache ||= {}
233     result = $client.execute(:api_method => $orvos.jobs.create,
234                              :parameters => {
235                                :api_token => ENV['ORVOS_API_TOKEN'],
236                                :job => attributes.to_json
237                              },
238                              :authenticated => false)
239     j = JSON.parse result.body, :symbolize_names => true
240     if j.is_a? Hash and j[:uuid]
241       @cache[j[:uuid]] = j
242     else
243       debuglog "create job: #{j[:errors] rescue nil}"
244       nil
245     end
246   end
247 end
248
249 class WhRunPipelineInstance
250   def initialize(_options)
251     @options = _options
252   end
253
254   def fetch_template(template_uuid)
255     result = $client.execute(:api_method => $orvos.pipeline_templates.get,
256                              :parameters => {
257                                :api_token => ENV['ORVOS_API_TOKEN'],
258                                :uuid => template_uuid
259                              },
260                              :authenticated => false)
261     @template = JSON.parse result.body, :symbolize_names => true
262     if !@template[:uuid]
263       abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
264     end
265     self
266   end
267
268   def apply_parameters(params_args)
269     params_args.shift if params_args[0] == '--'
270     params = {}
271     while !params_args.empty?
272       if (re = params_args[0].match /^(--)?([^-].*?)=(.)/)
273         params[re[2]] = re[3]
274         params_args.shift
275       elsif params_args.size > 1
276         param = params_args.shift.sub /^--/, ''
277         params[param] = params_args.shift
278       else
279         abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
280       end
281     end
282
283     @components = @template[:components].dup
284
285     errors = []
286     @components.each do |componentname, component|
287       component[:script_parameters].each do |parametername, parameter|
288         parameter = { :value => parameter } unless parameter.is_a? Hash
289         value =
290           (params["#{componentname}::#{parametername}"] ||
291            parameter[:value] ||
292            (parameter[:output_of].nil? &&
293             (params[parametername.to_s] ||
294              parameter[:default])) ||
295            nil)
296         if value.nil? and
297             ![false,'false',0,'0'].index parameter[:required]
298           if parameter[:output_of]
299             next
300           end
301           errors << [componentname, parametername, "required parameter is missing"]
302         end
303         debuglog "parameter #{componentname}::#{parametername} == #{value}"
304         component[:script_parameters][parametername] = value
305       end
306     end
307     if !errors.empty?
308       abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
309     end
310     debuglog "options=" + @options.pretty_inspect
311     self
312   end
313
314   def run
315     moretodo = true
316     while moretodo
317       moretodo = false
318
319       @instance ||= PipelineInstance.new(:components => @components,
320                                          :pipeline_template_uuid => @template[:uuid],
321                                          :active => true)
322       @components.each do |cname, c|
323         job = nil
324         if !c[:job] and
325             c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
326           # Job is fully specified (all parameter values are present) but
327           # no particular job has been found.
328
329           debuglog "component #{cname} ready to satisfy."
330
331           c.delete :wait
332           JobCache.where(:script => c[:script],
333                          :script_parameters => c[:script_parameters],
334                          :script_version_descends_from => c[:script_version_descends_from]).
335             each do |candidate_job|
336             candidate_params_downcase = Hash[candidate_job[:script_parameters].
337                                              map { |k,v| [k.downcase,v] }]
338             c_params_downcase = Hash[c[:script_parameters].
339                                      map { |k,v| [k.downcase,v] }]
340
341             debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
342
343             unless candidate_params_downcase == c_params_downcase
344               next
345             end
346
347             unless candidate_job[:success] || candidate_job[:running] ||
348                 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
349               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
350               next
351             end
352
353             job = candidate_job
354             debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
355             c[:job] = job
356             break
357           end
358           if not c[:job]
359             debuglog "component #{cname} not satisfied by any existing job."
360             if !@options[:dry_run]
361               debuglog "component #{cname} new job."
362               job = JobCache.create(:script => c[:script],
363                                     :script_parameters => c[:script_parameters],
364                                     :resource_limits => c[:resource_limits] || {},
365                                     :script_version => c[:script_version] || 'master')
366               if job
367                 debuglog "component #{cname} new job #{job[:uuid]}"
368                 c[:job] = job
369               else
370                 debuglog "component #{cname} new job failed: #{job[:errors]}"
371               end
372             end
373           end
374         else
375           c[:wait] = true
376         end
377         if c[:job] and c[:job][:uuid]
378           if not c[:job][:finished_at] and not c[:job][:cancelled_at]
379             c[:job] = JobCache.get(c[:job][:uuid])
380           end
381           if c[:job][:success]
382             # Populate script_parameters of other components waiting for
383             # this job
384             @components.each do |c2name, c2|
385               c2[:script_parameters].each do |pname, p|
386                 if p.is_a? Hash and p[:output_of] == cname.to_s
387                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
388                   c2[:script_parameters][pname] = c[:job][:output]
389                 end
390               end
391             end
392           elsif c[:job][:running] ||
393               (!c[:job][:started_at] && !c[:job][:cancelled_at])
394             moretodo ||= !@options[:no_wait]
395           elsif c[:job][:cancelled_at]
396             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
397           end
398         end
399       end
400       @instance[:components] = @components
401       @instance[:active] = moretodo
402       report_status
403       sleep 10 if moretodo
404     end
405   end
406
407   def cleanup
408     @instance[:active] = false
409     @instance.save
410   end
411
412   protected
413
414   def report_status
415     @instance.save
416
417     if @options[:status_json] != '/dev/null'
418       File.open(@options[:status_json], 'w') do |f|
419         f.puts @components.pretty_inspect
420       end
421     end
422
423     if @options[:status_text] != '/dev/null'
424       File.open(@options[:status_text], 'w') do |f|
425         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
426         namewidth = @components.collect { |cname, c| cname.size }.max
427         @components.each do |cname, c|
428           jstatus = if !c[:job]
429                       "-"
430                     elsif c[:job][:running]
431                       "#{c[:job][:tasks_summary].inspect}"
432                     elsif c[:job][:success]
433                       c[:job][:output]
434                     elsif c[:job][:cancelled_at]
435                       "cancelled #{c[:job][:cancelled_at]}"
436                     elsif c[:job][:started_at]
437                       "started #{c[:job][:started_at]}"
438                     else
439                       "queued #{c[:job][:created_at]}"
440                     end
441           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
442         end
443       end
444     end
445   end
446 end
447
448 instance = WhRunPipelineInstance.new($options)
449 begin
450   instance.fetch_template($options[:template]).
451     apply_parameters(p.leftovers).
452     run
453 rescue Exception => e
454   instance.cleanup
455   raise e
456 end