6146: Ignore SIGPIPE while feeding data to tar. Rely on close() retval instead.
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # [--description] Description for the pipeline instance.
46 #
47 # == Parameters
48 #
49 # [param_name=param_value]
50 #
51 # [param_name param_value] Set (or override) the default value for
52 #                          every parameter with the given name.
53 #
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 #                                            parameter for a single
59 #                                            component.
60 #
61 class WhRunPipelineInstance
62 end
63
64 if RUBY_VERSION < '1.9.3' then
65   abort <<-EOS
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
67   EOS
68 end
69
70 begin
71   require 'arvados'
72   require 'rubygems'
73   require 'json'
74   require 'pp'
75   require 'trollop'
76   require 'google/api_client'
77 rescue LoadError => l
78   puts $:
79   abort <<-EOS
80 #{$0}: fatal: #{l.message}
81 Some runtime dependencies may be missing.
82 Try: gem install arvados pp google-api-client json trollop
83   EOS
84 end
85
86 def debuglog(message, verbosity=1)
87   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
88 end
89
90 # Parse command line options (the kind that control the behavior of
91 # this program, that is, not the pipeline component parameters).
92
93 p = Trollop::Parser.new do
94   version __FILE__
95   opt(:dry_run,
96       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
97       :type => :boolean,
98       :short => :n)
99   opt(:status_text,
100       "Store plain text status in given file.",
101       :short => :none,
102       :type => :string,
103       :default => '/dev/stdout')
104   opt(:status_json,
105       "Store json-formatted pipeline in given file.",
106       :short => :none,
107       :type => :string,
108       :default => '/dev/null')
109   opt(:no_wait,
110       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
111       :short => :none,
112       :type => :boolean)
113   opt(:no_reuse,
114       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
115       :short => :none,
116       :type => :boolean)
117   opt(:debug,
118       "Print extra debugging information on stderr.",
119       :type => :boolean)
120   opt(:debug_level,
121       "Set debug verbosity level.",
122       :short => :none,
123       :type => :integer)
124   opt(:template,
125       "UUID of pipeline template, or path to local pipeline template file.",
126       :short => :none,
127       :type => :string)
128   opt(:instance,
129       "UUID of pipeline instance.",
130       :short => :none,
131       :type => :string)
132   opt(:submit,
133       "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
134       :short => :none,
135       :type => :boolean)
136   opt(:run_pipeline_here,
137       "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
138       :short => :none,
139       :type => :boolean)
140   opt(:run_jobs_here,
141       "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
142       :short => :none,
143       :type => :boolean)
144   opt(:run_here,
145       "Synonym for --run-jobs-here.",
146       :short => :none,
147       :type => :boolean)
148   opt(:description,
149       "Description for the pipeline instance.",
150       :short => :none,
151       :type => :string)
152   stop_on [:'--']
153 end
154 $options = Trollop::with_standard_exception_handling p do
155   p.parse ARGV
156 end
157 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
158
159 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
160 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
161
162 if $options[:instance]
163   if $options[:template] or $options[:submit]
164     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
165   end
166 elsif not $options[:template]
167   puts "error: you must supply a --template or --instance."
168   p.educate
169   abort
170 end
171
172 if $options[:run_pipeline_here] == $options[:submit]
173   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
174 end
175
176 # Set up the API client.
177
178 $arv = Arvados.new api_version: 'v1'
179 $client = $arv.client
180 $arvados = $arv.arvados_api
181
182 class PipelineInstance
183   def self.find(uuid)
184     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
185                              :parameters => {
186                                :uuid => uuid
187                              },
188                              :authenticated => false,
189                              :headers => {
190                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
191                              })
192     j = JSON.parse result.body, :symbolize_names => true
193     unless j.is_a? Hash and j[:uuid]
194       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
195       nil
196     else
197       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
198       self.new(j)
199     end
200   end
201   def self.create(attributes)
202     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
203                              :body_object => {
204                                :pipeline_instance => attributes
205                              },
206                              :authenticated => false,
207                              :headers => {
208                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
209                              })
210     j = JSON.parse result.body, :symbolize_names => true
211     unless j.is_a? Hash and j[:uuid]
212       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
213     end
214     debuglog "Created pipeline instance: #{j[:uuid]}"
215     self.new(j)
216   end
217   def save
218     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
219                              :parameters => {
220                                :uuid => @pi[:uuid]
221                              },
222                              :body_object => {
223                                :pipeline_instance => @attributes_to_update
224                              },
225                              :authenticated => false,
226                              :headers => {
227                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
228                              })
229     j = JSON.parse result.body, :symbolize_names => true
230     unless j.is_a? Hash and j[:uuid]
231       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
232       nil
233     else
234       @attributes_to_update = {}
235       @pi = j
236     end
237   end
238   def []=(x,y)
239     @attributes_to_update[x] = y
240     @pi[x] = y
241   end
242   def [](x)
243     @pi[x]
244   end
245
246   def log_stderr(msg)
247     $arv.log.create log: {
248       event_type: 'stderr',
249       object_uuid: self[:uuid],
250       owner_uuid: self[:owner_uuid],
251       properties: {"text" => msg},
252     }
253   end
254
255   protected
256   def initialize(j)
257     @attributes_to_update = {}
258     @pi = j
259   end
260 end
261
262 class JobCache
263   def self.get(uuid)
264     @cache ||= {}
265     result = $client.execute(:api_method => $arvados.jobs.get,
266                              :parameters => {
267                                :uuid => uuid
268                              },
269                              :authenticated => false,
270                              :headers => {
271                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
272                              })
273     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
274   end
275   def self.where(conditions)
276     result = $client.execute(:api_method => $arvados.jobs.list,
277                              :parameters => {
278                                :limit => 10000,
279                                :where => conditions.to_json
280                              },
281                              :authenticated => false,
282                              :headers => {
283                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
284                              })
285     list = JSON.parse result.body, :symbolize_names => true
286     if list and list[:items].is_a? Array
287       list[:items]
288     else
289       []
290     end
291   end
292   def self.create(pipeline, component, job, create_params)
293     @cache ||= {}
294
295     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
296
297     result = $client.execute(:api_method => $arvados.jobs.create,
298                              :body_object => body,
299                              :authenticated => false,
300                              :headers => {
301                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
302                              })
303     j = JSON.parse result.body, :symbolize_names => true
304     if j.is_a? Hash and j[:uuid]
305       @cache[j[:uuid]] = j
306     else
307       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
308
309       msg = ""
310       j[:errors].each do |err|
311         msg += "Error creating job for component #{component}: #{err}\n"
312       end
313       msg += "Job submission was: #{body.to_json}"
314
315       pipeline.log_stderr(msg)
316       nil
317     end
318   end
319
320   protected
321
322   def self.no_nil_values(hash)
323     hash.reject { |key, value| value.nil? }
324   end
325 end
326
327 class WhRunPipelineInstance
328   attr_reader :instance
329
330   def initialize(_options)
331     @options = _options
332   end
333
334   def fetch_template(template)
335     if template.match /[^-0-9a-z]/
336       # Doesn't look like a uuid -- use it as a filename.
337       @template = JSON.parse File.read(template), :symbolize_names => true
338     else
339       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
340                                :parameters => {
341                                  :uuid => template
342                                },
343                                :authenticated => false,
344                                :headers => {
345                                  authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
346                                })
347       @template = JSON.parse result.body, :symbolize_names => true
348       if !@template[:uuid]
349         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
350       end
351     end
352     self
353   end
354
355   def fetch_instance(instance_uuid)
356     @instance = PipelineInstance.find(instance_uuid)
357     @template = @instance
358     self
359   end
360
361   def apply_parameters(params_args)
362     params_args.shift if params_args[0] == '--'
363     params = {}
364     while !params_args.empty?
365       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
366         params[re[2]] = re[3]
367         params_args.shift
368       elsif params_args.size > 1
369         param = params_args.shift.sub /^--/, ''
370         params[param] = params_args.shift
371       else
372         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
373       end
374     end
375
376     if not @template[:components].is_a?(Hash)
377       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
378     end
379     @components = @template[:components].dup
380
381     bad_components = @components.each_pair.select do |cname, cspec|
382       not cspec.is_a?(Hash)
383     end
384     if bad_components.any?
385       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
386     end
387
388     bad_components = @components.each_pair.select do |cname, cspec|
389       not cspec[:script_parameters].is_a?(Hash)
390     end
391     if bad_components.any?
392       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
393     end
394
395     errors = []
396     @components.each do |componentname, component|
397       component[:script_parameters].each do |parametername, parameter|
398         parameter = { :value => parameter } unless parameter.is_a? Hash
399         value =
400           (params["#{componentname}::#{parametername}"] ||
401            parameter[:value] ||
402            (parameter[:output_of].nil? &&
403             (params[parametername.to_s] ||
404              parameter[:default])) ||
405            nil)
406         if value.nil? and
407             ![false,'false',0,'0'].index parameter[:required]
408           if parameter[:output_of]
409             if not @components[parameter[:output_of].intern]
410               errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
411             end
412             next
413           end
414           errors << [componentname, parametername, "required parameter is missing"]
415         end
416         debuglog "parameter #{componentname}::#{parametername} == #{value}"
417
418         component[:script_parameters][parametername] =
419           parameter.dup.merge(value: value)
420       end
421     end
422     if !errors.empty?
423       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
424     end
425     debuglog "options=" + @options.pretty_inspect
426     self
427   end
428
429   def setup_instance
430     if @instance
431       @instance[:properties][:run_options] ||= {}
432       if @options[:no_reuse]
433         # override properties of existing instance
434         @instance[:properties][:run_options][:enable_job_reuse] = false
435       else
436         # Default to "enable reuse" if not specified. (This code path
437         # can go away when old clients go away.)
438         if @instance[:properties][:run_options][:enable_job_reuse].nil?
439           @instance[:properties][:run_options][:enable_job_reuse] = true
440         end
441       end
442     else
443       description = $options[:description]
444       description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
445       @instance = PipelineInstance.
446         create(components: @components,
447                properties: {
448                  run_options: {
449                    enable_job_reuse: !@options[:no_reuse]
450                  }
451                },
452                pipeline_template_uuid: @template[:uuid],
453                description: description,
454                state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
455     end
456     self
457   end
458
459   def run
460     moretodo = true
461     interrupted = false
462
463     if @instance[:started_at].nil?
464       @instance[:started_at] = Time.now
465     end
466
467     job_creation_failed = 0
468     while moretodo
469       moretodo = false
470       @components.each do |cname, c|
471         job = nil
472         owner_uuid = @instance[:owner_uuid]
473         # Is the job satisfying this component already known to be
474         # finished? (Already meaning "before we query API server about
475         # the job's current state")
476         c_already_finished = (c[:job] &&
477                               c[:job][:uuid] &&
478                               ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
479         if !c[:job] and
480             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
481           # No job yet associated with this component and is component inputs
482           # are fully specified (any output_of script_parameters are resolved
483           # to real value)
484           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
485           job = JobCache.create(@instance, cname, {
486             :script => c[:script],
487             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
488                                          [key, spec[:value]]
489                                        end],
490             :script_version => c[:script_version],
491             :repository => c[:repository],
492             :nondeterministic => c[:nondeterministic],
493             :runtime_constraints => c[:runtime_constraints],
494             :owner_uuid => owner_uuid,
495             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
496             :submit_id => my_submit_id,
497             :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
498           }, {
499             # This is the right place to put these attributes when
500             # dealing with new API servers.
501             :minimum_script_version => c[:minimum_script_version],
502             :exclude_script_versions => c[:exclude_minimum_script_versions],
503             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
504                                 !c[:nondeterministic]),
505             :filters => c[:filters]
506           })
507           if job
508             debuglog "component #{cname} new job #{job[:uuid]}"
509             c[:job] = job
510             c[:run_in_process] = (@options[:run_jobs_here] and
511                                   job[:submit_id] == my_submit_id)
512           else
513             debuglog "component #{cname} new job failed", 0
514             job_creation_failed += 1
515           end
516         end
517
518         if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
519           report_status
520           begin
521             require 'open3'
522             Open3.popen3("arv-crunch-job", "--force-unlock",
523                          "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
524               debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
525               stdin.close
526               while true
527                 rready, wready, = IO.select([stdout, stderr], [])
528                 break if !rready[0]
529                 begin
530                   buf = rready[0].read_nonblock(2**20)
531                 rescue EOFError
532                   break
533                 end
534                 (rready[0] == stdout ? $stdout : $stderr).write(buf)
535               end
536               stdout.close
537               stderr.close
538               debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
539             end
540             if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
541               raise Exception.new("arv-crunch-job did not set finished_at.")
542             end
543           rescue Exception => e
544             debuglog "Interrupted (#{e}). Failing job.", 0
545             $arv.job.update(uuid: c[:job][:uuid],
546                             job: {
547                               state: "Failed"
548                             })
549           end
550         end
551
552         if c[:job] and c[:job][:uuid]
553           if ["Running", "Queued"].include?(c[:job][:state])
554             # Job is running (or may be soon) so update copy of job record
555             c[:job] = JobCache.get(c[:job][:uuid])
556           end
557
558           if c[:job][:state] == "Complete"
559             # Populate script_parameters of other components waiting for
560             # this job
561             @components.each do |c2name, c2|
562               c2[:script_parameters].each do |pname, p|
563                 if p.is_a? Hash and p[:output_of] == cname.to_s
564                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
565                   c2[:script_parameters][pname] = {value: c[:job][:output]}
566                   moretodo = true
567                 end
568               end
569             end
570             unless c_already_finished
571               # This is my first time discovering that the job
572               # succeeded. (At the top of this loop, I was still
573               # waiting for it to finish.)
574
575               if @instance[:name].andand.length.andand > 0
576                 pipeline_name = @instance[:name]
577               elsif @template.andand[:name].andand.length.andand > 0
578                 pipeline_name = @template[:name]
579               else
580                 pipeline_name = @instance[:uuid]
581               end
582               if c[:output_name] != false
583                 # Create a collection located in the same project as the pipeline with the contents of the output.
584                 portable_data_hash = c[:job][:output]
585                 collections = $arv.collection.list(limit: 1,
586                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
587                                                    select: ["portable_data_hash", "manifest_text"]
588                                                    )[:items]
589                 if collections.any?
590                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
591
592                   # check if there is a name collision.
593                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
594                                                                    ["name", "=", name]])[:items]
595
596                   newcollection_actual = nil
597                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
598                     # There is already a collection with the same name and the
599                     # same contents, so just point to that.
600                     newcollection_actual = name_collisions.first
601                   end
602
603                   if newcollection_actual.nil?
604                     # Did not find a collection with the same name (or the
605                     # collection has a different portable data hash) so create
606                     # a new collection with ensure_unique_name: true.
607                     newcollection = {
608                       owner_uuid: owner_uuid,
609                       name: name,
610                       portable_data_hash: collections.first[:portable_data_hash],
611                       manifest_text: collections.first[:manifest_text]
612                     }
613                     debuglog "Creating collection #{newcollection}", 0
614                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
615                   end
616
617                   c[:output_uuid] = newcollection_actual[:uuid]
618                 else
619                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
620                 end
621               end
622             end
623           elsif ["Queued", "Running"].include? c[:job][:state]
624             # Job is running or queued to run, so indicate that pipeline
625             # should continue to run
626             moretodo = true
627           elsif c[:job][:state] == "Cancelled"
628             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
629             moretodo = false
630           elsif c[:job][:state] == "Failed"
631             moretodo = false
632           end
633         end
634       end
635       @instance[:components] = @components
636       report_status
637
638       if @options[:no_wait]
639         moretodo = false
640       end
641
642       # If job creation fails, just give up on this pipeline instance.
643       if job_creation_failed > 0
644         moretodo = false
645       end
646
647       if moretodo
648         begin
649           sleep 10
650         rescue Interrupt
651           debuglog "interrupt", 0
652           interrupted = true
653           break
654         end
655       end
656     end
657
658     c_in_state = @components.values.group_by { |c|
659       c[:job] and c[:job][:state]
660     }
661     succeeded = c_in_state["Complete"].andand.count || 0
662     failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
663     ended = succeeded + failed
664
665     success = (succeeded == @components.length)
666
667     # A job create call failed. Just give up.
668     if job_creation_failed > 0
669       debuglog "job creation failed - giving up on this pipeline instance", 0
670       success = false
671       failed += 1
672     end
673
674     if interrupted
675      if success
676         @instance[:state] = 'Complete'
677      else
678         @instance[:state] = 'Paused'
679       end
680     else
681       if ended == @components.length or failed > 0
682         @instance[:state] = success ? 'Complete' : 'Failed'
683       end
684     end
685
686     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
687       @instance[:finished_at] = Time.now
688     end
689
690     debuglog "pipeline instance state is #{@instance[:state]}"
691
692     # set components_summary
693     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
694     @instance[:components_summary] = components_summary
695
696     @instance.save
697   end
698
699   def cleanup
700     if @instance and @instance[:state] == 'RunningOnClient'
701       @instance[:state] = 'Paused'
702       @instance.save
703     end
704   end
705
706   def uuid
707     @instance[:uuid]
708   end
709
710   protected
711
712   def report_status
713     @instance.save
714
715     if @options[:status_json] != '/dev/null'
716       File.open(@options[:status_json], 'w') do |f|
717         f.puts @components.pretty_inspect
718       end
719     end
720
721     if @options[:status_text] != '/dev/null'
722       File.open(@options[:status_text], 'w') do |f|
723         f.puts ""
724         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
725         namewidth = @components.collect { |cname, c| cname.size }.max
726         @components.each do |cname, c|
727           jstatus = if !c[:job]
728                       "-"
729                     else case c[:job][:state]
730                          when "Running"
731                            "#{c[:job][:tasks_summary].inspect}"
732                          when "Complete"
733                            c[:job][:output]
734                          when "Cancelled"
735                            "cancelled #{c[:job][:cancelled_at]}"
736                          when "Failed"
737                            "failed #{c[:job][:finished_at]}"
738                          when "Queued"
739                            "queued #{c[:job][:created_at]}"
740                          end
741                     end
742           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
743         end
744       end
745     end
746   end
747
748   def abort(msg)
749     if @instance
750       if ["New", "Ready", "RunningOnClient",
751           "RunningOnServer"].include?(@instance[:state])
752         @instance[:state] = "Failed"
753         @instance[:finished_at] = Time.now
754         @instance.save
755       end
756       @instance.log_stderr(msg)
757     end
758     Kernel::abort(msg)
759   end
760 end
761
762 runner = WhRunPipelineInstance.new($options)
763 begin
764   if $options[:template]
765     runner.fetch_template($options[:template])
766   else
767     runner.fetch_instance($options[:instance])
768   end
769   runner.apply_parameters(p.leftovers)
770   runner.setup_instance
771   if $options[:submit]
772     runner.instance.save
773     puts runner.instance[:uuid]
774   else
775     runner.run
776   end
777 rescue Exception => e
778   runner.cleanup
779   raise e
780 end