Merge branch '2291-new-keepd-read-blocks' (fixes #2291)
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
32 #                       pipeline components. Always submit a new job
33 #                       or use an existing job which has not yet
34 #                       finished.
35 #
36 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
37 #              components. Submit a new job for every component.
38 #
39 # [--debug] Print extra debugging information on stderr.
40 #
41 # [--debug-level N] Increase amount of debugging information. Default
42 #                   1, possible range 0..3.
43 #
44 # [--status-text path] Print plain text status report to a file or
45 #                      fifo. Default: /dev/stdout
46 #
47 # [--status-json path] Print JSON status report to a file or
48 #                      fifo. Default: /dev/null
49 #
50 # == Parameters
51 #
52 # [param_name=param_value]
53 #
54 # [param_name param_value] Set (or override) the default value for
55 #                          every parameter with the given name.
56 #
57 # [component_name::param_name=param_value]
58 # [component_name::param_name param_value]
59 # [--component_name::param_name=param_value]
60 # [--component_name::param_name param_value] Set the value of a
61 #                                            parameter for a single
62 #                                            component.
63 #
64 class WhRunPipelineInstance
65 end
66
67 $application_version = 1.0
68
69 if RUBY_VERSION < '1.9.3' then
70   abort <<-EOS
71 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
72   EOS
73 end
74
75 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
76 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
77   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
78 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
79   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
80
81 begin
82   require 'rubygems'
83   require 'json'
84   require 'pp'
85   require 'trollop'
86   require 'google/api_client'
87 rescue LoadError => l
88   puts $:
89   abort <<-EOS
90 #{$0}: fatal: #{l.message}
91 Some runtime dependencies may be missing.
92 Try: gem install pp google-api-client json trollop
93   EOS
94 end
95
96 def debuglog(message, verbosity=1)
97   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 end
99
100 module Kernel
101   def suppress_warnings
102     original_verbosity = $VERBOSE
103     $VERBOSE = nil
104     result = yield
105     $VERBOSE = original_verbosity
106     return result
107   end
108 end
109
110 if $arvados_api_host.match /local/
111   # You probably don't care about SSL certificate checks if you're
112   # testing with a dev server.
113   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
114 end
115
116 class Google::APIClient
117   def discovery_document(api, version)
118     api = api.to_s
119     return @discovery_documents["#{api}:#{version}"] ||=
120       begin
121         response = self.execute!(
122                                  :http_method => :get,
123                                  :uri => self.discovery_uri(api, version),
124                                  :authenticated => false
125                                  )
126         response.body.class == String ? JSON.parse(response.body) : response.body
127       end
128   end
129 end
130
131
132 # Parse command line options (the kind that control the behavior of
133 # this program, that is, not the pipeline component parameters).
134
135 p = Trollop::Parser.new do
136   version __FILE__
137   opt(:dry_run,
138       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
139       :type => :boolean,
140       :short => :n)
141   opt(:status_text,
142       "Store plain text status in given file.",
143       :short => :none,
144       :type => :string,
145       :default => '/dev/stdout')
146   opt(:status_json,
147       "Store json-formatted pipeline in given file.",
148       :short => :none,
149       :type => :string,
150       :default => '/dev/null')
151   opt(:no_wait,
152       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
153       :short => :none,
154       :type => :boolean)
155   opt(:no_reuse_finished,
156       "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
157       :short => :none,
158       :type => :boolean)
159   opt(:no_reuse,
160       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
161       :short => :none,
162       :type => :boolean)
163   opt(:debug,
164       "Print extra debugging information on stderr.",
165       :type => :boolean)
166   opt(:debug_level,
167       "Set debug verbosity level.",
168       :short => :none,
169       :type => :integer)
170   opt(:template,
171       "UUID of pipeline template, or path to local pipeline template file.",
172       :short => :none,
173       :type => :string)
174   opt(:instance,
175       "UUID of pipeline instance.",
176       :short => :none,
177       :type => :string)
178   opt(:submit,
179       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
180       :short => :none,
181       :type => :boolean)
182   opt(:run_here,
183       "Manage the pipeline in process.",
184       :short => :none,
185       :type => :boolean)
186   stop_on [:'--']
187 end
188 $options = Trollop::with_standard_exception_handling p do
189   p.parse ARGV
190 end
191 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
192
193 if $options[:instance]
194   if $options[:template] or $options[:submit]
195     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
196   end
197 elsif not $options[:template]
198   abort "#{$0}: syntax error: you must supply a --template or --instance."
199 end
200
201 if $options[:run_here] == $options[:submit]
202   abort "#{$0}: syntax error: you must supply either --run-here or --submit."
203 end
204
205 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
206
207 module Kernel
208   def suppress_warnings
209     original_verbosity = $VERBOSE
210     $VERBOSE = nil
211     result = yield
212     $VERBOSE = original_verbosity
213     return result
214   end
215 end
216
217 if ENV['ARVADOS_API_HOST_INSECURE']
218   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
219 end
220
221 # Set up the API client.
222
223 $client ||= Google::APIClient.
224   new(:host => $arvados_api_host,
225       :application_name => File.split($0).last,
226       :application_version => $application_version.to_s)
227 $arvados = $client.discovered_api('arvados', $arvados_api_version)
228
229
230 class PipelineInstance
231   def self.find(uuid)
232     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
233                              :parameters => {
234                                :uuid => uuid
235                              },
236                              :body => {
237                                :api_token => ENV['ARVADOS_API_TOKEN']
238                              },
239                              :authenticated => false)
240     j = JSON.parse result.body, :symbolize_names => true
241     unless j.is_a? Hash and j[:uuid]
242       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
243       nil
244     else
245       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
246       self.new(j)
247     end
248   end
249   def self.create(attributes)
250     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
251                              :body => {
252                                :api_token => ENV['ARVADOS_API_TOKEN'],
253                                :pipeline_instance => attributes
254                              },
255                              :authenticated => false)
256     j = JSON.parse result.body, :symbolize_names => true
257     unless j.is_a? Hash and j[:uuid]
258       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
259     end
260     debuglog "Created pipeline instance: #{j[:uuid]}"
261     self.new(j)
262   end
263   def save
264     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
265                              :parameters => {
266                                :uuid => @pi[:uuid]
267                              },
268                              :body => {
269                                :api_token => ENV['ARVADOS_API_TOKEN'],
270                                :pipeline_instance => @attributes_to_update.to_json
271                              },
272                              :authenticated => false)
273     j = JSON.parse result.body, :symbolize_names => true
274     unless j.is_a? Hash and j[:uuid]
275       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
276       nil
277     else
278       @attributes_to_update = {}
279       @pi = j
280     end
281   end
282   def []=(x,y)
283     @attributes_to_update[x] = y
284     @pi[x] = y
285   end
286   def [](x)
287     @pi[x]
288   end
289   protected
290   def initialize(j)
291     @attributes_to_update = {}
292     @pi = j
293   end
294 end
295
296 class JobCache
297   def self.get(uuid)
298     @cache ||= {}
299     result = $client.execute(:api_method => $arvados.jobs.get,
300                              :parameters => {
301                                :api_token => ENV['ARVADOS_API_TOKEN'],
302                                :uuid => uuid
303                              },
304                              :authenticated => false)
305     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
306   end
307   def self.where(conditions)
308     result = $client.execute(:api_method => $arvados.jobs.list,
309                              :parameters => {
310                                :api_token => ENV['ARVADOS_API_TOKEN'],
311                                :limit => 10000,
312                                :where => conditions.to_json
313                              },
314                              :authenticated => false)
315     list = JSON.parse result.body, :symbolize_names => true
316     if list and list[:items].is_a? Array
317       list[:items]
318     else
319       []
320     end
321   end
322   def self.create(attributes)
323     @cache ||= {}
324     result = $client.execute(:api_method => $arvados.jobs.create,
325                              :parameters => {
326                                :api_token => ENV['ARVADOS_API_TOKEN'],
327                                :job => attributes.to_json
328                              },
329                              :authenticated => false)
330     j = JSON.parse result.body, :symbolize_names => true
331     if j.is_a? Hash and j[:uuid]
332       @cache[j[:uuid]] = j
333     else
334       debuglog "create job: #{j[:errors] rescue nil}", 0
335       nil
336     end
337   end
338 end
339
340 class WhRunPipelineInstance
341   attr_reader :instance
342
343   def initialize(_options)
344     @options = _options
345   end
346
347   def fetch_template(template)
348     if template.match /[^-0-9a-z]/
349       # Doesn't look like a uuid -- use it as a filename.
350       @template = JSON.parse File.read(template), :symbolize_names => true
351       if !@template[:components]
352         abort ("#{$0}: Template loaded from #{template} " +
353                "does not have a \"components\" key")
354       end
355     else
356       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
357                                :parameters => {
358                                  :api_token => ENV['ARVADOS_API_TOKEN'],
359                                  :uuid => template
360                                },
361                                :authenticated => false)
362       @template = JSON.parse result.body, :symbolize_names => true
363       if !@template[:uuid]
364         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
365       end
366     end
367     self
368   end
369
370   def fetch_instance(instance_uuid)
371     @instance = PipelineInstance.find(instance_uuid)
372     @template = @instance
373     self
374   end
375
376   def apply_parameters(params_args)
377     params_args.shift if params_args[0] == '--'
378     params = {}
379     while !params_args.empty?
380       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
381         params[re[2]] = re[3]
382         params_args.shift
383       elsif params_args.size > 1
384         param = params_args.shift.sub /^--/, ''
385         params[param] = params_args.shift
386       else
387         abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
388       end
389     end
390
391     @components = @template[:components].dup
392
393     errors = []
394     @components.each do |componentname, component|
395       component[:script_parameters].each do |parametername, parameter|
396         parameter = { :value => parameter } unless parameter.is_a? Hash
397         value =
398           (params["#{componentname}::#{parametername}"] ||
399            parameter[:value] ||
400            (parameter[:output_of].nil? &&
401             (params[parametername.to_s] ||
402              parameter[:default])) ||
403            nil)
404         if value.nil? and
405             ![false,'false',0,'0'].index parameter[:required]
406           if parameter[:output_of]
407             next
408           end
409           errors << [componentname, parametername, "required parameter is missing"]
410         end
411         debuglog "parameter #{componentname}::#{parametername} == #{value}"
412         component[:script_parameters][parametername] = value
413       end
414     end
415     if !errors.empty?
416       abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
417     end
418     debuglog "options=" + @options.pretty_inspect
419     self
420   end
421
422   def setup_instance
423     @instance ||= PipelineInstance.
424       create(:components => @components,
425              :pipeline_template_uuid => @template[:uuid],
426              :active => true)
427     self
428   end
429
430   def run
431     moretodo = true
432     while moretodo
433       moretodo = false
434       @components.each do |cname, c|
435         job = nil
436         if !c[:job] and
437             c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
438           # Job is fully specified (all parameter values are present) but
439           # no particular job has been found.
440
441           debuglog "component #{cname} ready to satisfy."
442
443           c.delete :wait
444           second_place_job = nil # satisfies component, but not finished yet
445
446           (@options[:no_reuse] ? [] : JobCache.
447            where(script: c[:script],
448                  script_parameters: c[:script_parameters],
449                  script_version_descends_from: c[:script_version])
450            ).each do |candidate_job|
451             candidate_params_downcase = Hash[candidate_job[:script_parameters].
452                                              map { |k,v| [k.downcase,v] }]
453             c_params_downcase = Hash[c[:script_parameters].
454                                      map { |k,v| [k.downcase,v] }]
455
456             debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
457
458             unless candidate_params_downcase == c_params_downcase
459               next
460             end
461
462             if c[:script_version] !=
463                 candidate_job[:script_version][0,c[:script_version].length]
464               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
465               next
466             end
467
468             unless candidate_job[:success] || candidate_job[:running] ||
469                 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
470               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
471               next
472             end
473
474             if candidate_job[:success]
475               unless @options[:no_reuse_finished]
476                 job = candidate_job
477                 $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
478                 c[:job] = job
479               end
480             else
481               second_place_job ||= candidate_job
482             end
483             break
484           end
485           if not c[:job] and second_place_job
486             job = second_place_job
487             $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
488             c[:job] = job
489           end
490           if not c[:job]
491             debuglog "component #{cname} not satisfied by any existing job."
492             if !@options[:dry_run]
493               debuglog "component #{cname} new job."
494               job = JobCache.create(:script => c[:script],
495                                     :script_parameters => c[:script_parameters],
496                                     :runtime_constraints => c[:runtime_constraints] || {},
497                                     :script_version => c[:script_version] || 'master')
498               if job
499                 debuglog "component #{cname} new job #{job[:uuid]}"
500                 c[:job] = job
501               else
502                 debuglog "component #{cname} new job failed"
503               end
504             end
505           end
506         else
507           c[:wait] = true
508         end
509         if c[:job] and c[:job][:uuid]
510           if (c[:job][:running] or
511               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
512             c[:job] = JobCache.get(c[:job][:uuid])
513           end
514           if c[:job][:success]
515             # Populate script_parameters of other components waiting for
516             # this job
517             @components.each do |c2name, c2|
518               c2[:script_parameters].each do |pname, p|
519                 if p.is_a? Hash and p[:output_of] == cname.to_s
520                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
521                   c2[:script_parameters][pname] = c[:job][:output]
522                   moretodo = true
523                 end
524               end
525             end
526           elsif c[:job][:running] ||
527               (!c[:job][:started_at] && !c[:job][:cancelled_at])
528             moretodo = true
529           elsif c[:job][:cancelled_at]
530             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
531           end
532         end
533       end
534       @instance[:components] = @components
535       @instance[:active] = moretodo
536       report_status
537
538       if @options[:no_wait]
539         moretodo = false
540       end
541
542       if moretodo
543         begin
544           sleep 10
545         rescue Interrupt
546           debuglog "interrupt", 0
547           abort
548         end
549       end
550     end
551
552     ended = 0
553     succeeded = 0
554     failed = 0
555     @components.each do |cname, c|
556       if c[:job]
557         if c[:job][:finished_at]
558           ended += 1
559           if c[:job][:success] == true
560             succeeded += 1
561           elsif c[:job][:success] == false
562             failed += 1
563           end
564         end
565       end
566     end
567     
568     if ended == @components.length or failed > 0
569       @instance[:active] = false
570       @instance[:success] = (succeeded == @components.length)
571     end
572
573     @instance.save
574   end
575
576   def cleanup
577     if @instance
578       @instance[:active] = false
579       @instance.save
580     end
581   end
582
583   def uuid
584     @instance[:uuid]
585   end
586
587   protected
588
589   def report_status
590     @instance.save
591
592     if @options[:status_json] != '/dev/null'
593       File.open(@options[:status_json], 'w') do |f|
594         f.puts @components.pretty_inspect
595       end
596     end
597
598     if @options[:status_text] != '/dev/null'
599       File.open(@options[:status_text], 'w') do |f|
600         f.puts ""
601         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
602         namewidth = @components.collect { |cname, c| cname.size }.max
603         @components.each do |cname, c|
604           jstatus = if !c[:job]
605                       "-"
606                     elsif c[:job][:running]
607                       "#{c[:job][:tasks_summary].inspect}"
608                     elsif c[:job][:success]
609                       c[:job][:output]
610                     elsif c[:job][:cancelled_at]
611                       "cancelled #{c[:job][:cancelled_at]}"
612                     elsif c[:job][:finished_at]
613                       "failed #{c[:job][:finished_at]}"
614                     elsif c[:job][:started_at]
615                       "started #{c[:job][:started_at]}"
616                     else
617                       "queued #{c[:job][:created_at]}"
618                     end
619           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
620         end
621       end
622     end
623   end
624 end
625
626 runner = WhRunPipelineInstance.new($options)
627 begin
628   if $options[:template]
629     runner.fetch_template($options[:template])
630   else
631     runner.fetch_instance($options[:instance])
632   end
633   runner.apply_parameters(p.leftovers)
634   runner.setup_instance
635   if $options[:submit]
636     runner.instance.save
637     puts runner.instance[:uuid]
638   else
639     runner.run
640   end
641 rescue Exception => e
642   runner.cleanup
643   raise e
644 end