5737: Merge branch 'master' into 5737-ruby231
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 class WhRunPipelineInstance
4 end
5
6 if RUBY_VERSION < '1.9.3' then
7   abort <<-EOS
8 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
9   EOS
10 end
11
12 begin
13   require 'arvados'
14   require 'rubygems'
15   require 'json'
16   require 'pp'
17   require 'trollop'
18   require 'google/api_client'
19 rescue LoadError => l
20   $stderr.puts $:
21   abort <<-EOS
22 #{$0}: fatal: #{l.message}
23 Some runtime dependencies may be missing.
24 Try: gem install arvados pp google-api-client json trollop
25   EOS
26 end
27
28 def debuglog(message, verbosity=1)
29   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
30 end
31
32 # Parse command line options (the kind that control the behavior of
33 # this program, that is, not the pipeline component parameters).
34
35 p = Trollop::Parser.new do
36   version __FILE__
37   banner(<<EOF)
38
39 Usage:
40   arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
41   arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
42
43 Parameters:
44   param_name=param_value
45   param_name param_value
46                          Set (or override) the default value for every
47                          pipeline component parameter with the given
48                          name.
49
50   component_name::param_name=param_value
51   component_name::param_name param_value
52   --component_name::param_name=param_value
53   --component_name::param_name param_value
54                          Set the value of a parameter for a single
55                          pipeline component.
56
57 Options:
58 EOF
59   opt(:dry_run,
60       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
61       :type => :boolean,
62       :short => :n)
63   opt(:status_text,
64       "Store plain text status in given file.",
65       :short => :none,
66       :type => :string,
67       :default => '/dev/stdout')
68   opt(:status_json,
69       "Store json-formatted pipeline in given file.",
70       :short => :none,
71       :type => :string,
72       :default => '/dev/null')
73   opt(:no_wait,
74       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
75       :short => :none,
76       :type => :boolean)
77   opt(:no_reuse,
78       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
79       :short => :none,
80       :type => :boolean)
81   opt(:debug,
82       "Print extra debugging information on stderr.",
83       :type => :boolean)
84   opt(:debug_level,
85       "Set debug verbosity level.",
86       :short => :none,
87       :type => :integer)
88   opt(:template,
89       "UUID of pipeline template, or path to local pipeline template file.",
90       :short => :none,
91       :type => :string)
92   opt(:instance,
93       "UUID of pipeline instance.",
94       :short => :none,
95       :type => :string)
96   opt(:submit,
97       "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
98       :short => :none,
99       :type => :boolean)
100   opt(:run_pipeline_here,
101       "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
102       :short => :none,
103       :type => :boolean)
104   opt(:run_jobs_here,
105       "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
106       :short => :none,
107       :type => :boolean)
108   opt(:run_here,
109       "Synonym for --run-jobs-here.",
110       :short => :none,
111       :type => :boolean)
112   opt(:description,
113       "Description for the pipeline instance.",
114       :short => :none,
115       :type => :string)
116   opt(:project_uuid,
117       "UUID of the project for the pipeline instance.",
118       short: :none,
119       type: :string)
120   stop_on [:'--']
121 end
122 $options = Trollop::with_standard_exception_handling p do
123   p.parse ARGV
124 end
125 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
126
127 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
128 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
129
130 if $options[:instance]
131   if $options[:template] or $options[:submit]
132     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
133   end
134 elsif not $options[:template]
135   $stderr.puts "error: you must supply a --template or --instance."
136   p.educate
137   abort
138 end
139
140 if $options[:run_pipeline_here] == $options[:submit]
141   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
142 end
143
144 # Set up the API client.
145
146 $arv = Arvados.new api_version: 'v1'
147 $client = $arv.client
148 $arvados = $arv.arvados_api
149
150 class PipelineInstance
151   def self.find(uuid)
152     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
153                              :parameters => {
154                                :uuid => uuid
155                              },
156                              :authenticated => false,
157                              :headers => {
158                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
159                              })
160     j = JSON.parse result.body, :symbolize_names => true
161     unless j.is_a? Hash and j[:uuid]
162       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
163       nil
164     else
165       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
166       self.new(j)
167     end
168   end
169   def self.create(attributes)
170     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
171                              :body_object => {
172                                :pipeline_instance => attributes
173                              },
174                              :authenticated => false,
175                              :headers => {
176                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
177                              })
178     j = JSON.parse result.body, :symbolize_names => true
179     unless j.is_a? Hash and j[:uuid]
180       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
181     end
182     debuglog "Created pipeline instance: #{j[:uuid]}"
183     self.new(j)
184   end
185   def save
186     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
187                              :parameters => {
188                                :uuid => @pi[:uuid]
189                              },
190                              :body_object => {
191                                :pipeline_instance => @attributes_to_update
192                              },
193                              :authenticated => false,
194                              :headers => {
195                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
196                              })
197     j = JSON.parse result.body, :symbolize_names => true
198     unless j.is_a? Hash and j[:uuid]
199       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
200       nil
201     else
202       @attributes_to_update = {}
203       @pi = j
204     end
205   end
206   def []=(x,y)
207     @attributes_to_update[x] = y
208     @pi[x] = y
209   end
210   def [](x)
211     @pi[x]
212   end
213
214   def log_stderr(msg)
215     $arv.log.create log: {
216       event_type: 'stderr',
217       object_uuid: self[:uuid],
218       owner_uuid: self[:owner_uuid],
219       properties: {"text" => msg},
220     }
221   end
222
223   protected
224   def initialize(j)
225     @attributes_to_update = {}
226     @pi = j
227   end
228 end
229
230 class JobCache
231   def self.get(uuid)
232     @cache ||= {}
233     result = $client.execute(:api_method => $arvados.jobs.get,
234                              :parameters => {
235                                :uuid => uuid
236                              },
237                              :authenticated => false,
238                              :headers => {
239                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
240                              })
241     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
242   end
243   def self.where(conditions)
244     result = $client.execute(:api_method => $arvados.jobs.list,
245                              :parameters => {
246                                :limit => 10000,
247                                :where => conditions.to_json
248                              },
249                              :authenticated => false,
250                              :headers => {
251                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
252                              })
253     list = JSON.parse result.body, :symbolize_names => true
254     if list and list[:items].is_a? Array
255       list[:items]
256     else
257       []
258     end
259   end
260   def self.create(pipeline, component, job, create_params)
261     @cache ||= {}
262
263     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
264
265     result = $client.execute(:api_method => $arvados.jobs.create,
266                              :body_object => body,
267                              :authenticated => false,
268                              :headers => {
269                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
270                              })
271     j = JSON.parse result.body, :symbolize_names => true
272     if j.is_a? Hash and j[:uuid]
273       @cache[j[:uuid]] = j
274     else
275       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
276
277       msg = ""
278       j[:errors].each do |err|
279         msg += "Error creating job for component #{component}: #{err}\n"
280       end
281       msg += "Job submission was: #{body.to_json}"
282
283       pipeline.log_stderr(msg)
284       nil
285     end
286   end
287
288   protected
289
290   def self.no_nil_values(hash)
291     hash.reject { |key, value| value.nil? }
292   end
293 end
294
295 class WhRunPipelineInstance
296   attr_reader :instance
297
298   def initialize(_options)
299     @options = _options
300   end
301
302   def fetch_template(template)
303     if template.match /[^-0-9a-z]/
304       # Doesn't look like a uuid -- use it as a filename.
305       @template = JSON.parse File.read(template), :symbolize_names => true
306     else
307       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
308                                :parameters => {
309                                  :uuid => template
310                                },
311                                :authenticated => false,
312                                :headers => {
313                                  authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
314                                })
315       @template = JSON.parse result.body, :symbolize_names => true
316       if !@template[:uuid]
317         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
318       end
319     end
320     self
321   end
322
323   def fetch_instance(instance_uuid)
324     @instance = PipelineInstance.find(instance_uuid)
325     @template = @instance
326     self
327   end
328
329   def apply_parameters(params_args)
330     params_args.shift if params_args[0] == '--'
331     params = {}
332     while !params_args.empty?
333       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
334         params[re[2]] = re[3]
335         params_args.shift
336       elsif params_args.size > 1
337         param = params_args.shift.sub /^--/, ''
338         params[param] = params_args.shift
339       else
340         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
341       end
342     end
343
344     if not @template[:components].is_a?(Hash)
345       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
346     end
347     @components = @template[:components].dup
348
349     bad_components = @components.each_pair.select do |cname, cspec|
350       not cspec.is_a?(Hash)
351     end
352     if bad_components.any?
353       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
354     end
355
356     bad_components = @components.each_pair.select do |cname, cspec|
357       not cspec[:script_parameters].is_a?(Hash)
358     end
359     if bad_components.any?
360       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
361     end
362
363     errors = []
364     @components.each do |componentname, component|
365       component[:script_parameters].each do |parametername, parameter|
366         parameter = { :value => parameter } unless parameter.is_a? Hash
367         if params.has_key?("#{componentname}::#{parametername}")
368           value = params["#{componentname}::#{parametername}"]
369         elsif parameter.has_key?(:value)
370           value = parameter[:value]
371         elsif parameter.has_key?(:output_of)
372           if !@components[parameter[:output_of].intern]
373             errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
374           else
375             # value will be filled in later when the upstream
376             # component's output becomes known
377           end
378           next
379         elsif params.has_key?(parametername.to_s)
380           value = params[parametername.to_s]
381         elsif parameter.has_key?(:default)
382           value = parameter[:default]
383         elsif [false, 'false', 0, '0'].index(parameter[:required])
384           value = nil
385         else
386           errors << [componentname, parametername, "required parameter is missing"]
387           next
388         end
389         debuglog "parameter #{componentname}::#{parametername} == #{value}"
390
391         component[:script_parameters][parametername] =
392           parameter.dup.merge(value: value)
393       end
394     end
395     if !errors.empty?
396       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
397     end
398     debuglog "options=" + @options.pretty_inspect
399     self
400   end
401
402   def setup_instance
403     if @instance
404       @instance[:properties][:run_options] ||= {}
405       if @options[:no_reuse]
406         # override properties of existing instance
407         @instance[:properties][:run_options][:enable_job_reuse] = false
408       else
409         # Default to "enable reuse" if not specified. (This code path
410         # can go away when old clients go away.)
411         if @instance[:properties][:run_options][:enable_job_reuse].nil?
412           @instance[:properties][:run_options][:enable_job_reuse] = true
413         end
414       end
415     else
416       description = $options[:description] ||
417                     ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
418       instance_body = {
419         components: @components,
420         properties: {
421           run_options: {
422             enable_job_reuse: !@options[:no_reuse]
423           }
424         },
425         pipeline_template_uuid: @template[:uuid],
426         description: description,
427         state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
428       }
429       if @options[:project_uuid]
430         instance_body[:owner_uuid] = @options[:project_uuid]
431       end
432       @instance = PipelineInstance.create(instance_body)
433     end
434     self
435   end
436
437   def run
438     moretodo = true
439     interrupted = false
440
441     if @instance[:started_at].nil?
442       @instance[:started_at] = Time.now
443     end
444
445     job_creation_failed = 0
446     while moretodo
447       moretodo = false
448       @components.each do |cname, c|
449         job = nil
450         owner_uuid = @instance[:owner_uuid]
451         # Is the job satisfying this component already known to be
452         # finished? (Already meaning "before we query API server about
453         # the job's current state")
454         c_already_finished = (c[:job] &&
455                               c[:job][:uuid] &&
456                               ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
457         if !c[:job] and
458             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
459           # No job yet associated with this component and is component inputs
460           # are fully specified (any output_of script_parameters are resolved
461           # to real value)
462           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
463           job = JobCache.create(@instance, cname, {
464             :script => c[:script],
465             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
466                                          [key, spec[:value]]
467                                        end],
468             :script_version => c[:script_version],
469             :repository => c[:repository],
470             :nondeterministic => c[:nondeterministic],
471             :runtime_constraints => c[:runtime_constraints],
472             :owner_uuid => owner_uuid,
473             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
474             :submit_id => my_submit_id,
475             :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
476           }, {
477             # This is the right place to put these attributes when
478             # dealing with new API servers.
479             :minimum_script_version => c[:minimum_script_version],
480             :exclude_script_versions => c[:exclude_minimum_script_versions],
481             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
482                                 !c[:nondeterministic]),
483             :filters => c[:filters]
484           })
485           if job
486             debuglog "component #{cname} new job #{job[:uuid]}"
487             c[:job] = job
488             c[:run_in_process] = (@options[:run_jobs_here] and
489                                   job[:submit_id] == my_submit_id)
490           else
491             debuglog "component #{cname} new job failed", 0
492             job_creation_failed += 1
493           end
494         end
495
496         if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
497           report_status
498           begin
499             require 'open3'
500             Open3.popen3("arv-crunch-job", "--force-unlock",
501                          "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
502               debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
503               stdin.close
504               while true
505                 rready, wready, = IO.select([stdout, stderr], [])
506                 break if !rready[0]
507                 begin
508                   buf = rready[0].read_nonblock(2**20)
509                 rescue EOFError
510                   break
511                 end
512                 (rready[0] == stdout ? $stdout : $stderr).write(buf)
513               end
514               stdout.close
515               stderr.close
516               debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
517             end
518             if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
519               raise Exception.new("arv-crunch-job did not set finished_at.")
520             end
521           rescue Exception => e
522             debuglog "Interrupted (#{e}). Failing job.", 0
523             $arv.job.update(uuid: c[:job][:uuid],
524                             job: {
525                               state: "Failed"
526                             })
527           end
528         end
529
530         if c[:job] and c[:job][:uuid]
531           if ["Running", "Queued"].include?(c[:job][:state])
532             # Job is running (or may be soon) so update copy of job record
533             c[:job] = JobCache.get(c[:job][:uuid])
534           end
535
536           if c[:job][:state] == "Complete"
537             # Populate script_parameters of other components waiting for
538             # this job
539             @components.each do |c2name, c2|
540               c2[:script_parameters].each do |pname, p|
541                 if p.is_a? Hash and p[:output_of] == cname.to_s
542                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
543                   c2[:script_parameters][pname] = {value: c[:job][:output]}
544                   moretodo = true
545                 end
546               end
547             end
548             unless c_already_finished
549               # This is my first time discovering that the job
550               # succeeded. (At the top of this loop, I was still
551               # waiting for it to finish.)
552
553               if @instance[:name].andand.length.andand > 0
554                 pipeline_name = @instance[:name]
555               elsif @template.andand[:name].andand.length.andand > 0
556                 pipeline_name = @template[:name]
557               else
558                 pipeline_name = @instance[:uuid]
559               end
560               if c[:output_name] != false
561                 # Create a collection located in the same project as the pipeline with the contents of the output.
562                 portable_data_hash = c[:job][:output]
563                 collections = $arv.collection.list(limit: 1,
564                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
565                                                    select: ["portable_data_hash", "manifest_text"]
566                                                    )[:items]
567                 if collections.any?
568                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
569
570                   # check if there is a name collision.
571                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
572                                                                    ["name", "=", name]])[:items]
573
574                   newcollection_actual = nil
575                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
576                     # There is already a collection with the same name and the
577                     # same contents, so just point to that.
578                     newcollection_actual = name_collisions.first
579                   end
580
581                   if newcollection_actual.nil?
582                     # Did not find a collection with the same name (or the
583                     # collection has a different portable data hash) so create
584                     # a new collection with ensure_unique_name: true.
585                     newcollection = {
586                       owner_uuid: owner_uuid,
587                       name: name,
588                       portable_data_hash: collections.first[:portable_data_hash],
589                       manifest_text: collections.first[:manifest_text]
590                     }
591                     debuglog "Creating collection #{newcollection}", 0
592                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
593                   end
594
595                   c[:output_uuid] = newcollection_actual[:uuid]
596                 else
597                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
598                 end
599               end
600             end
601           elsif ["Queued", "Running"].include? c[:job][:state]
602             # Job is running or queued to run, so indicate that pipeline
603             # should continue to run
604             moretodo = true
605           elsif c[:job][:state] == "Cancelled"
606             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
607             moretodo = false
608           elsif c[:job][:state] == "Failed"
609             moretodo = false
610           end
611         end
612       end
613       @instance[:components] = @components
614       report_status
615
616       if @options[:no_wait]
617         moretodo = false
618       end
619
620       # If job creation fails, just give up on this pipeline instance.
621       if job_creation_failed > 0
622         moretodo = false
623       end
624
625       if moretodo
626         begin
627           sleep 10
628         rescue Interrupt
629           debuglog "interrupt", 0
630           interrupted = true
631           break
632         end
633       end
634     end
635
636     c_in_state = @components.values.group_by { |c|
637       c[:job] and c[:job][:state]
638     }
639     succeeded = c_in_state["Complete"].andand.count || 0
640     failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
641     ended = succeeded + failed
642
643     success = (succeeded == @components.length)
644
645     # A job create call failed. Just give up.
646     if job_creation_failed > 0
647       debuglog "job creation failed - giving up on this pipeline instance", 0
648       success = false
649       failed += 1
650     end
651
652     if interrupted
653      if success
654         @instance[:state] = 'Complete'
655      else
656         @instance[:state] = 'Paused'
657       end
658     else
659       if ended == @components.length or failed > 0
660         @instance[:state] = success ? 'Complete' : 'Failed'
661       end
662     end
663
664     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
665       @instance[:finished_at] = Time.now
666     end
667
668     debuglog "pipeline instance state is #{@instance[:state]}"
669
670     # set components_summary
671     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
672     @instance[:components_summary] = components_summary
673
674     @instance.save
675   end
676
677   def cleanup
678     if @instance and @instance[:state] == 'RunningOnClient'
679       @instance[:state] = 'Paused'
680       @instance.save
681     end
682   end
683
684   def uuid
685     @instance[:uuid]
686   end
687
688   protected
689
690   def report_status
691     @instance.save
692
693     if @options[:status_json] != '/dev/null'
694       File.open(@options[:status_json], 'w') do |f|
695         f.puts @components.pretty_inspect
696       end
697     end
698
699     if @options[:status_text] != '/dev/null'
700       File.open(@options[:status_text], 'w') do |f|
701         f.puts ""
702         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
703         namewidth = @components.collect { |cname, c| cname.size }.max
704         @components.each do |cname, c|
705           jstatus = if !c[:job]
706                       "-"
707                     else case c[:job][:state]
708                          when "Running"
709                            "#{c[:job][:tasks_summary].inspect}"
710                          when "Complete"
711                            c[:job][:output]
712                          when "Cancelled"
713                            "cancelled #{c[:job][:cancelled_at]}"
714                          when "Failed"
715                            "failed #{c[:job][:finished_at]}"
716                          when "Queued"
717                            "queued #{c[:job][:created_at]}"
718                          end
719                     end
720           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
721         end
722       end
723     end
724   end
725
726   def abort(msg)
727     if @instance
728       if ["New", "Ready", "RunningOnClient",
729           "RunningOnServer"].include?(@instance[:state])
730         @instance[:state] = "Failed"
731         @instance[:finished_at] = Time.now
732         @instance.save
733       end
734       @instance.log_stderr(msg)
735     end
736     Kernel::abort(msg)
737   end
738 end
739
740 runner = WhRunPipelineInstance.new($options)
741 begin
742   if $options[:template]
743     runner.fetch_template($options[:template])
744   else
745     runner.fetch_instance($options[:instance])
746   end
747   runner.apply_parameters(p.leftovers)
748   runner.setup_instance
749   if $options[:submit]
750     runner.instance.save
751     puts runner.instance[:uuid]
752   else
753     runner.run
754   end
755 rescue Exception => e
756   runner.cleanup
757   raise e
758 end