5824: Merge branch 'master' into 5824-keep-web-workbench
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # [--description] Description for the pipeline instance.
46 #
47 # == Parameters
48 #
49 # [param_name=param_value]
50 #
51 # [param_name param_value] Set (or override) the default value for
52 #                          every parameter with the given name.
53 #
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 #                                            parameter for a single
59 #                                            component.
60 #
61 class WhRunPipelineInstance
62 end
63
64 if RUBY_VERSION < '1.9.3' then
65   abort <<-EOS
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
67   EOS
68 end
69
70 begin
71   require 'arvados'
72   require 'rubygems'
73   require 'json'
74   require 'pp'
75   require 'trollop'
76   require 'google/api_client'
77 rescue LoadError => l
78   puts $:
79   abort <<-EOS
80 #{$0}: fatal: #{l.message}
81 Some runtime dependencies may be missing.
82 Try: gem install arvados pp google-api-client json trollop
83   EOS
84 end
85
86 def debuglog(message, verbosity=1)
87   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
88 end
89
90 # Parse command line options (the kind that control the behavior of
91 # this program, that is, not the pipeline component parameters).
92
93 p = Trollop::Parser.new do
94   version __FILE__
95   opt(:dry_run,
96       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
97       :type => :boolean,
98       :short => :n)
99   opt(:status_text,
100       "Store plain text status in given file.",
101       :short => :none,
102       :type => :string,
103       :default => '/dev/stdout')
104   opt(:status_json,
105       "Store json-formatted pipeline in given file.",
106       :short => :none,
107       :type => :string,
108       :default => '/dev/null')
109   opt(:no_wait,
110       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
111       :short => :none,
112       :type => :boolean)
113   opt(:no_reuse,
114       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
115       :short => :none,
116       :type => :boolean)
117   opt(:debug,
118       "Print extra debugging information on stderr.",
119       :type => :boolean)
120   opt(:debug_level,
121       "Set debug verbosity level.",
122       :short => :none,
123       :type => :integer)
124   opt(:template,
125       "UUID of pipeline template, or path to local pipeline template file.",
126       :short => :none,
127       :type => :string)
128   opt(:instance,
129       "UUID of pipeline instance.",
130       :short => :none,
131       :type => :string)
132   opt(:submit,
133       "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
134       :short => :none,
135       :type => :boolean)
136   opt(:run_pipeline_here,
137       "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
138       :short => :none,
139       :type => :boolean)
140   opt(:run_jobs_here,
141       "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
142       :short => :none,
143       :type => :boolean)
144   opt(:run_here,
145       "Synonym for --run-jobs-here.",
146       :short => :none,
147       :type => :boolean)
148   opt(:description,
149       "Description for the pipeline instance.",
150       :short => :none,
151       :type => :string)
152   opt(:project_uuid,
153       "UUID of the project for the pipeline instance.",
154       short: :none,
155       type: :string)
156   stop_on [:'--']
157 end
158 $options = Trollop::with_standard_exception_handling p do
159   p.parse ARGV
160 end
161 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
162
163 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
164 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
165
166 if $options[:instance]
167   if $options[:template] or $options[:submit]
168     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
169   end
170 elsif not $options[:template]
171   puts "error: you must supply a --template or --instance."
172   p.educate
173   abort
174 end
175
176 if $options[:run_pipeline_here] == $options[:submit]
177   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
178 end
179
180 # Set up the API client.
181
182 $arv = Arvados.new api_version: 'v1'
183 $client = $arv.client
184 $arvados = $arv.arvados_api
185
186 class PipelineInstance
187   def self.find(uuid)
188     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
189                              :parameters => {
190                                :uuid => uuid
191                              },
192                              :authenticated => false,
193                              :headers => {
194                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
195                              })
196     j = JSON.parse result.body, :symbolize_names => true
197     unless j.is_a? Hash and j[:uuid]
198       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
199       nil
200     else
201       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
202       self.new(j)
203     end
204   end
205   def self.create(attributes)
206     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
207                              :body_object => {
208                                :pipeline_instance => attributes
209                              },
210                              :authenticated => false,
211                              :headers => {
212                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
213                              })
214     j = JSON.parse result.body, :symbolize_names => true
215     unless j.is_a? Hash and j[:uuid]
216       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
217     end
218     debuglog "Created pipeline instance: #{j[:uuid]}"
219     self.new(j)
220   end
221   def save
222     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
223                              :parameters => {
224                                :uuid => @pi[:uuid]
225                              },
226                              :body_object => {
227                                :pipeline_instance => @attributes_to_update
228                              },
229                              :authenticated => false,
230                              :headers => {
231                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
232                              })
233     j = JSON.parse result.body, :symbolize_names => true
234     unless j.is_a? Hash and j[:uuid]
235       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
236       nil
237     else
238       @attributes_to_update = {}
239       @pi = j
240     end
241   end
242   def []=(x,y)
243     @attributes_to_update[x] = y
244     @pi[x] = y
245   end
246   def [](x)
247     @pi[x]
248   end
249
250   def log_stderr(msg)
251     $arv.log.create log: {
252       event_type: 'stderr',
253       object_uuid: self[:uuid],
254       owner_uuid: self[:owner_uuid],
255       properties: {"text" => msg},
256     }
257   end
258
259   protected
260   def initialize(j)
261     @attributes_to_update = {}
262     @pi = j
263   end
264 end
265
266 class JobCache
267   def self.get(uuid)
268     @cache ||= {}
269     result = $client.execute(:api_method => $arvados.jobs.get,
270                              :parameters => {
271                                :uuid => uuid
272                              },
273                              :authenticated => false,
274                              :headers => {
275                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
276                              })
277     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
278   end
279   def self.where(conditions)
280     result = $client.execute(:api_method => $arvados.jobs.list,
281                              :parameters => {
282                                :limit => 10000,
283                                :where => conditions.to_json
284                              },
285                              :authenticated => false,
286                              :headers => {
287                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
288                              })
289     list = JSON.parse result.body, :symbolize_names => true
290     if list and list[:items].is_a? Array
291       list[:items]
292     else
293       []
294     end
295   end
296   def self.create(pipeline, component, job, create_params)
297     @cache ||= {}
298
299     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
300
301     result = $client.execute(:api_method => $arvados.jobs.create,
302                              :body_object => body,
303                              :authenticated => false,
304                              :headers => {
305                                authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
306                              })
307     j = JSON.parse result.body, :symbolize_names => true
308     if j.is_a? Hash and j[:uuid]
309       @cache[j[:uuid]] = j
310     else
311       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
312
313       msg = ""
314       j[:errors].each do |err|
315         msg += "Error creating job for component #{component}: #{err}\n"
316       end
317       msg += "Job submission was: #{body.to_json}"
318
319       pipeline.log_stderr(msg)
320       nil
321     end
322   end
323
324   protected
325
326   def self.no_nil_values(hash)
327     hash.reject { |key, value| value.nil? }
328   end
329 end
330
331 class WhRunPipelineInstance
332   attr_reader :instance
333
334   def initialize(_options)
335     @options = _options
336   end
337
338   def fetch_template(template)
339     if template.match /[^-0-9a-z]/
340       # Doesn't look like a uuid -- use it as a filename.
341       @template = JSON.parse File.read(template), :symbolize_names => true
342     else
343       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
344                                :parameters => {
345                                  :uuid => template
346                                },
347                                :authenticated => false,
348                                :headers => {
349                                  authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
350                                })
351       @template = JSON.parse result.body, :symbolize_names => true
352       if !@template[:uuid]
353         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
354       end
355     end
356     self
357   end
358
359   def fetch_instance(instance_uuid)
360     @instance = PipelineInstance.find(instance_uuid)
361     @template = @instance
362     self
363   end
364
365   def apply_parameters(params_args)
366     params_args.shift if params_args[0] == '--'
367     params = {}
368     while !params_args.empty?
369       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
370         params[re[2]] = re[3]
371         params_args.shift
372       elsif params_args.size > 1
373         param = params_args.shift.sub /^--/, ''
374         params[param] = params_args.shift
375       else
376         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
377       end
378     end
379
380     if not @template[:components].is_a?(Hash)
381       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
382     end
383     @components = @template[:components].dup
384
385     bad_components = @components.each_pair.select do |cname, cspec|
386       not cspec.is_a?(Hash)
387     end
388     if bad_components.any?
389       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
390     end
391
392     bad_components = @components.each_pair.select do |cname, cspec|
393       not cspec[:script_parameters].is_a?(Hash)
394     end
395     if bad_components.any?
396       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
397     end
398
399     errors = []
400     @components.each do |componentname, component|
401       component[:script_parameters].each do |parametername, parameter|
402         parameter = { :value => parameter } unless parameter.is_a? Hash
403         value =
404           (params["#{componentname}::#{parametername}"] ||
405            parameter[:value] ||
406            (parameter[:output_of].nil? &&
407             (params[parametername.to_s] ||
408              parameter[:default])) ||
409            nil)
410         if value.nil? and
411             ![false,'false',0,'0'].index parameter[:required]
412           if parameter[:output_of]
413             if not @components[parameter[:output_of].intern]
414               errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
415             end
416             next
417           end
418           errors << [componentname, parametername, "required parameter is missing"]
419         end
420         debuglog "parameter #{componentname}::#{parametername} == #{value}"
421
422         component[:script_parameters][parametername] =
423           parameter.dup.merge(value: value)
424       end
425     end
426     if !errors.empty?
427       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
428     end
429     debuglog "options=" + @options.pretty_inspect
430     self
431   end
432
433   def setup_instance
434     if @instance
435       @instance[:properties][:run_options] ||= {}
436       if @options[:no_reuse]
437         # override properties of existing instance
438         @instance[:properties][:run_options][:enable_job_reuse] = false
439       else
440         # Default to "enable reuse" if not specified. (This code path
441         # can go away when old clients go away.)
442         if @instance[:properties][:run_options][:enable_job_reuse].nil?
443           @instance[:properties][:run_options][:enable_job_reuse] = true
444         end
445       end
446     else
447       description = $options[:description] ||
448                     ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : ""))
449       instance_body = {
450         components: @components,
451         properties: {
452           run_options: {
453             enable_job_reuse: !@options[:no_reuse]
454           }
455         },
456         pipeline_template_uuid: @template[:uuid],
457         description: description,
458         state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')
459       }
460       if @options[:project_uuid]
461         instance_body[:owner_uuid] = @options[:project_uuid]
462       end
463       @instance = PipelineInstance.create(instance_body)
464     end
465     self
466   end
467
468   def run
469     moretodo = true
470     interrupted = false
471
472     if @instance[:started_at].nil?
473       @instance[:started_at] = Time.now
474     end
475
476     job_creation_failed = 0
477     while moretodo
478       moretodo = false
479       @components.each do |cname, c|
480         job = nil
481         owner_uuid = @instance[:owner_uuid]
482         # Is the job satisfying this component already known to be
483         # finished? (Already meaning "before we query API server about
484         # the job's current state")
485         c_already_finished = (c[:job] &&
486                               c[:job][:uuid] &&
487                               ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
488         if !c[:job] and
489             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
490           # No job yet associated with this component and is component inputs
491           # are fully specified (any output_of script_parameters are resolved
492           # to real value)
493           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
494           job = JobCache.create(@instance, cname, {
495             :script => c[:script],
496             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
497                                          [key, spec[:value]]
498                                        end],
499             :script_version => c[:script_version],
500             :repository => c[:repository],
501             :nondeterministic => c[:nondeterministic],
502             :runtime_constraints => c[:runtime_constraints],
503             :owner_uuid => owner_uuid,
504             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
505             :submit_id => my_submit_id,
506             :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
507           }, {
508             # This is the right place to put these attributes when
509             # dealing with new API servers.
510             :minimum_script_version => c[:minimum_script_version],
511             :exclude_script_versions => c[:exclude_minimum_script_versions],
512             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
513                                 !c[:nondeterministic]),
514             :filters => c[:filters]
515           })
516           if job
517             debuglog "component #{cname} new job #{job[:uuid]}"
518             c[:job] = job
519             c[:run_in_process] = (@options[:run_jobs_here] and
520                                   job[:submit_id] == my_submit_id)
521           else
522             debuglog "component #{cname} new job failed", 0
523             job_creation_failed += 1
524           end
525         end
526
527         if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
528           report_status
529           begin
530             require 'open3'
531             Open3.popen3("arv-crunch-job", "--force-unlock",
532                          "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
533               debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
534               stdin.close
535               while true
536                 rready, wready, = IO.select([stdout, stderr], [])
537                 break if !rready[0]
538                 begin
539                   buf = rready[0].read_nonblock(2**20)
540                 rescue EOFError
541                   break
542                 end
543                 (rready[0] == stdout ? $stdout : $stderr).write(buf)
544               end
545               stdout.close
546               stderr.close
547               debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
548             end
549             if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
550               raise Exception.new("arv-crunch-job did not set finished_at.")
551             end
552           rescue Exception => e
553             debuglog "Interrupted (#{e}). Failing job.", 0
554             $arv.job.update(uuid: c[:job][:uuid],
555                             job: {
556                               state: "Failed"
557                             })
558           end
559         end
560
561         if c[:job] and c[:job][:uuid]
562           if ["Running", "Queued"].include?(c[:job][:state])
563             # Job is running (or may be soon) so update copy of job record
564             c[:job] = JobCache.get(c[:job][:uuid])
565           end
566
567           if c[:job][:state] == "Complete"
568             # Populate script_parameters of other components waiting for
569             # this job
570             @components.each do |c2name, c2|
571               c2[:script_parameters].each do |pname, p|
572                 if p.is_a? Hash and p[:output_of] == cname.to_s
573                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
574                   c2[:script_parameters][pname] = {value: c[:job][:output]}
575                   moretodo = true
576                 end
577               end
578             end
579             unless c_already_finished
580               # This is my first time discovering that the job
581               # succeeded. (At the top of this loop, I was still
582               # waiting for it to finish.)
583
584               if @instance[:name].andand.length.andand > 0
585                 pipeline_name = @instance[:name]
586               elsif @template.andand[:name].andand.length.andand > 0
587                 pipeline_name = @template[:name]
588               else
589                 pipeline_name = @instance[:uuid]
590               end
591               if c[:output_name] != false
592                 # Create a collection located in the same project as the pipeline with the contents of the output.
593                 portable_data_hash = c[:job][:output]
594                 collections = $arv.collection.list(limit: 1,
595                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
596                                                    select: ["portable_data_hash", "manifest_text"]
597                                                    )[:items]
598                 if collections.any?
599                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
600
601                   # check if there is a name collision.
602                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
603                                                                    ["name", "=", name]])[:items]
604
605                   newcollection_actual = nil
606                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
607                     # There is already a collection with the same name and the
608                     # same contents, so just point to that.
609                     newcollection_actual = name_collisions.first
610                   end
611
612                   if newcollection_actual.nil?
613                     # Did not find a collection with the same name (or the
614                     # collection has a different portable data hash) so create
615                     # a new collection with ensure_unique_name: true.
616                     newcollection = {
617                       owner_uuid: owner_uuid,
618                       name: name,
619                       portable_data_hash: collections.first[:portable_data_hash],
620                       manifest_text: collections.first[:manifest_text]
621                     }
622                     debuglog "Creating collection #{newcollection}", 0
623                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
624                   end
625
626                   c[:output_uuid] = newcollection_actual[:uuid]
627                 else
628                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
629                 end
630               end
631             end
632           elsif ["Queued", "Running"].include? c[:job][:state]
633             # Job is running or queued to run, so indicate that pipeline
634             # should continue to run
635             moretodo = true
636           elsif c[:job][:state] == "Cancelled"
637             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
638             moretodo = false
639           elsif c[:job][:state] == "Failed"
640             moretodo = false
641           end
642         end
643       end
644       @instance[:components] = @components
645       report_status
646
647       if @options[:no_wait]
648         moretodo = false
649       end
650
651       # If job creation fails, just give up on this pipeline instance.
652       if job_creation_failed > 0
653         moretodo = false
654       end
655
656       if moretodo
657         begin
658           sleep 10
659         rescue Interrupt
660           debuglog "interrupt", 0
661           interrupted = true
662           break
663         end
664       end
665     end
666
667     c_in_state = @components.values.group_by { |c|
668       c[:job] and c[:job][:state]
669     }
670     succeeded = c_in_state["Complete"].andand.count || 0
671     failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
672     ended = succeeded + failed
673
674     success = (succeeded == @components.length)
675
676     # A job create call failed. Just give up.
677     if job_creation_failed > 0
678       debuglog "job creation failed - giving up on this pipeline instance", 0
679       success = false
680       failed += 1
681     end
682
683     if interrupted
684      if success
685         @instance[:state] = 'Complete'
686      else
687         @instance[:state] = 'Paused'
688       end
689     else
690       if ended == @components.length or failed > 0
691         @instance[:state] = success ? 'Complete' : 'Failed'
692       end
693     end
694
695     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
696       @instance[:finished_at] = Time.now
697     end
698
699     debuglog "pipeline instance state is #{@instance[:state]}"
700
701     # set components_summary
702     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
703     @instance[:components_summary] = components_summary
704
705     @instance.save
706   end
707
708   def cleanup
709     if @instance and @instance[:state] == 'RunningOnClient'
710       @instance[:state] = 'Paused'
711       @instance.save
712     end
713   end
714
715   def uuid
716     @instance[:uuid]
717   end
718
719   protected
720
721   def report_status
722     @instance.save
723
724     if @options[:status_json] != '/dev/null'
725       File.open(@options[:status_json], 'w') do |f|
726         f.puts @components.pretty_inspect
727       end
728     end
729
730     if @options[:status_text] != '/dev/null'
731       File.open(@options[:status_text], 'w') do |f|
732         f.puts ""
733         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
734         namewidth = @components.collect { |cname, c| cname.size }.max
735         @components.each do |cname, c|
736           jstatus = if !c[:job]
737                       "-"
738                     else case c[:job][:state]
739                          when "Running"
740                            "#{c[:job][:tasks_summary].inspect}"
741                          when "Complete"
742                            c[:job][:output]
743                          when "Cancelled"
744                            "cancelled #{c[:job][:cancelled_at]}"
745                          when "Failed"
746                            "failed #{c[:job][:finished_at]}"
747                          when "Queued"
748                            "queued #{c[:job][:created_at]}"
749                          end
750                     end
751           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
752         end
753       end
754     end
755   end
756
757   def abort(msg)
758     if @instance
759       if ["New", "Ready", "RunningOnClient",
760           "RunningOnServer"].include?(@instance[:state])
761         @instance[:state] = "Failed"
762         @instance[:finished_at] = Time.now
763         @instance.save
764       end
765       @instance.log_stderr(msg)
766     end
767     Kernel::abort(msg)
768   end
769 end
770
771 runner = WhRunPipelineInstance.new($options)
772 begin
773   if $options[:template]
774     runner.fetch_template($options[:template])
775   else
776     runner.fetch_instance($options[:instance])
777   end
778   runner.apply_parameters(p.leftovers)
779   runner.setup_instance
780   if $options[:submit]
781     runner.instance.save
782     puts runner.instance[:uuid]
783   else
784     runner.run
785   end
786 rescue Exception => e
787   runner.cleanup
788   raise e
789 end