Use a simple 'open' for fetch_block (freeing up an extra filehandle). refs #2325...
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # == Parameters
46 #
47 # [param_name=param_value]
48 #
49 # [param_name param_value] Set (or override) the default value for
50 #                          every parameter with the given name.
51 #
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 #                                            parameter for a single
57 #                                            component.
58 #
59 class WhRunPipelineInstance
60 end
61
62 $application_version = 1.0
63
64 if RUBY_VERSION < '1.9.3' then
65   abort <<-EOS
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
67   EOS
68 end
69
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
75
76 begin
77   require 'rubygems'
78   require 'json'
79   require 'pp'
80   require 'trollop'
81   require 'google/api_client'
82 rescue LoadError => l
83   puts $:
84   abort <<-EOS
85 #{$0}: fatal: #{l.message}
86 Some runtime dependencies may be missing.
87 Try: gem install pp google-api-client json trollop
88   EOS
89 end
90
91 def debuglog(message, verbosity=1)
92   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
93 end
94
95 module Kernel
96   def suppress_warnings
97     original_verbosity = $VERBOSE
98     $VERBOSE = nil
99     result = yield
100     $VERBOSE = original_verbosity
101     return result
102   end
103 end
104
105 if $arvados_api_host.match /local/
106   # You probably don't care about SSL certificate checks if you're
107   # testing with a dev server.
108   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
109 end
110
111 class Google::APIClient
112   def discovery_document(api, version)
113     api = api.to_s
114     return @discovery_documents["#{api}:#{version}"] ||=
115       begin
116         response = self.execute!(
117                                  :http_method => :get,
118                                  :uri => self.discovery_uri(api, version),
119                                  :authenticated => false
120                                  )
121         response.body.class == String ? JSON.parse(response.body) : response.body
122       end
123   end
124 end
125
126
127 # Parse command line options (the kind that control the behavior of
128 # this program, that is, not the pipeline component parameters).
129
130 p = Trollop::Parser.new do
131   version __FILE__
132   opt(:dry_run,
133       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
134       :type => :boolean,
135       :short => :n)
136   opt(:status_text,
137       "Store plain text status in given file.",
138       :short => :none,
139       :type => :string,
140       :default => '/dev/stdout')
141   opt(:status_json,
142       "Store json-formatted pipeline in given file.",
143       :short => :none,
144       :type => :string,
145       :default => '/dev/null')
146   opt(:no_wait,
147       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
148       :short => :none,
149       :type => :boolean)
150   opt(:no_reuse,
151       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
152       :short => :none,
153       :type => :boolean)
154   opt(:debug,
155       "Print extra debugging information on stderr.",
156       :type => :boolean)
157   opt(:debug_level,
158       "Set debug verbosity level.",
159       :short => :none,
160       :type => :integer)
161   opt(:template,
162       "UUID of pipeline template, or path to local pipeline template file.",
163       :short => :none,
164       :type => :string)
165   opt(:instance,
166       "UUID of pipeline instance.",
167       :short => :none,
168       :type => :string)
169   opt(:submit,
170       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
171       :short => :none,
172       :type => :boolean)
173   opt(:run_here,
174       "Manage the pipeline in process.",
175       :short => :none,
176       :type => :boolean)
177   stop_on [:'--']
178 end
179 $options = Trollop::with_standard_exception_handling p do
180   p.parse ARGV
181 end
182 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
183
184 if $options[:instance]
185   if $options[:template] or $options[:submit]
186     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
187   end
188 elsif not $options[:template]
189   abort "#{$0}: syntax error: you must supply a --template or --instance."
190 end
191
192 if $options[:run_here] == $options[:submit]
193   abort "#{$0}: syntax error: you must supply either --run-here or --submit."
194 end
195
196 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
197
198 module Kernel
199   def suppress_warnings
200     original_verbosity = $VERBOSE
201     $VERBOSE = nil
202     result = yield
203     $VERBOSE = original_verbosity
204     return result
205   end
206 end
207
208 if ENV['ARVADOS_API_HOST_INSECURE']
209   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
210 end
211
212 # Set up the API client.
213
214 $client ||= Google::APIClient.
215   new(:host => $arvados_api_host,
216       :application_name => File.split($0).last,
217       :application_version => $application_version.to_s)
218 $arvados = $client.discovered_api('arvados', $arvados_api_version)
219
220
221 class PipelineInstance
222   def self.find(uuid)
223     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
224                              :parameters => {
225                                :uuid => uuid
226                              },
227                              :body => {
228                                :api_token => ENV['ARVADOS_API_TOKEN']
229                              },
230                              :authenticated => false)
231     j = JSON.parse result.body, :symbolize_names => true
232     unless j.is_a? Hash and j[:uuid]
233       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
234       nil
235     else
236       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
237       self.new(j)
238     end
239   end
240   def self.create(attributes)
241     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
242                              :body => {
243                                :api_token => ENV['ARVADOS_API_TOKEN'],
244                                :pipeline_instance => attributes
245                              },
246                              :authenticated => false)
247     j = JSON.parse result.body, :symbolize_names => true
248     unless j.is_a? Hash and j[:uuid]
249       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
250     end
251     debuglog "Created pipeline instance: #{j[:uuid]}"
252     self.new(j)
253   end
254   def save
255     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
256                              :parameters => {
257                                :uuid => @pi[:uuid]
258                              },
259                              :body => {
260                                :api_token => ENV['ARVADOS_API_TOKEN'],
261                                :pipeline_instance => @attributes_to_update.to_json
262                              },
263                              :authenticated => false)
264     j = JSON.parse result.body, :symbolize_names => true
265     unless j.is_a? Hash and j[:uuid]
266       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
267       nil
268     else
269       @attributes_to_update = {}
270       @pi = j
271     end
272   end
273   def []=(x,y)
274     @attributes_to_update[x] = y
275     @pi[x] = y
276   end
277   def [](x)
278     @pi[x]
279   end
280   protected
281   def initialize(j)
282     @attributes_to_update = {}
283     @pi = j
284   end
285 end
286
287 class JobCache
288   def self.get(uuid)
289     @cache ||= {}
290     result = $client.execute(:api_method => $arvados.jobs.get,
291                              :parameters => {
292                                :api_token => ENV['ARVADOS_API_TOKEN'],
293                                :uuid => uuid
294                              },
295                              :authenticated => false)
296     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
297   end
298   def self.where(conditions)
299     result = $client.execute(:api_method => $arvados.jobs.list,
300                              :parameters => {
301                                :api_token => ENV['ARVADOS_API_TOKEN'],
302                                :limit => 10000,
303                                :where => conditions.to_json
304                              },
305                              :authenticated => false)
306     list = JSON.parse result.body, :symbolize_names => true
307     if list and list[:items].is_a? Array
308       list[:items]
309     else
310       []
311     end
312   end
313   def self.create(attributes)
314     @cache ||= {}
315     result = $client.execute(:api_method => $arvados.jobs.create,
316                              :parameters => {
317                                :api_token => ENV['ARVADOS_API_TOKEN'],
318                                :job => attributes.to_json
319                              },
320                              :authenticated => false)
321     j = JSON.parse result.body, :symbolize_names => true
322     if j.is_a? Hash and j[:uuid]
323       @cache[j[:uuid]] = j
324     else
325       debuglog "create job: #{j[:errors] rescue nil} with attribute #{attributes}", 0
326       nil
327     end
328   end
329 end
330
331 class WhRunPipelineInstance
332   attr_reader :instance
333
334   def initialize(_options)
335     @options = _options
336   end
337
338   def fetch_template(template)
339     if template.match /[^-0-9a-z]/
340       # Doesn't look like a uuid -- use it as a filename.
341       @template = JSON.parse File.read(template), :symbolize_names => true
342       if !@template[:components]
343         abort ("#{$0}: Template loaded from #{template} " +
344                "does not have a \"components\" key")
345       end
346     else
347       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
348                                :parameters => {
349                                  :api_token => ENV['ARVADOS_API_TOKEN'],
350                                  :uuid => template
351                                },
352                                :authenticated => false)
353       @template = JSON.parse result.body, :symbolize_names => true
354       if !@template[:uuid]
355         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
356       end
357     end
358     self
359   end
360
361   def fetch_instance(instance_uuid)
362     @instance = PipelineInstance.find(instance_uuid)
363     @template = @instance
364     self
365   end
366
367   def apply_parameters(params_args)
368     params_args.shift if params_args[0] == '--'
369     params = {}
370     while !params_args.empty?
371       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
372         params[re[2]] = re[3]
373         params_args.shift
374       elsif params_args.size > 1
375         param = params_args.shift.sub /^--/, ''
376         params[param] = params_args.shift
377       else
378         abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
379       end
380     end
381
382     @components = @template[:components].dup
383
384     errors = []
385     @components.each do |componentname, component|
386       component[:script_parameters].each do |parametername, parameter|
387         parameter = { :value => parameter } unless parameter.is_a? Hash
388         value =
389           (params["#{componentname}::#{parametername}"] ||
390            parameter[:value] ||
391            (parameter[:output_of].nil? &&
392             (params[parametername.to_s] ||
393              parameter[:default])) ||
394            nil)
395         if value.nil? and
396             ![false,'false',0,'0'].index parameter[:required]
397           if parameter[:output_of]
398             next
399           end
400           errors << [componentname, parametername, "required parameter is missing"]
401         end
402         debuglog "parameter #{componentname}::#{parametername} == #{value}"
403         component[:script_parameters][parametername] = value
404       end
405     end
406     if !errors.empty?
407       abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
408     end
409     debuglog "options=" + @options.pretty_inspect
410     self
411   end
412
413   def setup_instance
414     @instance ||= PipelineInstance.
415       create(:components => @components,
416              :pipeline_template_uuid => @template[:uuid],
417              :active => true)
418     self
419   end
420
421   def run
422     moretodo = true
423     while moretodo
424       moretodo = false
425       @components.each do |cname, c|
426         job = nil
427
428         if !c[:job] and
429             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
430           # No job yet associated with this component and is component inputs
431           # are fully specified (any output_of script_parameters are resolved
432           # to real value)
433           job = JobCache.create({:script => c[:script],
434                             :script_parameters => c[:script_parameters],
435                             :script_version => c[:script_version],
436                             :repository => c[:repository],
437                             :minimum_script_version => c[:minimum_script_version],
438                             :exclude_script_versions => c[:exclude_minimum_script_versions],
439                             :nondeterministic => c[:nondeterministic],
440                             :no_reuse => @options[:no_reuse]})
441           if job
442             debuglog "component #{cname} new job #{job[:uuid]}"
443             c[:job] = job
444           else
445             debuglog "component #{cname} new job failed"
446           end
447         end
448
449         if c[:job] and c[:job][:uuid]
450           if (c[:job][:running] or
451               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
452             # Job is running so update copy of job record
453             c[:job] = JobCache.get(c[:job][:uuid])            
454           end
455
456           if c[:job][:success]
457             # Populate script_parameters of other components waiting for
458             # this job
459             @components.each do |c2name, c2|
460               c2[:script_parameters].each do |pname, p|
461                 if p.is_a? Hash and p[:output_of] == cname.to_s
462                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
463                   c2[:script_parameters][pname] = c[:job][:output]
464                   moretodo = true
465                 end
466               end
467             end
468           elsif c[:job][:running] ||
469               (!c[:job][:started_at] && !c[:job][:cancelled_at])
470             # Job is still running
471             moretodo = true
472           elsif c[:job][:cancelled_at]
473             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
474           end
475         end
476       end
477       @instance[:components] = @components
478       @instance[:active] = moretodo
479       report_status
480
481       if @options[:no_wait]
482         moretodo = false
483       end
484
485       if moretodo
486         begin
487           sleep 10
488         rescue Interrupt
489           debuglog "interrupt", 0
490           abort
491         end
492       end
493     end
494
495     ended = 0
496     succeeded = 0
497     failed = 0
498     @components.each do |cname, c|
499       if c[:job]
500         if c[:job][:finished_at]
501           ended += 1
502           if c[:job][:success] == true
503             succeeded += 1
504           elsif c[:job][:success] == false
505             failed += 1
506           end
507         end
508       end
509     end
510     
511     if ended == @components.length or failed > 0
512       @instance[:active] = false
513       @instance[:success] = (succeeded == @components.length)
514     end
515
516     @instance.save
517   end
518
519   def cleanup
520     if @instance
521       @instance[:active] = false
522       @instance.save
523     end
524   end
525
526   def uuid
527     @instance[:uuid]
528   end
529
530   protected
531
532   def report_status
533     @instance.save
534
535     if @options[:status_json] != '/dev/null'
536       File.open(@options[:status_json], 'w') do |f|
537         f.puts @components.pretty_inspect
538       end
539     end
540
541     if @options[:status_text] != '/dev/null'
542       File.open(@options[:status_text], 'w') do |f|
543         f.puts ""
544         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
545         namewidth = @components.collect { |cname, c| cname.size }.max
546         @components.each do |cname, c|
547           jstatus = if !c[:job]
548                       "-"
549                     elsif c[:job][:running]
550                       "#{c[:job][:tasks_summary].inspect}"
551                     elsif c[:job][:success]
552                       c[:job][:output]
553                     elsif c[:job][:cancelled_at]
554                       "cancelled #{c[:job][:cancelled_at]}"
555                     elsif c[:job][:finished_at]
556                       "failed #{c[:job][:finished_at]}"
557                     elsif c[:job][:started_at]
558                       "started #{c[:job][:started_at]}"
559                     else
560                       "queued #{c[:job][:created_at]}"
561                     end
562           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
563         end
564       end
565     end
566   end
567 end
568
569 runner = WhRunPipelineInstance.new($options)
570 begin
571   if $options[:template]
572     runner.fetch_template($options[:template])
573   else
574     runner.fetch_instance($options[:instance])
575   end
576   runner.apply_parameters(p.leftovers)
577   runner.setup_instance
578   if $options[:submit]
579     runner.instance.save
580     puts runner.instance[:uuid]
581   else
582     runner.run
583   end
584 rescue Exception => e
585   runner.cleanup
586   raise e
587 end