closes #2875
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # [--description] Description for the pipeline instance.
46 #
47 # == Parameters
48 #
49 # [param_name=param_value]
50 #
51 # [param_name param_value] Set (or override) the default value for
52 #                          every parameter with the given name.
53 #
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 #                                            parameter for a single
59 #                                            component.
60 #
61 class WhRunPipelineInstance
62 end
63
64 $application_version = 1.0
65
66 if RUBY_VERSION < '1.9.3' then
67   abort <<-EOS
68 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
69   EOS
70 end
71
72 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
73 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
74   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
75 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
76   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
77
78 begin
79   require 'arvados'
80   require 'rubygems'
81   require 'json'
82   require 'pp'
83   require 'trollop'
84   require 'google/api_client'
85 rescue LoadError => l
86   puts $:
87   abort <<-EOS
88 #{$0}: fatal: #{l.message}
89 Some runtime dependencies may be missing.
90 Try: gem install arvados pp google-api-client json trollop
91   EOS
92 end
93
94 def debuglog(message, verbosity=1)
95   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
96 end
97
98 module Kernel
99   def suppress_warnings
100     original_verbosity = $VERBOSE
101     $VERBOSE = nil
102     result = yield
103     $VERBOSE = original_verbosity
104     return result
105   end
106 end
107
108 if $arvados_api_host.match /local/
109   # You probably don't care about SSL certificate checks if you're
110   # testing with a dev server.
111   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 end
113
114 class Google::APIClient
115   def discovery_document(api, version)
116     api = api.to_s
117     return @discovery_documents["#{api}:#{version}"] ||=
118       begin
119         response = self.execute!(
120                                  :http_method => :get,
121                                  :uri => self.discovery_uri(api, version),
122                                  :authenticated => false
123                                  )
124         response.body.class == String ? JSON.parse(response.body) : response.body
125       end
126   end
127 end
128
129
130 # Parse command line options (the kind that control the behavior of
131 # this program, that is, not the pipeline component parameters).
132
133 p = Trollop::Parser.new do
134   version __FILE__
135   opt(:dry_run,
136       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
137       :type => :boolean,
138       :short => :n)
139   opt(:status_text,
140       "Store plain text status in given file.",
141       :short => :none,
142       :type => :string,
143       :default => '/dev/stdout')
144   opt(:status_json,
145       "Store json-formatted pipeline in given file.",
146       :short => :none,
147       :type => :string,
148       :default => '/dev/null')
149   opt(:no_wait,
150       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
151       :short => :none,
152       :type => :boolean)
153   opt(:no_reuse,
154       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
155       :short => :none,
156       :type => :boolean)
157   opt(:debug,
158       "Print extra debugging information on stderr.",
159       :type => :boolean)
160   opt(:debug_level,
161       "Set debug verbosity level.",
162       :short => :none,
163       :type => :integer)
164   opt(:template,
165       "UUID of pipeline template, or path to local pipeline template file.",
166       :short => :none,
167       :type => :string)
168   opt(:instance,
169       "UUID of pipeline instance.",
170       :short => :none,
171       :type => :string)
172   opt(:submit,
173       "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
174       :short => :none,
175       :type => :boolean)
176   opt(:run_here,
177       "Manage the pipeline in process.",
178       :short => :none,
179       :type => :boolean)
180   opt(:description,
181       "Description for the pipeline instance.",
182       :short => :none,
183       :type => :string)
184   stop_on [:'--']
185 end
186 $options = Trollop::with_standard_exception_handling p do
187   p.parse ARGV
188 end
189 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
190
191 if $options[:instance]
192   if $options[:template] or $options[:submit]
193     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
194   end
195 elsif not $options[:template]
196   puts "error: you must supply a --template or --instance."
197   p.educate
198   abort
199 end
200
201 if $options[:run_here] == $options[:submit]
202   abort "#{$0}: syntax error: you must supply either --run-here or --submit."
203 end
204
205 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
206
207 module Kernel
208   def suppress_warnings
209     original_verbosity = $VERBOSE
210     $VERBOSE = nil
211     result = yield
212     $VERBOSE = original_verbosity
213     return result
214   end
215 end
216
217 if ENV['ARVADOS_API_HOST_INSECURE']
218   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
219 end
220
221 # Set up the API client.
222
223 $client ||= Google::APIClient.
224   new(:host => $arvados_api_host,
225       :application_name => File.split($0).last,
226       :application_version => $application_version.to_s)
227 $arvados = $client.discovered_api('arvados', $arvados_api_version)
228 $arv = Arvados.new api_version: 'v1'
229
230
231 class PipelineInstance
232   def self.find(uuid)
233     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
234                              :parameters => {
235                                :uuid => uuid
236                              },
237                              :authenticated => false,
238                              :headers => {
239                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
240                              })
241     j = JSON.parse result.body, :symbolize_names => true
242     unless j.is_a? Hash and j[:uuid]
243       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
244       nil
245     else
246       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
247       self.new(j)
248     end
249   end
250   def self.create(attributes)
251     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
252                              :body_object => {
253                                :pipeline_instance => attributes
254                              },
255                              :authenticated => false,
256                              :headers => {
257                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
258                              })
259     j = JSON.parse result.body, :symbolize_names => true
260     unless j.is_a? Hash and j[:uuid]
261       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
262     end
263     debuglog "Created pipeline instance: #{j[:uuid]}"
264     self.new(j)
265   end
266   def save
267     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
268                              :parameters => {
269                                :uuid => @pi[:uuid]
270                              },
271                              :body_object => {
272                                :pipeline_instance => @attributes_to_update
273                              },
274                              :authenticated => false,
275                              :headers => {
276                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
277                              })
278     j = JSON.parse result.body, :symbolize_names => true
279     unless j.is_a? Hash and j[:uuid]
280       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
281       nil
282     else
283       @attributes_to_update = {}
284       @pi = j
285     end
286   end
287   def []=(x,y)
288     @attributes_to_update[x] = y
289     @pi[x] = y
290   end
291   def [](x)
292     @pi[x]
293   end
294
295   def log_stderr(msg)
296     $arv.log.create log: {
297       event_type: 'stderr',
298       object_uuid: self[:uuid],
299       owner_uuid: self[:owner_uuid],
300       properties: {"text" => msg},
301     }
302   end
303
304   protected
305   def initialize(j)
306     @attributes_to_update = {}
307     @pi = j
308   end
309 end
310
311 class JobCache
312   def self.get(uuid)
313     @cache ||= {}
314     result = $client.execute(:api_method => $arvados.jobs.get,
315                              :parameters => {
316                                :uuid => uuid
317                              },
318                              :authenticated => false,
319                              :headers => {
320                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
321                              })
322     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
323   end
324   def self.where(conditions)
325     result = $client.execute(:api_method => $arvados.jobs.list,
326                              :parameters => {
327                                :limit => 10000,
328                                :where => conditions.to_json
329                              },
330                              :authenticated => false,
331                              :headers => {
332                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
333                              })
334     list = JSON.parse result.body, :symbolize_names => true
335     if list and list[:items].is_a? Array
336       list[:items]
337     else
338       []
339     end
340   end
341   def self.create(pipeline, component, job, create_params)
342     @cache ||= {}
343
344     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
345
346     result = $client.execute(:api_method => $arvados.jobs.create,
347                              :body_object => body,
348                              :authenticated => false,
349                              :headers => {
350                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
351                              })
352     j = JSON.parse result.body, :symbolize_names => true
353     if j.is_a? Hash and j[:uuid]
354       @cache[j[:uuid]] = j
355     else
356       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
357
358       msg = ""
359       j[:errors].each do |err|
360         msg += "Error creating job for component #{component}: #{err}\n"
361       end
362       msg += "Job submission was: #{body.to_json}"
363
364       pipeline.log_stderr(msg)
365       nil
366     end
367   end
368
369   protected
370
371   def self.no_nil_values(hash)
372     hash.reject { |key, value| value.nil? }
373   end
374 end
375
376 class WhRunPipelineInstance
377   attr_reader :instance
378
379   def initialize(_options)
380     @options = _options
381   end
382
383   def fetch_template(template)
384     if template.match /[^-0-9a-z]/
385       # Doesn't look like a uuid -- use it as a filename.
386       @template = JSON.parse File.read(template), :symbolize_names => true
387     else
388       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
389                                :parameters => {
390                                  :uuid => template
391                                },
392                                :authenticated => false,
393                                :headers => {
394                                  authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
395                                })
396       @template = JSON.parse result.body, :symbolize_names => true
397       if !@template[:uuid]
398         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
399       end
400     end
401     self
402   end
403
404   def fetch_instance(instance_uuid)
405     @instance = PipelineInstance.find(instance_uuid)
406     @template = @instance
407     self
408   end
409
410   def apply_parameters(params_args)
411     params_args.shift if params_args[0] == '--'
412     params = {}
413     while !params_args.empty?
414       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
415         params[re[2]] = re[3]
416         params_args.shift
417       elsif params_args.size > 1
418         param = params_args.shift.sub /^--/, ''
419         params[param] = params_args.shift
420       else
421         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
422       end
423     end
424
425     if not @template[:components].is_a?(Hash)
426       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
427     end
428     @components = @template[:components].dup
429
430     bad_components = @components.each_pair.select do |cname, cspec|
431       not cspec.is_a?(Hash)
432     end
433     if bad_components.any?
434       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
435     end
436
437     bad_components = @components.each_pair.select do |cname, cspec|
438       not cspec[:script_parameters].is_a?(Hash)
439     end
440     if bad_components.any?
441       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
442     end
443
444     errors = []
445     @components.each do |componentname, component|
446       component[:script_parameters].each do |parametername, parameter|
447         parameter = { :value => parameter } unless parameter.is_a? Hash
448         value =
449           (params["#{componentname}::#{parametername}"] ||
450            parameter[:value] ||
451            (parameter[:output_of].nil? &&
452             (params[parametername.to_s] ||
453              parameter[:default])) ||
454            nil)
455         if value.nil? and
456             ![false,'false',0,'0'].index parameter[:required]
457           if parameter[:output_of]
458             next
459           end
460           errors << [componentname, parametername, "required parameter is missing"]
461         end
462         debuglog "parameter #{componentname}::#{parametername} == #{value}"
463         component[:script_parameters][parametername] = value
464       end
465     end
466     if !errors.empty?
467       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
468     end
469     debuglog "options=" + @options.pretty_inspect
470     self
471   end
472
473   def setup_instance
474     if @instance
475       @instance[:properties][:run_options] ||= {}
476       if @options[:no_reuse]
477         # override properties of existing instance
478         @instance[:properties][:run_options][:enable_job_reuse] = false
479       else
480         # Default to "enable reuse" if not specified. (This code path
481         # can go away when old clients go away.)
482         if @instance[:properties][:run_options][:enable_job_reuse].nil?
483           @instance[:properties][:run_options][:enable_job_reuse] = true
484         end
485       end
486     else
487       description = $options[:description]
488       description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
489       @instance = PipelineInstance.
490         create(components: @components,
491                properties: {
492                  run_options: {
493                    enable_job_reuse: !@options[:no_reuse]
494                  }
495                },
496                pipeline_template_uuid: @template[:uuid],
497                description: description,
498                state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
499     end
500     self
501   end
502
503   def run
504     moretodo = true
505     interrupted = false
506
507     if @instance[:started_at].nil?
508       @instance[:started_at] = Time.now
509     end
510
511     job_creation_failed = 0
512     while moretodo
513       moretodo = false
514       @components.each do |cname, c|
515         job = nil
516         owner_uuid = @instance[:owner_uuid]
517         # Is the job satisfying this component already known to be
518         # finished? (Already meaning "before we query API server about
519         # the job's current state")
520         c_already_finished = (c[:job] &&
521                               c[:job][:uuid] &&
522                               !c[:job][:success].nil?)
523         if !c[:job] and
524             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
525           # No job yet associated with this component and is component inputs
526           # are fully specified (any output_of script_parameters are resolved
527           # to real value)
528           job = JobCache.create(@instance, cname, {
529             :script => c[:script],
530             :script_parameters => c[:script_parameters],
531             :script_version => c[:script_version],
532             :repository => c[:repository],
533             :nondeterministic => c[:nondeterministic],
534             :runtime_constraints => c[:runtime_constraints],
535             :owner_uuid => owner_uuid,
536           }, {
537             # This is the right place to put these attributes when
538             # dealing with new API servers.
539             :minimum_script_version => c[:minimum_script_version],
540             :exclude_script_versions => c[:exclude_minimum_script_versions],
541             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
542                                 !c[:nondeterministic]),
543             :filters => c[:filters]
544           })
545           if job
546             debuglog "component #{cname} new job #{job[:uuid]}"
547             c[:job] = job
548           else
549             debuglog "component #{cname} new job failed", 0
550             job_creation_failed += 1
551           end
552         end
553
554         if c[:job] and c[:job][:uuid]
555           if (c[:job][:running] or
556               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
557             # Job is running so update copy of job record
558             c[:job] = JobCache.get(c[:job][:uuid])
559           end
560
561           if c[:job][:success]
562             # Populate script_parameters of other components waiting for
563             # this job
564             @components.each do |c2name, c2|
565               c2[:script_parameters].each do |pname, p|
566                 if p.is_a? Hash and p[:output_of] == cname.to_s
567                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
568                   c2[:script_parameters][pname] = c[:job][:output]
569                   moretodo = true
570                 end
571               end
572             end
573             unless c_already_finished
574               # This is my first time discovering that the job
575               # succeeded. (At the top of this loop, I was still
576               # waiting for it to finish.)
577
578               debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
579               if (not @instance[:name].nil?) and (not @instance[:name].empty?)
580                 pipeline_name = @instance[:name]
581               else
582                 fetch_template(@instance[:pipeline_template_uuid])
583                 pipeline_name = @template[:name]
584               end
585               if c[:output_name] != false
586                 # Create a collection located in the same project as the pipeline with the contents of the output.
587                 portable_data_hash = c[:job][:output]
588                 collections = $arv.collection.list(limit: 1,
589                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
590                                                    select: ["portable_data_hash", "manifest_text"]
591                                                    )[:items]
592                 if collections.any?
593                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
594
595                   # check if there is a name collision.
596                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
597                                                                    ["name", "=", name]])[:items]
598
599                   newcollection_actual = nil
600                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
601                     # There is already a collection with the same name and the
602                     # same contents, so just point to that.
603                     newcollection_actual = name_collisions.first
604                   end
605
606                   if newcollection_actual.nil?
607                     # Did not find a collection with the same name (or the
608                     # collection has a different portable data hash) so create
609                     # a new collection with ensure_unique_name: true.
610                     newcollection = {
611                       owner_uuid: owner_uuid,
612                       name: name,
613                       portable_data_hash: collections.first[:portable_data_hash],
614                       manifest_text: collections.first[:manifest_text]
615                     }
616                     debuglog "Creating collection #{newcollection}", 0
617                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
618                   end
619
620                   c[:output_uuid] = newcollection_actual[:uuid]
621                 else
622                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
623                 end
624               end
625             end
626           elsif c[:job][:running] ||
627               (!c[:job][:started_at] && !c[:job][:cancelled_at])
628             # Job is still running
629             moretodo = true
630           elsif c[:job][:cancelled_at]
631             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
632           end
633         end
634       end
635       @instance[:components] = @components
636       report_status
637
638       if @options[:no_wait]
639         moretodo = false
640       end
641
642       # If job creation fails, just give up on this pipeline instance.
643       if job_creation_failed > 0
644         moretodo = false
645       end
646
647       if moretodo
648         begin
649           sleep 10
650         rescue Interrupt
651           debuglog "interrupt", 0
652           interrupted = true
653           break
654         end
655       end
656     end
657
658     ended = 0
659     succeeded = 0
660     failed = 0
661     @components.each do |cname, c|
662       if c[:job]
663         if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
664           ended += 1
665           if c[:job][:success] == true
666             succeeded += 1
667           elsif c[:job][:success] == false or c[:job][:cancelled_at]
668             failed += 1
669           end
670         end
671       end
672     end
673
674     success = (succeeded == @components.length)
675
676     # A job create call failed. Just give up.
677     if job_creation_failed > 0
678       debuglog "job creation failed - giving up on this pipeline instance", 0
679       success = false
680       failed += 1
681     end
682
683     if interrupted
684      if success
685         @instance[:state] = 'Complete'
686      else
687         @instance[:state] = 'Paused'
688       end
689     else
690       if ended == @components.length or failed > 0
691         @instance[:state] = success ? 'Complete' : 'Failed'
692       end
693     end
694
695     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
696       @instance[:finished_at] = Time.now
697     end
698
699     debuglog "pipeline instance state is #{@instance[:state]}"
700
701     # set components_summary
702     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
703     @instance[:components_summary] = components_summary
704
705     @instance.save
706   end
707
708   def cleanup
709     if @instance and @instance[:state] == 'RunningOnClient'
710       @instance[:state] = 'Paused'
711       @instance.save
712     end
713   end
714
715   def uuid
716     @instance[:uuid]
717   end
718
719   protected
720
721   def report_status
722     @instance.save
723
724     if @options[:status_json] != '/dev/null'
725       File.open(@options[:status_json], 'w') do |f|
726         f.puts @components.pretty_inspect
727       end
728     end
729
730     if @options[:status_text] != '/dev/null'
731       File.open(@options[:status_text], 'w') do |f|
732         f.puts ""
733         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
734         namewidth = @components.collect { |cname, c| cname.size }.max
735         @components.each do |cname, c|
736           jstatus = if !c[:job]
737                       "-"
738                     elsif c[:job][:running]
739                       "#{c[:job][:tasks_summary].inspect}"
740                     elsif c[:job][:success]
741                       c[:job][:output]
742                     elsif c[:job][:cancelled_at]
743                       "cancelled #{c[:job][:cancelled_at]}"
744                     elsif c[:job][:finished_at]
745                       "failed #{c[:job][:finished_at]}"
746                     elsif c[:job][:started_at]
747                       "started #{c[:job][:started_at]}"
748                     else
749                       "queued #{c[:job][:created_at]}"
750                     end
751           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
752         end
753       end
754     end
755   end
756
757   def abort(msg)
758     if @instance
759       if ["New", "Ready", "RunningOnClient",
760           "RunningOnServer"].include?(@instance[:state])
761         @instance[:state] = "Failed"
762         @instance[:finished_at] = Time.now
763         @instance.save
764       end
765       @instance.log_stderr(msg)
766     end
767     Kernel::abort(msg)
768   end
769 end
770
771 runner = WhRunPipelineInstance.new($options)
772 begin
773   if $options[:template]
774     runner.fetch_template($options[:template])
775   else
776     runner.fetch_instance($options[:instance])
777   end
778   runner.apply_parameters(p.leftovers)
779   runner.setup_instance
780   if $options[:submit]
781     runner.instance.save
782     puts runner.instance[:uuid]
783   else
784     runner.run
785   end
786 rescue Exception => e
787   runner.cleanup
788   raise e
789 end