Merge branch '3187-start-finish-timestamps-tasks-pipelines' refs #3187
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # == Parameters
46 #
47 # [param_name=param_value]
48 #
49 # [param_name param_value] Set (or override) the default value for
50 #                          every parameter with the given name.
51 #
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 #                                            parameter for a single
57 #                                            component.
58 #
59 class WhRunPipelineInstance
60 end
61
62 $application_version = 1.0
63
64 if RUBY_VERSION < '1.9.3' then
65   abort <<-EOS
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
67   EOS
68 end
69
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
75
76 begin
77   require 'arvados'
78   require 'rubygems'
79   require 'json'
80   require 'pp'
81   require 'trollop'
82   require 'google/api_client'
83 rescue LoadError => l
84   puts $:
85   abort <<-EOS
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
89   EOS
90 end
91
92 def debuglog(message, verbosity=1)
93   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
94 end
95
96 module Kernel
97   def suppress_warnings
98     original_verbosity = $VERBOSE
99     $VERBOSE = nil
100     result = yield
101     $VERBOSE = original_verbosity
102     return result
103   end
104 end
105
106 if $arvados_api_host.match /local/
107   # You probably don't care about SSL certificate checks if you're
108   # testing with a dev server.
109   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
110 end
111
112 class Google::APIClient
113   def discovery_document(api, version)
114     api = api.to_s
115     return @discovery_documents["#{api}:#{version}"] ||=
116       begin
117         response = self.execute!(
118                                  :http_method => :get,
119                                  :uri => self.discovery_uri(api, version),
120                                  :authenticated => false
121                                  )
122         response.body.class == String ? JSON.parse(response.body) : response.body
123       end
124   end
125 end
126
127
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
130
131 p = Trollop::Parser.new do
132   version __FILE__
133   opt(:dry_run,
134       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
135       :type => :boolean,
136       :short => :n)
137   opt(:status_text,
138       "Store plain text status in given file.",
139       :short => :none,
140       :type => :string,
141       :default => '/dev/stdout')
142   opt(:status_json,
143       "Store json-formatted pipeline in given file.",
144       :short => :none,
145       :type => :string,
146       :default => '/dev/null')
147   opt(:no_wait,
148       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
149       :short => :none,
150       :type => :boolean)
151   opt(:no_reuse,
152       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
153       :short => :none,
154       :type => :boolean)
155   opt(:debug,
156       "Print extra debugging information on stderr.",
157       :type => :boolean)
158   opt(:debug_level,
159       "Set debug verbosity level.",
160       :short => :none,
161       :type => :integer)
162   opt(:template,
163       "UUID of pipeline template, or path to local pipeline template file.",
164       :short => :none,
165       :type => :string)
166   opt(:instance,
167       "UUID of pipeline instance.",
168       :short => :none,
169       :type => :string)
170   opt(:submit,
171       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
172       :short => :none,
173       :type => :boolean)
174   opt(:run_here,
175       "Manage the pipeline in process.",
176       :short => :none,
177       :type => :boolean)
178   stop_on [:'--']
179 end
180 $options = Trollop::with_standard_exception_handling p do
181   p.parse ARGV
182 end
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
184
185 if $options[:instance]
186   if $options[:template] or $options[:submit]
187     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
188   end
189 elsif not $options[:template]
190   puts "error: you must supply a --template or --instance."
191   p.educate
192   abort
193 end
194
195 if $options[:run_here] == $options[:submit]
196   abort "#{$0}: syntax error: you must supply either --run-here or --submit."
197 end
198
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
200
201 module Kernel
202   def suppress_warnings
203     original_verbosity = $VERBOSE
204     $VERBOSE = nil
205     result = yield
206     $VERBOSE = original_verbosity
207     return result
208   end
209 end
210
211 if ENV['ARVADOS_API_HOST_INSECURE']
212   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
213 end
214
215 # Set up the API client.
216
217 $client ||= Google::APIClient.
218   new(:host => $arvados_api_host,
219       :application_name => File.split($0).last,
220       :application_version => $application_version.to_s)
221 $arvados = $client.discovered_api('arvados', $arvados_api_version)
222 $arv = Arvados.new api_version: 'v1'
223
224
225 class PipelineInstance
226   def self.find(uuid)
227     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
228                              :parameters => {
229                                :uuid => uuid
230                              },
231                              :authenticated => false,
232                              :headers => {
233                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
234                              })
235     j = JSON.parse result.body, :symbolize_names => true
236     unless j.is_a? Hash and j[:uuid]
237       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
238       nil
239     else
240       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
241       self.new(j)
242     end
243   end
244   def self.create(attributes)
245     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
246                              :body_object => {
247                                :pipeline_instance => attributes
248                              },
249                              :authenticated => false,
250                              :headers => {
251                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
252                              })
253     j = JSON.parse result.body, :symbolize_names => true
254     unless j.is_a? Hash and j[:uuid]
255       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
256     end
257     debuglog "Created pipeline instance: #{j[:uuid]}"
258     self.new(j)
259   end
260   def save
261     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
262                              :parameters => {
263                                :uuid => @pi[:uuid]
264                              },
265                              :body_object => {
266                                :pipeline_instance => @attributes_to_update
267                              },
268                              :authenticated => false,
269                              :headers => {
270                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
271                              })
272     j = JSON.parse result.body, :symbolize_names => true
273     unless j.is_a? Hash and j[:uuid]
274       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
275       nil
276     else
277       @attributes_to_update = {}
278       @pi = j
279     end
280   end
281   def []=(x,y)
282     @attributes_to_update[x] = y
283     @pi[x] = y
284   end
285   def [](x)
286     @pi[x]
287   end
288
289   def log_stderr(msg)
290     $arv.log.create log: {
291       event_type: 'stderr',
292       object_uuid: self[:uuid],
293       owner_uuid: self[:owner_uuid],
294       properties: {"text" => msg},
295     }
296   end
297
298   protected
299   def initialize(j)
300     @attributes_to_update = {}
301     @pi = j
302   end
303 end
304
305 class JobCache
306   def self.get(uuid)
307     @cache ||= {}
308     result = $client.execute(:api_method => $arvados.jobs.get,
309                              :parameters => {
310                                :uuid => uuid
311                              },
312                              :authenticated => false,
313                              :headers => {
314                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
315                              })
316     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
317   end
318   def self.where(conditions)
319     result = $client.execute(:api_method => $arvados.jobs.list,
320                              :parameters => {
321                                :limit => 10000,
322                                :where => conditions.to_json
323                              },
324                              :authenticated => false,
325                              :headers => {
326                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
327                              })
328     list = JSON.parse result.body, :symbolize_names => true
329     if list and list[:items].is_a? Array
330       list[:items]
331     else
332       []
333     end
334   end
335   def self.create(pipeline, component, job, create_params)
336     @cache ||= {}
337
338     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
339
340     result = $client.execute(:api_method => $arvados.jobs.create,
341                              :body_object => body,
342                              :authenticated => false,
343                              :headers => {
344                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
345                              })
346     j = JSON.parse result.body, :symbolize_names => true
347     if j.is_a? Hash and j[:uuid]
348       @cache[j[:uuid]] = j
349     else
350       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
351
352       msg = ""
353       j[:errors].each do |err|
354         msg += "Error creating job for component #{component}: #{err}\n"
355       end
356       msg += "Job submission was: #{body.to_json}"
357
358       pipeline.log_stderr(msg)
359       nil
360     end
361   end
362
363   protected
364
365   def self.no_nil_values(hash)
366     hash.reject { |key, value| value.nil? }
367   end
368 end
369
370 class WhRunPipelineInstance
371   attr_reader :instance
372
373   def initialize(_options)
374     @options = _options
375   end
376
377   def fetch_template(template)
378     if template.match /[^-0-9a-z]/
379       # Doesn't look like a uuid -- use it as a filename.
380       @template = JSON.parse File.read(template), :symbolize_names => true
381     else
382       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
383                                :parameters => {
384                                  :uuid => template
385                                },
386                                :authenticated => false,
387                                :headers => {
388                                  authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
389                                })
390       @template = JSON.parse result.body, :symbolize_names => true
391       if !@template[:uuid]
392         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
393       end
394     end
395     self
396   end
397
398   def fetch_instance(instance_uuid)
399     @instance = PipelineInstance.find(instance_uuid)
400     @template = @instance
401     self
402   end
403
404   def apply_parameters(params_args)
405     params_args.shift if params_args[0] == '--'
406     params = {}
407     while !params_args.empty?
408       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
409         params[re[2]] = re[3]
410         params_args.shift
411       elsif params_args.size > 1
412         param = params_args.shift.sub /^--/, ''
413         params[param] = params_args.shift
414       else
415         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
416       end
417     end
418
419     if not @template[:components].is_a?(Hash)
420       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
421     end
422     @components = @template[:components].dup
423
424     bad_components = @components.each_pair.select do |cname, cspec|
425       not cspec.is_a?(Hash)
426     end
427     if bad_components.any?
428       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
429     end
430
431     bad_components = @components.each_pair.select do |cname, cspec|
432       not cspec[:script_parameters].is_a?(Hash)
433     end
434     if bad_components.any?
435       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
436     end
437
438     errors = []
439     @components.each do |componentname, component|
440       component[:script_parameters].each do |parametername, parameter|
441         parameter = { :value => parameter } unless parameter.is_a? Hash
442         value =
443           (params["#{componentname}::#{parametername}"] ||
444            parameter[:value] ||
445            (parameter[:output_of].nil? &&
446             (params[parametername.to_s] ||
447              parameter[:default])) ||
448            nil)
449         if value.nil? and
450             ![false,'false',0,'0'].index parameter[:required]
451           if parameter[:output_of]
452             next
453           end
454           errors << [componentname, parametername, "required parameter is missing"]
455         end
456         debuglog "parameter #{componentname}::#{parametername} == #{value}"
457         component[:script_parameters][parametername] = value
458       end
459     end
460     if !errors.empty?
461       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
462     end
463     debuglog "options=" + @options.pretty_inspect
464     self
465   end
466
467   def setup_instance
468     if @instance
469       @instance[:properties][:run_options] ||= {}
470       if @options[:no_reuse]
471         # override properties of existing instance
472         @instance[:properties][:run_options][:enable_job_reuse] = false
473       else
474         # Default to "enable reuse" if not specified. (This code path
475         # can go away when old clients go away.)
476         if @instance[:properties][:run_options][:enable_job_reuse].nil?
477           @instance[:properties][:run_options][:enable_job_reuse] = true
478         end
479       end
480     else
481       @instance = PipelineInstance.
482         create(components: @components,
483                properties: {
484                  run_options: {
485                    enable_job_reuse: !@options[:no_reuse]
486                  }
487                },
488                pipeline_template_uuid: @template[:uuid],
489                state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
490     end
491     self
492   end
493
494   def run
495     moretodo = true
496     interrupted = false
497
498     if @instance[:started_at].nil?
499       @instance[:started_at] = Time.now
500     end
501
502     job_creation_failed = 0
503     while moretodo
504       moretodo = false
505       @components.each do |cname, c|
506         job = nil
507         owner_uuid = @instance[:owner_uuid]
508         # Is the job satisfying this component already known to be
509         # finished? (Already meaning "before we query API server about
510         # the job's current state")
511         c_already_finished = (c[:job] &&
512                               c[:job][:uuid] &&
513                               !c[:job][:success].nil?)
514         if !c[:job] and
515             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
516           # No job yet associated with this component and is component inputs
517           # are fully specified (any output_of script_parameters are resolved
518           # to real value)
519           job = JobCache.create(@instance, cname, {
520             :script => c[:script],
521             :script_parameters => c[:script_parameters],
522             :script_version => c[:script_version],
523             :repository => c[:repository],
524             :nondeterministic => c[:nondeterministic],
525             :runtime_constraints => c[:runtime_constraints],
526             :owner_uuid => owner_uuid,
527           }, {
528             # This is the right place to put these attributes when
529             # dealing with new API servers.
530             :minimum_script_version => c[:minimum_script_version],
531             :exclude_script_versions => c[:exclude_minimum_script_versions],
532             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
533                                 !c[:nondeterministic]),
534             :filters => c[:filters]
535           })
536           if job
537             debuglog "component #{cname} new job #{job[:uuid]}"
538             c[:job] = job
539           else
540             debuglog "component #{cname} new job failed", 0
541             job_creation_failed += 1
542           end
543         end
544
545         if c[:job] and c[:job][:uuid]
546           if (c[:job][:running] or
547               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
548             # Job is running so update copy of job record
549             c[:job] = JobCache.get(c[:job][:uuid])
550           end
551
552           if c[:job][:success]
553             # Populate script_parameters of other components waiting for
554             # this job
555             @components.each do |c2name, c2|
556               c2[:script_parameters].each do |pname, p|
557                 if p.is_a? Hash and p[:output_of] == cname.to_s
558                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
559                   c2[:script_parameters][pname] = c[:job][:output]
560                   moretodo = true
561                 end
562               end
563             end
564             unless c_already_finished
565               # This is my first time discovering that the job
566               # succeeded. (At the top of this loop, I was still
567               # waiting for it to finish.)
568
569               debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
570               if (not @instance[:name].nil?) and (not @instance[:name].empty?)
571                 pipeline_name = @instance[:name]
572               else
573                 fetch_template(@instance[:pipeline_template_uuid])
574                 pipeline_name = @template[:name]
575               end
576               if c[:output_name] != false
577                 # Create a collection located in the same project as the pipeline with the contents of the output.
578                 portable_data_hash = c[:job][:output]
579                 collections = $arv.collection.list(limit: 1,
580                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
581                                                    select: ["portable_data_hash", "manifest_text"]
582                                                    )[:items]
583                 if collections.any?
584                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
585
586                   # check if there is a name collision.
587                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
588                                                                    ["name", "=", name]])[:items]
589
590                   newcollection_actual = nil
591                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
592                     # There is already a collection with the same name and the
593                     # same contents, so just point to that.
594                     newcollection_actual = name_collisions.first
595                   end
596
597                   if newcollection_actual.nil?
598                     # Did not find a collection with the same name (or the
599                     # collection has a different portable data hash) so create
600                     # a new collection with ensure_unique_name: true.
601                     newcollection = {
602                       owner_uuid: owner_uuid,
603                       name: name,
604                       portable_data_hash: collections.first[:portable_data_hash],
605                       manifest_text: collections.first[:manifest_text]
606                     }
607                     debuglog "Creating collection #{newcollection}", 0
608                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
609                   end
610
611                   c[:output_uuid] = newcollection_actual[:uuid]
612                 else
613                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
614                 end
615               end
616             end
617           elsif c[:job][:running] ||
618               (!c[:job][:started_at] && !c[:job][:cancelled_at])
619             # Job is still running
620             moretodo = true
621           elsif c[:job][:cancelled_at]
622             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
623           end
624         end
625       end
626       @instance[:components] = @components
627       report_status
628
629       if @options[:no_wait]
630         moretodo = false
631       end
632
633       # If job creation fails, just give up on this pipeline instance.
634       if job_creation_failed > 0
635         moretodo = false
636       end
637
638       if moretodo
639         begin
640           sleep 10
641         rescue Interrupt
642           debuglog "interrupt", 0
643           interrupted = true
644           break
645         end
646       end
647     end
648
649     ended = 0
650     succeeded = 0
651     failed = 0
652     @components.each do |cname, c|
653       if c[:job]
654         if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
655           ended += 1
656           if c[:job][:success] == true
657             succeeded += 1
658           elsif c[:job][:success] == false or c[:job][:cancelled_at]
659             failed += 1
660           end
661         end
662       end
663     end
664
665     success = (succeeded == @components.length)
666
667     # A job create call failed. Just give up.
668     if job_creation_failed > 0
669       debuglog "job creation failed - giving up on this pipeline instance", 0
670       success = false
671       failed += 1
672     end
673
674     if interrupted
675      if success
676         @instance[:state] = 'Complete'
677      else
678         @instance[:state] = 'Paused'
679       end
680     else
681       if ended == @components.length or failed > 0
682         @instance[:state] = success ? 'Complete' : 'Failed'
683       end
684     end
685
686     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
687       @instance[:finished_at] = Time.now
688     end
689
690     debuglog "pipeline instance state is #{@instance[:state]}"
691
692     # set components_summary
693     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
694     @instance[:components_summary] = components_summary
695
696     @instance.save
697   end
698
699   def cleanup
700     if @instance and @instance[:state] == 'RunningOnClient'
701       @instance[:state] = 'Paused'
702       @instance.save
703     end
704   end
705
706   def uuid
707     @instance[:uuid]
708   end
709
710   protected
711
712   def report_status
713     @instance.save
714
715     if @options[:status_json] != '/dev/null'
716       File.open(@options[:status_json], 'w') do |f|
717         f.puts @components.pretty_inspect
718       end
719     end
720
721     if @options[:status_text] != '/dev/null'
722       File.open(@options[:status_text], 'w') do |f|
723         f.puts ""
724         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
725         namewidth = @components.collect { |cname, c| cname.size }.max
726         @components.each do |cname, c|
727           jstatus = if !c[:job]
728                       "-"
729                     elsif c[:job][:running]
730                       "#{c[:job][:tasks_summary].inspect}"
731                     elsif c[:job][:success]
732                       c[:job][:output]
733                     elsif c[:job][:cancelled_at]
734                       "cancelled #{c[:job][:cancelled_at]}"
735                     elsif c[:job][:finished_at]
736                       "failed #{c[:job][:finished_at]}"
737                     elsif c[:job][:started_at]
738                       "started #{c[:job][:started_at]}"
739                     else
740                       "queued #{c[:job][:created_at]}"
741                     end
742           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
743         end
744       end
745     end
746   end
747
748   def abort(msg)
749     if @instance
750       if ["New", "Ready", "RunningOnClient",
751           "RunningOnServer"].include?(@instance[:state])
752         @instance[:state] = "Failed"
753         @instance[:finished_at] = Time.now
754         @instance.save
755       end
756       @instance.log_stderr(msg)
757     end
758     Kernel::abort(msg)
759   end
760 end
761
762 runner = WhRunPipelineInstance.new($options)
763 begin
764   if $options[:template]
765     runner.fetch_template($options[:template])
766   else
767     runner.fetch_instance($options[:instance])
768   end
769   runner.apply_parameters(p.leftovers)
770   runner.setup_instance
771   if $options[:submit]
772     runner.instance.save
773     puts runner.instance[:uuid]
774   else
775     runner.run
776   end
777 rescue Exception => e
778   runner.cleanup
779   raise e
780 end