rename jobs.resource_limits to runtime_constraints
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--instance uuid] Use the specified pipeline instance.
16 #
17 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
18 #                 to finish. Just find out whether jobs are finished,
19 #                 queued, or running for each component
20 #
21 # [--create-instance-only] Do not try to satisfy any components. Just
22 #                          create an instance, print its UUID to
23 #                          stdout, and exit.
24 #
25 # [--no-wait] Make only as much progress as possible without entering
26 #             a sleep/poll loop.
27 #
28 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
29 #                       pipeline components. Always submit a new job
30 #                       or use an existing job which has not yet
31 #                       finished.
32 #
33 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
34 #              components. Submit a new job for every component.
35 #
36 # [--debug] Print extra debugging information on stderr.
37 #
38 # [--debug-level N] Increase amount of debugging information. Default
39 #                   1, possible range 0..3.
40 #
41 # [--status-text path] Print plain text status report to a file or
42 #                      fifo. Default: /dev/stdout
43 #
44 # [--status-json path] Print JSON status report to a file or
45 #                      fifo. Default: /dev/null
46 #
47 # == Parameters
48 #
49 # [param_name=param_value]
50 #
51 # [param_name param_value] Set (or override) the default value for
52 #                          every parameter with the given name.
53 #
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 #                                            parameter for a single
59 #                                            component.
60 #
61 class WhRunPipelineInstance
62 end
63
64 $application_version = 1.0
65
66 if RUBY_VERSION < '1.9.3' then
67   abort <<-EOS
68 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
69   EOS
70 end
71
72 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
73 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
74   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
75 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
76   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
77
78 begin
79   require 'rubygems'
80   require 'google/api_client'
81   require 'json'
82   require 'pp'
83   require 'trollop'
84 rescue LoadError
85   abort <<-EOS
86 #{$0}: fatal: some runtime dependencies are missing.
87 Try: gem install pp google-api-client json trollop
88   EOS
89 end
90
91 def debuglog(message, verbosity=1)
92   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
93 end
94
95 module Kernel
96   def suppress_warnings
97     original_verbosity = $VERBOSE
98     $VERBOSE = nil
99     result = yield
100     $VERBOSE = original_verbosity
101     return result
102   end
103 end
104
105 if $arvados_api_host.match /local/
106   # You probably don't care about SSL certificate checks if you're
107   # testing with a dev server.
108   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
109 end
110
111 class Google::APIClient
112   def discovery_document(api, version)
113     api = api.to_s
114     return @discovery_documents["#{api}:#{version}"] ||=
115       begin
116         response = self.execute!(
117                                  :http_method => :get,
118                                  :uri => self.discovery_uri(api, version),
119                                  :authenticated => false
120                                  )
121         response.body.class == String ? JSON.parse(response.body) : response.body
122       end
123   end
124 end
125
126
127 # Parse command line options (the kind that control the behavior of
128 # this program, that is, not the pipeline component parameters).
129
130 p = Trollop::Parser.new do
131   opt(:dry_run,
132       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
133       :type => :boolean,
134       :short => :n)
135   opt(:status_text,
136       "Store plain text status in given file.",
137       :short => :none,
138       :type => :string,
139       :default => '/dev/stdout')
140   opt(:status_json,
141       "Store json-formatted pipeline in given file.",
142       :short => :none,
143       :type => :string,
144       :default => '/dev/null')
145   opt(:no_wait,
146       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
147       :short => :none,
148       :type => :boolean)
149   opt(:no_reuse_finished,
150       "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
151       :short => :none,
152       :type => :boolean)
153   opt(:no_reuse,
154       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
155       :short => :none,
156       :type => :boolean)
157   opt(:debug,
158       "Print extra debugging information on stderr.",
159       :type => :boolean)
160   opt(:debug_level,
161       "Set debug verbosity level.",
162       :short => :none,
163       :type => :integer)
164   opt(:template,
165       "UUID of pipeline template.",
166       :short => :none,
167       :type => :string)
168   opt(:instance,
169       "UUID of pipeline instance.",
170       :short => :none,
171       :type => :string)
172   opt(:create_instance_only,
173       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
174       :short => :none,
175       :type => :boolean)
176   stop_on [:'--']
177 end
178 $options = Trollop::with_standard_exception_handling p do
179   p.parse ARGV
180 end
181 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
182
183 if $options[:instance]
184   if $options[:template] or $options[:create_instance_only]
185     abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
186   end
187 elsif not $options[:template]
188   abort "#{$0}: syntax error: you must supply a --template or --instance."
189 end
190
191 # Set up the API client.
192
193 $client ||= Google::APIClient.
194   new(:host => $arvados_api_host,
195       :application_name => File.split($0).last,
196       :application_version => $application_version.to_s)
197 $arvados = $client.discovered_api('arvados', $arvados_api_version)
198
199
200 class PipelineInstance
201   def self.find(uuid)
202     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
203                              :parameters => {
204                                :uuid => uuid
205                              },
206                              :body => {
207                                :api_token => ENV['ARVADOS_API_TOKEN']
208                              },
209                              :authenticated => false)
210     j = JSON.parse result.body, :symbolize_names => true
211     unless j.is_a? Hash and j[:uuid]
212       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
213       nil
214     else
215       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
216       self.new(j)
217     end
218   end
219   def self.create(attributes)
220     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
221                              :body => {
222                                :api_token => ENV['ARVADOS_API_TOKEN'],
223                                :pipeline_instance => attributes
224                              },
225                              :authenticated => false)
226     j = JSON.parse result.body, :symbolize_names => true
227     unless j.is_a? Hash and j[:uuid]
228       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
229     end
230     debuglog "Created pipeline instance: #{j[:uuid]}"
231     self.new(j)
232   end
233   def save
234     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
235                              :parameters => {
236                                :uuid => @pi[:uuid]
237                              },
238                              :body => {
239                                :api_token => ENV['ARVADOS_API_TOKEN'],
240                                :pipeline_instance => @attributes_to_update.to_json
241                              },
242                              :authenticated => false)
243     j = JSON.parse result.body, :symbolize_names => true
244     unless j.is_a? Hash and j[:uuid]
245       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
246       nil
247     else
248       @attributes_to_update = {}
249       @pi = j
250     end
251   end
252   def []=(x,y)
253     @attributes_to_update[x] = y
254     @pi[x] = y
255   end
256   def [](x)
257     @pi[x]
258   end
259   protected
260   def initialize(j)
261     @attributes_to_update = {}
262     @pi = j
263   end
264 end
265
266 class JobCache
267   def self.get(uuid)
268     @cache ||= {}
269     result = $client.execute(:api_method => $arvados.jobs.get,
270                              :parameters => {
271                                :api_token => ENV['ARVADOS_API_TOKEN'],
272                                :uuid => uuid
273                              },
274                              :authenticated => false)
275     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
276   end
277   def self.where(conditions)
278     result = $client.execute(:api_method => $arvados.jobs.list,
279                              :parameters => {
280                                :api_token => ENV['ARVADOS_API_TOKEN'],
281                                :limit => 10000,
282                                :where => conditions.to_json
283                              },
284                              :authenticated => false)
285     list = JSON.parse result.body, :symbolize_names => true
286     if list and list[:items].is_a? Array
287       list[:items]
288     else
289       []
290     end
291   end
292   def self.create(attributes)
293     @cache ||= {}
294     result = $client.execute(:api_method => $arvados.jobs.create,
295                              :parameters => {
296                                :api_token => ENV['ARVADOS_API_TOKEN'],
297                                :job => attributes.to_json
298                              },
299                              :authenticated => false)
300     j = JSON.parse result.body, :symbolize_names => true
301     if j.is_a? Hash and j[:uuid]
302       @cache[j[:uuid]] = j
303     else
304       debuglog "create job: #{j[:errors] rescue nil}", 0
305       nil
306     end
307   end
308 end
309
310 class WhRunPipelineInstance
311   attr_reader :instance
312
313   def initialize(_options)
314     @options = _options
315   end
316
317   def fetch_template(template_uuid)
318     result = $client.execute(:api_method => $arvados.pipeline_templates.get,
319                              :parameters => {
320                                :api_token => ENV['ARVADOS_API_TOKEN'],
321                                :uuid => template_uuid
322                              },
323                              :authenticated => false)
324     @template = JSON.parse result.body, :symbolize_names => true
325     if !@template[:uuid]
326       abort "#{$0}: fatal: failed to retrieve pipeline template #{template_uuid} #{@template[:errors].inspect rescue nil}"
327     end
328     self
329   end
330
331   def fetch_instance(instance_uuid)
332     @instance = PipelineInstance.find(instance_uuid)
333     @template = @instance
334     self
335   end
336
337   def apply_parameters(params_args)
338     params_args.shift if params_args[0] == '--'
339     params = {}
340     while !params_args.empty?
341       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
342         params[re[2]] = re[3]
343         params_args.shift
344       elsif params_args.size > 1
345         param = params_args.shift.sub /^--/, ''
346         params[param] = params_args.shift
347       else
348         abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
349       end
350     end
351
352     @components = @template[:components].dup
353
354     errors = []
355     @components.each do |componentname, component|
356       component[:script_parameters].each do |parametername, parameter|
357         parameter = { :value => parameter } unless parameter.is_a? Hash
358         value =
359           (params["#{componentname}::#{parametername}"] ||
360            parameter[:value] ||
361            (parameter[:output_of].nil? &&
362             (params[parametername.to_s] ||
363              parameter[:default])) ||
364            nil)
365         if value.nil? and
366             ![false,'false',0,'0'].index parameter[:required]
367           if parameter[:output_of]
368             next
369           end
370           errors << [componentname, parametername, "required parameter is missing"]
371         end
372         debuglog "parameter #{componentname}::#{parametername} == #{value}"
373         component[:script_parameters][parametername] = value
374       end
375     end
376     if !errors.empty?
377       abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
378     end
379     debuglog "options=" + @options.pretty_inspect
380     self
381   end
382
383   def setup_instance
384     @instance ||= PipelineInstance.
385       create(:components => @components,
386              :pipeline_template_uuid => @template[:uuid],
387              :active => true)
388     self
389   end
390
391   def run
392     moretodo = true
393     while moretodo
394       moretodo = false
395       @components.each do |cname, c|
396         job = nil
397         if !c[:job] and
398             c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
399           # Job is fully specified (all parameter values are present) but
400           # no particular job has been found.
401
402           debuglog "component #{cname} ready to satisfy."
403
404           c.delete :wait
405           second_place_job = nil # satisfies component, but not finished yet
406
407           (@options[:no_reuse] ? [] : JobCache.
408            where(script: c[:script],
409                  script_parameters: c[:script_parameters],
410                  script_version_descends_from: c[:script_version_descends_from])
411            ).each do |candidate_job|
412             candidate_params_downcase = Hash[candidate_job[:script_parameters].
413                                              map { |k,v| [k.downcase,v] }]
414             c_params_downcase = Hash[c[:script_parameters].
415                                      map { |k,v| [k.downcase,v] }]
416
417             debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
418
419             unless candidate_params_downcase == c_params_downcase
420               next
421             end
422
423             unless candidate_job[:success] || candidate_job[:running] ||
424                 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
425               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
426               next
427             end
428
429             if candidate_job[:success]
430               unless @options[:no_reuse_finished]
431                 job = candidate_job
432                 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
433                 c[:job] = job
434               end
435             else
436               second_place_job ||= candidate_job
437             end
438             break
439           end
440           if not c[:job] and second_place_job
441             job = second_place_job
442             debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
443             c[:job] = job
444           end
445           if not c[:job]
446             debuglog "component #{cname} not satisfied by any existing job."
447             if !@options[:dry_run]
448               debuglog "component #{cname} new job."
449               job = JobCache.create(:script => c[:script],
450                                     :script_parameters => c[:script_parameters],
451                                     :runtime_constraints => c[:runtime_constraints] || {},
452                                     :script_version => c[:script_version] || 'master')
453               if job
454                 debuglog "component #{cname} new job #{job[:uuid]}"
455                 c[:job] = job
456               else
457                 debuglog "component #{cname} new job failed"
458               end
459             end
460           end
461         else
462           c[:wait] = true
463         end
464         if c[:job] and c[:job][:uuid]
465           if not c[:job][:finished_at] and not c[:job][:cancelled_at]
466             c[:job] = JobCache.get(c[:job][:uuid])
467           end
468           if c[:job][:success]
469             # Populate script_parameters of other components waiting for
470             # this job
471             @components.each do |c2name, c2|
472               c2[:script_parameters].each do |pname, p|
473                 if p.is_a? Hash and p[:output_of] == cname.to_s
474                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
475                   c2[:script_parameters][pname] = c[:job][:output]
476                 end
477               end
478             end
479           elsif c[:job][:running] ||
480               (!c[:job][:started_at] && !c[:job][:cancelled_at])
481             moretodo ||= !@options[:no_wait]
482           elsif c[:job][:cancelled_at]
483             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
484           end
485         end
486       end
487       @instance[:components] = @components
488       @instance[:active] = moretodo
489       report_status
490       if moretodo
491         begin
492           sleep 10
493         rescue Interrupt
494           debuglog "interrupt", 0
495           abort
496         end
497       end
498     end
499     @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
500     @instance.save
501   end
502
503   def cleanup
504     if @instance
505       @instance[:active] = false
506       @instance.save
507     end
508   end
509
510   def uuid
511     @instance[:uuid]
512   end
513
514   protected
515
516   def report_status
517     @instance.save
518
519     if @options[:status_json] != '/dev/null'
520       File.open(@options[:status_json], 'w') do |f|
521         f.puts @components.pretty_inspect
522       end
523     end
524
525     if @options[:status_text] != '/dev/null'
526       File.open(@options[:status_text], 'w') do |f|
527         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
528         namewidth = @components.collect { |cname, c| cname.size }.max
529         @components.each do |cname, c|
530           jstatus = if !c[:job]
531                       "-"
532                     elsif c[:job][:running]
533                       "#{c[:job][:tasks_summary].inspect}"
534                     elsif c[:job][:success]
535                       c[:job][:output]
536                     elsif c[:job][:cancelled_at]
537                       "cancelled #{c[:job][:cancelled_at]}"
538                     elsif c[:job][:finished_at]
539                       "failed #{c[:job][:finished_at]}"
540                     elsif c[:job][:started_at]
541                       "started #{c[:job][:started_at]}"
542                     else
543                       "queued #{c[:job][:created_at]}"
544                     end
545           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
546         end
547       end
548     end
549   end
550 end
551
552 runner = WhRunPipelineInstance.new($options)
553 begin
554   if $options[:template]
555     runner.fetch_template($options[:template])
556   else
557     runner.fetch_instance($options[:instance])
558   end
559   runner.apply_parameters(p.leftovers)
560   runner.setup_instance
561   if $options[:create_instance_only]
562     runner.instance.save
563     puts runner.instance[:uuid]
564   else
565     runner.run
566   end
567 rescue Exception => e
568   runner.cleanup
569   raise e
570 end