3550: Fix running local job repeatedly on subsequent loop iterations.
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # == Parameters
46 #
47 # [param_name=param_value]
48 #
49 # [param_name param_value] Set (or override) the default value for
50 #                          every parameter with the given name.
51 #
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 #                                            parameter for a single
57 #                                            component.
58 #
59 class WhRunPipelineInstance
60 end
61
62 if RUBY_VERSION < '1.9.3' then
63   abort <<-EOS
64 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
65   EOS
66 end
67
68 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
69 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
70   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
71 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
72   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
73
74 begin
75   require 'arvados'
76   require 'rubygems'
77   require 'json'
78   require 'pp'
79   require 'trollop'
80   require 'google/api_client'
81 rescue LoadError => l
82   puts $:
83   abort <<-EOS
84 #{$0}: fatal: #{l.message}
85 Some runtime dependencies may be missing.
86 Try: gem install arvados pp google-api-client json trollop
87   EOS
88 end
89
90 def debuglog(message, verbosity=1)
91   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
92 end
93
94 module Kernel
95   def suppress_warnings
96     original_verbosity = $VERBOSE
97     $VERBOSE = nil
98     result = yield
99     $VERBOSE = original_verbosity
100     return result
101   end
102 end
103
104 if $arvados_api_host.match /local/
105   # You probably don't care about SSL certificate checks if you're
106   # testing with a dev server.
107   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
108 end
109
110
111 # Parse command line options (the kind that control the behavior of
112 # this program, that is, not the pipeline component parameters).
113
114 p = Trollop::Parser.new do
115   version __FILE__
116   opt(:dry_run,
117       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
118       :type => :boolean,
119       :short => :n)
120   opt(:status_text,
121       "Store plain text status in given file.",
122       :short => :none,
123       :type => :string,
124       :default => '/dev/stdout')
125   opt(:status_json,
126       "Store json-formatted pipeline in given file.",
127       :short => :none,
128       :type => :string,
129       :default => '/dev/null')
130   opt(:no_wait,
131       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
132       :short => :none,
133       :type => :boolean)
134   opt(:no_reuse,
135       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
136       :short => :none,
137       :type => :boolean)
138   opt(:debug,
139       "Print extra debugging information on stderr.",
140       :type => :boolean)
141   opt(:debug_level,
142       "Set debug verbosity level.",
143       :short => :none,
144       :type => :integer)
145   opt(:template,
146       "UUID of pipeline template, or path to local pipeline template file.",
147       :short => :none,
148       :type => :string)
149   opt(:instance,
150       "UUID of pipeline instance.",
151       :short => :none,
152       :type => :string)
153   opt(:submit,
154       "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
155       :short => :none,
156       :type => :boolean)
157   opt(:run_pipeline_here,
158       "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
159       :short => :none,
160       :type => :boolean)
161   opt(:run_jobs_here,
162       "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
163       :short => :none,
164       :type => :boolean)
165   opt(:run_here,
166       "Synonym for --run-jobs-here.",
167       :short => :none,
168       :type => :boolean)
169   stop_on [:'--']
170 end
171 $options = Trollop::with_standard_exception_handling p do
172   p.parse ARGV
173 end
174 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
175
176 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
177 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
178
179 if $options[:instance]
180   if $options[:template] or $options[:submit]
181     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
182   end
183 elsif not $options[:template]
184   puts "error: you must supply a --template or --instance."
185   p.educate
186   abort
187 end
188
189 if $options[:run_pipeline_here] == $options[:submit]
190   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
191 end
192
193 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
194
195 module Kernel
196   def suppress_warnings
197     original_verbosity = $VERBOSE
198     $VERBOSE = nil
199     result = yield
200     $VERBOSE = original_verbosity
201     return result
202   end
203 end
204
205 if ENV['ARVADOS_API_HOST_INSECURE']
206   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
207 end
208
209 # Set up the API client.
210
211 $arv = Arvados.new api_version: 'v1'
212 $client = $arv.client
213 $arvados = $arv.arvados_api
214
215 class PipelineInstance
216   def self.find(uuid)
217     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
218                              :parameters => {
219                                :uuid => uuid
220                              },
221                              :authenticated => false,
222                              :headers => {
223                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
224                              })
225     j = JSON.parse result.body, :symbolize_names => true
226     unless j.is_a? Hash and j[:uuid]
227       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
228       nil
229     else
230       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
231       self.new(j)
232     end
233   end
234   def self.create(attributes)
235     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
236                              :body_object => {
237                                :pipeline_instance => attributes
238                              },
239                              :authenticated => false,
240                              :headers => {
241                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
242                              })
243     j = JSON.parse result.body, :symbolize_names => true
244     unless j.is_a? Hash and j[:uuid]
245       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
246     end
247     debuglog "Created pipeline instance: #{j[:uuid]}"
248     self.new(j)
249   end
250   def save
251     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
252                              :parameters => {
253                                :uuid => @pi[:uuid]
254                              },
255                              :body_object => {
256                                :pipeline_instance => @attributes_to_update
257                              },
258                              :authenticated => false,
259                              :headers => {
260                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
261                              })
262     j = JSON.parse result.body, :symbolize_names => true
263     unless j.is_a? Hash and j[:uuid]
264       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
265       nil
266     else
267       @attributes_to_update = {}
268       @pi = j
269     end
270   end
271   def []=(x,y)
272     @attributes_to_update[x] = y
273     @pi[x] = y
274   end
275   def [](x)
276     @pi[x]
277   end
278
279   def log_stderr(msg)
280     $arv.log.create log: {
281       event_type: 'stderr',
282       object_uuid: self[:uuid],
283       owner_uuid: self[:owner_uuid],
284       properties: {"text" => msg},
285     }
286   end
287
288   protected
289   def initialize(j)
290     @attributes_to_update = {}
291     @pi = j
292   end
293 end
294
295 class JobCache
296   def self.get(uuid)
297     @cache ||= {}
298     result = $client.execute(:api_method => $arvados.jobs.get,
299                              :parameters => {
300                                :uuid => uuid
301                              },
302                              :authenticated => false,
303                              :headers => {
304                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
305                              })
306     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
307   end
308   def self.where(conditions)
309     result = $client.execute(:api_method => $arvados.jobs.list,
310                              :parameters => {
311                                :limit => 10000,
312                                :where => conditions.to_json
313                              },
314                              :authenticated => false,
315                              :headers => {
316                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
317                              })
318     list = JSON.parse result.body, :symbolize_names => true
319     if list and list[:items].is_a? Array
320       list[:items]
321     else
322       []
323     end
324   end
325   def self.create(pipeline, component, job, create_params)
326     @cache ||= {}
327
328     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
329
330     result = $client.execute(:api_method => $arvados.jobs.create,
331                              :body_object => body,
332                              :authenticated => false,
333                              :headers => {
334                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
335                              })
336     j = JSON.parse result.body, :symbolize_names => true
337     if j.is_a? Hash and j[:uuid]
338       @cache[j[:uuid]] = j
339     else
340       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
341
342       msg = ""
343       j[:errors].each do |err|
344         msg += "Error creating job for component #{component}: #{err}\n"
345       end
346       msg += "Job submission was: #{body.to_json}"
347
348       pipeline.log_stderr(msg)
349       nil
350     end
351   end
352
353   protected
354
355   def self.no_nil_values(hash)
356     hash.reject { |key, value| value.nil? }
357   end
358 end
359
360 class WhRunPipelineInstance
361   attr_reader :instance
362
363   def initialize(_options)
364     @options = _options
365   end
366
367   def fetch_template(template)
368     if template.match /[^-0-9a-z]/
369       # Doesn't look like a uuid -- use it as a filename.
370       @template = JSON.parse File.read(template), :symbolize_names => true
371     else
372       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
373                                :parameters => {
374                                  :uuid => template
375                                },
376                                :authenticated => false,
377                                :headers => {
378                                  authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
379                                })
380       @template = JSON.parse result.body, :symbolize_names => true
381       if !@template[:uuid]
382         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
383       end
384     end
385     self
386   end
387
388   def fetch_instance(instance_uuid)
389     @instance = PipelineInstance.find(instance_uuid)
390     @template = @instance
391     self
392   end
393
394   def apply_parameters(params_args)
395     params_args.shift if params_args[0] == '--'
396     params = {}
397     while !params_args.empty?
398       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
399         params[re[2]] = re[3]
400         params_args.shift
401       elsif params_args.size > 1
402         param = params_args.shift.sub /^--/, ''
403         params[param] = params_args.shift
404       else
405         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
406       end
407     end
408
409     if not @template[:components].is_a?(Hash)
410       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
411     end
412     @components = @template[:components].dup
413
414     bad_components = @components.each_pair.select do |cname, cspec|
415       not cspec.is_a?(Hash)
416     end
417     if bad_components.any?
418       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
419     end
420
421     bad_components = @components.each_pair.select do |cname, cspec|
422       not cspec[:script_parameters].is_a?(Hash)
423     end
424     if bad_components.any?
425       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
426     end
427
428     errors = []
429     @components.each do |componentname, component|
430       component[:script_parameters].each do |parametername, parameter|
431         parameter = { :value => parameter } unless parameter.is_a? Hash
432         value =
433           (params["#{componentname}::#{parametername}"] ||
434            parameter[:value] ||
435            (parameter[:output_of].nil? &&
436             (params[parametername.to_s] ||
437              parameter[:default])) ||
438            nil)
439         if value.nil? and
440             ![false,'false',0,'0'].index parameter[:required]
441           if parameter[:output_of]
442             next
443           end
444           errors << [componentname, parametername, "required parameter is missing"]
445         end
446         debuglog "parameter #{componentname}::#{parametername} == #{value}"
447         component[:script_parameters][parametername] = value
448       end
449     end
450     if !errors.empty?
451       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
452     end
453     debuglog "options=" + @options.pretty_inspect
454     self
455   end
456
457   def setup_instance
458     if @instance
459       @instance[:properties][:run_options] ||= {}
460       if @options[:no_reuse]
461         # override properties of existing instance
462         @instance[:properties][:run_options][:enable_job_reuse] = false
463       else
464         # Default to "enable reuse" if not specified. (This code path
465         # can go away when old clients go away.)
466         if @instance[:properties][:run_options][:enable_job_reuse].nil?
467           @instance[:properties][:run_options][:enable_job_reuse] = true
468         end
469       end
470     else
471       @instance = PipelineInstance.
472         create(components: @components,
473                properties: {
474                  run_options: {
475                    enable_job_reuse: !@options[:no_reuse]
476                  }
477                },
478                pipeline_template_uuid: @template[:uuid],
479                state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
480     end
481     self
482   end
483
484   def run
485     moretodo = true
486     interrupted = false
487
488     if @instance[:started_at].nil?
489       @instance[:started_at] = Time.now
490     end
491
492     job_creation_failed = 0
493     while moretodo
494       moretodo = false
495       @components.each do |cname, c|
496         job = nil
497         owner_uuid = @instance[:owner_uuid]
498         # Is the job satisfying this component already known to be
499         # finished? (Already meaning "before we query API server about
500         # the job's current state")
501         c_already_finished = (c[:job] &&
502                               c[:job][:uuid] &&
503                               !c[:job][:success].nil?)
504         if !c[:job] and
505             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
506           # No job yet associated with this component and is component inputs
507           # are fully specified (any output_of script_parameters are resolved
508           # to real value)
509           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
510           job = JobCache.create(@instance, cname, {
511             :script => c[:script],
512             :script_parameters => c[:script_parameters],
513             :script_version => c[:script_version],
514             :repository => c[:repository],
515             :nondeterministic => c[:nondeterministic],
516             :runtime_constraints => c[:runtime_constraints],
517             :owner_uuid => owner_uuid,
518             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
519             :submit_id => my_submit_id,
520           }, {
521             # This is the right place to put these attributes when
522             # dealing with new API servers.
523             :minimum_script_version => c[:minimum_script_version],
524             :exclude_script_versions => c[:exclude_minimum_script_versions],
525             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
526                                 !c[:nondeterministic]),
527             :filters => c[:filters]
528           })
529           if job
530             debuglog "component #{cname} new job #{job[:uuid]}"
531             c[:job] = job
532             c[:run_in_process] = (@options[:run_jobs_here] and
533                                   job[:submit_id] == my_submit_id)
534           else
535             debuglog "component #{cname} new job failed", 0
536             job_creation_failed += 1
537           end
538         end
539
540         if c[:job] and c[:run_in_process] and c[:job][:success].nil?
541           report_status
542           begin
543             require 'open3'
544             Open3.popen3("arv-crunch-job", "--force-unlock",
545                          "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
546               debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
547               stdin.close
548               while true
549                 rready, wready, = IO.select([stdout, stderr], [])
550                 break if !rready[0]
551                 begin
552                   buf = rready[0].read_nonblock(2**20)
553                 rescue EOFError
554                   break
555                 end
556                 (rready[0] == stdout ? $stdout : $stderr).write(buf)
557               end
558               stdout.close
559               stderr.close
560               debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
561             end
562             if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
563               raise Exception.new("arv-crunch-job did not set finished_at.")
564             end
565           rescue Exception => e
566             debuglog "Interrupted (#{e}). Failing job.", 0
567             $arv.job.update(uuid: c[:job][:uuid],
568                             job: {
569                               finished_at: Time.now,
570                               running: false,
571                               success: false
572                             })
573           end
574         end
575
576         if c[:job] and c[:job][:uuid]
577           if (c[:job][:running] or
578               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
579             # Job is running so update copy of job record
580             c[:job] = JobCache.get(c[:job][:uuid])
581           end
582
583           if c[:job][:success]
584             # Populate script_parameters of other components waiting for
585             # this job
586             @components.each do |c2name, c2|
587               c2[:script_parameters].each do |pname, p|
588                 if p.is_a? Hash and p[:output_of] == cname.to_s
589                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
590                   c2[:script_parameters][pname] = c[:job][:output]
591                   moretodo = true
592                 end
593               end
594             end
595             unless c_already_finished
596               # This is my first time discovering that the job
597               # succeeded. (At the top of this loop, I was still
598               # waiting for it to finish.)
599
600               if @instance[:name].andand.length.andand > 0
601                 pipeline_name = @instance[:name]
602               elsif @template.andand[:name].andand.length.andand > 0
603                 pipeline_name = @template[:name]
604               else
605                 pipeline_name = @instance[:uuid]
606               end
607               if c[:output_name] != false
608                 # Create a collection located in the same project as the pipeline with the contents of the output.
609                 portable_data_hash = c[:job][:output]
610                 collections = $arv.collection.list(limit: 1,
611                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
612                                                    select: ["portable_data_hash", "manifest_text"]
613                                                    )[:items]
614                 if collections.any?
615                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
616
617                   # check if there is a name collision.
618                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
619                                                                    ["name", "=", name]])[:items]
620
621                   newcollection_actual = nil
622                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
623                     # There is already a collection with the same name and the
624                     # same contents, so just point to that.
625                     newcollection_actual = name_collisions.first
626                   end
627
628                   if newcollection_actual.nil?
629                     # Did not find a collection with the same name (or the
630                     # collection has a different portable data hash) so create
631                     # a new collection with ensure_unique_name: true.
632                     newcollection = {
633                       owner_uuid: owner_uuid,
634                       name: name,
635                       portable_data_hash: collections.first[:portable_data_hash],
636                       manifest_text: collections.first[:manifest_text]
637                     }
638                     debuglog "Creating collection #{newcollection}", 0
639                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
640                   end
641
642                   c[:output_uuid] = newcollection_actual[:uuid]
643                 else
644                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
645                 end
646               end
647             end
648           elsif c[:job][:running] ||
649               (!c[:job][:started_at] && !c[:job][:cancelled_at])
650             # Job is still running
651             moretodo = true
652           elsif c[:job][:cancelled_at]
653             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
654           end
655         end
656       end
657       @instance[:components] = @components
658       report_status
659
660       if @options[:no_wait]
661         moretodo = false
662       end
663
664       # If job creation fails, just give up on this pipeline instance.
665       if job_creation_failed > 0
666         moretodo = false
667       end
668
669       if moretodo
670         begin
671           sleep 10
672         rescue Interrupt
673           debuglog "interrupt", 0
674           interrupted = true
675           break
676         end
677       end
678     end
679
680     ended = 0
681     succeeded = 0
682     failed = 0
683     @components.each do |cname, c|
684       if c[:job]
685         if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
686           ended += 1
687           if c[:job][:success] == true
688             succeeded += 1
689           elsif c[:job][:success] == false or c[:job][:cancelled_at]
690             failed += 1
691           end
692         end
693       end
694     end
695
696     success = (succeeded == @components.length)
697
698     # A job create call failed. Just give up.
699     if job_creation_failed > 0
700       debuglog "job creation failed - giving up on this pipeline instance", 0
701       success = false
702       failed += 1
703     end
704
705     if interrupted
706      if success
707         @instance[:state] = 'Complete'
708      else
709         @instance[:state] = 'Paused'
710       end
711     else
712       if ended == @components.length or failed > 0
713         @instance[:state] = success ? 'Complete' : 'Failed'
714       end
715     end
716
717     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
718       @instance[:finished_at] = Time.now
719     end
720
721     debuglog "pipeline instance state is #{@instance[:state]}"
722
723     # set components_summary
724     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
725     @instance[:components_summary] = components_summary
726
727     @instance.save
728   end
729
730   def cleanup
731     if @instance and @instance[:state] == 'RunningOnClient'
732       @instance[:state] = 'Paused'
733       @instance.save
734     end
735   end
736
737   def uuid
738     @instance[:uuid]
739   end
740
741   protected
742
743   def report_status
744     @instance.save
745
746     if @options[:status_json] != '/dev/null'
747       File.open(@options[:status_json], 'w') do |f|
748         f.puts @components.pretty_inspect
749       end
750     end
751
752     if @options[:status_text] != '/dev/null'
753       File.open(@options[:status_text], 'w') do |f|
754         f.puts ""
755         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
756         namewidth = @components.collect { |cname, c| cname.size }.max
757         @components.each do |cname, c|
758           jstatus = if !c[:job]
759                       "-"
760                     elsif c[:job][:running]
761                       "#{c[:job][:tasks_summary].inspect}"
762                     elsif c[:job][:success]
763                       c[:job][:output]
764                     elsif c[:job][:cancelled_at]
765                       "cancelled #{c[:job][:cancelled_at]}"
766                     elsif c[:job][:finished_at]
767                       "failed #{c[:job][:finished_at]}"
768                     elsif c[:job][:started_at]
769                       "started #{c[:job][:started_at]}"
770                     elsif c[:job][:is_locked_by_uuid]
771                       "starting #{c[:job][:started_at]}"
772                     else
773                       "queued #{c[:job][:created_at]}"
774                     end
775           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
776         end
777       end
778     end
779   end
780
781   def abort(msg)
782     if @instance
783       if ["New", "Ready", "RunningOnClient",
784           "RunningOnServer"].include?(@instance[:state])
785         @instance[:state] = "Failed"
786         @instance[:finished_at] = Time.now
787         @instance.save
788       end
789       @instance.log_stderr(msg)
790     end
791     Kernel::abort(msg)
792   end
793 end
794
795 runner = WhRunPipelineInstance.new($options)
796 begin
797   if $options[:template]
798     runner.fetch_template($options[:template])
799   else
800     runner.fetch_instance($options[:instance])
801   end
802   runner.apply_parameters(p.leftovers)
803   runner.setup_instance
804   if $options[:submit]
805     runner.instance.save
806     puts runner.instance[:uuid]
807   else
808     runner.run
809   end
810 rescue Exception => e
811   runner.cleanup
812   raise e
813 end