Make sure persistence links get added for persistent components, even
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
32 #                       pipeline components. Always submit a new job
33 #                       or use an existing job which has not yet
34 #                       finished.
35 #
36 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
37 #              components. Submit a new job for every component.
38 #
39 # [--debug] Print extra debugging information on stderr.
40 #
41 # [--debug-level N] Increase amount of debugging information. Default
42 #                   1, possible range 0..3.
43 #
44 # [--status-text path] Print plain text status report to a file or
45 #                      fifo. Default: /dev/stdout
46 #
47 # [--status-json path] Print JSON status report to a file or
48 #                      fifo. Default: /dev/null
49 #
50 # == Parameters
51 #
52 # [param_name=param_value]
53 #
54 # [param_name param_value] Set (or override) the default value for
55 #                          every parameter with the given name.
56 #
57 # [component_name::param_name=param_value]
58 # [component_name::param_name param_value]
59 # [--component_name::param_name=param_value]
60 # [--component_name::param_name param_value] Set the value of a
61 #                                            parameter for a single
62 #                                            component.
63 #
64 class WhRunPipelineInstance
65 end
66
67 $application_version = 1.0
68
69 if RUBY_VERSION < '1.9.3' then
70   abort <<-EOS
71 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
72   EOS
73 end
74
75 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
76 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
77   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
78 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
79   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
80
81 begin
82   require 'arvados'
83   require 'rubygems'
84   require 'json'
85   require 'pp'
86   require 'trollop'
87   require 'google/api_client'
88 rescue LoadError => l
89   puts $:
90   abort <<-EOS
91 #{$0}: fatal: #{l.message}
92 Some runtime dependencies may be missing.
93 Try: gem install arvados pp google-api-client json trollop
94   EOS
95 end
96
97 def debuglog(message, verbosity=1)
98   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
99 end
100
101 module Kernel
102   def suppress_warnings
103     original_verbosity = $VERBOSE
104     $VERBOSE = nil
105     result = yield
106     $VERBOSE = original_verbosity
107     return result
108   end
109 end
110
111 if $arvados_api_host.match /local/
112   # You probably don't care about SSL certificate checks if you're
113   # testing with a dev server.
114   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
115 end
116
117 class Google::APIClient
118   def discovery_document(api, version)
119     api = api.to_s
120     return @discovery_documents["#{api}:#{version}"] ||=
121       begin
122         response = self.execute!(
123                                  :http_method => :get,
124                                  :uri => self.discovery_uri(api, version),
125                                  :authenticated => false
126                                  )
127         response.body.class == String ? JSON.parse(response.body) : response.body
128       end
129   end
130 end
131
132
133 # Parse command line options (the kind that control the behavior of
134 # this program, that is, not the pipeline component parameters).
135
136 p = Trollop::Parser.new do
137   version __FILE__
138   opt(:dry_run,
139       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
140       :type => :boolean,
141       :short => :n)
142   opt(:status_text,
143       "Store plain text status in given file.",
144       :short => :none,
145       :type => :string,
146       :default => '/dev/stdout')
147   opt(:status_json,
148       "Store json-formatted pipeline in given file.",
149       :short => :none,
150       :type => :string,
151       :default => '/dev/null')
152   opt(:no_wait,
153       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
154       :short => :none,
155       :type => :boolean)
156   opt(:no_reuse_finished,
157       "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
158       :short => :none,
159       :type => :boolean)
160   opt(:no_reuse,
161       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
162       :short => :none,
163       :type => :boolean)
164   opt(:debug,
165       "Print extra debugging information on stderr.",
166       :type => :boolean)
167   opt(:debug_level,
168       "Set debug verbosity level.",
169       :short => :none,
170       :type => :integer)
171   opt(:template,
172       "UUID of pipeline template, or path to local pipeline template file.",
173       :short => :none,
174       :type => :string)
175   opt(:instance,
176       "UUID of pipeline instance.",
177       :short => :none,
178       :type => :string)
179   opt(:submit,
180       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
181       :short => :none,
182       :type => :boolean)
183   opt(:run_here,
184       "Manage the pipeline in process.",
185       :short => :none,
186       :type => :boolean)
187   stop_on [:'--']
188 end
189 $options = Trollop::with_standard_exception_handling p do
190   p.parse ARGV
191 end
192 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
193
194 if $options[:instance]
195   if $options[:template] or $options[:submit]
196     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
197   end
198 elsif not $options[:template]
199   abort "#{$0}: syntax error: you must supply a --template or --instance."
200 end
201
202 if $options[:run_here] == $options[:submit]
203   abort "#{$0}: syntax error: you must supply either --run-here or --submit."
204 end
205
206 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
207
208 module Kernel
209   def suppress_warnings
210     original_verbosity = $VERBOSE
211     $VERBOSE = nil
212     result = yield
213     $VERBOSE = original_verbosity
214     return result
215   end
216 end
217
218 if ENV['ARVADOS_API_HOST_INSECURE']
219   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
220 end
221
222 # Set up the API client.
223
224 $client ||= Google::APIClient.
225   new(:host => $arvados_api_host,
226       :application_name => File.split($0).last,
227       :application_version => $application_version.to_s)
228 $arvados = $client.discovered_api('arvados', $arvados_api_version)
229 $arv = Arvados.new api_version: 'v1'
230
231
232 class PipelineInstance
233   def self.find(uuid)
234     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
235                              :parameters => {
236                                :uuid => uuid
237                              },
238                              :body => {
239                                :api_token => ENV['ARVADOS_API_TOKEN']
240                              },
241                              :authenticated => false)
242     j = JSON.parse result.body, :symbolize_names => true
243     unless j.is_a? Hash and j[:uuid]
244       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
245       nil
246     else
247       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
248       self.new(j)
249     end
250   end
251   def self.create(attributes)
252     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
253                              :body => {
254                                :api_token => ENV['ARVADOS_API_TOKEN'],
255                                :pipeline_instance => attributes
256                              },
257                              :authenticated => false)
258     j = JSON.parse result.body, :symbolize_names => true
259     unless j.is_a? Hash and j[:uuid]
260       abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
261     end
262     debuglog "Created pipeline instance: #{j[:uuid]}"
263     self.new(j)
264   end
265   def save
266     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
267                              :parameters => {
268                                :uuid => @pi[:uuid]
269                              },
270                              :body => {
271                                :api_token => ENV['ARVADOS_API_TOKEN'],
272                                :pipeline_instance => @attributes_to_update.to_json
273                              },
274                              :authenticated => false)
275     j = JSON.parse result.body, :symbolize_names => true
276     unless j.is_a? Hash and j[:uuid]
277       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
278       nil
279     else
280       @attributes_to_update = {}
281       @pi = j
282     end
283   end
284   def []=(x,y)
285     @attributes_to_update[x] = y
286     @pi[x] = y
287   end
288   def [](x)
289     @pi[x]
290   end
291   protected
292   def initialize(j)
293     @attributes_to_update = {}
294     @pi = j
295   end
296 end
297
298 class JobCache
299   def self.get(uuid)
300     @cache ||= {}
301     result = $client.execute(:api_method => $arvados.jobs.get,
302                              :parameters => {
303                                :api_token => ENV['ARVADOS_API_TOKEN'],
304                                :uuid => uuid
305                              },
306                              :authenticated => false)
307     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
308   end
309   def self.where(conditions)
310     result = $client.execute(:api_method => $arvados.jobs.list,
311                              :parameters => {
312                                :api_token => ENV['ARVADOS_API_TOKEN'],
313                                :limit => 10000,
314                                :where => conditions.to_json
315                              },
316                              :authenticated => false)
317     list = JSON.parse result.body, :symbolize_names => true
318     if list and list[:items].is_a? Array
319       list[:items]
320     else
321       []
322     end
323   end
324   def self.create(attributes)
325     @cache ||= {}
326     result = $client.execute(:api_method => $arvados.jobs.create,
327                              :parameters => {
328                                :api_token => ENV['ARVADOS_API_TOKEN'],
329                                :job => attributes.to_json
330                              },
331                              :authenticated => false)
332     j = JSON.parse result.body, :symbolize_names => true
333     if j.is_a? Hash and j[:uuid]
334       @cache[j[:uuid]] = j
335     else
336       debuglog "create job: #{j[:errors] rescue nil}", 0
337       nil
338     end
339   end
340 end
341
342 class WhRunPipelineInstance
343   attr_reader :instance
344
345   def initialize(_options)
346     @options = _options
347   end
348
349   def fetch_template(template)
350     if template.match /[^-0-9a-z]/
351       # Doesn't look like a uuid -- use it as a filename.
352       @template = JSON.parse File.read(template), :symbolize_names => true
353       if !@template[:components]
354         abort ("#{$0}: Template loaded from #{template} " +
355                "does not have a \"components\" key")
356       end
357     else
358       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
359                                :parameters => {
360                                  :api_token => ENV['ARVADOS_API_TOKEN'],
361                                  :uuid => template
362                                },
363                                :authenticated => false)
364       @template = JSON.parse result.body, :symbolize_names => true
365       if !@template[:uuid]
366         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
367       end
368     end
369     self
370   end
371
372   def fetch_instance(instance_uuid)
373     @instance = PipelineInstance.find(instance_uuid)
374     @template = @instance
375     self
376   end
377
378   def apply_parameters(params_args)
379     params_args.shift if params_args[0] == '--'
380     params = {}
381     while !params_args.empty?
382       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
383         params[re[2]] = re[3]
384         params_args.shift
385       elsif params_args.size > 1
386         param = params_args.shift.sub /^--/, ''
387         params[param] = params_args.shift
388       else
389         abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
390       end
391     end
392
393     @components = @template[:components].dup
394
395     errors = []
396     @components.each do |componentname, component|
397       component[:script_parameters].each do |parametername, parameter|
398         parameter = { :value => parameter } unless parameter.is_a? Hash
399         value =
400           (params["#{componentname}::#{parametername}"] ||
401            parameter[:value] ||
402            (parameter[:output_of].nil? &&
403             (params[parametername.to_s] ||
404              parameter[:default])) ||
405            nil)
406         if value.nil? and
407             ![false,'false',0,'0'].index parameter[:required]
408           if parameter[:output_of]
409             next
410           end
411           errors << [componentname, parametername, "required parameter is missing"]
412         end
413         debuglog "parameter #{componentname}::#{parametername} == #{value}"
414         component[:script_parameters][parametername] = value
415       end
416     end
417     if !errors.empty?
418       abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
419     end
420     debuglog "options=" + @options.pretty_inspect
421     self
422   end
423
424   def setup_instance
425     @instance ||= PipelineInstance.
426       create(:components => @components,
427              :pipeline_template_uuid => @template[:uuid],
428              :active => true)
429     self
430   end
431
432   def run
433     moretodo = true
434     while moretodo
435       moretodo = false
436       @components.each do |cname, c|
437         job = nil
438         c_already_finished = (c[:job] &&
439                               c[:job][:uuid] &&
440                               !c[:job][:success].nil?)
441         if !c[:job] and
442             c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
443           # Job is fully specified (all parameter values are present) but
444           # no particular job has been found.
445
446           debuglog "component #{cname} ready to satisfy."
447
448           c.delete :wait
449           second_place_job = nil # satisfies component, but not finished yet
450
451           (@options[:no_reuse] ? [] : JobCache.
452            where(script: c[:script],
453                  script_parameters: c[:script_parameters],
454                  script_version_descends_from: c[:script_version])
455            ).each do |candidate_job|
456             candidate_params_downcase = Hash[candidate_job[:script_parameters].
457                                              map { |k,v| [k.downcase,v] }]
458             c_params_downcase = Hash[c[:script_parameters].
459                                      map { |k,v| [k.downcase,v] }]
460
461             debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
462
463             unless candidate_params_downcase == c_params_downcase
464               next
465             end
466
467             if c[:script_version] !=
468                 candidate_job[:script_version][0,c[:script_version].length]
469               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
470               next
471             end
472
473             unless candidate_job[:success] || candidate_job[:running] ||
474                 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
475               debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
476               next
477             end
478
479             if candidate_job[:success]
480               unless @options[:no_reuse_finished]
481                 job = candidate_job
482                 $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
483                 c[:job] = job
484               end
485             else
486               second_place_job ||= candidate_job
487             end
488             break
489           end
490           if not c[:job] and second_place_job
491             job = second_place_job
492             $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
493             c[:job] = job
494           end
495           if not c[:job]
496             debuglog "component #{cname} not satisfied by any existing job."
497             if !@options[:dry_run]
498               debuglog "component #{cname} new job."
499               job = JobCache.create(:script => c[:script],
500                                     :script_parameters => c[:script_parameters],
501                                     :runtime_constraints => c[:runtime_constraints] || {},
502                                     :script_version => c[:script_version] || 'master',
503                                     :output_is_persistent => c[:output_is_persistent] || false)
504               if job
505                 debuglog "component #{cname} new job #{job[:uuid]}"
506                 c[:job] = job
507               else
508                 debuglog "component #{cname} new job failed"
509               end
510             end
511           end
512         else
513           c[:wait] = true
514         end
515         if c[:job] and c[:job][:uuid]
516           if (c[:job][:running] or
517               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
518             c[:job] = JobCache.get(c[:job][:uuid])
519           end
520           if c[:job][:success]
521             # Populate script_parameters of other components waiting for
522             # this job
523             @components.each do |c2name, c2|
524               c2[:script_parameters].each do |pname, p|
525                 if p.is_a? Hash and p[:output_of] == cname.to_s
526                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
527                   c2[:script_parameters][pname] = c[:job][:output]
528                   moretodo = true
529                 end
530               end
531             end
532             unless c_already_finished
533               if c[:output_is_persistent]
534                 # This is my first time discovering that the job
535                 # succeeded. I need to make sure a resources/wants
536                 # link is in place to protect the output from garbage
537                 # collection. (Normally Crunch does this for me, but
538                 # here I might be reusing the output of someone else's
539                 # job and I need to make sure it's understood that the
540                 # output is valuable to me, too.)
541                 wanted = c[:job][:output]
542                 debuglog "checking for existing persistence link for #{wanted}"
543                 @my_user_uuid ||= $arv.user.current[:uuid]
544                 links = $arv.link.list(limit: 1,
545                                        filters:
546                                        [%w(link_class = resources),
547                                         %w(name = wants),
548                                         %w(tail_uuid =) + [@my_user_uuid],
549                                         %w(head_uuid =) + [wanted]
550                                        ])[:items]
551                 if links.any?
552                   debuglog "link already exists, uuid #{links.first[:uuid]}"
553                 else
554                   newlink = $arv.link.create link: \
555                   {
556                     link_class: 'resources',
557                     name: 'wants',
558                     tail_kind: 'arvados#user',
559                     tail_uuid: @my_user_uuid,
560                     head_kind: 'arvados#collection',
561                     head_uuid: wanted
562                   }
563                   debuglog "added link, uuid #{newlink[:uuid]}"
564                 end
565               end
566             end
567           elsif c[:job][:running] ||
568               (!c[:job][:started_at] && !c[:job][:cancelled_at])
569             moretodo = true
570           elsif c[:job][:cancelled_at]
571             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
572           end
573         end
574       end
575       @instance[:components] = @components
576       @instance[:active] = moretodo
577       report_status
578
579       if @options[:no_wait]
580         moretodo = false
581       end
582
583       if moretodo
584         begin
585           sleep 10
586         rescue Interrupt
587           debuglog "interrupt", 0
588           abort
589         end
590       end
591     end
592
593     ended = 0
594     succeeded = 0
595     failed = 0
596     @components.each do |cname, c|
597       if c[:job]
598         if c[:job][:finished_at]
599           ended += 1
600           if c[:job][:success] == true
601             succeeded += 1
602           elsif c[:job][:success] == false
603             failed += 1
604           end
605         end
606       end
607     end
608     
609     if ended == @components.length or failed > 0
610       @instance[:active] = false
611       @instance[:success] = (succeeded == @components.length)
612     end
613
614     @instance.save
615   end
616
617   def cleanup
618     if @instance
619       @instance[:active] = false
620       @instance.save
621     end
622   end
623
624   def uuid
625     @instance[:uuid]
626   end
627
628   protected
629
630   def report_status
631     @instance.save
632
633     if @options[:status_json] != '/dev/null'
634       File.open(@options[:status_json], 'w') do |f|
635         f.puts @components.pretty_inspect
636       end
637     end
638
639     if @options[:status_text] != '/dev/null'
640       File.open(@options[:status_text], 'w') do |f|
641         f.puts ""
642         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
643         namewidth = @components.collect { |cname, c| cname.size }.max
644         @components.each do |cname, c|
645           jstatus = if !c[:job]
646                       "-"
647                     elsif c[:job][:running]
648                       "#{c[:job][:tasks_summary].inspect}"
649                     elsif c[:job][:success]
650                       c[:job][:output]
651                     elsif c[:job][:cancelled_at]
652                       "cancelled #{c[:job][:cancelled_at]}"
653                     elsif c[:job][:finished_at]
654                       "failed #{c[:job][:finished_at]}"
655                     elsif c[:job][:started_at]
656                       "started #{c[:job][:started_at]}"
657                     else
658                       "queued #{c[:job][:created_at]}"
659                     end
660           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
661         end
662       end
663     end
664   end
665 end
666
667 runner = WhRunPipelineInstance.new($options)
668 begin
669   if $options[:template]
670     runner.fetch_template($options[:template])
671   else
672     runner.fetch_instance($options[:instance])
673   end
674   runner.apply_parameters(p.leftovers)
675   runner.setup_instance
676   if $options[:submit]
677     runner.instance.save
678     puts runner.instance[:uuid]
679   else
680     runner.run
681   end
682 rescue Exception => e
683   runner.cleanup
684   raise e
685 end