Merge branch 'master' into 12479-wb-structured-vocabulary
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 class WhRunPipelineInstance
7 end
8
9 if RUBY_VERSION < '1.9.3' then
10   abort <<-EOS
11 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
12   EOS
13 end
14
15 begin
16   require 'arvados'
17   require 'rubygems'
18   require 'json'
19   require 'pp'
20   require 'trollop'
21   require 'google/api_client'
22 rescue LoadError => l
23   $stderr.puts $:
24   abort <<-EOS
25 #{$0}: fatal: #{l.message}
26 Some runtime dependencies may be missing.
27 Try: gem install arvados pp google-api-client json trollop
28   EOS
29 end
30
31 def debuglog(message, verbosity=1)
32   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
33 end
34
35 # Parse command line options (the kind that control the behavior of
36 # this program, that is, not the pipeline component parameters).
37
38 p = Trollop::Parser.new do
39   version __FILE__
40   banner(<<EOF)
41
42 Usage:
43   arv-run-pipeline-instance --template TEMPLATE_UUID [options] [--] [parameters]
44   arv-run-pipeline-instance --instance INSTANCE_UUID [options] [--] [parameters]
45
46 Parameters:
47   param_name=param_value
48   param_name param_value
49                          Set (or override) the default value for every
50                          pipeline component parameter with the given
51                          name.
52
53   component_name::param_name=param_value
54   component_name::param_name param_value
55   --component_name::param_name=param_value
56   --component_name::param_name param_value
57                          Set the value of a parameter for a single
58                          pipeline component.
59
60 Options:
61 EOF
62   opt(:dry_run,
63       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
64       :type => :boolean,
65       :short => :n)
66   opt(:status_text,
67       "Store plain text status in given file.",
68       :short => :none,
69       :type => :string,
70       :default => '/dev/stdout')
71   opt(:status_json,
72       "Store json-formatted pipeline in given file.",
73       :short => :none,
74       :type => :string,
75       :default => '/dev/null')
76   opt(:no_wait,
77       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
78       :short => :none,
79       :type => :boolean)
80   opt(:no_reuse,
81       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
82       :short => :none,
83       :type => :boolean)
84   opt(:debug,
85       "Print extra debugging information on stderr.",
86       :type => :boolean)
87   opt(:debug_level,
88       "Set debug verbosity level.",
89       :short => :none,
90       :type => :integer)
91   opt(:template,
92       "UUID of pipeline template, or path to local pipeline template file.",
93       :short => :none,
94       :type => :string)
95   opt(:instance,
96       "UUID of pipeline instance.",
97       :short => :none,
98       :type => :string)
99   opt(:submit,
100       "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
101       :short => :none,
102       :type => :boolean)
103   opt(:run_pipeline_here,
104       "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
105       :short => :none,
106       :type => :boolean)
107   opt(:run_jobs_here,
108       "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
109       :short => :none,
110       :type => :boolean)
111   opt(:run_here,
112       "Synonym for --run-jobs-here.",
113       :short => :none,
114       :type => :boolean)
115   opt(:description,
116       "Description for the pipeline instance.",
117       :short => :none,
118       :type => :string)
119   opt(:project_uuid,
120       "UUID of the project for the pipeline instance.",
121       short: :none,
122       type: :string)
123   stop_on [:'--']
124 end
125 $options = Trollop::with_standard_exception_handling p do
126   p.parse ARGV
127 end
128 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
129
130 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
131 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
132
133 if $options[:instance]
134   if $options[:template] or $options[:submit]
135     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
136   end
137 elsif not $options[:template]
138   $stderr.puts "error: you must supply a --template or --instance."
139   p.educate
140   abort
141 end
142
143 if $options[:run_pipeline_here] == $options[:submit]
144   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
145 end
146
147 # Set up the API client.
148
149 $arv = Arvados.new api_version: 'v1'
150 $client = $arv.client
151 $arvados = $arv.arvados_api
152
153 class PipelineInstance
154   def self.find(uuid)
155     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
156                              :parameters => {
157                                :uuid => uuid
158                              },
159                              :authenticated => false,
160                              :headers => {
161                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
162                              })
163     j = JSON.parse result.body, :symbolize_names => true
164     unless j.is_a? Hash and j[:uuid]
165       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
166       nil
167     else
168       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
169       self.new(j)
170     end
171   end
172   def self.create(attributes)
173     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
174                              :body_object => {
175                                :pipeline_instance => attributes
176                              },
177                              :authenticated => false,
178                              :headers => {
179                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
180                              })
181     j = JSON.parse result.body, :symbolize_names => true
182     unless j.is_a? Hash and j[:uuid]
183       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
184     end
185     debuglog "Created pipeline instance: #{j[:uuid]}"
186     self.new(j)
187   end
188   def save
189     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
190                              :parameters => {
191                                :uuid => @pi[:uuid]
192                              },
193                              :body_object => {
194                                :pipeline_instance => @attributes_to_update
195                              },
196                              :authenticated => false,
197                              :headers => {
198                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
199                              })
200     j = JSON.parse result.body, :symbolize_names => true
201     unless j.is_a? Hash and j[:uuid]
202       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
203       nil
204     else
205       @attributes_to_update = {}
206       @pi = j
207     end
208   end
209   def []=(x,y)
210     @attributes_to_update[x] = y
211     @pi[x] = y
212   end
213   def [](x)
214     @pi[x]
215   end
216
217   def log_stderr(msg)
218     $arv.log.create log: {
219       event_type: 'stderr',
220       object_uuid: self[:uuid],
221       owner_uuid: self[:owner_uuid],
222       properties: {"text" => msg},
223     }
224   end
225
226   protected
227   def initialize(j)
228     @attributes_to_update = {}
229     @pi = j
230   end
231 end
232
233 class JobCache
234   def self.get(uuid)
235     @cache ||= {}
236     result = $client.execute(:api_method => $arvados.jobs.get,
237                              :parameters => {
238                                :uuid => uuid
239                              },
240                              :authenticated => false,
241                              :headers => {
242                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
243                              })
244     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
245   end
246   def self.where(conditions)
247     result = $client.execute(:api_method => $arvados.jobs.list,
248                              :parameters => {
249                                :limit => 10000,
250                                :where => conditions.to_json
251                              },
252                              :authenticated => false,
253                              :headers => {
254                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
255                              })
256     list = JSON.parse result.body, :symbolize_names => true
257     if list and list[:items].is_a? Array
258       list[:items]
259     else
260       []
261     end
262   end
263
264   # create() returns [job, exception]. If both job and exception are
265   # nil, there was a non-retryable error and the call should not be
266   # attempted again.
267   def self.create(pipeline, component, job, create_params)
268     @cache ||= {}
269
270     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
271
272     result = nil
273     begin
274       result = $client.execute(
275         :api_method => $arvados.jobs.create,
276         :body_object => body,
277         :authenticated => false,
278         :headers => {
279           authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
280         })
281       if result.status == 429 || result.status >= 500
282         raise Exception.new("HTTP status #{result.status}")
283       end
284     rescue Exception => e
285       return nil, e
286     end
287     j = JSON.parse(result.body, :symbolize_names => true) rescue nil
288     if result.status == 200 && j.is_a?(Hash) && j[:uuid]
289       @cache[j[:uuid]] = j
290       return j, nil
291     else
292       errors = j[:errors] rescue []
293       debuglog "create job: [#{result.status}] #{errors.inspect} with attributes #{body}", 0
294
295       msg = ""
296       errors.each do |err|
297         msg += "Error creating job for component #{component}: #{err}\n"
298       end
299       msg += "Job submission was: #{body.to_json}"
300
301       pipeline.log_stderr(msg)
302       return nil, nil
303     end
304   end
305
306   protected
307
308   def self.no_nil_values(hash)
309     hash.reject { |key, value| value.nil? }
310   end
311 end
312
313 class WhRunPipelineInstance
314   attr_reader :instance
315
316   def initialize(_options)
317     @options = _options
318   end
319
320   def fetch_template(template)
321     if template.match /[^-0-9a-z]/
322       # Doesn't look like a uuid -- use it as a filename.
323       @template = JSON.parse File.read(template), :symbolize_names => true
324     else
325       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
326                                :parameters => {
327                                  :uuid => template
328                                },
329                                :authenticated => false,
330                                :headers => {
331                                  authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
332                                })
333       @template = JSON.parse result.body, :symbolize_names => true
334       if !@template[:uuid]
335         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
336       end
337     end
338     self
339   end
340
341   def fetch_instance(instance_uuid)
342     @instance = PipelineInstance.find(instance_uuid)
343     @template = @instance
344     self
345   end
346
347   def apply_parameters(params_args)
348     params_args.shift if params_args[0] == '--'
349     params = {}
350     while !params_args.empty?
351       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
352         params[re[2]] = re[3]
353         params_args.shift
354       elsif params_args.size > 1
355         param = params_args.shift.sub /^--/, ''
356         params[param] = params_args.shift
357       else
358         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
359       end
360     end
361
362     if not @template[:components].is_a?(Hash)
363       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
364     end
365     @components = @template[:components].dup
366
367     bad_components = @components.each_pair.select do |cname, cspec|
368       not cspec.is_a?(Hash)
369     end
370     if bad_components.any?
371       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
372     end
373
374     bad_components = @components.each_pair.select do |cname, cspec|
375       not cspec[:script_parameters].is_a?(Hash)
376     end
377     if bad_components.any?
378       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
379     end
380
381     errors = []
382     @components.each do |componentname, component|
383       component[:script_parameters].each do |parametername, parameter|
384         parameter = { :value => parameter } unless parameter.is_a? Hash
385         if params.has_key?("#{componentname}::#{parametername}")
386           value = params["#{componentname}::#{parametername}"]
387         elsif parameter.has_key?(:value)
388           value = parameter[:value]
389         elsif parameter.has_key?(:output_of)
390           if !@components[parameter[:output_of].intern]
391             errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
392           else
393             # value will be filled in later when the upstream
394             # component's output becomes known
395           end
396           next
397         elsif params.has_key?(parametername.to_s)
398           value = params[parametername.to_s]
399         elsif parameter.has_key?(:default)
400           value = parameter[:default]
401         elsif [false, 'false', 0, '0'].index(parameter[:required])
402           value = nil
403         else
404           errors << [componentname, parametername, "required parameter is missing"]
405           next
406         end
407         debuglog "parameter #{componentname}::#{parametername} == #{value}"
408
409         component[:script_parameters][parametername] =
410           parameter.dup.merge(value: value)
411       end
412     end
413     if !errors.empty?
414       all_errors = errors.collect do |c,p,e|
415         "#{c}::#{p} - #{e}\n"
416       end.join("")
417       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{all_errors}"
418     end
419     debuglog "options=" + @options.pretty_inspect
420     self
421   end
422
423   def setup_instance
424     if @instance
425       @instance[:properties][:run_options] ||= {}
426       if @options[:no_reuse]
427         # override properties of existing instance
428         @instance[:properties][:run_options][:enable_job_reuse] = false
429       else
430         # Default to "enable reuse" if not specified. (This code path
431         # can go away when old clients go away.)
432         if @instance[:properties][:run_options][:enable_job_reuse].nil?
433           @instance[:properties][:run_options][:enable_job_reuse] = true
434         end
435       end
436     else
437       description = $options[:description] ||
438                     ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
439       instance_body = {
440         components: @components,
441         properties: {
442           run_options: {
443             enable_job_reuse: !@options[:no_reuse]
444           }
445         },
446         pipeline_template_uuid: @template[:uuid],
447         description: description,
448         state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
449       }
450       if @options[:project_uuid]
451         instance_body[:owner_uuid] = @options[:project_uuid]
452       end
453       @instance = PipelineInstance.create(instance_body)
454     end
455     self
456   end
457
458   def run
459     moretodo = true
460     interrupted = false
461
462     if @instance[:started_at].nil?
463       @instance[:started_at] = Time.now
464     end
465
466     job_creation_failed = 0
467     while moretodo
468       moretodo = false
469       @components.each do |cname, c|
470         job = nil
471         owner_uuid = @instance[:owner_uuid]
472         # Is the job satisfying this component already known to be
473         # finished? (Already meaning "before we query API server about
474         # the job's current state")
475         c_already_finished = (c[:job] &&
476                               c[:job][:uuid] &&
477                               ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
478         if !c[:job] and
479             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
480           # No job yet associated with this component and is component inputs
481           # are fully specified (any output_of script_parameters are resolved
482           # to real value)
483           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
484           job, err = JobCache.create(@instance, cname, {
485             :script => c[:script],
486             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
487                                          [key, spec[:value]]
488                                        end],
489             :script_version => c[:script_version],
490             :repository => c[:repository],
491             :nondeterministic => c[:nondeterministic],
492             :runtime_constraints => c[:runtime_constraints],
493             :owner_uuid => owner_uuid,
494             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
495             :submit_id => my_submit_id,
496             :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
497           }, {
498             # This is the right place to put these attributes when
499             # dealing with new API servers.
500             :minimum_script_version => c[:minimum_script_version],
501             :exclude_script_versions => c[:exclude_minimum_script_versions],
502             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
503                                 !c[:nondeterministic]),
504             :filters => c[:filters]
505           })
506           if job
507             debuglog "component #{cname} new job #{job[:uuid]}"
508             c[:job] = job
509             c[:run_in_process] = (@options[:run_jobs_here] and
510                                   job[:submit_id] == my_submit_id)
511           elsif err.nil?
512             debuglog "component #{cname} new job failed", 0
513             job_creation_failed += 1
514           else
515             debuglog "component #{cname} new job failed, err=#{err}", 0
516           end
517         end
518
519         if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
520           report_status
521           begin
522             require 'open3'
523             Open3.popen3("arv-crunch-job", "--force-unlock",
524                          "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
525               debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
526               stdin.close
527               while true
528                 rready, wready, = IO.select([stdout, stderr], [])
529                 break if !rready[0]
530                 begin
531                   buf = rready[0].read_nonblock(2**20)
532                 rescue EOFError
533                   break
534                 end
535                 (rready[0] == stdout ? $stdout : $stderr).write(buf)
536               end
537               stdout.close
538               stderr.close
539               debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
540             end
541             if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
542               raise Exception.new("arv-crunch-job did not set finished_at.")
543             end
544           rescue Exception => e
545             debuglog "Interrupted (#{e}). Failing job.", 0
546             $arv.job.update(uuid: c[:job][:uuid],
547                             job: {
548                               state: "Failed"
549                             })
550           end
551         end
552
553         if c[:job] and c[:job][:uuid]
554           if ["Running", "Queued"].include?(c[:job][:state])
555             # Job is running (or may be soon) so update copy of job record
556             c[:job] = JobCache.get(c[:job][:uuid])
557           end
558
559           if c[:job][:state] == "Complete"
560             # Populate script_parameters of other components waiting for
561             # this job
562             @components.each do |c2name, c2|
563               c2[:script_parameters].each do |pname, p|
564                 if p.is_a? Hash and p[:output_of] == cname.to_s
565                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
566                   c2[:script_parameters][pname] = {value: c[:job][:output]}
567                   moretodo = true
568                 end
569               end
570             end
571             unless c_already_finished
572               # This is my first time discovering that the job
573               # succeeded. (At the top of this loop, I was still
574               # waiting for it to finish.)
575
576               if @instance[:name].andand.length.andand > 0
577                 pipeline_name = @instance[:name]
578               elsif @template.andand[:name].andand.length.andand > 0
579                 pipeline_name = @template[:name]
580               else
581                 pipeline_name = @instance[:uuid]
582               end
583               if c[:output_name] != false
584                 # Create a collection located in the same project as the pipeline with the contents of the output.
585                 portable_data_hash = c[:job][:output]
586                 collections = $arv.collection.list(limit: 1,
587                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
588                                                    select: ["portable_data_hash", "manifest_text"]
589                                                    )[:items]
590                 if collections.any?
591                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
592
593                   # check if there is a name collision.
594                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
595                                                                    ["name", "=", name]])[:items]
596
597                   newcollection_actual = nil
598                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
599                     # There is already a collection with the same name and the
600                     # same contents, so just point to that.
601                     newcollection_actual = name_collisions.first
602                   end
603
604                   if newcollection_actual.nil?
605                     # Did not find a collection with the same name (or the
606                     # collection has a different portable data hash) so create
607                     # a new collection with ensure_unique_name: true.
608                     newcollection = {
609                       owner_uuid: owner_uuid,
610                       name: name,
611                       portable_data_hash: collections.first[:portable_data_hash],
612                       manifest_text: collections.first[:manifest_text]
613                     }
614                     debuglog "Creating collection #{newcollection}", 0
615                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
616                   end
617
618                   c[:output_uuid] = newcollection_actual[:uuid]
619                 else
620                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
621                 end
622               end
623             end
624           elsif ["Queued", "Running"].include? c[:job][:state]
625             # Job is running or queued to run, so indicate that pipeline
626             # should continue to run
627             moretodo = true
628           elsif c[:job][:state] == "Cancelled"
629             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
630             moretodo = false
631           elsif c[:job][:state] == "Failed"
632             moretodo = false
633           end
634         end
635       end
636       @instance[:components] = @components
637       report_status
638
639       if @options[:no_wait]
640         moretodo = false
641       end
642
643       # If job creation fails, just give up on this pipeline instance.
644       if job_creation_failed > 0
645         moretodo = false
646       end
647
648       if moretodo
649         begin
650           sleep 10
651         rescue Interrupt
652           debuglog "interrupt", 0
653           interrupted = true
654           break
655         end
656       end
657     end
658
659     c_in_state = @components.values.group_by { |c|
660       c[:job] and c[:job][:state]
661     }
662     succeeded = c_in_state["Complete"].andand.count || 0
663     failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
664     ended = succeeded + failed
665
666     success = (succeeded == @components.length)
667
668     # A job create call failed. Just give up.
669     if job_creation_failed > 0
670       debuglog "job creation failed - giving up on this pipeline instance", 0
671       success = false
672       failed += 1
673     end
674
675     if interrupted
676      if success
677         @instance[:state] = 'Complete'
678      else
679         @instance[:state] = 'Paused'
680      end
681     else
682       if ended == @components.length or failed > 0
683         @instance[:state] = success ? 'Complete' : 'Failed'
684       end
685     end
686
687     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
688       @instance[:finished_at] = Time.now
689     end
690
691     debuglog "pipeline instance state is #{@instance[:state]}"
692
693     # set components_summary
694     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
695     @instance[:components_summary] = components_summary
696
697     @instance.save
698   end
699
700   def cleanup
701     if @instance and @instance[:state] == 'RunningOnClient'
702       @instance[:state] = 'Paused'
703       @instance.save
704     end
705   end
706
707   def uuid
708     @instance[:uuid]
709   end
710
711   protected
712
713   def report_status
714     @instance.save
715
716     if @options[:status_json] != '/dev/null'
717       File.open(@options[:status_json], 'w') do |f|
718         f.puts @components.pretty_inspect
719       end
720     end
721
722     if @options[:status_text] != '/dev/null'
723       File.open(@options[:status_text], 'w') do |f|
724         f.puts ""
725         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
726         namewidth = @components.collect { |cname, c| cname.size }.max
727         @components.each do |cname, c|
728           jstatus = if !c[:job]
729                       "-"
730                     else case c[:job][:state]
731                          when "Running"
732                            "#{c[:job][:tasks_summary].inspect}"
733                          when "Complete"
734                            c[:job][:output]
735                          when "Cancelled"
736                            "cancelled #{c[:job][:cancelled_at]}"
737                          when "Failed"
738                            "failed #{c[:job][:finished_at]}"
739                          when "Queued"
740                            "queued #{c[:job][:created_at]}"
741                          end
742                     end
743           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
744         end
745       end
746     end
747   end
748
749   def abort(msg)
750     if @instance
751       if ["New", "Ready", "RunningOnClient",
752           "RunningOnServer"].include?(@instance[:state])
753         @instance[:state] = "Failed"
754         @instance[:finished_at] = Time.now
755         @instance.save
756       end
757       @instance.log_stderr(msg)
758     end
759     Kernel::abort(msg)
760   end
761 end
762
763 runner = WhRunPipelineInstance.new($options)
764 begin
765   if $options[:template]
766     runner.fetch_template($options[:template])
767   else
768     runner.fetch_instance($options[:instance])
769   end
770   runner.apply_parameters(p.leftovers)
771   runner.setup_instance
772   if $options[:submit]
773     runner.instance.save
774     puts runner.instance[:uuid]
775   else
776     runner.run
777   end
778 rescue Exception => e
779   runner.cleanup
780   raise e
781 end