Merge branch 'master' into 4156-cli-tests
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
1 #!/usr/bin/env ruby
2
3 # == Synopsis
4 #
5 #  arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 #  arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
7 #
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
10 #
11 # == Options
12 #
13 # [--template uuid] Use the specified pipeline template.
14 #
15 # [--template path] Load the pipeline template from the specified
16 #                   local file.
17 #
18 # [--instance uuid] Use the specified pipeline instance.
19 #
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 #                 to finish. Just find out whether jobs are finished,
22 #                 queued, or running for each component
23 #
24 # [--submit] Do not try to satisfy any components. Just
25 #                          create an instance, print its UUID to
26 #                          stdout, and exit.
27 #
28 # [--no-wait] Make only as much progress as possible without entering
29 #             a sleep/poll loop.
30 #
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 #              components. Submit a new job for every component.
33 #
34 # [--debug] Print extra debugging information on stderr.
35 #
36 # [--debug-level N] Increase amount of debugging information. Default
37 #                   1, possible range 0..3.
38 #
39 # [--status-text path] Print plain text status report to a file or
40 #                      fifo. Default: /dev/stdout
41 #
42 # [--status-json path] Print JSON status report to a file or
43 #                      fifo. Default: /dev/null
44 #
45 # [--description] Description for the pipeline instance.
46 #
47 # == Parameters
48 #
49 # [param_name=param_value]
50 #
51 # [param_name param_value] Set (or override) the default value for
52 #                          every parameter with the given name.
53 #
54 # [component_name::param_name=param_value]
55 # [component_name::param_name param_value]
56 # [--component_name::param_name=param_value]
57 # [--component_name::param_name param_value] Set the value of a
58 #                                            parameter for a single
59 #                                            component.
60 #
61 class WhRunPipelineInstance
62 end
63
64 if RUBY_VERSION < '1.9.3' then
65   abort <<-EOS
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
67   EOS
68 end
69
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72   abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74   abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
75
76 begin
77   require 'arvados'
78   require 'rubygems'
79   require 'json'
80   require 'pp'
81   require 'trollop'
82   require 'google/api_client'
83 rescue LoadError => l
84   puts $:
85   abort <<-EOS
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
89   EOS
90 end
91
92 def debuglog(message, verbosity=1)
93   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
94 end
95
96 module Kernel
97   def suppress_warnings
98     original_verbosity = $VERBOSE
99     $VERBOSE = nil
100     result = yield
101     $VERBOSE = original_verbosity
102     return result
103   end
104 end
105
106 if $arvados_api_host.match /local/
107   # You probably don't care about SSL certificate checks if you're
108   # testing with a dev server.
109   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
110 end
111
112
113 # Parse command line options (the kind that control the behavior of
114 # this program, that is, not the pipeline component parameters).
115
116 p = Trollop::Parser.new do
117   version __FILE__
118   opt(:dry_run,
119       "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
120       :type => :boolean,
121       :short => :n)
122   opt(:status_text,
123       "Store plain text status in given file.",
124       :short => :none,
125       :type => :string,
126       :default => '/dev/stdout')
127   opt(:status_json,
128       "Store json-formatted pipeline in given file.",
129       :short => :none,
130       :type => :string,
131       :default => '/dev/null')
132   opt(:no_wait,
133       "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
134       :short => :none,
135       :type => :boolean)
136   opt(:no_reuse,
137       "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
138       :short => :none,
139       :type => :boolean)
140   opt(:debug,
141       "Print extra debugging information on stderr.",
142       :type => :boolean)
143   opt(:debug_level,
144       "Set debug verbosity level.",
145       :short => :none,
146       :type => :integer)
147   opt(:template,
148       "UUID of pipeline template, or path to local pipeline template file.",
149       :short => :none,
150       :type => :string)
151   opt(:instance,
152       "UUID of pipeline instance.",
153       :short => :none,
154       :type => :string)
155   opt(:submit,
156       "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
157       :short => :none,
158       :type => :boolean)
159   opt(:run_pipeline_here,
160       "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
161       :short => :none,
162       :type => :boolean)
163   opt(:run_jobs_here,
164       "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
165       :short => :none,
166       :type => :boolean)
167   opt(:run_here,
168       "Synonym for --run-jobs-here.",
169       :short => :none,
170       :type => :boolean)
171   opt(:description,
172       "Description for the pipeline instance.",
173       :short => :none,
174       :type => :string)
175   stop_on [:'--']
176 end
177 $options = Trollop::with_standard_exception_handling p do
178   p.parse ARGV
179 end
180 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
181
182 $options[:run_jobs_here] ||= $options[:run_here] # old flag name
183 $options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
184
185 if $options[:instance]
186   if $options[:template] or $options[:submit]
187     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
188   end
189 elsif not $options[:template]
190   puts "error: you must supply a --template or --instance."
191   p.educate
192   abort
193 end
194
195 if $options[:run_pipeline_here] == $options[:submit]
196   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
197 end
198
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
200
201 module Kernel
202   def suppress_warnings
203     original_verbosity = $VERBOSE
204     $VERBOSE = nil
205     result = yield
206     $VERBOSE = original_verbosity
207     return result
208   end
209 end
210
211 if ENV['ARVADOS_API_HOST_INSECURE']
212   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
213 end
214
215 # Set up the API client.
216
217 $arv = Arvados.new api_version: 'v1'
218 $client = $arv.client
219 $arvados = $arv.arvados_api
220
221 class PipelineInstance
222   def self.find(uuid)
223     result = $client.execute(:api_method => $arvados.pipeline_instances.get,
224                              :parameters => {
225                                :uuid => uuid
226                              },
227                              :authenticated => false,
228                              :headers => {
229                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
230                              })
231     j = JSON.parse result.body, :symbolize_names => true
232     unless j.is_a? Hash and j[:uuid]
233       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
234       nil
235     else
236       debuglog "Retrieved pipeline_instance #{j[:uuid]}"
237       self.new(j)
238     end
239   end
240   def self.create(attributes)
241     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
242                              :body_object => {
243                                :pipeline_instance => attributes
244                              },
245                              :authenticated => false,
246                              :headers => {
247                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
248                              })
249     j = JSON.parse result.body, :symbolize_names => true
250     unless j.is_a? Hash and j[:uuid]
251       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
252     end
253     debuglog "Created pipeline instance: #{j[:uuid]}"
254     self.new(j)
255   end
256   def save
257     result = $client.execute(:api_method => $arvados.pipeline_instances.update,
258                              :parameters => {
259                                :uuid => @pi[:uuid]
260                              },
261                              :body_object => {
262                                :pipeline_instance => @attributes_to_update
263                              },
264                              :authenticated => false,
265                              :headers => {
266                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
267                              })
268     j = JSON.parse result.body, :symbolize_names => true
269     unless j.is_a? Hash and j[:uuid]
270       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
271       nil
272     else
273       @attributes_to_update = {}
274       @pi = j
275     end
276   end
277   def []=(x,y)
278     @attributes_to_update[x] = y
279     @pi[x] = y
280   end
281   def [](x)
282     @pi[x]
283   end
284
285   def log_stderr(msg)
286     $arv.log.create log: {
287       event_type: 'stderr',
288       object_uuid: self[:uuid],
289       owner_uuid: self[:owner_uuid],
290       properties: {"text" => msg},
291     }
292   end
293
294   protected
295   def initialize(j)
296     @attributes_to_update = {}
297     @pi = j
298   end
299 end
300
301 class JobCache
302   def self.get(uuid)
303     @cache ||= {}
304     result = $client.execute(:api_method => $arvados.jobs.get,
305                              :parameters => {
306                                :uuid => uuid
307                              },
308                              :authenticated => false,
309                              :headers => {
310                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
311                              })
312     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
313   end
314   def self.where(conditions)
315     result = $client.execute(:api_method => $arvados.jobs.list,
316                              :parameters => {
317                                :limit => 10000,
318                                :where => conditions.to_json
319                              },
320                              :authenticated => false,
321                              :headers => {
322                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
323                              })
324     list = JSON.parse result.body, :symbolize_names => true
325     if list and list[:items].is_a? Array
326       list[:items]
327     else
328       []
329     end
330   end
331   def self.create(pipeline, component, job, create_params)
332     @cache ||= {}
333
334     body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
335
336     result = $client.execute(:api_method => $arvados.jobs.create,
337                              :body_object => body,
338                              :authenticated => false,
339                              :headers => {
340                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
341                              })
342     j = JSON.parse result.body, :symbolize_names => true
343     if j.is_a? Hash and j[:uuid]
344       @cache[j[:uuid]] = j
345     else
346       debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
347
348       msg = ""
349       j[:errors].each do |err|
350         msg += "Error creating job for component #{component}: #{err}\n"
351       end
352       msg += "Job submission was: #{body.to_json}"
353
354       pipeline.log_stderr(msg)
355       nil
356     end
357   end
358
359   protected
360
361   def self.no_nil_values(hash)
362     hash.reject { |key, value| value.nil? }
363   end
364 end
365
366 class WhRunPipelineInstance
367   attr_reader :instance
368
369   def initialize(_options)
370     @options = _options
371   end
372
373   def fetch_template(template)
374     if template.match /[^-0-9a-z]/
375       # Doesn't look like a uuid -- use it as a filename.
376       @template = JSON.parse File.read(template), :symbolize_names => true
377     else
378       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
379                                :parameters => {
380                                  :uuid => template
381                                },
382                                :authenticated => false,
383                                :headers => {
384                                  authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
385                                })
386       @template = JSON.parse result.body, :symbolize_names => true
387       if !@template[:uuid]
388         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
389       end
390     end
391     self
392   end
393
394   def fetch_instance(instance_uuid)
395     @instance = PipelineInstance.find(instance_uuid)
396     @template = @instance
397     self
398   end
399
400   def apply_parameters(params_args)
401     params_args.shift if params_args[0] == '--'
402     params = {}
403     while !params_args.empty?
404       if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
405         params[re[2]] = re[3]
406         params_args.shift
407       elsif params_args.size > 1
408         param = params_args.shift.sub /^--/, ''
409         params[param] = params_args.shift
410       else
411         abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
412       end
413     end
414
415     if not @template[:components].is_a?(Hash)
416       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
417     end
418     @components = @template[:components].dup
419
420     bad_components = @components.each_pair.select do |cname, cspec|
421       not cspec.is_a?(Hash)
422     end
423     if bad_components.any?
424       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
425     end
426
427     bad_components = @components.each_pair.select do |cname, cspec|
428       not cspec[:script_parameters].is_a?(Hash)
429     end
430     if bad_components.any?
431       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
432     end
433
434     errors = []
435     @components.each do |componentname, component|
436       component[:script_parameters].each do |parametername, parameter|
437         parameter = { :value => parameter } unless parameter.is_a? Hash
438         value =
439           (params["#{componentname}::#{parametername}"] ||
440            parameter[:value] ||
441            (parameter[:output_of].nil? &&
442             (params[parametername.to_s] ||
443              parameter[:default])) ||
444            nil)
445         if value.nil? and
446             ![false,'false',0,'0'].index parameter[:required]
447           if parameter[:output_of]
448             if not @components[parameter[:output_of].intern]
449               errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
450             end
451             next
452           end
453           errors << [componentname, parametername, "required parameter is missing"]
454         end
455         debuglog "parameter #{componentname}::#{parametername} == #{value}"
456
457         component[:script_parameters][parametername] =
458           parameter.dup.merge(value: value)
459       end
460     end
461     if !errors.empty?
462       abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
463     end
464     debuglog "options=" + @options.pretty_inspect
465     self
466   end
467
468   def setup_instance
469     if @instance
470       @instance[:properties][:run_options] ||= {}
471       if @options[:no_reuse]
472         # override properties of existing instance
473         @instance[:properties][:run_options][:enable_job_reuse] = false
474       else
475         # Default to "enable reuse" if not specified. (This code path
476         # can go away when old clients go away.)
477         if @instance[:properties][:run_options][:enable_job_reuse].nil?
478           @instance[:properties][:run_options][:enable_job_reuse] = true
479         end
480       end
481     else
482       description = $options[:description]
483       description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
484       @instance = PipelineInstance.
485         create(components: @components,
486                properties: {
487                  run_options: {
488                    enable_job_reuse: !@options[:no_reuse]
489                  }
490                },
491                pipeline_template_uuid: @template[:uuid],
492                description: description,
493                state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
494     end
495     self
496   end
497
498   def run
499     moretodo = true
500     interrupted = false
501
502     if @instance[:started_at].nil?
503       @instance[:started_at] = Time.now
504     end
505
506     job_creation_failed = 0
507     while moretodo
508       moretodo = false
509       @components.each do |cname, c|
510         job = nil
511         owner_uuid = @instance[:owner_uuid]
512         # Is the job satisfying this component already known to be
513         # finished? (Already meaning "before we query API server about
514         # the job's current state")
515         c_already_finished = (c[:job] &&
516                               c[:job][:uuid] &&
517                               ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
518         if !c[:job] and
519             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
520           # No job yet associated with this component and is component inputs
521           # are fully specified (any output_of script_parameters are resolved
522           # to real value)
523           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
524           job = JobCache.create(@instance, cname, {
525             :script => c[:script],
526             :script_parameters => Hash[c[:script_parameters].map do |key, spec|
527                                          [key, spec[:value]]
528                                        end],
529             :script_version => c[:script_version],
530             :repository => c[:repository],
531             :nondeterministic => c[:nondeterministic],
532             :runtime_constraints => c[:runtime_constraints],
533             :owner_uuid => owner_uuid,
534             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
535             :submit_id => my_submit_id,
536             :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
537           }, {
538             # This is the right place to put these attributes when
539             # dealing with new API servers.
540             :minimum_script_version => c[:minimum_script_version],
541             :exclude_script_versions => c[:exclude_minimum_script_versions],
542             :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
543                                 !c[:nondeterministic]),
544             :filters => c[:filters]
545           })
546           if job
547             debuglog "component #{cname} new job #{job[:uuid]}"
548             c[:job] = job
549             c[:run_in_process] = (@options[:run_jobs_here] and
550                                   job[:submit_id] == my_submit_id)
551           else
552             debuglog "component #{cname} new job failed", 0
553             job_creation_failed += 1
554           end
555         end
556
557         if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
558           report_status
559           begin
560             require 'open3'
561             Open3.popen3("arv-crunch-job", "--force-unlock",
562                          "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
563               debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
564               stdin.close
565               while true
566                 rready, wready, = IO.select([stdout, stderr], [])
567                 break if !rready[0]
568                 begin
569                   buf = rready[0].read_nonblock(2**20)
570                 rescue EOFError
571                   break
572                 end
573                 (rready[0] == stdout ? $stdout : $stderr).write(buf)
574               end
575               stdout.close
576               stderr.close
577               debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
578             end
579             if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
580               raise Exception.new("arv-crunch-job did not set finished_at.")
581             end
582           rescue Exception => e
583             debuglog "Interrupted (#{e}). Failing job.", 0
584             $arv.job.update(uuid: c[:job][:uuid],
585                             job: {
586                               state: "Failed"
587                             })
588           end
589         end
590
591         if c[:job] and c[:job][:uuid]
592           if ["Running", "Queued"].include?(c[:job][:state])
593             # Job is running (or may be soon) so update copy of job record
594             c[:job] = JobCache.get(c[:job][:uuid])
595           end
596
597           if c[:job][:state] == "Complete"
598             # Populate script_parameters of other components waiting for
599             # this job
600             @components.each do |c2name, c2|
601               c2[:script_parameters].each do |pname, p|
602                 if p.is_a? Hash and p[:output_of] == cname.to_s
603                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
604                   c2[:script_parameters][pname] = {value: c[:job][:output]}
605                   moretodo = true
606                 end
607               end
608             end
609             unless c_already_finished
610               # This is my first time discovering that the job
611               # succeeded. (At the top of this loop, I was still
612               # waiting for it to finish.)
613
614               if @instance[:name].andand.length.andand > 0
615                 pipeline_name = @instance[:name]
616               elsif @template.andand[:name].andand.length.andand > 0
617                 pipeline_name = @template[:name]
618               else
619                 pipeline_name = @instance[:uuid]
620               end
621               if c[:output_name] != false
622                 # Create a collection located in the same project as the pipeline with the contents of the output.
623                 portable_data_hash = c[:job][:output]
624                 collections = $arv.collection.list(limit: 1,
625                                                    filters: [['portable_data_hash', '=', portable_data_hash]],
626                                                    select: ["portable_data_hash", "manifest_text"]
627                                                    )[:items]
628                 if collections.any?
629                   name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
630
631                   # check if there is a name collision.
632                   name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
633                                                                    ["name", "=", name]])[:items]
634
635                   newcollection_actual = nil
636                   if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
637                     # There is already a collection with the same name and the
638                     # same contents, so just point to that.
639                     newcollection_actual = name_collisions.first
640                   end
641
642                   if newcollection_actual.nil?
643                     # Did not find a collection with the same name (or the
644                     # collection has a different portable data hash) so create
645                     # a new collection with ensure_unique_name: true.
646                     newcollection = {
647                       owner_uuid: owner_uuid,
648                       name: name,
649                       portable_data_hash: collections.first[:portable_data_hash],
650                       manifest_text: collections.first[:manifest_text]
651                     }
652                     debuglog "Creating collection #{newcollection}", 0
653                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
654                   end
655
656                   c[:output_uuid] = newcollection_actual[:uuid]
657                 else
658                   debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
659                 end
660               end
661             end
662           elsif ["Queued", "Running"].include? c[:job][:state]
663             # Job is running or queued to run, so indicate that pipeline
664             # should continue to run
665             moretodo = true
666           elsif c[:job][:state] == "Cancelled"
667             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
668             moretodo = false
669           elsif c[:job][:state] == "Failed"
670             moretodo = false
671           end
672         end
673       end
674       @instance[:components] = @components
675       report_status
676
677       if @options[:no_wait]
678         moretodo = false
679       end
680
681       # If job creation fails, just give up on this pipeline instance.
682       if job_creation_failed > 0
683         moretodo = false
684       end
685
686       if moretodo
687         begin
688           sleep 10
689         rescue Interrupt
690           debuglog "interrupt", 0
691           interrupted = true
692           break
693         end
694       end
695     end
696
697     c_in_state = @components.values.group_by { |c|
698       c[:job] and c[:job][:state]
699     }
700     succeeded = c_in_state["Complete"].andand.count || 0
701     failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
702     ended = succeeded + failed
703
704     success = (succeeded == @components.length)
705
706     # A job create call failed. Just give up.
707     if job_creation_failed > 0
708       debuglog "job creation failed - giving up on this pipeline instance", 0
709       success = false
710       failed += 1
711     end
712
713     if interrupted
714      if success
715         @instance[:state] = 'Complete'
716      else
717         @instance[:state] = 'Paused'
718       end
719     else
720       if ended == @components.length or failed > 0
721         @instance[:state] = success ? 'Complete' : 'Failed'
722       end
723     end
724
725     if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
726       @instance[:finished_at] = Time.now
727     end
728
729     debuglog "pipeline instance state is #{@instance[:state]}"
730
731     # set components_summary
732     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
733     @instance[:components_summary] = components_summary
734
735     @instance.save
736   end
737
738   def cleanup
739     if @instance and @instance[:state] == 'RunningOnClient'
740       @instance[:state] = 'Paused'
741       @instance.save
742     end
743   end
744
745   def uuid
746     @instance[:uuid]
747   end
748
749   protected
750
751   def report_status
752     @instance.save
753
754     if @options[:status_json] != '/dev/null'
755       File.open(@options[:status_json], 'w') do |f|
756         f.puts @components.pretty_inspect
757       end
758     end
759
760     if @options[:status_text] != '/dev/null'
761       File.open(@options[:status_text], 'w') do |f|
762         f.puts ""
763         f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
764         namewidth = @components.collect { |cname, c| cname.size }.max
765         @components.each do |cname, c|
766           jstatus = if !c[:job]
767                       "-"
768                     else case c[:job][:state]
769                          when "Running"
770                            "#{c[:job][:tasks_summary].inspect}"
771                          when "Complete"
772                            c[:job][:output]
773                          when "Cancelled"
774                            "cancelled #{c[:job][:cancelled_at]}"
775                          when "Failed"
776                            "failed #{c[:job][:finished_at]}"
777                          when "Queued"
778                            "queued #{c[:job][:created_at]}"
779                          end
780                     end
781           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
782         end
783       end
784     end
785   end
786
787   def abort(msg)
788     if @instance
789       if ["New", "Ready", "RunningOnClient",
790           "RunningOnServer"].include?(@instance[:state])
791         @instance[:state] = "Failed"
792         @instance[:finished_at] = Time.now
793         @instance.save
794       end
795       @instance.log_stderr(msg)
796     end
797     Kernel::abort(msg)
798   end
799 end
800
801 runner = WhRunPipelineInstance.new($options)
802 begin
803   if $options[:template]
804     runner.fetch_template($options[:template])
805   else
806     runner.fetch_instance($options[:instance])
807   end
808   runner.apply_parameters(p.leftovers)
809   runner.setup_instance
810   if $options[:submit]
811     runner.instance.save
812     puts runner.instance[:uuid]
813   else
814     runner.run
815   end
816 rescue Exception => e
817   runner.cleanup
818   raise e
819 end