11917: Do not clear rails cache at boot time.
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 class WhRunPipelineInstance
7 end
8
9 if RUBY_VERSION < '1.9.3' then
10   abort <<-EOS
11 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
12   EOS
13 end
14
15 begin
16   require 'arvados'
17   require 'rubygems'
18   require 'json'
19   require 'pp'
20   require 'trollop'
21   require 'google/api_client'
22 rescue LoadError => l
23   $stderr.puts $:
24   abort <<-EOS
25 #{$0}: fatal: #{l.message}
26 Some runtime dependencies may be missing.
27 Try: gem install arvados pp google-api-client json trollop
28   EOS
29 end
30
31 def debuglog(message, verbosity=1)
32   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
33 end
34
35 # Parse command line options (the kind that control the behavior of
36 # this program, that is, not the pipeline component parameters).
37
38 p = Trollop::Parser.new do
39   version __FILE__
40   banner(<<EOF)
41
42 Usage:
43   arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
44   arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
45
46 Parameters:
47   param_name=param_value
48   param_name param_value
49                          Set (or override) the default value for every
50                          pipeline component parameter with the given
51                          name.
52
53   component_name::param_name=param_value
54   component_name::param_name param_value
55   --component_name::param_name=param_value
56   --component_name::param_name param_value
57                          Set the value of a parameter for a single
58                          pipeline component.
59
60 Options:
61 EOF
62   opt(:dry_run,
63       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
64       :type => :boolean,
65       :short => :n)
66   opt(:status_text,
67       "Store plain text status in given file.",
68       :short => :none,
69       :type => :string,
70       :default => '/dev/stdout')
71   opt(:status_json,
72       "Store json-formatted pipeline in given file.",
73       :short => :none,
74       :type => :string,
75       :default => '/dev/null')
76   opt(:no_wait,
77       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
78       :short => :none,
79       :type => :boolean)
80   opt(:no_reuse,
81       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
82       :short => :none,
83       :type => :boolean)
84   opt(:debug,
85       "Print extra debugging information on stderr.",
86       :type => :boolean)
87   opt(:debug_level,
88       "Set debug verbosity level.",
89       :short => :none,
90       :type => :integer)
91   opt(:template,
92       "UUID of pipeline template, or path to local pipeline template file.",
93       :short => :none,
94       :type => :string)
95   opt(:instance,
96       "UUID of pipeline instance.",
97       :short => :none,
98       :type => :string)
99   opt(:submit,
100       "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
101       :short => :none,
102       :type => :boolean)
103   opt(:run_pipeline_here,
104       "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
105       :short => :none,
106       :type => :boolean)
107   opt(:run_jobs_here,
108       "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
109       :short => :none,
110       :type => :boolean)
111   opt(:run_here,
112       "Synonym for --run-jobs-here.",
113       :short => :none,
114       :type => :boolean)
115   opt(:description,
116       "Description for the pipeline instance.",
117       :short => :none,
118       :type => :string)
119   opt(:project_uuid,
120       "UUID of the project for the pipeline instance.",
121       short: :none,
122       type: :string)
123   stop_on [:'--']
124 end
125 $options = Trollop::with_standard_exception_handling p do
126   p.parse ARGV
127 end
128 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
129
130 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
131 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
132
133 if $options[:instance]
134   if $options[:template] or $options[:submit]
135     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
136   end
137 elsif not $options[:template]
138   $stderr.puts "error: you must supply a --template or --instance."
139   p.educate
140   abort
141 end
142
143 if $options[:run_pipeline_here] == $options[:submit]
144   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
145 end
146
147 # Set up the API client.
148
149 $arv = Arvados.new api_version: 'v1'
150 $client = $arv.client
151 $arvados = $arv.arvados_api
152
153 class PipelineInstance
154   def self.find(uuid)
155     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
156                              :parameters => {
157                                :uuid => uuid
158                              },
159                              :authenticated => false,
160                              :headers => {
161                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
162                              })
163     j = JSON.parse result.body, :symbolize_names => true
164     unless j.is_a? Hash and j[:uuid]
165       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
166       nil
167     else
168       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
169       self.new(j)
170     end
171   end
172   def self.create(attributes)
173     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
174                              :body_object => {
175                                :pipeline_instance => attributes
176                              },
177                              :authenticated => false,
178                              :headers => {
179                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
180                              })
181     j = JSON.parse result.body, :symbolize_names => true
182     unless j.is_a? Hash and j[:uuid]
183       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
184     end
185     debuglog "Created pipeline instance: #{j[:uuid]}"
186     self.new(j)
187   end
188   def save
189     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
190                              :parameters => {
191                                :uuid => @pi[:uuid]
192                              },
193                              :body_object => {
194                                :pipeline_instance => @attributes_to_update
195                              },
196                              :authenticated => false,
197                              :headers => {
198                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
199                              })
200     j = JSON.parse result.body, :symbolize_names => true
201     unless j.is_a? Hash and j[:uuid]
202       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
203       nil
204     else
205       @attributes_to_update = {}
206       @pi = j
207     end
208   end
209   def []=(x,y)
210     @attributes_to_update[x] = y
211     @pi[x] = y
212   end
213   def [](x)
214     @pi[x]
215   end
216
217   def log_stderr(msg)
218     $arv.log.create log: {
219       event_type: 'stderr',
220       object_uuid: self[:uuid],
221       owner_uuid: self[:owner_uuid],
222       properties: {"text" => msg},
223     }
224   end
225
226   protected
227   def initialize(j)
228     @attributes_to_update = {}
229     @pi = j
230   end
231 end
232
233 class JobCache
234   def self.get(uuid)
235     @cache ||= {}
236     result = $client.execute(:api_method => $arvados.jobs.get,
237                              :parameters => {
238                                :uuid => uuid
239                              },
240                              :authenticated => false,
241                              :headers => {
242                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
243                              })
244     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
245   end
246   def self.where(conditions)
247     result = $client.execute(:api_method => $arvados.jobs.list,
248                              :parameters => {
249                                :limit => 10000,
250                                :where => conditions.to_json
251                              },
252                              :authenticated => false,
253                              :headers => {
254                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
255                              })
256     list = JSON.parse result.body, :symbolize_names => true
257     if list and list[:items].is_a? Array
258       list[:items]
259     else
260       []
261     end
262   end
263   def self.create(pipeline, component, job, create_params)
264     @cache ||= {}
265
266     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
267
268     result = $client.execute(:api_method => $arvados.jobs.create,
269                              :body_object => body,
270                              :authenticated => false,
271                              :headers => {
272                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
273                              })
274     j = JSON.parse result.body, :symbolize_names => true
275     if j.is_a? Hash and j[:uuid]
276       @cache[j[:uuid]] = j
277     else
278       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
279
280       msg = ""
281       j[:errors].each do |err|
282         msg += "Error creating job for component #{component}: #{err}\n"
283       end
284       msg += "Job submission was: #{body.to_json}"
285
286       pipeline.log_stderr(msg)
287       nil
288     end
289   end
290
291   protected
292
293   def self.no_nil_values(hash)
294     hash.reject { |key, value| value.nil? }
295   end
296 end
297
298 class WhRunPipelineInstance
299   attr_reader :instance
300
301   def initialize(_options)
302     @options = _options
303   end
304
305   def fetch_template(template)
306     if template.match /[^-0-9a-z]/
307       # Doesn't look like a uuid -- use it as a filename.
308       @template = JSON.parse File.read(template), :symbolize_names => true
309     else
310       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
311                                :parameters => {
312                                  :uuid => template
313                                },
314                                :authenticated => false,
315                                :headers => {
316                                  authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
317                                })
318       @template = JSON.parse result.body, :symbolize_names => true
319       if !@template[:uuid]
320         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
321       end
322     end
323     self
324   end
325
326   def fetch_instance(instance_uuid)
327     @instance = PipelineInstance.find(instance_uuid)
328     @template = @instance
329     self
330   end
331
332   def apply_parameters(params_args)
333     params_args.shift if params_args[0] == '--'
334     params = {}
335     while !params_args.empty?
336       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
337         params[re[2]] = re[3]
338         params_args.shift
339       elsif params_args.size > 1
340         param = params_args.shift.sub /^--/, ''
341         params[param] = params_args.shift
342       else
343         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
344       end
345     end
346
347     if not @template[:components].is_a?(Hash)
348       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
349     end
350     @components = @template[:components].dup
351
352     bad_components = @components.each_pair.select do |cname, cspec|
353       not cspec.is_a?(Hash)
354     end
355     if bad_components.any?
356       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
357     end
358
359     bad_components = @components.each_pair.select do |cname, cspec|
360       not cspec[:script_parameters].is_a?(Hash)
361     end
362     if bad_components.any?
363       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
364     end
365
366     errors = []
367     @components.each do |componentname, component|
368       component[:script_parameters].each do |parametername, parameter|
369         parameter = { :value => parameter } unless parameter.is_a? Hash
370         if params.has_key?("#{componentname}::#{parametername}")
371           value = params["#{componentname}::#{parametername}"]
372         elsif parameter.has_key?(:value)
373           value = parameter[:value]
374         elsif parameter.has_key?(:output_of)
375           if !@components[parameter[:output_of].intern]
376             errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
377           else
378             # value will be filled in later when the upstream
379             # component's output becomes known
380           end
381           next
382         elsif params.has_key?(parametername.to_s)
383           value = params[parametername.to_s]
384         elsif parameter.has_key?(:default)
385           value = parameter[:default]
386         elsif [false, 'false', 0, '0'].index(parameter[:required])
387           value = nil
388         else
389           errors << [componentname, parametername, "required parameter is missing"]
390           next
391         end
392         debuglog "parameter #{componentname}::#{parametername} == #{value}"
393
394         component[:script_parameters][parametername] =
395           parameter.dup.merge(value: value)
396       end
397     end
398     if !errors.empty?
399       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
400     end
401     debuglog "options=" + @options.pretty_inspect
402     self
403   end
404
405   def setup_instance
406     if @instance
407       @instance[:properties][:run_options] ||= {}
408       if @options[:no_reuse]
409         # override properties of existing instance
410         @instance[:properties][:run_options][:enable_job_reuse] = false
411       else
412         # Default to "enable reuse" if not specified. (This code path
413         # can go away when old clients go away.)
414         if @instance[:properties][:run_options][:enable_job_reuse].nil?
415           @instance[:properties][:run_options][:enable_job_reuse] = true
416         end
417       end
418     else
419       description = $options[:description] ||
420                     ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
421       instance_body = {
422         components: @components,
423         properties: {
424           run_options: {
425             enable_job_reuse: !@options[:no_reuse]
426           }
427         },
428         pipeline_template_uuid: @template[:uuid],
429         description: description,
430         state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
431       }
432       if @options[:project_uuid]
433         instance_body[:owner_uuid] = @options[:project_uuid]
434       end
435       @instance = PipelineInstance.create(instance_body)
436     end
437     self
438   end
439
440   def run
441     moretodo = true
442     interrupted = false
443
444     if @instance[:started_at].nil?
445       @instance[:started_at] = Time.now
446     end
447
448     job_creation_failed = 0
449     while moretodo
450       moretodo = false
451       @components.each do |cname, c|
452         job = nil
453         owner_uuid = @instance[:owner_uuid]
454         # Is the job satisfying this component already known to be
455         # finished? (Already meaning "before we query API server about
456         # the job's current state")
457         c_already_finished = (c[:job] &&
458                               c[:job][:uuid] &&
459                               ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
460         if !c[:job] and
461             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
462           # No job yet associated with this component and is component inputs
463           # are fully specified (any output_of script_parameters are resolved
464           # to real value)
465           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
466           job = JobCache.create(@instance, cname, {
467             :script => c[:script],
468             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
469                                          [key, spec[:value]]
470                                        end],
471             :script_version => c[:script_version],
472             :repository => c[:repository],
473             :nondeterministic => c[:nondeterministic],
474             :runtime_constraints => c[:runtime_constraints],
475             :owner_uuid => owner_uuid,
476             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
477             :submit_id => my_submit_id,
478             :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
479           }, {
480             # This is the right place to put these attributes when
481             # dealing with new API servers.
482             :minimum_script_version => c[:minimum_script_version],
483             :exclude_script_versions => c[:exclude_minimum_script_versions],
484             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
485                                 !c[:nondeterministic]),
486             :filters => c[:filters]
487           })
488           if job
489             debuglog "component #{cname} new job #{job[:uuid]}"
490             c[:job] = job
491             c[:run_in_process] = (@options[:run_jobs_here] and
492                                   job[:submit_id] == my_submit_id)
493           else
494             debuglog "component #{cname} new job failed", 0
495             job_creation_failed += 1
496           end
497         end
498
499         if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
500           report_status
501           begin
502             require 'open3'
503             Open3.popen3("arv-crunch-job", "--force-unlock",
504                          "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
505               debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
506               stdin.close
507               while true
508                 rready, wready, = IO.select([stdout, stderr], [])
509                 break if !rready[0]
510                 begin
511                   buf = rready[0].read_nonblock(2**20)
512                 rescue EOFError
513                   break
514                 end
515                 (rready[0] == stdout ? $stdout : $stderr).write(buf)
516               end
517               stdout.close
518               stderr.close
519               debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
520             end
521             if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
522               raise Exception.new("arv-crunch-job did not set finished_at.")
523             end
524           rescue Exception => e
525             debuglog "Interrupted (#{e}). Failing job.", 0
526             $arv.job.update(uuid: c[:job][:uuid],
527                             job: {
528                               state: "Failed"
529                             })
530           end
531         end
532
533         if c[:job] and c[:job][:uuid]
534           if ["Running", "Queued"].include?(c[:job][:state])
535             # Job is running (or may be soon) so update copy of job record
536             c[:job] = JobCache.get(c[:job][:uuid])
537           end
538
539           if c[:job][:state] == "Complete"
540             # Populate script_parameters of other components waiting for
541             # this job
542             @components.each do |c2name, c2|
543               c2[:script_parameters].each do |pname, p|
544                 if p.is_a? Hash and p[:output_of] == cname.to_s
545                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
546                   c2[:script_parameters][pname] = {value: c[:job][:output]}
547                   moretodo = true
548                 end
549               end
550             end
551             unless c_already_finished
552               # This is my first time discovering that the job
553               # succeeded. (At the top of this loop, I was still
554               # waiting for it to finish.)
555
556               if @instance[:name].andand.length.andand > 0
557                 pipeline_name = @instance[:name]
558               elsif @template.andand[:name].andand.length.andand > 0
559                 pipeline_name = @template[:name]
560               else
561                 pipeline_name = @instance[:uuid]
562               end
563               if c[:output_name] != false
564                 # Create a collection located in the same project as the pipeline with the contents of the output.
565                 portable_data_hash = c[:job][:output]
566                 collections = $arv.collection.list(limit: 1,
567                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
568                                                    select: ["portable_data_hash", "manifest_text"]
569                                                    )[:items]
570                 if collections.any?
571                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
572
573                   # check if there is a name collision.
574                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
575                                                                    ["name", "=", name]])[:items]
576
577                   newcollection_actual = nil
578                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
579                     # There is already a collection with the same name and the
580                     # same contents, so just point to that.
581                     newcollection_actual = name_collisions.first
582                   end
583
584                   if newcollection_actual.nil?
585                     # Did not find a collection with the same name (or the
586                     # collection has a different portable data hash) so create
587                     # a new collection with ensure_unique_name: true.
588                     newcollection = {
589                       owner_uuid: owner_uuid,
590                       name: name,
591                       portable_data_hash: collections.first[:portable_data_hash],
592                       manifest_text: collections.first[:manifest_text]
593                     }
594                     debuglog "Creating collection #{newcollection}", 0
595                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
596                   end
597
598                   c[:output_uuid] = newcollection_actual[:uuid]
599                 else
600                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
601                 end
602               end
603             end
604           elsif ["Queued", "Running"].include? c[:job][:state]
605             # Job is running or queued to run, so indicate that pipeline
606             # should continue to run
607             moretodo = true
608           elsif c[:job][:state] == "Cancelled"
609             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
610             moretodo = false
611           elsif c[:job][:state] == "Failed"
612             moretodo = false
613           end
614         end
615       end
616       @instance[:components] = @components
617       report_status
618
619       if @options[:no_wait]
620         moretodo = false
621       end
622
623       # If job creation fails, just give up on this pipeline instance.
624       if job_creation_failed > 0
625         moretodo = false
626       end
627
628       if moretodo
629         begin
630           sleep 10
631         rescue Interrupt
632           debuglog "interrupt", 0
633           interrupted = true
634           break
635         end
636       end
637     end
638
639     c_in_state = @components.values.group_by { |c|
640       c[:job] and c[:job][:state]
641     }
642     succeeded = c_in_state["Complete"].andand.count || 0
643     failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
644     ended = succeeded + failed
645
646     success = (succeeded == @components.length)
647
648     # A job create call failed. Just give up.
649     if job_creation_failed > 0
650       debuglog "job creation failed - giving up on this pipeline instance", 0
651       success = false
652       failed += 1
653     end
654
655     if interrupted
656      if success
657         @instance[:state] = 'Complete'
658      else
659         @instance[:state] = 'Paused'
660       end
661     else
662       if ended == @components.length or failed > 0
663         @instance[:state] = success ? 'Complete' : 'Failed'
664       end
665     end
666
667     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
668       @instance[:finished_at] = Time.now
669     end
670
671     debuglog "pipeline instance state is #{@instance[:state]}"
672
673     # set components_summary
674     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
675     @instance[:components_summary] = components_summary
676
677     @instance.save
678   end
679
680   def cleanup
681     if @instance and @instance[:state] == 'RunningOnClient'
682       @instance[:state] = 'Paused'
683       @instance.save
684     end
685   end
686
687   def uuid
688     @instance[:uuid]
689   end
690
691   protected
692
693   def report_status
694     @instance.save
695
696     if @options[:status_json] != '/dev/null'
697       File.open(@options[:status_json], 'w') do |f|
698         f.puts @components.pretty_inspect
699       end
700     end
701
702     if @options[:status_text] != '/dev/null'
703       File.open(@options[:status_text], 'w') do |f|
704         f.puts ""
705         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
706         namewidth = @components.collect { |cname, c| cname.size }.max
707         @components.each do |cname, c|
708           jstatus = if !c[:job]
709                       "-"
710                     else case c[:job][:state]
711                          when "Running"
712                            "#{c[:job][:tasks_summary].inspect}"
713                          when "Complete"
714                            c[:job][:output]
715                          when "Cancelled"
716                            "cancelled #{c[:job][:cancelled_at]}"
717                          when "Failed"
718                            "failed #{c[:job][:finished_at]}"
719                          when "Queued"
720                            "queued #{c[:job][:created_at]}"
721                          end
722                     end
723           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
724         end
725       end
726     end
727   end
728
729   def abort(msg)
730     if @instance
731       if ["New", "Ready", "RunningOnClient",
732           "RunningOnServer"].include?(@instance[:state])
733         @instance[:state] = "Failed"
734         @instance[:finished_at] = Time.now
735         @instance.save
736       end
737       @instance.log_stderr(msg)
738     end
739     Kernel::abort(msg)
740   end
741 end
742
743 runner = WhRunPipelineInstance.new($options)
744 begin
745   if $options[:template]
746     runner.fetch_template($options[:template])
747   else
748     runner.fetch_instance($options[:instance])
749   end
750   runner.apply_parameters(p.leftovers)
751   runner.setup_instance
752   if $options[:submit]
753     runner.instance.save
754     puts runner.instance[:uuid]
755   else
756     runner.run
757   end
758 rescue Exception => e
759   runner.cleanup
760   raise e
761 end