]> git.arvados.org - arvados.git/blob - sdk/cli/bin/arv-run-pipeline-instance
3550: Improve startup time by removing excess api client instantiation.
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # == Parameters
46 #
47 # [param_name=param_value]
48 #
49 # [param_name param_value] Set (or override) the default value for
50 #                          every parameter with the given name.
51 #
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 #                                            parameter for a single
57 #                                            component.
58 #
59 class WhRunPipelineInstance
60 end
61
62 if RUBY_VERSION < '1.9.3' then
63   abort <<-EOS
64 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
65   EOS
66 end
67
68 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
69 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
70   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
71 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
72   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
73
74 begin
75   require 'arvados'
76   require 'rubygems'
77   require 'json'
78   require 'pp'
79   require 'trollop'
80   require 'google/api_client'
81 rescue LoadError => l
82   puts $:
83   abort <<-EOS
84 #{$0}: fatal: #{l.message}
85 Some runtime dependencies may be missing.
86 Try: gem install arvados pp google-api-client json trollop
87   EOS
88 end
89
90 def debuglog(message, verbosity=1)
91   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
92 end
93
94 module Kernel
95   def suppress_warnings
96     original_verbosity = $VERBOSE
97     $VERBOSE = nil
98     result = yield
99     $VERBOSE = original_verbosity
100     return result
101   end
102 end
103
104 if $arvados_api_host.match /local/
105   # You probably don't care about SSL certificate checks if you're
106   # testing with a dev server.
107   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
108 end
109
110
111 # Parse command line options (the kind that control the behavior of
112 # this program, that is, not the pipeline component parameters).
113
114 p = Trollop::Parser.new do
115   version __FILE__
116   opt(:dry_run,
117       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
118       :type => :boolean,
119       :short => :n)
120   opt(:status_text,
121       "Store plain text status in given file.",
122       :short => :none,
123       :type => :string,
124       :default => '/dev/stdout')
125   opt(:status_json,
126       "Store json-formatted pipeline in given file.",
127       :short => :none,
128       :type => :string,
129       :default => '/dev/null')
130   opt(:no_wait,
131       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
132       :short => :none,
133       :type => :boolean)
134   opt(:no_reuse,
135       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
136       :short => :none,
137       :type => :boolean)
138   opt(:debug,
139       "Print extra debugging information on stderr.",
140       :type => :boolean)
141   opt(:debug_level,
142       "Set debug verbosity level.",
143       :short => :none,
144       :type => :integer)
145   opt(:template,
146       "UUID of pipeline template, or path to local pipeline template file.",
147       :short => :none,
148       :type => :string)
149   opt(:instance,
150       "UUID of pipeline instance.",
151       :short => :none,
152       :type => :string)
153   opt(:submit,
154       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
155       :short => :none,
156       :type => :boolean)
157   opt(:run_here,
158       "Manage the pipeline in process.",
159       :short => :none,
160       :type => :boolean)
161   stop_on [:'--']
162 end
163 $options = Trollop::with_standard_exception_handling p do
164   p.parse ARGV
165 end
166 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
167
168 if $options[:instance]
169   if $options[:template] or $options[:submit]
170     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
171   end
172 elsif not $options[:template]
173   puts "error: you must supply a --template or --instance."
174   p.educate
175   abort
176 end
177
178 if $options[:run_here] == $options[:submit]
179   abort "#{$0}: syntax error: you must supply either --run-here or --submit."
180 end
181
182 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
183
184 module Kernel
185   def suppress_warnings
186     original_verbosity = $VERBOSE
187     $VERBOSE = nil
188     result = yield
189     $VERBOSE = original_verbosity
190     return result
191   end
192 end
193
194 if ENV['ARVADOS_API_HOST_INSECURE']
195   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
196 end
197
198 # Set up the API client.
199
200 $arv = Arvados.new api_version: 'v1'
201 $client = $arv.client
202 $arvados = $arv.arvados_api
203
204 class PipelineInstance
205   def self.find(uuid)
206     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
207                              :parameters => {
208                                :uuid => uuid
209                              },
210                              :authenticated => false,
211                              :headers => {
212                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
213                              })
214     j = JSON.parse result.body, :symbolize_names => true
215     unless j.is_a? Hash and j[:uuid]
216       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
217       nil
218     else
219       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
220       self.new(j)
221     end
222   end
223   def self.create(attributes)
224     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
225                              :body_object => {
226                                :pipeline_instance => attributes
227                              },
228                              :authenticated => false,
229                              :headers => {
230                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
231                              })
232     j = JSON.parse result.body, :symbolize_names => true
233     unless j.is_a? Hash and j[:uuid]
234       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
235     end
236     debuglog "Created pipeline instance: #{j[:uuid]}"
237     self.new(j)
238   end
239   def save
240     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
241                              :parameters => {
242                                :uuid => @pi[:uuid]
243                              },
244                              :body_object => {
245                                :pipeline_instance => @attributes_to_update
246                              },
247                              :authenticated => false,
248                              :headers => {
249                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
250                              })
251     j = JSON.parse result.body, :symbolize_names => true
252     unless j.is_a? Hash and j[:uuid]
253       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
254       nil
255     else
256       @attributes_to_update = {}
257       @pi = j
258     end
259   end
260   def []=(x,y)
261     @attributes_to_update[x] = y
262     @pi[x] = y
263   end
264   def [](x)
265     @pi[x]
266   end
267
268   def log_stderr(msg)
269     $arv.log.create log: {
270       event_type: 'stderr',
271       object_uuid: self[:uuid],
272       owner_uuid: self[:owner_uuid],
273       properties: {"text" => msg},
274     }
275   end
276
277   protected
278   def initialize(j)
279     @attributes_to_update = {}
280     @pi = j
281   end
282 end
283
284 class JobCache
285   def self.get(uuid)
286     @cache ||= {}
287     result = $client.execute(:api_method => $arvados.jobs.get,
288                              :parameters => {
289                                :uuid => uuid
290                              },
291                              :authenticated => false,
292                              :headers => {
293                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
294                              })
295     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
296   end
297   def self.where(conditions)
298     result = $client.execute(:api_method => $arvados.jobs.list,
299                              :parameters => {
300                                :limit => 10000,
301                                :where => conditions.to_json
302                              },
303                              :authenticated => false,
304                              :headers => {
305                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
306                              })
307     list = JSON.parse result.body, :symbolize_names => true
308     if list and list[:items].is_a? Array
309       list[:items]
310     else
311       []
312     end
313   end
314   def self.create(pipeline, component, job, create_params)
315     @cache ||= {}
316
317     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
318
319     result = $client.execute(:api_method => $arvados.jobs.create,
320                              :body_object => body,
321                              :authenticated => false,
322                              :headers => {
323                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
324                              })
325     j = JSON.parse result.body, :symbolize_names => true
326     if j.is_a? Hash and j[:uuid]
327       @cache[j[:uuid]] = j
328     else
329       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
330
331       msg = ""
332       j[:errors].each do |err|
333         msg += "Error creating job for component #{component}: #{err}\n"
334       end
335       msg += "Job submission was: #{body.to_json}"
336
337       pipeline.log_stderr(msg)
338       nil
339     end
340   end
341
342   protected
343
344   def self.no_nil_values(hash)
345     hash.reject { |key, value| value.nil? }
346   end
347 end
348
349 class WhRunPipelineInstance
350   attr_reader :instance
351
352   def initialize(_options)
353     @options = _options
354   end
355
356   def fetch_template(template)
357     if template.match /[^-0-9a-z]/
358       # Doesn't look like a uuid -- use it as a filename.
359       @template = JSON.parse File.read(template), :symbolize_names => true
360     else
361       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
362                                :parameters => {
363                                  :uuid => template
364                                },
365                                :authenticated => false,
366                                :headers => {
367                                  authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
368                                })
369       @template = JSON.parse result.body, :symbolize_names => true
370       if !@template[:uuid]
371         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
372       end
373     end
374     self
375   end
376
377   def fetch_instance(instance_uuid)
378     @instance = PipelineInstance.find(instance_uuid)
379     @template = @instance
380     self
381   end
382
383   def apply_parameters(params_args)
384     params_args.shift if params_args[0] == '--'
385     params = {}
386     while !params_args.empty?
387       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
388         params[re[2]] = re[3]
389         params_args.shift
390       elsif params_args.size > 1
391         param = params_args.shift.sub /^--/, ''
392         params[param] = params_args.shift
393       else
394         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
395       end
396     end
397
398     if not @template[:components].is_a?(Hash)
399       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
400     end
401     @components = @template[:components].dup
402
403     bad_components = @components.each_pair.select do |cname, cspec|
404       not cspec.is_a?(Hash)
405     end
406     if bad_components.any?
407       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
408     end
409
410     bad_components = @components.each_pair.select do |cname, cspec|
411       not cspec[:script_parameters].is_a?(Hash)
412     end
413     if bad_components.any?
414       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
415     end
416
417     errors = []
418     @components.each do |componentname, component|
419       component[:script_parameters].each do |parametername, parameter|
420         parameter = { :value => parameter } unless parameter.is_a? Hash
421         value =
422           (params["#{componentname}::#{parametername}"] ||
423            parameter[:value] ||
424            (parameter[:output_of].nil? &&
425             (params[parametername.to_s] ||
426              parameter[:default])) ||
427            nil)
428         if value.nil? and
429             ![false,'false',0,'0'].index parameter[:required]
430           if parameter[:output_of]
431             next
432           end
433           errors << [componentname, parametername, "required parameter is missing"]
434         end
435         debuglog "parameter #{componentname}::#{parametername} == #{value}"
436         component[:script_parameters][parametername] = value
437       end
438     end
439     if !errors.empty?
440       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
441     end
442     debuglog "options=" + @options.pretty_inspect
443     self
444   end
445
446   def setup_instance
447     if @instance
448       @instance[:properties][:run_options] ||= {}
449       if @options[:no_reuse]
450         # override properties of existing instance
451         @instance[:properties][:run_options][:enable_job_reuse] = false
452       else
453         # Default to "enable reuse" if not specified. (This code path
454         # can go away when old clients go away.)
455         if @instance[:properties][:run_options][:enable_job_reuse].nil?
456           @instance[:properties][:run_options][:enable_job_reuse] = true
457         end
458       end
459     else
460       @instance = PipelineInstance.
461         create(components: @components,
462                properties: {
463                  run_options: {
464                    enable_job_reuse: !@options[:no_reuse]
465                  }
466                },
467                pipeline_template_uuid: @template[:uuid],
468                state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
469     end
470     self
471   end
472
473   def run
474     moretodo = true
475     interrupted = false
476
477     job_creation_failed = 0
478     while moretodo
479       moretodo = false
480       @components.each do |cname, c|
481         job = nil
482         owner_uuid = @instance[:owner_uuid]
483         # Is the job satisfying this component already known to be
484         # finished? (Already meaning "before we query API server about
485         # the job's current state")
486         c_already_finished = (c[:job] &&
487                               c[:job][:uuid] &&
488                               !c[:job][:success].nil?)
489         if !c[:job] and
490             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
491           # No job yet associated with this component and is component inputs
492           # are fully specified (any output_of script_parameters are resolved
493           # to real value)
494           job = JobCache.create(@instance, cname, {
495             :script => c[:script],
496             :script_parameters => c[:script_parameters],
497             :script_version => c[:script_version],
498             :repository => c[:repository],
499             :nondeterministic => c[:nondeterministic],
500             :runtime_constraints => c[:runtime_constraints],
501             :owner_uuid => owner_uuid,
502           }, {
503             # This is the right place to put these attributes when
504             # dealing with new API servers.
505             :minimum_script_version => c[:minimum_script_version],
506             :exclude_script_versions => c[:exclude_minimum_script_versions],
507             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
508                                 !c[:nondeterministic]),
509             :filters => c[:filters]
510           })
511           if job
512             debuglog "component #{cname} new job #{job[:uuid]}"
513             c[:job] = job
514           else
515             debuglog "component #{cname} new job failed", 0
516             job_creation_failed += 1
517           end
518         end
519
520         if c[:job] and c[:job][:uuid]
521           if (c[:job][:running] or
522               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
523             # Job is running so update copy of job record
524             c[:job] = JobCache.get(c[:job][:uuid])
525           end
526
527           if c[:job][:success]
528             # Populate script_parameters of other components waiting for
529             # this job
530             @components.each do |c2name, c2|
531               c2[:script_parameters].each do |pname, p|
532                 if p.is_a? Hash and p[:output_of] == cname.to_s
533                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
534                   c2[:script_parameters][pname] = c[:job][:output]
535                   moretodo = true
536                 end
537               end
538             end
539             unless c_already_finished
540               # This is my first time discovering that the job
541               # succeeded. (At the top of this loop, I was still
542               # waiting for it to finish.)
543
544               debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
545               if (not @instance[:name].nil?) and (not @instance[:name].empty?)
546                 pipeline_name = @instance[:name]
547               else
548                 fetch_template(@instance[:pipeline_template_uuid])
549                 pipeline_name = @template[:name]
550               end
551               if c[:output_name] != false
552                 output_name = c[:output_name] || "Output of #{cname} of #{pipeline_name}"
553                 # Create a collection located in the same project as the pipeline with the contents of the output.
554                 portable_data_hash = c[:job][:output]
555                 collections = $arv.collection.list(limit: 1,
556                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
557                                                    select: ["portable_data_hash", "manifest_text"]
558                                                    )[:items]
559                 if collections.any?
560                   newcollection = {
561                     owner_uuid: owner_uuid,
562                     name: "#{output_name} at #{c[:job][:finished_at]}",
563                     portable_data_hash: collections.first[:portable_data_hash],
564                     manifest_text: collections.first[:manifest_text]
565                   }
566                   debuglog "Creating collection #{newcollection}", 0
567                   newcollection_actual = $arv.collection.create collection: newcollection
568                   c[:output_uuid] = newcollection_actual[:uuid]
569                 else
570                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
571                 end
572               end
573             end
574           elsif c[:job][:running] ||
575               (!c[:job][:started_at] && !c[:job][:cancelled_at])
576             # Job is still running
577             moretodo = true
578           elsif c[:job][:cancelled_at]
579             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
580           end
581         end
582       end
583       @instance[:components] = @components
584       report_status
585
586       if @options[:no_wait]
587         moretodo = false
588       end
589
590       # If job creation fails, just give up on this pipeline instance.
591       if job_creation_failed > 0
592         moretodo = false
593       end
594
595       if moretodo
596         begin
597           sleep 10
598         rescue Interrupt
599           debuglog "interrupt", 0
600           interrupted = true
601           break
602         end
603       end
604     end
605
606     ended = 0
607     succeeded = 0
608     failed = 0
609     @components.each do |cname, c|
610       if c[:job]
611         if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
612           ended += 1
613           if c[:job][:success] == true
614             succeeded += 1
615           elsif c[:job][:success] == false or c[:job][:cancelled_at]
616             failed += 1
617           end
618         end
619       end
620     end
621
622     success = (succeeded == @components.length)
623
624     # A job create call failed. Just give up.
625     if job_creation_failed > 0
626       debuglog "job creation failed - giving up on this pipeline instance", 0
627       success = false
628       failed += 1
629     end
630
631     if interrupted
632      if success
633         @instance[:state] = 'Complete'
634      else
635         @instance[:state] = 'Paused'
636       end
637     else
638       if ended == @components.length or failed > 0
639         @instance[:state] = success ? 'Complete' : 'Failed'
640       end
641     end
642
643     debuglog "pipeline instance state is #{@instance[:state]}"
644
645     # set components_summary
646     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
647     @instance[:components_summary] = components_summary
648
649     @instance.save
650   end
651
652   def cleanup
653     if @instance and @instance[:state] == 'RunningOnClient'
654       @instance[:state] = 'Paused'
655       @instance.save
656     end
657   end
658
659   def uuid
660     @instance[:uuid]
661   end
662
663   protected
664
665   def report_status
666     @instance.save
667
668     if @options[:status_json] != '/dev/null'
669       File.open(@options[:status_json], 'w') do |f|
670         f.puts @components.pretty_inspect
671       end
672     end
673
674     if @options[:status_text] != '/dev/null'
675       File.open(@options[:status_text], 'w') do |f|
676         f.puts ""
677         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
678         namewidth = @components.collect { |cname, c| cname.size }.max
679         @components.each do |cname, c|
680           jstatus = if !c[:job]
681                       "-"
682                     elsif c[:job][:running]
683                       "#{c[:job][:tasks_summary].inspect}"
684                     elsif c[:job][:success]
685                       c[:job][:output]
686                     elsif c[:job][:cancelled_at]
687                       "cancelled #{c[:job][:cancelled_at]}"
688                     elsif c[:job][:finished_at]
689                       "failed #{c[:job][:finished_at]}"
690                     elsif c[:job][:started_at]
691                       "started #{c[:job][:started_at]}"
692                     else
693                       "queued #{c[:job][:created_at]}"
694                     end
695           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
696         end
697       end
698     end
699   end
700
701   def abort(msg)
702     if @instance
703       if ["New", "Ready", "RunningOnClient",
704           "RunningOnServer"].include?(@instance[:state])
705         @instance[:state] = "Failed"
706         @instance.save
707       end
708       @instance.log_stderr(msg)
709     end
710     Kernel::abort(msg)
711   end
712 end
713
714 runner = WhRunPipelineInstance.new($options)
715 begin
716   if $options[:template]
717     runner.fetch_template($options[:template])
718   else
719     runner.fetch_instance($options[:instance])
720   end
721   runner.apply_parameters(p.leftovers)
722   runner.setup_instance
723   if $options[:submit]
724     runner.instance.save
725     puts runner.instance[:uuid]
726   else
727     runner.run
728   end
729 rescue Exception => e
730   runner.cleanup
731   raise e
732 end