3826: Merge branch 'master' into 3826-crunchstat-netstats
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index c6fd61f9e3c90fe4836b9918635db9bdde984a1e..63313fc8082a18b7c637a222d92cabafc76e4ab3 100755 (executable)
@@ -61,8 +61,6 @@
 class WhRunPipelineInstance
 end
 
-$application_version = 1.0
-
 if RUBY_VERSION < '1.9.3' then
   abort <<-EOS
 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
@@ -111,21 +109,6 @@ if $arvados_api_host.match /local/
   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
 end
 
-class Google::APIClient
-  def discovery_document(api, version)
-    api = api.to_s
-    return @discovery_documents["#{api}:#{version}"] ||=
-      begin
-        response = self.execute!(
-                                 :http_method => :get,
-                                 :uri => self.discovery_uri(api, version),
-                                 :authenticated => false
-                                 )
-        response.body.class == String ? JSON.parse(response.body) : response.body
-      end
-  end
-end
-
 
 # Parse command line options (the kind that control the behavior of
 # this program, that is, not the pipeline component parameters).
@@ -170,11 +153,19 @@ p = Trollop::Parser.new do
       :short => :none,
       :type => :string)
   opt(:submit,
-      "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
+      "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
+      :short => :none,
+      :type => :boolean)
+  opt(:run_pipeline_here,
+      "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
+      :short => :none,
+      :type => :boolean)
+  opt(:run_jobs_here,
+      "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
       :short => :none,
       :type => :boolean)
   opt(:run_here,
-      "Manage the pipeline in process.",
+      "Synonym for --run-jobs-here.",
       :short => :none,
       :type => :boolean)
   opt(:description,
@@ -188,6 +179,9 @@ $options = Trollop::with_standard_exception_handling p do
 end
 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
 
+$options[:run_jobs_here] ||= $options[:run_here] # old flag name
+$options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
+
 if $options[:instance]
   if $options[:template] or $options[:submit]
     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
@@ -198,8 +192,8 @@ elsif not $options[:template]
   abort
 end
 
-if $options[:run_here] == $options[:submit]
-  abort "#{$0}: syntax error: you must supply either --run-here or --submit."
+if $options[:run_pipeline_here] == $options[:submit]
+  abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
 end
 
 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
@@ -220,13 +214,9 @@ end
 
 # Set up the API client.
 
-$client ||= Google::APIClient.
-  new(:host => $arvados_api_host,
-      :application_name => File.split($0).last,
-      :application_version => $application_version.to_s)
-$arvados = $client.discovered_api('arvados', $arvados_api_version)
 $arv = Arvados.new api_version: 'v1'
-
+$client = $arv.client
+$arvados = $arv.arvados_api
 
 class PipelineInstance
   def self.find(uuid)
@@ -455,12 +445,17 @@ class WhRunPipelineInstance
         if value.nil? and
             ![false,'false',0,'0'].index parameter[:required]
           if parameter[:output_of]
+            if not @components[parameter[:output_of].intern]
+              errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+            end
             next
           end
           errors << [componentname, parametername, "required parameter is missing"]
         end
         debuglog "parameter #{componentname}::#{parametername} == #{value}"
-        component[:script_parameters][parametername] = value
+
+        component[:script_parameters][parametername] =
+          parameter.dup.merge(value: value)
       end
     end
     if !errors.empty?
@@ -485,7 +480,7 @@ class WhRunPipelineInstance
       end
     else
       description = $options[:description]
-      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
+      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
       @instance = PipelineInstance.
         create(components: @components,
                properties: {
@@ -519,20 +514,26 @@ class WhRunPipelineInstance
         # the job's current state")
         c_already_finished = (c[:job] &&
                               c[:job][:uuid] &&
-                              !c[:job][:success].nil?)
+                              ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
         if !c[:job] and
             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
           # No job yet associated with this component and is component inputs
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
+          my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
           job = JobCache.create(@instance, cname, {
             :script => c[:script],
-            :script_parameters => c[:script_parameters],
+            :script_parameters => Hash[c[:script_parameters].map do |key, spec|
+                                         [key, spec[:value]]
+                                       end],
             :script_version => c[:script_version],
             :repository => c[:repository],
             :nondeterministic => c[:nondeterministic],
             :runtime_constraints => c[:runtime_constraints],
             :owner_uuid => owner_uuid,
+            :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
+            :submit_id => my_submit_id,
+            :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
           }, {
             # This is the right place to put these attributes when
             # dealing with new API servers.
@@ -545,27 +546,62 @@ class WhRunPipelineInstance
           if job
             debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
+            c[:run_in_process] = (@options[:run_jobs_here] and
+                                  job[:submit_id] == my_submit_id)
           else
             debuglog "component #{cname} new job failed", 0
             job_creation_failed += 1
           end
         end
 
+        if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
+          report_status
+          begin
+            require 'open3'
+            Open3.popen3("arv-crunch-job", "--force-unlock",
+                         "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
+              debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
+              stdin.close
+              while true
+                rready, wready, = IO.select([stdout, stderr], [])
+                break if !rready[0]
+                begin
+                  buf = rready[0].read_nonblock(2**20)
+                rescue EOFError
+                  break
+                end
+                (rready[0] == stdout ? $stdout : $stderr).write(buf)
+              end
+              stdout.close
+              stderr.close
+              debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
+            end
+            if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
+              raise Exception.new("arv-crunch-job did not set finished_at.")
+            end
+          rescue Exception => e
+            debuglog "Interrupted (#{e}). Failing job.", 0
+            $arv.job.update(uuid: c[:job][:uuid],
+                            job: {
+                              state: "Failed"
+                            })
+          end
+        end
+
         if c[:job] and c[:job][:uuid]
-          if (c[:job][:running] or
-              not (c[:job][:finished_at] or c[:job][:cancelled_at]))
-            # Job is running so update copy of job record
+          if ["Running", "Queued"].include?(c[:job][:state])
+            # Job is running (or may be soon) so update copy of job record
             c[:job] = JobCache.get(c[:job][:uuid])
           end
 
-          if c[:job][:success]
+          if c[:job][:state] == "Complete"
             # Populate script_parameters of other components waiting for
             # this job
             @components.each do |c2name, c2|
               c2[:script_parameters].each do |pname, p|
                 if p.is_a? Hash and p[:output_of] == cname.to_s
                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
-                  c2[:script_parameters][pname] = c[:job][:output]
+                  c2[:script_parameters][pname] = {value: c[:job][:output]}
                   moretodo = true
                 end
               end
@@ -575,14 +611,12 @@ class WhRunPipelineInstance
               # succeeded. (At the top of this loop, I was still
               # waiting for it to finish.)
 
-              debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
-              if (not @instance[:name].nil?) and (not @instance[:name].empty?)
+              if @instance[:name].andand.length.andand > 0
                 pipeline_name = @instance[:name]
-              elsif @instance[:pipeline_template_uuid]
-                fetch_template(@instance[:pipeline_template_uuid])
+              elsif @template.andand[:name].andand.length.andand > 0
                 pipeline_name = @template[:name]
               else
-                pipeline_name = "pipeline started #{@instance[:started_at]}"
+                pipeline_name = @instance[:uuid]
               end
               if c[:output_name] != false
                 # Create a collection located in the same project as the pipeline with the contents of the output.
@@ -625,12 +659,15 @@ class WhRunPipelineInstance
                 end
               end
             end
-          elsif c[:job][:running] ||
-              (!c[:job][:started_at] && !c[:job][:cancelled_at])
-            # Job is still running
+          elsif ["Queued", "Running"].include? c[:job][:state]
+            # Job is running or queued to run, so indicate that pipeline
+            # should continue to run
             moretodo = true
-          elsif c[:job][:cancelled_at]
+          elsif c[:job][:state] == "Cancelled"
             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
+            moretodo = false
+          elsif c[:job][:state] == "Failed"
+            moretodo = false
           end
         end
       end
@@ -657,21 +694,12 @@ class WhRunPipelineInstance
       end
     end
 
-    ended = 0
-    succeeded = 0
-    failed = 0
-    @components.each do |cname, c|
-      if c[:job]
-        if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
-          ended += 1
-          if c[:job][:success] == true
-            succeeded += 1
-          elsif c[:job][:success] == false or c[:job][:cancelled_at]
-            failed += 1
-          end
-        end
-      end
-    end
+    c_in_state = @components.values.group_by { |c|
+      c[:job] and c[:job][:state]
+    }
+    succeeded = c_in_state["Complete"].andand.count || 0
+    failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
+    ended = succeeded + failed
 
     success = (succeeded == @components.length)
 
@@ -737,18 +765,18 @@ class WhRunPipelineInstance
         @components.each do |cname, c|
           jstatus = if !c[:job]
                       "-"
-                    elsif c[:job][:running]
-                      "#{c[:job][:tasks_summary].inspect}"
-                    elsif c[:job][:success]
-                      c[:job][:output]
-                    elsif c[:job][:cancelled_at]
-                      "cancelled #{c[:job][:cancelled_at]}"
-                    elsif c[:job][:finished_at]
-                      "failed #{c[:job][:finished_at]}"
-                    elsif c[:job][:started_at]
-                      "started #{c[:job][:started_at]}"
-                    else
-                      "queued #{c[:job][:created_at]}"
+                    else case c[:job][:state]
+                         when "Running"
+                           "#{c[:job][:tasks_summary].inspect}"
+                         when "Complete"
+                           c[:job][:output]
+                         when "Cancelled"
+                           "cancelled #{c[:job][:cancelled_at]}"
+                         when "Failed"
+                           "failed #{c[:job][:finished_at]}"
+                         when "Queued"
+                           "queued #{c[:job][:created_at]}"
+                         end
                     end
           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
         end