Merge branch '1965-fuse-support-directories'
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--create-instance-only] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
32 #                       pipeline components. Always submit a new job
33 #                       or use an existing job which has not yet
34 #                       finished.
35 #
36 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
37 #              components. Submit a new job for every component.
38 #
39 # [--debug] Print extra debugging information on stderr.
40 #
41 # [--debug-level N] Increase amount of debugging information. Default
42 #                   1, possible range 0..3.
43 #
44 # [--status-text path] Print plain text status report to a file or
45 #                      fifo. Default: /dev/stdout
46 #
47 # [--status-json path] Print JSON status report to a file or
48 #                      fifo. Default: /dev/null
49 #
50 # == Parameters
51 #
52 # [param_name=param_value]
53 #
54 # [param_name param_value] Set (or override) the default value for
55 #                          every parameter with the given name.
56 #
57 # [component_name::param_name=param_value]
58 # [component_name::param_name param_value]
59 # [--component_name::param_name=param_value]
60 # [--component_name::param_name param_value] Set the value of a
61 #                                            parameter for a single
62 #                                            component.
63 #
64 class WhRunPipelineInstance
65 end
66
67 $application_version = 1.0
68
69 if RUBY_VERSION < '1.9.3' then
70   abort <<-EOS
71 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
72   EOS
73 end
74
75 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
76 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
77   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
78 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
79   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
80
81 begin
82   require 'rubygems'
83   require 'google/api_client'
84   require 'json'
85   require 'pp'
86   require 'trollop'
87 rescue LoadError
88   abort <<-EOS
89 #{$0}: fatal: some runtime dependencies are missing.
90 Try: gem install pp google-api-client json trollop
91   EOS
92 end
93
94 def debuglog(message, verbosity=1)
95   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
96 end
97
98 module Kernel
99   def suppress_warnings
100     original_verbosity = $VERBOSE
101     $VERBOSE = nil
102     result = yield
103     $VERBOSE = original_verbosity
104     return result
105   end
106 end
107
108 if $arvados_api_host.match /local/
109   # You probably don't care about SSL certificate checks if you're
110   # testing with a dev server.
111   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 end
113
114 class Google::APIClient
115   def discovery_document(api, version)
116     api = api.to_s
117     return @discovery_documents["#{api}:#{version}"] ||=
118       begin
119         response = self.execute!(
120                                  :http_method => :get,
121                                  :uri => self.discovery_uri(api, version),
122                                  :authenticated => false
123                                  )
124         response.body.class == String ? JSON.parse(response.body) : response.body
125       end
126   end
127 end
128
129
130 # Parse command line options (the kind that control the behavior of
131 # this program, that is, not the pipeline component parameters).
132
133 p = Trollop::Parser.new do
134   version __FILE__
135   opt(:dry_run,
136       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
137       :type => :boolean,
138       :short => :n)
139   opt(:status_text,
140       "Store plain text status in given file.",
141       :short => :none,
142       :type => :string,
143       :default => '/dev/stdout')
144   opt(:status_json,
145       "Store json-formatted pipeline in given file.",
146       :short => :none,
147       :type => :string,
148       :default => '/dev/null')
149   opt(:no_wait,
150       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
151       :short => :none,
152       :type => :boolean)
153   opt(:no_reuse_finished,
154       "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
155       :short => :none,
156       :type => :boolean)
157   opt(:no_reuse,
158       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
159       :short => :none,
160       :type => :boolean)
161   opt(:debug,
162       "Print extra debugging information on stderr.",
163       :type => :boolean)
164   opt(:debug_level,
165       "Set debug verbosity level.",
166       :short => :none,
167       :type => :integer)
168   opt(:template,
169       "UUID of pipeline template, or path to local pipeline template file.",
170       :short => :none,
171       :type => :string)
172   opt(:instance,
173       "UUID of pipeline instance.",
174       :short => :none,
175       :type => :string)
176   opt(:create_instance_only,
177       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
178       :short => :none,
179       :type => :boolean)
180   stop_on [:'--']
181 end
182 $options = Trollop::with_standard_exception_handling p do
183   p.parse ARGV
184 end
185 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
186
187 if $options[:instance]
188   if $options[:template] or $options[:create_instance_only]
189     abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
190   end
191 elsif not $options[:template]
192   abort "#{$0}: syntax error: you must supply a --template or --instance."
193 end
194
195 # Set up the API client.
196
197 $client ||= Google::APIClient.
198   new(:host => $arvados_api_host,
199       :application_name => File.split($0).last,
200       :application_version => $application_version.to_s)
201 $arvados = $client.discovered_api('arvados', $arvados_api_version)
202
203
204 class PipelineInstance
205   def self.find(uuid)
206     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
207                              :parameters => {
208                                :uuid => uuid
209                              },
210                              :body => {
211                                :api_token => ENV['ARVADOS_API_TOKEN']
212                              },
213                              :authenticated => false)
214     j = JSON.parse result.body, :symbolize_names => true
215     unless j.is_a? Hash and j[:uuid]
216       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
217       nil
218     else
219       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
220       self.new(j)
221     end
222   end
223   def self.create(attributes)
224     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
225                              :body => {
226                                :api_token => ENV['ARVADOS_API_TOKEN'],
227                                :pipeline_instance => attributes
228                              },
229                              :authenticated => false)
230     j = JSON.parse result.body, :symbolize_names => true
231     unless j.is_a? Hash and j[:uuid]
232       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
233     end
234     debuglog "Created pipeline instance: #{j[:uuid]}"
235     self.new(j)
236   end
237   def save
238     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
239                              :parameters => {
240                                :uuid => @pi[:uuid]
241                              },
242                              :body => {
243                                :api_token => ENV['ARVADOS_API_TOKEN'],
244                                :pipeline_instance => @attributes_to_update.to_json
245                              },
246                              :authenticated => false)
247     j = JSON.parse result.body, :symbolize_names => true
248     unless j.is_a? Hash and j[:uuid]
249       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
250       nil
251     else
252       @attributes_to_update = {}
253       @pi = j
254     end
255   end
256   def []=(x,y)
257     @attributes_to_update[x] = y
258     @pi[x] = y
259   end
260   def [](x)
261     @pi[x]
262   end
263   protected
264   def initialize(j)
265     @attributes_to_update = {}
266     @pi = j
267   end
268 end
269
270 class JobCache
271   def self.get(uuid)
272     @cache ||= {}
273     result = $client.execute(:api_method => $arvados.jobs.get,
274                              :parameters => {
275                                :api_token => ENV['ARVADOS_API_TOKEN'],
276                                :uuid => uuid
277                              },
278                              :authenticated => false)
279     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
280   end
281   def self.where(conditions)
282     result = $client.execute(:api_method => $arvados.jobs.list,
283                              :parameters => {
284                                :api_token => ENV['ARVADOS_API_TOKEN'],
285                                :limit => 10000,
286                                :where => conditions.to_json
287                              },
288                              :authenticated => false)
289     list = JSON.parse result.body, :symbolize_names => true
290     if list and list[:items].is_a? Array
291       list[:items]
292     else
293       []
294     end
295   end
296   def self.create(attributes)
297     @cache ||= {}
298     result = $client.execute(:api_method => $arvados.jobs.create,
299                              :parameters => {
300                                :api_token => ENV['ARVADOS_API_TOKEN'],
301                                :job => attributes.to_json
302                              },
303                              :authenticated => false)
304     j = JSON.parse result.body, :symbolize_names => true
305     if j.is_a? Hash and j[:uuid]
306       @cache[j[:uuid]] = j
307     else
308       debuglog "create job: #{j[:errors] rescue nil}", 0
309       nil
310     end
311   end
312 end
313
314 class WhRunPipelineInstance
315   attr_reader :instance
316
317   def initialize(_options)
318     @options = _options
319   end
320
321   def fetch_template(template)
322     if template.match /[^-0-9a-z]/
323       # Doesn't look like a uuid -- use it as a filename.
324       @template = JSON.parse File.read(template), :symbolize_names => true
325       if !@template[:components]
326         abort ("#{$0}: Template loaded from #{template} " +
327                "does not have a \"components\" key")
328       end
329     else
330       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
331                                :parameters => {
332                                  :api_token => ENV['ARVADOS_API_TOKEN'],
333                                  :uuid => template
334                                },
335                                :authenticated => false)
336       @template = JSON.parse result.body, :symbolize_names => true
337       if !@template[:uuid]
338         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
339       end
340     end
341     self
342   end
343
344   def fetch_instance(instance_uuid)
345     @instance = PipelineInstance.find(instance_uuid)
346     @template = @instance
347     self
348   end
349
350   def apply_parameters(params_args)
351     params_args.shift if params_args[0] == '--'
352     params = {}
353     while !params_args.empty?
354       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
355         params[re[2]] = re[3]
356         params_args.shift
357       elsif params_args.size > 1
358         param = params_args.shift.sub /^--/, ''
359         params[param] = params_args.shift
360       else
361         abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
362       end
363     end
364
365     @components = @template[:components].dup
366
367     errors = []
368     @components.each do |componentname, component|
369       component[:script_parameters].each do |parametername, parameter|
370         parameter = { :value => parameter } unless parameter.is_a? Hash
371         value =
372           (params["#{componentname}::#{parametername}"] ||
373            parameter[:value] ||
374            (parameter[:output_of].nil? &&
375             (params[parametername.to_s] ||
376              parameter[:default])) ||
377            nil)
378         if value.nil? and
379             ![false,'false',0,'0'].index parameter[:required]
380           if parameter[:output_of]
381             next
382           end
383           errors << [componentname, parametername, "required parameter is missing"]
384         end
385         debuglog "parameter #{componentname}::#{parametername} == #{value}"
386         component[:script_parameters][parametername] = value
387       end
388     end
389     if !errors.empty?
390       abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
391     end
392     debuglog "options=" + @options.pretty_inspect
393     self
394   end
395
396   def setup_instance
397     @instance ||= PipelineInstance.
398       create(:components => @components,
399              :pipeline_template_uuid => @template[:uuid],
400              :active => true)
401     self
402   end
403
404   def run
405     moretodo = true
406     while moretodo
407       moretodo = false
408       @components.each do |cname, c|
409         job = nil
410         if !c[:job] and
411             c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
412           # Job is fully specified (all parameter values are present) but
413           # no particular job has been found.
414
415           debuglog "component #{cname} ready to satisfy."
416
417           c.delete :wait
418           second_place_job = nil # satisfies component, but not finished yet
419
420           (@options[:no_reuse] ? [] : JobCache.
421            where(script: c[:script],
422                  script_parameters: c[:script_parameters],
423                  script_version_descends_from: c[:script_version])
424            ).each do |candidate_job|
425             candidate_params_downcase = Hash[candidate_job[:script_parameters].
426                                              map { |k,v| [k.downcase,v] }]
427             c_params_downcase = Hash[c[:script_parameters].
428                                      map { |k,v| [k.downcase,v] }]
429
430             debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
431
432             unless candidate_params_downcase == c_params_downcase
433               next
434             end
435
436             if c[:script_version] !=
437                 candidate_job[:script_version][0,c[:script_version].length]
438               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
439               next
440             end
441
442             unless candidate_job[:success] || candidate_job[:running] ||
443                 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
444               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
445               next
446             end
447
448             if candidate_job[:success]
449               unless @options[:no_reuse_finished]
450                 job = candidate_job
451                 $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
452                 c[:job] = job
453               end
454             else
455               second_place_job ||= candidate_job
456             end
457             break
458           end
459           if not c[:job] and second_place_job
460             job = second_place_job
461             $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
462             c[:job] = job
463           end
464           if not c[:job]
465             debuglog "component #{cname} not satisfied by any existing job."
466             if !@options[:dry_run]
467               debuglog "component #{cname} new job."
468               job = JobCache.create(:script => c[:script],
469                                     :script_parameters => c[:script_parameters],
470                                     :runtime_constraints => c[:runtime_constraints] || {},
471                                     :script_version => c[:script_version] || 'master')
472               if job
473                 debuglog "component #{cname} new job #{job[:uuid]}"
474                 c[:job] = job
475               else
476                 debuglog "component #{cname} new job failed"
477               end
478             end
479           end
480         else
481           c[:wait] = true
482         end
483         if c[:job] and c[:job][:uuid]
484           if (c[:job][:running] or
485               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
486             c[:job] = JobCache.get(c[:job][:uuid])
487           end
488           if c[:job][:success]
489             # Populate script_parameters of other components waiting for
490             # this job
491             @components.each do |c2name, c2|
492               c2[:script_parameters].each do |pname, p|
493                 if p.is_a? Hash and p[:output_of] == cname.to_s
494                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
495                   c2[:script_parameters][pname] = c[:job][:output]
496                 end
497               end
498             end
499           elsif c[:job][:running] ||
500               (!c[:job][:started_at] && !c[:job][:cancelled_at])
501             moretodo ||= !@options[:no_wait]
502           elsif c[:job][:cancelled_at]
503             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
504           end
505         end
506       end
507       @instance[:components] = @components
508       @instance[:active] = moretodo
509       report_status
510       if moretodo
511         begin
512           sleep 10
513         rescue Interrupt
514           debuglog "interrupt", 0
515           abort
516         end
517       end
518     end
519     @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
520     @instance.save
521   end
522
523   def cleanup
524     if @instance
525       @instance[:active] = false
526       @instance.save
527     end
528   end
529
530   def uuid
531     @instance[:uuid]
532   end
533
534   protected
535
536   def report_status
537     @instance.save
538
539     if @options[:status_json] != '/dev/null'
540       File.open(@options[:status_json], 'w') do |f|
541         f.puts @components.pretty_inspect
542       end
543     end
544
545     if @options[:status_text] != '/dev/null'
546       File.open(@options[:status_text], 'w') do |f|
547         f.puts ""
548         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
549         namewidth = @components.collect { |cname, c| cname.size }.max
550         @components.each do |cname, c|
551           jstatus = if !c[:job]
552                       "-"
553                     elsif c[:job][:running]
554                       "#{c[:job][:tasks_summary].inspect}"
555                     elsif c[:job][:success]
556                       c[:job][:output]
557                     elsif c[:job][:cancelled_at]
558                       "cancelled #{c[:job][:cancelled_at]}"
559                     elsif c[:job][:finished_at]
560                       "failed #{c[:job][:finished_at]}"
561                     elsif c[:job][:started_at]
562                       "started #{c[:job][:started_at]}"
563                     else
564                       "queued #{c[:job][:created_at]}"
565                     end
566           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
567         end
568       end
569     end
570   end
571 end
572
573 runner = WhRunPipelineInstance.new($options)
574 begin
575   if $options[:template]
576     runner.fetch_template($options[:template])
577   else
578     runner.fetch_instance($options[:instance])
579   end
580   runner.apply_parameters(p.leftovers)
581   runner.setup_instance
582   if $options[:create_instance_only]
583     runner.instance.save
584     puts runner.instance[:uuid]
585   else
586     runner.run
587   end
588 rescue Exception => e
589   runner.cleanup
590   raise e
591 end