11843: Improve formatting.
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 class WhRunPipelineInstance
7 end
8
9 if RUBY_VERSION < '1.9.3' then
10   abort <<-EOS
11 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
12   EOS
13 end
14
15 begin
16   require 'arvados'
17   require 'rubygems'
18   require 'json'
19   require 'pp'
20   require 'trollop'
21   require 'google/api_client'
22 rescue LoadError => l
23   $stderr.puts $:
24   abort <<-EOS
25 #{$0}: fatal: #{l.message}
26 Some runtime dependencies may be missing.
27 Try: gem install arvados pp google-api-client json trollop
28   EOS
29 end
30
31 def debuglog(message, verbosity=1)
32   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
33 end
34
35 # Parse command line options (the kind that control the behavior of
36 # this program, that is, not the pipeline component parameters).
37
38 p = Trollop::Parser.new do
39   version __FILE__
40   banner(<<EOF)
41
42 Usage:
43   arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
44   arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
45
46 Parameters:
47   param_name=param_value
48   param_name param_value
49                          Set (or override) the default value for every
50                          pipeline component parameter with the given
51                          name.
52
53   component_name::param_name=param_value
54   component_name::param_name param_value
55   --component_name::param_name=param_value
56   --component_name::param_name param_value
57                          Set the value of a parameter for a single
58                          pipeline component.
59
60 Options:
61 EOF
62   opt(:dry_run,
63       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
64       :type => :boolean,
65       :short => :n)
66   opt(:status_text,
67       "Store plain text status in given file.",
68       :short => :none,
69       :type => :string,
70       :default => '/dev/stdout')
71   opt(:status_json,
72       "Store json-formatted pipeline in given file.",
73       :short => :none,
74       :type => :string,
75       :default => '/dev/null')
76   opt(:no_wait,
77       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
78       :short => :none,
79       :type => :boolean)
80   opt(:no_reuse,
81       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
82       :short => :none,
83       :type => :boolean)
84   opt(:debug,
85       "Print extra debugging information on stderr.",
86       :type => :boolean)
87   opt(:debug_level,
88       "Set debug verbosity level.",
89       :short => :none,
90       :type => :integer)
91   opt(:template,
92       "UUID of pipeline template, or path to local pipeline template file.",
93       :short => :none,
94       :type => :string)
95   opt(:instance,
96       "UUID of pipeline instance.",
97       :short => :none,
98       :type => :string)
99   opt(:submit,
100       "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
101       :short => :none,
102       :type => :boolean)
103   opt(:run_pipeline_here,
104       "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
105       :short => :none,
106       :type => :boolean)
107   opt(:run_jobs_here,
108       "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
109       :short => :none,
110       :type => :boolean)
111   opt(:run_here,
112       "Synonym for --run-jobs-here.",
113       :short => :none,
114       :type => :boolean)
115   opt(:description,
116       "Description for the pipeline instance.",
117       :short => :none,
118       :type => :string)
119   opt(:project_uuid,
120       "UUID of the project for the pipeline instance.",
121       short: :none,
122       type: :string)
123   stop_on [:'--']
124 end
125 $options = Trollop::with_standard_exception_handling p do
126   p.parse ARGV
127 end
128 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
129
130 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
131 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
132
133 if $options[:instance]
134   if $options[:template] or $options[:submit]
135     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
136   end
137 elsif not $options[:template]
138   $stderr.puts "error: you must supply a --template or --instance."
139   p.educate
140   abort
141 end
142
143 if $options[:run_pipeline_here] == $options[:submit]
144   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
145 end
146
147 # Set up the API client.
148
149 $arv = Arvados.new api_version: 'v1'
150 $client = $arv.client
151 $arvados = $arv.arvados_api
152
153 class PipelineInstance
154   def self.find(uuid)
155     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
156                              :parameters => {
157                                :uuid => uuid
158                              },
159                              :authenticated => false,
160                              :headers => {
161                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
162                              })
163     j = JSON.parse result.body, :symbolize_names => true
164     unless j.is_a? Hash and j[:uuid]
165       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
166       nil
167     else
168       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
169       self.new(j)
170     end
171   end
172   def self.create(attributes)
173     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
174                              :body_object => {
175                                :pipeline_instance => attributes
176                              },
177                              :authenticated => false,
178                              :headers => {
179                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
180                              })
181     j = JSON.parse result.body, :symbolize_names => true
182     unless j.is_a? Hash and j[:uuid]
183       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
184     end
185     debuglog "Created pipeline instance: #{j[:uuid]}"
186     self.new(j)
187   end
188   def save
189     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
190                              :parameters => {
191                                :uuid => @pi[:uuid]
192                              },
193                              :body_object => {
194                                :pipeline_instance => @attributes_to_update
195                              },
196                              :authenticated => false,
197                              :headers => {
198                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
199                              })
200     j = JSON.parse result.body, :symbolize_names => true
201     unless j.is_a? Hash and j[:uuid]
202       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
203       nil
204     else
205       @attributes_to_update = {}
206       @pi = j
207     end
208   end
209   def []=(x,y)
210     @attributes_to_update[x] = y
211     @pi[x] = y
212   end
213   def [](x)
214     @pi[x]
215   end
216
217   def log_stderr(msg)
218     $arv.log.create log: {
219       event_type: 'stderr',
220       object_uuid: self[:uuid],
221       owner_uuid: self[:owner_uuid],
222       properties: {"text" => msg},
223     }
224   end
225
226   protected
227   def initialize(j)
228     @attributes_to_update = {}
229     @pi = j
230   end
231 end
232
233 class JobCache
234   def self.get(uuid)
235     @cache ||= {}
236     result = $client.execute(:api_method => $arvados.jobs.get,
237                              :parameters => {
238                                :uuid => uuid
239                              },
240                              :authenticated => false,
241                              :headers => {
242                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
243                              })
244     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
245   end
246   def self.where(conditions)
247     result = $client.execute(:api_method => $arvados.jobs.list,
248                              :parameters => {
249                                :limit => 10000,
250                                :where => conditions.to_json
251                              },
252                              :authenticated => false,
253                              :headers => {
254                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
255                              })
256     list = JSON.parse result.body, :symbolize_names => true
257     if list and list[:items].is_a? Array
258       list[:items]
259     else
260       []
261     end
262   end
263   def self.create(pipeline, component, job, create_params)
264     @cache ||= {}
265
266     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
267
268     result = $client.execute(:api_method => $arvados.jobs.create,
269                              :body_object => body,
270                              :authenticated => false,
271                              :headers => {
272                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
273                              })
274     j = JSON.parse result.body, :symbolize_names => true
275     if j.is_a? Hash and j[:uuid]
276       @cache[j[:uuid]] = j
277     else
278       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
279
280       msg = ""
281       j[:errors].each do |err|
282         msg += "Error creating job for component #{component}: #{err}\n"
283       end
284       msg += "Job submission was: #{body.to_json}"
285
286       pipeline.log_stderr(msg)
287       nil
288     end
289   end
290
291   protected
292
293   def self.no_nil_values(hash)
294     hash.reject { |key, value| value.nil? }
295   end
296 end
297
298 class WhRunPipelineInstance
299   attr_reader :instance
300
301   def initialize(_options)
302     @options = _options
303   end
304
305   def fetch_template(template)
306     if template.match /[^-0-9a-z]/
307       # Doesn't look like a uuid -- use it as a filename.
308       @template = JSON.parse File.read(template), :symbolize_names => true
309     else
310       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
311                                :parameters => {
312                                  :uuid => template
313                                },
314                                :authenticated => false,
315                                :headers => {
316                                  authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
317                                })
318       @template = JSON.parse result.body, :symbolize_names => true
319       if !@template[:uuid]
320         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
321       end
322     end
323     self
324   end
325
326   def fetch_instance(instance_uuid)
327     @instance = PipelineInstance.find(instance_uuid)
328     @template = @instance
329     self
330   end
331
332   def apply_parameters(params_args)
333     params_args.shift if params_args[0] == '--'
334     params = {}
335     while !params_args.empty?
336       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
337         params[re[2]] = re[3]
338         params_args.shift
339       elsif params_args.size > 1
340         param = params_args.shift.sub /^--/, ''
341         params[param] = params_args.shift
342       else
343         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
344       end
345     end
346
347     if not @template[:components].is_a?(Hash)
348       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
349     end
350     @components = @template[:components].dup
351
352     bad_components = @components.each_pair.select do |cname, cspec|
353       not cspec.is_a?(Hash)
354     end
355     if bad_components.any?
356       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
357     end
358
359     bad_components = @components.each_pair.select do |cname, cspec|
360       not cspec[:script_parameters].is_a?(Hash)
361     end
362     if bad_components.any?
363       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
364     end
365
366     errors = []
367     @components.each do |componentname, component|
368       component[:script_parameters].each do |parametername, parameter|
369         parameter = { :value => parameter } unless parameter.is_a? Hash
370         if params.has_key?("#{componentname}::#{parametername}")
371           value = params["#{componentname}::#{parametername}"]
372         elsif parameter.has_key?(:value)
373           value = parameter[:value]
374         elsif parameter.has_key?(:output_of)
375           if !@components[parameter[:output_of].intern]
376             errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
377           else
378             # value will be filled in later when the upstream
379             # component's output becomes known
380           end
381           next
382         elsif params.has_key?(parametername.to_s)
383           value = params[parametername.to_s]
384         elsif parameter.has_key?(:default)
385           value = parameter[:default]
386         elsif [false, 'false', 0, '0'].index(parameter[:required])
387           value = nil
388         else
389           errors << [componentname, parametername, "required parameter is missing"]
390           next
391         end
392         debuglog "parameter #{componentname}::#{parametername} == #{value}"
393
394         component[:script_parameters][parametername] =
395           parameter.dup.merge(value: value)
396       end
397     end
398     if !errors.empty?
399       all_errors = errors.collect do |c,p,e|
400         "#{c}::#{p} - #{e}\n"
401       end.join("")
402       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
403     end
404     debuglog "options=" + @options.pretty_inspect
405     self
406   end
407
408   def setup_instance
409     if @instance
410       @instance[:properties][:run_options] ||= {}
411       if @options[:no_reuse]
412         # override properties of existing instance
413         @instance[:properties][:run_options][:enable_job_reuse] = false
414       else
415         # Default to "enable reuse" if not specified. (This code path
416         # can go away when old clients go away.)
417         if @instance[:properties][:run_options][:enable_job_reuse].nil?
418           @instance[:properties][:run_options][:enable_job_reuse] = true
419         end
420       end
421     else
422       description = $options[:description] ||
423                     ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
424       instance_body = {
425         components: @components,
426         properties: {
427           run_options: {
428             enable_job_reuse: !@options[:no_reuse]
429           }
430         },
431         pipeline_template_uuid: @template[:uuid],
432         description: description,
433         state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
434       }
435       if @options[:project_uuid]
436         instance_body[:owner_uuid] = @options[:project_uuid]
437       end
438       @instance = PipelineInstance.create(instance_body)
439     end
440     self
441   end
442
443   def run
444     moretodo = true
445     interrupted = false
446
447     if @instance[:started_at].nil?
448       @instance[:started_at] = Time.now
449     end
450
451     job_creation_failed = 0
452     while moretodo
453       moretodo = false
454       @components.each do |cname, c|
455         job = nil
456         owner_uuid = @instance[:owner_uuid]
457         # Is the job satisfying this component already known to be
458         # finished? (Already meaning "before we query API server about
459         # the job's current state")
460         c_already_finished = (c[:job] &&
461                               c[:job][:uuid] &&
462                               ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
463         if !c[:job] and
464             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
465           # No job yet associated with this component and is component inputs
466           # are fully specified (any output_of script_parameters are resolved
467           # to real value)
468           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
469           job = JobCache.create(@instance, cname, {
470             :script => c[:script],
471             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
472                                          [key, spec[:value]]
473                                        end],
474             :script_version => c[:script_version],
475             :repository => c[:repository],
476             :nondeterministic => c[:nondeterministic],
477             :runtime_constraints => c[:runtime_constraints],
478             :owner_uuid => owner_uuid,
479             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
480             :submit_id => my_submit_id,
481             :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
482           }, {
483             # This is the right place to put these attributes when
484             # dealing with new API servers.
485             :minimum_script_version => c[:minimum_script_version],
486             :exclude_script_versions => c[:exclude_minimum_script_versions],
487             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
488                                 !c[:nondeterministic]),
489             :filters => c[:filters]
490           })
491           if job
492             debuglog "component #{cname} new job #{job[:uuid]}"
493             c[:job] = job
494             c[:run_in_process] = (@options[:run_jobs_here] and
495                                   job[:submit_id] == my_submit_id)
496           else
497             debuglog "component #{cname} new job failed", 0
498             job_creation_failed += 1
499           end
500         end
501
502         if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
503           report_status
504           begin
505             require 'open3'
506             Open3.popen3("arv-crunch-job", "--force-unlock",
507                          "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
508               debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
509               stdin.close
510               while true
511                 rready, wready, = IO.select([stdout, stderr], [])
512                 break if !rready[0]
513                 begin
514                   buf = rready[0].read_nonblock(2**20)
515                 rescue EOFError
516                   break
517                 end
518                 (rready[0] == stdout ? $stdout : $stderr).write(buf)
519               end
520               stdout.close
521               stderr.close
522               debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
523             end
524             if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
525               raise Exception.new("arv-crunch-job did not set finished_at.")
526             end
527           rescue Exception => e
528             debuglog "Interrupted (#{e}). Failing job.", 0
529             $arv.job.update(uuid: c[:job][:uuid],
530                             job: {
531                               state: "Failed"
532                             })
533           end
534         end
535
536         if c[:job] and c[:job][:uuid]
537           if ["Running", "Queued"].include?(c[:job][:state])
538             # Job is running (or may be soon) so update copy of job record
539             c[:job] = JobCache.get(c[:job][:uuid])
540           end
541
542           if c[:job][:state] == "Complete"
543             # Populate script_parameters of other components waiting for
544             # this job
545             @components.each do |c2name, c2|
546               c2[:script_parameters].each do |pname, p|
547                 if p.is_a? Hash and p[:output_of] == cname.to_s
548                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
549                   c2[:script_parameters][pname] = {value: c[:job][:output]}
550                   moretodo = true
551                 end
552               end
553             end
554             unless c_already_finished
555               # This is my first time discovering that the job
556               # succeeded. (At the top of this loop, I was still
557               # waiting for it to finish.)
558
559               if @instance[:name].andand.length.andand > 0
560                 pipeline_name = @instance[:name]
561               elsif @template.andand[:name].andand.length.andand > 0
562                 pipeline_name = @template[:name]
563               else
564                 pipeline_name = @instance[:uuid]
565               end
566               if c[:output_name] != false
567                 # Create a collection located in the same project as the pipeline with the contents of the output.
568                 portable_data_hash = c[:job][:output]
569                 collections = $arv.collection.list(limit: 1,
570                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
571                                                    select: ["portable_data_hash", "manifest_text"]
572                                                    )[:items]
573                 if collections.any?
574                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
575
576                   # check if there is a name collision.
577                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
578                                                                    ["name", "=", name]])[:items]
579
580                   newcollection_actual = nil
581                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
582                     # There is already a collection with the same name and the
583                     # same contents, so just point to that.
584                     newcollection_actual = name_collisions.first
585                   end
586
587                   if newcollection_actual.nil?
588                     # Did not find a collection with the same name (or the
589                     # collection has a different portable data hash) so create
590                     # a new collection with ensure_unique_name: true.
591                     newcollection = {
592                       owner_uuid: owner_uuid,
593                       name: name,
594                       portable_data_hash: collections.first[:portable_data_hash],
595                       manifest_text: collections.first[:manifest_text]
596                     }
597                     debuglog "Creating collection #{newcollection}", 0
598                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
599                   end
600
601                   c[:output_uuid] = newcollection_actual[:uuid]
602                 else
603                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
604                 end
605               end
606             end
607           elsif ["Queued", "Running"].include? c[:job][:state]
608             # Job is running or queued to run, so indicate that pipeline
609             # should continue to run
610             moretodo = true
611           elsif c[:job][:state] == "Cancelled"
612             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
613             moretodo = false
614           elsif c[:job][:state] == "Failed"
615             moretodo = false
616           end
617         end
618       end
619       @instance[:components] = @components
620       report_status
621
622       if @options[:no_wait]
623         moretodo = false
624       end
625
626       # If job creation fails, just give up on this pipeline instance.
627       if job_creation_failed > 0
628         moretodo = false
629       end
630
631       if moretodo
632         begin
633           sleep 10
634         rescue Interrupt
635           debuglog "interrupt", 0
636           interrupted = true
637           break
638         end
639       end
640     end
641
642     c_in_state = @components.values.group_by { |c|
643       c[:job] and c[:job][:state]
644     }
645     succeeded = c_in_state["Complete"].andand.count || 0
646     failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
647     ended = succeeded + failed
648
649     success = (succeeded == @components.length)
650
651     # A job create call failed. Just give up.
652     if job_creation_failed > 0
653       debuglog "job creation failed - giving up on this pipeline instance", 0
654       success = false
655       failed += 1
656     end
657
658     if interrupted
659      if success
660         @instance[:state] = 'Complete'
661      else
662         @instance[:state] = 'Paused'
663      end
664     else
665       if ended == @components.length or failed > 0
666         @instance[:state] = success ? 'Complete' : 'Failed'
667       end
668     end
669
670     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
671       @instance[:finished_at] = Time.now
672     end
673
674     debuglog "pipeline instance state is #{@instance[:state]}"
675
676     # set components_summary
677     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
678     @instance[:components_summary] = components_summary
679
680     @instance.save
681   end
682
683   def cleanup
684     if @instance and @instance[:state] == 'RunningOnClient'
685       @instance[:state] = 'Paused'
686       @instance.save
687     end
688   end
689
690   def uuid
691     @instance[:uuid]
692   end
693
694   protected
695
696   def report_status
697     @instance.save
698
699     if @options[:status_json] != '/dev/null'
700       File.open(@options[:status_json], 'w') do |f|
701         f.puts @components.pretty_inspect
702       end
703     end
704
705     if @options[:status_text] != '/dev/null'
706       File.open(@options[:status_text], 'w') do |f|
707         f.puts ""
708         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
709         namewidth = @components.collect { |cname, c| cname.size }.max
710         @components.each do |cname, c|
711           jstatus = if !c[:job]
712                       "-"
713                     else case c[:job][:state]
714                          when "Running"
715                            "#{c[:job][:tasks_summary].inspect}"
716                          when "Complete"
717                            c[:job][:output]
718                          when "Cancelled"
719                            "cancelled #{c[:job][:cancelled_at]}"
720                          when "Failed"
721                            "failed #{c[:job][:finished_at]}"
722                          when "Queued"
723                            "queued #{c[:job][:created_at]}"
724                          end
725                     end
726           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
727         end
728       end
729     end
730   end
731
732   def abort(msg)
733     if @instance
734       if ["New", "Ready", "RunningOnClient",
735           "RunningOnServer"].include?(@instance[:state])
736         @instance[:state] = "Failed"
737         @instance[:finished_at] = Time.now
738         @instance.save
739       end
740       @instance.log_stderr(msg)
741     end
742     Kernel::abort(msg)
743   end
744 end
745
746 runner = WhRunPipelineInstance.new($options)
747 begin
748   if $options[:template]
749     runner.fetch_template($options[:template])
750   else
751     runner.fetch_instance($options[:instance])
752   end
753   runner.apply_parameters(p.leftovers)
754   runner.setup_instance
755   if $options[:submit]
756     runner.instance.save
757     puts runner.instance[:uuid]
758   else
759     runner.run
760   end
761 rescue Exception => e
762   runner.cleanup
763   raise e
764 end