Merge branch '2756-eventbus-in-workbench' into 2678-owned_by
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # == Parameters
46 #
47 # [param_name=param_value]
48 #
49 # [param_name param_value] Set (or override) the default value for
50 #                          every parameter with the given name.
51 #
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 #                                            parameter for a single
57 #                                            component.
58 #
59 class WhRunPipelineInstance
60 end
61
62 $application_version = 1.0
63
64 if RUBY_VERSION < '1.9.3' then
65   abort <<-EOS
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
67   EOS
68 end
69
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
75
76 begin
77   require 'arvados'
78   require 'rubygems'
79   require 'json'
80   require 'pp'
81   require 'trollop'
82   require 'google/api_client'
83 rescue LoadError => l
84   puts $:
85   abort <<-EOS
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
89   EOS
90 end
91
92 def debuglog(message, verbosity=1)
93   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
94 end
95
96 module Kernel
97   def suppress_warnings
98     original_verbosity = $VERBOSE
99     $VERBOSE = nil
100     result = yield
101     $VERBOSE = original_verbosity
102     return result
103   end
104 end
105
106 if $arvados_api_host.match /local/
107   # You probably don't care about SSL certificate checks if you're
108   # testing with a dev server.
109   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
110 end
111
112 class Google::APIClient
113   def discovery_document(api, version)
114     api = api.to_s
115     return @discovery_documents["#{api}:#{version}"] ||=
116       begin
117         response = self.execute!(
118                                  :http_method => :get,
119                                  :uri => self.discovery_uri(api, version),
120                                  :authenticated => false
121                                  )
122         response.body.class == String ? JSON.parse(response.body) : response.body
123       end
124   end
125 end
126
127
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
130
131 p = Trollop::Parser.new do
132   version __FILE__
133   opt(:dry_run,
134       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
135       :type => :boolean,
136       :short => :n)
137   opt(:status_text,
138       "Store plain text status in given file.",
139       :short => :none,
140       :type => :string,
141       :default => '/dev/stdout')
142   opt(:status_json,
143       "Store json-formatted pipeline in given file.",
144       :short => :none,
145       :type => :string,
146       :default => '/dev/null')
147   opt(:no_wait,
148       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
149       :short => :none,
150       :type => :boolean)
151   opt(:no_reuse,
152       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
153       :short => :none,
154       :type => :boolean)
155   opt(:debug,
156       "Print extra debugging information on stderr.",
157       :type => :boolean)
158   opt(:debug_level,
159       "Set debug verbosity level.",
160       :short => :none,
161       :type => :integer)
162   opt(:template,
163       "UUID of pipeline template, or path to local pipeline template file.",
164       :short => :none,
165       :type => :string)
166   opt(:instance,
167       "UUID of pipeline instance.",
168       :short => :none,
169       :type => :string)
170   opt(:submit,
171       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
172       :short => :none,
173       :type => :boolean)
174   opt(:run_here,
175       "Manage the pipeline in process.",
176       :short => :none,
177       :type => :boolean)
178   stop_on [:'--']
179 end
180 $options = Trollop::with_standard_exception_handling p do
181   p.parse ARGV
182 end
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
184
185 if $options[:instance]
186   if $options[:template] or $options[:submit]
187     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
188   end
189 elsif not $options[:template]
190   abort "#{$0}: syntax error: you must supply a --template or --instance."
191 end
192
193 if $options[:run_here] == $options[:submit]
194   abort "#{$0}: syntax error: you must supply either --run-here or --submit."
195 end
196
197 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
198
199 module Kernel
200   def suppress_warnings
201     original_verbosity = $VERBOSE
202     $VERBOSE = nil
203     result = yield
204     $VERBOSE = original_verbosity
205     return result
206   end
207 end
208
209 if ENV['ARVADOS_API_HOST_INSECURE']
210   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
211 end
212
213 # Set up the API client.
214
215 $client ||= Google::APIClient.
216   new(:host => $arvados_api_host,
217       :application_name => File.split($0).last,
218       :application_version => $application_version.to_s)
219 $arvados = $client.discovered_api('arvados', $arvados_api_version)
220 $arv = Arvados.new api_version: 'v1'
221
222
223 class PipelineInstance
224   def self.find(uuid)
225     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
226                              :parameters => {
227                                :uuid => uuid
228                              },
229                              :authenticated => false,
230                              :headers => {
231                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
232                              })
233     j = JSON.parse result.body, :symbolize_names => true
234     unless j.is_a? Hash and j[:uuid]
235       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
236       nil
237     else
238       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
239       self.new(j)
240     end
241   end
242   def self.create(attributes)
243     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
244                              :body => {
245                                :pipeline_instance => attributes
246                              },
247                              :authenticated => false,
248                              :headers => {
249                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
250                              })
251     j = JSON.parse result.body, :symbolize_names => true
252     unless j.is_a? Hash and j[:uuid]
253       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
254     end
255     debuglog "Created pipeline instance: #{j[:uuid]}"
256     self.new(j)
257   end
258   def save
259     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
260                              :parameters => {
261                                :uuid => @pi[:uuid]
262                              },
263                              :body => {
264                                :pipeline_instance => @attributes_to_update.to_json
265                              },
266                              :authenticated => false,
267                              :headers => {
268                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
269                              })
270     j = JSON.parse result.body, :symbolize_names => true
271     unless j.is_a? Hash and j[:uuid]
272       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
273       nil
274     else
275       @attributes_to_update = {}
276       @pi = j
277     end
278   end
279   def []=(x,y)
280     @attributes_to_update[x] = y
281     @pi[x] = y
282   end
283   def [](x)
284     @pi[x]
285   end
286   protected
287   def initialize(j)
288     @attributes_to_update = {}
289     @pi = j
290   end
291 end
292
293 class JobCache
294   def self.get(uuid)
295     @cache ||= {}
296     result = $client.execute(:api_method => $arvados.jobs.get,
297                              :parameters => {
298                                :uuid => uuid
299                              },
300                              :authenticated => false,
301                              :headers => {
302                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
303                              })
304     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
305   end
306   def self.where(conditions)
307     result = $client.execute(:api_method => $arvados.jobs.list,
308                              :parameters => {
309                                :limit => 10000,
310                                :where => conditions.to_json
311                              },
312                              :authenticated => false,
313                              :headers => {
314                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
315                              })
316     list = JSON.parse result.body, :symbolize_names => true
317     if list and list[:items].is_a? Array
318       list[:items]
319     else
320       []
321     end
322   end
323   def self.create(job, create_params)
324     @cache ||= {}
325     result = $client.execute(:api_method => $arvados.jobs.create,
326                              :body => {
327                                :job => job.to_json
328                              }.merge(create_params),
329                              :authenticated => false,
330                              :headers => {
331                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
332                              })
333     j = JSON.parse result.body, :symbolize_names => true
334     if j.is_a? Hash and j[:uuid]
335       @cache[j[:uuid]] = j
336     else
337       debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
338       nil
339     end
340   end
341 end
342
343 class WhRunPipelineInstance
344   attr_reader :instance
345
346   def initialize(_options)
347     @options = _options
348   end
349
350   def fetch_template(template)
351     if template.match /[^-0-9a-z]/
352       # Doesn't look like a uuid -- use it as a filename.
353       @template = JSON.parse File.read(template), :symbolize_names => true
354       if !@template[:components]
355         abort ("#{$0}: Template loaded from #{template} " +
356                "does not have a \"components\" key")
357       end
358     else
359       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
360                                :parameters => {
361                                  :uuid => template
362                                },
363                                :authenticated => false,
364                                :headers => {
365                                  authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
366                                })
367       @template = JSON.parse result.body, :symbolize_names => true
368       if !@template[:uuid]
369         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
370       end
371     end
372     self
373   end
374
375   def fetch_instance(instance_uuid)
376     @instance = PipelineInstance.find(instance_uuid)
377     @template = @instance
378     self
379   end
380
381   def apply_parameters(params_args)
382     params_args.shift if params_args[0] == '--'
383     params = {}
384     while !params_args.empty?
385       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
386         params[re[2]] = re[3]
387         params_args.shift
388       elsif params_args.size > 1
389         param = params_args.shift.sub /^--/, ''
390         params[param] = params_args.shift
391       else
392         abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
393       end
394     end
395
396     @components = @template[:components].dup
397
398     errors = []
399     @components.each do |componentname, component|
400       component[:script_parameters].each do |parametername, parameter|
401         parameter = { :value => parameter } unless parameter.is_a? Hash
402         value =
403           (params["#{componentname}::#{parametername}"] ||
404            parameter[:value] ||
405            (parameter[:output_of].nil? &&
406             (params[parametername.to_s] ||
407              parameter[:default])) ||
408            nil)
409         if value.nil? and
410             ![false,'false',0,'0'].index parameter[:required]
411           if parameter[:output_of]
412             next
413           end
414           errors << [componentname, parametername, "required parameter is missing"]
415         end
416         debuglog "parameter #{componentname}::#{parametername} == #{value}"
417         component[:script_parameters][parametername] = value
418       end
419     end
420     if !errors.empty?
421       abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
422     end
423     debuglog "options=" + @options.pretty_inspect
424     self
425   end
426
427   def setup_instance
428     if $options[:submit]
429       @instance ||= PipelineInstance.
430         create(:components => @components,
431               :pipeline_template_uuid => @template[:uuid],
432               :state => 'New')
433     else
434       @instance ||= PipelineInstance.
435         create(:components => @components,
436              :pipeline_template_uuid => @template[:uuid],
437              :state => 'RunningOnClient')
438     end
439     self
440   end
441
442   def run
443     moretodo = true
444     interrupted = false
445
446     # check if the pipeline os owned by a group
447     owner_uuid = nil
448     group_result = $client.execute(:api_method => $arvados.groups.get,
449                                :parameters => {
450                                  :uuid => @instance[:owner_uuid]
451                                },
452                                :authenticated => false,
453                                :headers => {
454                                  authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
455                                })
456     group = JSON.parse group_result.body, :symbolize_names => true
457     if group[:uuid]
458       owner_uuid = group[:uuid]
459     end
460
461     while moretodo
462       moretodo = false
463       @components.each do |cname, c|
464         job = nil
465         # Is the job satisfying this component already known to be
466         # finished? (Already meaning "before we query API server about
467         # the job's current state")
468         c_already_finished = (c[:job] &&
469                               c[:job][:uuid] &&
470                               !c[:job][:success].nil?)
471         if !c[:job] and
472             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
473           # No job yet associated with this component and is component inputs
474           # are fully specified (any output_of script_parameters are resolved
475           # to real value)
476           job = JobCache.create({
477             :script => c[:script],
478             :script_parameters => c[:script_parameters],
479             :script_version => c[:script_version],
480             :repository => c[:repository],
481             :nondeterministic => c[:nondeterministic],
482             :output_is_persistent => c[:output_is_persistent] || false,
483             :owner_uuid => owner_uuid,
484             # TODO: Delete the following three attributes when
485             # supporting pre-20140418 API servers is no longer
486             # important. New API servers take these as flags that
487             # control behavior of create, rather than job attributes.
488             :minimum_script_version => c[:minimum_script_version],
489             :exclude_script_versions => c[:exclude_minimum_script_versions],
490             :no_reuse => @options[:no_reuse] || c[:nondeterministic],
491           }, {
492             # This is the right place to put these attributes when
493             # dealing with new API servers.
494             :minimum_script_version => c[:minimum_script_version],
495             :exclude_script_versions => c[:exclude_minimum_script_versions],
496             :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
497           })
498           if job
499             debuglog "component #{cname} new job #{job[:uuid]}"
500             c[:job] = job
501           else
502             debuglog "component #{cname} new job failed"
503           end
504         end
505
506         if c[:job] and c[:job][:uuid]
507           if (c[:job][:running] or
508               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
509             # Job is running so update copy of job record
510             c[:job] = JobCache.get(c[:job][:uuid])
511           end
512
513           if c[:job][:success]
514             # Populate script_parameters of other components waiting for
515             # this job
516             @components.each do |c2name, c2|
517               c2[:script_parameters].each do |pname, p|
518                 if p.is_a? Hash and p[:output_of] == cname.to_s
519                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
520                   c2[:script_parameters][pname] = c[:job][:output]
521                   moretodo = true
522                 end
523               end
524             end
525             unless c_already_finished
526               # This is my first time discovering that the job
527               # succeeded. (At the top of this loop, I was still
528               # waiting for it to finish.)
529               if c[:output_is_persistent]
530                 # I need to make sure a resources/wants link is in
531                 # place to protect the output from garbage
532                 # collection. (Normally Crunch does this for me, but
533                 # here I might be reusing the output of someone else's
534                 # job and I need to make sure it's understood that the
535                 # output is valuable to me, too.)
536                 wanted = c[:job][:output]
537                 debuglog "checking for existing persistence link for #{wanted}"
538                 @my_user_uuid ||= $arv.user.current[:uuid]
539                 links = $arv.link.list(limit: 1,
540                                        filters:
541                                        [%w(link_class = resources),
542                                         %w(name = wants),
543                                         %w(tail_uuid =) + [@my_user_uuid],
544                                         %w(head_uuid =) + [wanted]
545                                        ])[:items]
546                 if links.any?
547                   debuglog "link already exists, uuid #{links.first[:uuid]}"
548                 else
549                   newlink = $arv.link.create link: \
550                   {
551                     link_class: 'resources',
552                     name: 'wants',
553                     tail_kind: 'arvados#user',
554                     tail_uuid: @my_user_uuid,
555                     head_kind: 'arvados#collection',
556                     head_uuid: wanted,
557                     owner_uuid: owner_uuid
558                   }
559                   debuglog "added link, uuid #{newlink[:uuid]}"
560                 end
561               end
562             end
563           elsif c[:job][:running] ||
564               (!c[:job][:started_at] && !c[:job][:cancelled_at])
565             # Job is still running
566             moretodo = true
567           elsif c[:job][:cancelled_at]
568             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
569           end
570         end
571       end
572       @instance[:components] = @components
573       report_status
574
575       if @options[:no_wait]
576         moretodo = false
577       end
578
579       if moretodo
580         begin
581           sleep 10
582         rescue Interrupt
583           debuglog "interrupt", 0
584           interrupted = true
585           break
586         end
587       end
588     end
589
590     ended = 0
591     succeeded = 0
592     failed = 0
593     @components.each do |cname, c|
594       if c[:job]
595         if c[:job][:finished_at]
596           ended += 1
597           if c[:job][:success] == true
598             succeeded += 1
599           elsif c[:job][:success] == false
600             failed += 1
601           end
602         end
603       end
604     end
605
606     success = (succeeded == @components.length)
607
608     if interrupted
609      if success
610         @instance[:state] = 'Complete'
611      else
612         @instance[:state] = 'Paused'
613       end
614     else
615       if ended == @components.length or failed > 0
616         @instance[:state] = success ? 'Complete' : 'Failed'
617       end
618     end
619
620     # set components_summary
621     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
622     @instance[:components_summary] = components_summary
623
624     @instance.save
625   end
626
627   def cleanup
628     if @instance and @instance[:state] == 'RunningOnClient'
629       @instance[:state] = 'Paused'
630       @instance.save
631     end
632   end
633
634   def uuid
635     @instance[:uuid]
636   end
637
638   protected
639
640   def report_status
641     @instance.save
642
643     if @options[:status_json] != '/dev/null'
644       File.open(@options[:status_json], 'w') do |f|
645         f.puts @components.pretty_inspect
646       end
647     end
648
649     if @options[:status_text] != '/dev/null'
650       File.open(@options[:status_text], 'w') do |f|
651         f.puts ""
652         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
653         namewidth = @components.collect { |cname, c| cname.size }.max
654         @components.each do |cname, c|
655           jstatus = if !c[:job]
656                       "-"
657                     elsif c[:job][:running]
658                       "#{c[:job][:tasks_summary].inspect}"
659                     elsif c[:job][:success]
660                       c[:job][:output]
661                     elsif c[:job][:cancelled_at]
662                       "cancelled #{c[:job][:cancelled_at]}"
663                     elsif c[:job][:finished_at]
664                       "failed #{c[:job][:finished_at]}"
665                     elsif c[:job][:started_at]
666                       "started #{c[:job][:started_at]}"
667                     else
668                       "queued #{c[:job][:created_at]}"
669                     end
670           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
671         end
672       end
673     end
674   end
675 end
676
677 runner = WhRunPipelineInstance.new($options)
678 begin
679   if $options[:template]
680     runner.fetch_template($options[:template])
681   else
682     runner.fetch_instance($options[:instance])
683   end
684   runner.apply_parameters(p.leftovers)
685   runner.setup_instance
686   if $options[:submit]
687     runner.instance.save
688     puts runner.instance[:uuid]
689   else
690     runner.run
691   end
692 rescue Exception => e
693   runner.cleanup
694   raise e
695 end