Merge branch 'master' of git.clinicalfuture.com:arvados
[arvados.git] / sdk / cli / bin / wh-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--instance uuid] Use the specified pipeline instance.
16 #
17 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
18 #                 to finish. Just find out whether jobs are finished,
19 #                 queued, or running for each component
20 #
21 # [--create-only] Do not try to satisfy any components. Just create an
22 #                 instance, print its UUID to stdout, and exit.
23 #
24 # [--no-wait] Make only as much progress as possible without entering
25 #             a sleep/poll loop.
26 #
27 # [--debug] Print extra debugging information on stderr.
28 #
29 # [--debug-level N] Increase amount of debugging information. Default
30 #                   1, possible range 0..3.
31 #
32 # [--status-text path] Print plain text status report to a file or
33 #                      fifo. Default: /dev/stdout
34 #
35 # [--status-json path] Print JSON status report to a file or
36 #                      fifo. Default: /dev/null
37 #
38 # == Parameters
39 #
40 # [param_name=param_value]
41 #
42 # [param_name param_value] Set (or override) the default value for
43 #                          every parameter with the given name.
44 #
45 # [component_name::param_name=param_value]
46 # [component_name::param_name param_value]
47 # [--component_name::param_name=param_value]
48 # [--component_name::param_name param_value] Set the value of a
49 #                                            parameter for a single
50 #                                            component.
51 #
52 class WhRunPipelineInstance
53 end
54
55 $application_version = 1.0
56
57 if RUBY_VERSION < '1.9.3' then
58   abort <<-EOS
59 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
60   EOS
61 end
62
63 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
64 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
65   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
66 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
67   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
68
69 begin
70   require 'rubygems'
71   require 'google/api_client'
72   require 'json'
73   require 'pp'
74   require 'trollop'
75 rescue LoadError
76   abort <<-EOS
77 #{$0}: fatal: some runtime dependencies are missing.
78 Try: gem install pp google-api-client json trollop
79   EOS
80 end
81
82 def debuglog(message, verbosity=1)
83   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
84 end
85
86 module Kernel
87   def suppress_warnings
88     original_verbosity = $VERBOSE
89     $VERBOSE = nil
90     result = yield
91     $VERBOSE = original_verbosity
92     return result
93   end
94 end
95
96 if $arvados_api_host.match /local/
97   # You probably don't care about SSL certificate checks if you're
98   # testing with a dev server.
99   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
100 end
101
102 class Google::APIClient
103   def discovery_document(api, version)
104     api = api.to_s
105     return @discovery_documents["#{api}:#{version}"] ||=
106       begin
107         response = self.execute!(
108                                  :http_method => :get,
109                                  :uri => self.discovery_uri(api, version),
110                                  :authenticated => false
111                                  )
112         response.body.class == String ? JSON.parse(response.body) : response.body
113       end
114   end
115 end
116
117
118 # Parse command line options (the kind that control the behavior of
119 # this program, that is, not the pipeline component parameters).
120
121 p = Trollop::Parser.new do
122   opt(:dry_run,
123       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
124       :type => :boolean,
125       :short => :n)
126   opt(:status_text,
127       "Store plain text status in given file.",
128       :short => :none,
129       :type => :string,
130       :default => '/dev/stdout')
131   opt(:status_json,
132       "Store json-formatted pipeline in given file.",
133       :short => :none,
134       :type => :string,
135       :default => '/dev/null')
136   opt(:no_wait,
137       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
138       :short => :none,
139       :type => :boolean)
140   opt(:debug,
141       "Print extra debugging information on stderr.",
142       :type => :boolean)
143   opt(:debug_level,
144       "Set debug verbosity level.",
145       :short => :none,
146       :type => :integer)
147   opt(:template,
148       "UUID of pipeline template.",
149       :short => :none,
150       :type => :string)
151   opt(:instance,
152       "UUID of pipeline instance.",
153       :short => :none,
154       :type => :string)
155   opt(:create_only,
156       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
157       :short => :none,
158       :type => :boolean)
159   stop_on [:'--']
160 end
161 $options = Trollop::with_standard_exception_handling p do
162   p.parse ARGV
163 end
164 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
165
166 if $options[:instance]
167   if $options[:template] or $options[:create_only]
168     abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only."
169   end
170 elsif not $options[:template]
171   abort "#{$0}: syntax error: you must supply a --template or --instance."
172 end
173
174 # Set up the API client.
175
176 $client ||= Google::APIClient.
177   new(:host => $arvados_api_host,
178       :application_name => File.split($0).last,
179       :application_version => $application_version.to_s)
180 $arvados = $client.discovered_api('arvados', $arvados_api_version)
181
182
183 class PipelineInstance
184   def self.find(uuid)
185     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
186                              :parameters => {
187                                :api_token => ENV['ARVADOS_API_TOKEN'],
188                                :uuid => uuid
189                              },
190                              :authenticated => false)
191     j = JSON.parse result.body, :symbolize_names => true
192     unless j.is_a? Hash and j[:uuid]
193       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
194       nil
195     else
196       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
197       self.new(j)
198     end
199   end
200   def self.create(attributes)
201     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
202                              :parameters => {
203                                :api_token => ENV['ARVADOS_API_TOKEN'],
204                                :pipeline_instance => attributes.to_json
205                              },
206                              :authenticated => false)
207     j = JSON.parse result.body, :symbolize_names => true
208     unless j.is_a? Hash and j[:uuid]
209       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
210     end
211     debuglog "Created pipeline instance: #{j[:uuid]}"
212     self.new(j)
213   end
214   def save
215     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
216                              :parameters => {
217                                :api_token => ENV['ARVADOS_API_TOKEN'],
218                                :uuid => @pi[:uuid],
219                                :pipeline_instance => @attributes_to_update.to_json
220                              },
221                              :authenticated => false)
222     j = JSON.parse result.body, :symbolize_names => true
223     unless j.is_a? Hash and j[:uuid]
224       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
225       nil
226     else
227       @attributes_to_update = {}
228       @pi = j
229     end
230   end
231   def []=(x,y)
232     @attributes_to_update[x] = y
233     @pi[x] = y
234   end
235   def [](x)
236     @pi[x]
237   end
238   protected
239   def initialize(j)
240     @attributes_to_update = {}
241     @pi = j
242   end
243 end
244
245 class JobCache
246   def self.get(uuid)
247     @cache ||= {}
248     result = $client.execute(:api_method => $arvados.jobs.get,
249                              :parameters => {
250                                :api_token => ENV['ARVADOS_API_TOKEN'],
251                                :uuid => uuid
252                              },
253                              :authenticated => false)
254     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
255   end
256   def self.where(conditions)
257     result = $client.execute(:api_method => $arvados.jobs.list,
258                              :parameters => {
259                                :api_token => ENV['ARVADOS_API_TOKEN'],
260                                :limit => 10000,
261                                :where => conditions.to_json
262                              },
263                              :authenticated => false)
264     list = JSON.parse result.body, :symbolize_names => true
265     if list and list[:items].is_a? Array
266       list[:items]
267     else
268       []
269     end
270   end
271   def self.create(attributes)
272     @cache ||= {}
273     result = $client.execute(:api_method => $arvados.jobs.create,
274                              :parameters => {
275                                :api_token => ENV['ARVADOS_API_TOKEN'],
276                                :job => attributes.to_json
277                              },
278                              :authenticated => false)
279     j = JSON.parse result.body, :symbolize_names => true
280     if j.is_a? Hash and j[:uuid]
281       @cache[j[:uuid]] = j
282     else
283       debuglog "create job: #{j[:errors] rescue nil}"
284       nil
285     end
286   end
287 end
288
289 class WhRunPipelineInstance
290   attr_reader :instance
291
292   def initialize(_options)
293     @options = _options
294   end
295
296   def fetch_template(template_uuid)
297     result = $client.execute(:api_method => $arvados.pipeline_templates.get,
298                              :parameters => {
299                                :api_token => ENV['ARVADOS_API_TOKEN'],
300                                :uuid => template_uuid
301                              },
302                              :authenticated => false)
303     @template = JSON.parse result.body, :symbolize_names => true
304     if !@template[:uuid]
305       abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
306     end
307     self
308   end
309
310   def fetch_instance(instance_uuid)
311     @instance = PipelineInstance.find(instance_uuid)
312     @template = @instance
313     self
314   end
315
316   def apply_parameters(params_args)
317     params_args.shift if params_args[0] == '--'
318     params = {}
319     while !params_args.empty?
320       if (re = params_args[0].match /^(--)?([^-].*?)=(.)/)
321         params[re[2]] = re[3]
322         params_args.shift
323       elsif params_args.size > 1
324         param = params_args.shift.sub /^--/, ''
325         params[param] = params_args.shift
326       else
327         abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
328       end
329     end
330
331     @components = @template[:components].dup
332
333     errors = []
334     @components.each do |componentname, component|
335       component[:script_parameters].each do |parametername, parameter|
336         parameter = { :value => parameter } unless parameter.is_a? Hash
337         value =
338           (params["#{componentname}::#{parametername}"] ||
339            parameter[:value] ||
340            (parameter[:output_of].nil? &&
341             (params[parametername.to_s] ||
342              parameter[:default])) ||
343            nil)
344         if value.nil? and
345             ![false,'false',0,'0'].index parameter[:required]
346           if parameter[:output_of]
347             next
348           end
349           errors << [componentname, parametername, "required parameter is missing"]
350         end
351         debuglog "parameter #{componentname}::#{parametername} == #{value}"
352         component[:script_parameters][parametername] = value
353       end
354     end
355     if !errors.empty?
356       abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
357     end
358     debuglog "options=" + @options.pretty_inspect
359     self
360   end
361
362   def setup_instance
363     @instance ||= PipelineInstance.
364       create(:components => @components,
365              :pipeline_template_uuid => @template[:uuid],
366              :active => true)
367     self
368   end
369
370   def run
371     moretodo = true
372     while moretodo
373       moretodo = false
374       @components.each do |cname, c|
375         job = nil
376         if !c[:job] and
377             c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
378           # Job is fully specified (all parameter values are present) but
379           # no particular job has been found.
380
381           debuglog "component #{cname} ready to satisfy."
382
383           c.delete :wait
384           second_place_job = nil # satisfies component, but not finished yet
385           JobCache.where(:script => c[:script],
386                          :script_parameters => c[:script_parameters],
387                          :script_version_descends_from => c[:script_version_descends_from]).
388             each do |candidate_job|
389             candidate_params_downcase = Hash[candidate_job[:script_parameters].
390                                              map { |k,v| [k.downcase,v] }]
391             c_params_downcase = Hash[c[:script_parameters].
392                                      map { |k,v| [k.downcase,v] }]
393
394             debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
395
396             unless candidate_params_downcase == c_params_downcase
397               next
398             end
399
400             unless candidate_job[:success] || candidate_job[:running] ||
401                 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
402               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
403               next
404             end
405
406             if candidate_job[:success]
407               job = candidate_job
408               debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
409               c[:job] = job
410             else
411               second_place_job ||= candidate_job
412             end
413             break
414           end
415           if not c[:job] and second_place_job
416             job = second_place_job
417             debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
418             c[:job] = job
419           end
420           if not c[:job]
421             debuglog "component #{cname} not satisfied by any existing job."
422             if !@options[:dry_run]
423               debuglog "component #{cname} new job."
424               job = JobCache.create(:script => c[:script],
425                                     :script_parameters => c[:script_parameters],
426                                     :resource_limits => c[:resource_limits] || {},
427                                     :script_version => c[:script_version] || 'master')
428               if job
429                 debuglog "component #{cname} new job #{job[:uuid]}"
430                 c[:job] = job
431               else
432                 debuglog "component #{cname} new job failed: #{job[:errors]}"
433               end
434             end
435           end
436         else
437           c[:wait] = true
438         end
439         if c[:job] and c[:job][:uuid]
440           if not c[:job][:finished_at] and not c[:job][:cancelled_at]
441             c[:job] = JobCache.get(c[:job][:uuid])
442           end
443           if c[:job][:success]
444             # Populate script_parameters of other components waiting for
445             # this job
446             @components.each do |c2name, c2|
447               c2[:script_parameters].each do |pname, p|
448                 if p.is_a? Hash and p[:output_of] == cname.to_s
449                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
450                   c2[:script_parameters][pname] = c[:job][:output]
451                 end
452               end
453             end
454           elsif c[:job][:running] ||
455               (!c[:job][:started_at] && !c[:job][:cancelled_at])
456             moretodo ||= !@options[:no_wait]
457           elsif c[:job][:cancelled_at]
458             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
459           end
460         end
461       end
462       @instance[:components] = @components
463       @instance[:active] = moretodo
464       report_status
465       sleep 10 if moretodo
466     end
467     @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
468     @instance.save
469   end
470
471   def cleanup
472     if @instance
473       @instance[:active] = false
474       @instance.save
475     end
476   end
477
478   def uuid
479     @instance[:uuid]
480   end
481
482   protected
483
484   def report_status
485     @instance.save
486
487     if @options[:status_json] != '/dev/null'
488       File.open(@options[:status_json], 'w') do |f|
489         f.puts @components.pretty_inspect
490       end
491     end
492
493     if @options[:status_text] != '/dev/null'
494       File.open(@options[:status_text], 'w') do |f|
495         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
496         namewidth = @components.collect { |cname, c| cname.size }.max
497         @components.each do |cname, c|
498           jstatus = if !c[:job]
499                       "-"
500                     elsif c[:job][:running]
501                       "#{c[:job][:tasks_summary].inspect}"
502                     elsif c[:job][:success]
503                       c[:job][:output]
504                     elsif c[:job][:cancelled_at]
505                       "cancelled #{c[:job][:cancelled_at]}"
506                     elsif c[:job][:finished_at]
507                       "failed #{c[:job][:finished_at]}"
508                     elsif c[:job][:started_at]
509                       "started #{c[:job][:started_at]}"
510                     else
511                       "queued #{c[:job][:created_at]}"
512                     end
513           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
514         end
515       end
516     end
517   end
518 end
519
520 runner = WhRunPipelineInstance.new($options)
521 begin
522   if $options[:template]
523     runner.fetch_template($options[:template])
524   else
525     runner.fetch_instance($options[:instance])
526   end
527   runner.apply_parameters(p.leftovers)
528   runner.setup_instance
529   if $options[:create_only]
530     runner.instance.save
531     puts runner.instance[:uuid]
532   else
533     runner.run
534   end
535 rescue Exception => e
536   runner.cleanup
537   raise e
538 end