3550: Merge branch 'master' into 3550-local-pipeline
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 35b2cd59428c67239d78fbcf70d75c378c071889..472c20bd73a283feb3149c44b6b3f534d3ed50bb 100755 (executable)
@@ -61,8 +61,6 @@
 class WhRunPipelineInstance
 end
 
-$application_version = 1.0
-
 if RUBY_VERSION < '1.9.3' then
   abort <<-EOS
 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
@@ -111,21 +109,6 @@ if $arvados_api_host.match /local/
   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
 end
 
-class Google::APIClient
-  def discovery_document(api, version)
-    api = api.to_s
-    return @discovery_documents["#{api}:#{version}"] ||=
-      begin
-        response = self.execute!(
-                                 :http_method => :get,
-                                 :uri => self.discovery_uri(api, version),
-                                 :authenticated => false
-                                 )
-        response.body.class == String ? JSON.parse(response.body) : response.body
-      end
-  end
-end
-
 
 # Parse command line options (the kind that control the behavior of
 # this program, that is, not the pipeline component parameters).
@@ -170,11 +153,19 @@ p = Trollop::Parser.new do
       :short => :none,
       :type => :string)
   opt(:submit,
-      "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
+      "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
+      :short => :none,
+      :type => :boolean)
+  opt(:run_pipeline_here,
+      "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
+      :short => :none,
+      :type => :boolean)
+  opt(:run_jobs_here,
+      "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
       :short => :none,
       :type => :boolean)
   opt(:run_here,
-      "Manage the pipeline in process.",
+      "Synonym for --run-jobs-here.",
       :short => :none,
       :type => :boolean)
   opt(:description,
@@ -188,6 +179,9 @@ $options = Trollop::with_standard_exception_handling p do
 end
 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
 
+$options[:run_jobs_here] ||= $options[:run_here] # old flag name
+$options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
+
 if $options[:instance]
   if $options[:template] or $options[:submit]
     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
@@ -198,8 +192,8 @@ elsif not $options[:template]
   abort
 end
 
-if $options[:run_here] == $options[:submit]
-  abort "#{$0}: syntax error: you must supply either --run-here or --submit."
+if $options[:run_pipeline_here] == $options[:submit]
+  abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
 end
 
 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
@@ -220,13 +214,9 @@ end
 
 # Set up the API client.
 
-$client ||= Google::APIClient.
-  new(:host => $arvados_api_host,
-      :application_name => File.split($0).last,
-      :application_version => $application_version.to_s)
-$arvados = $client.discovered_api('arvados', $arvados_api_version)
 $arv = Arvados.new api_version: 'v1'
-
+$client = $arv.client
+$arvados = $arv.arvados_api
 
 class PipelineInstance
   def self.find(uuid)
@@ -485,7 +475,7 @@ class WhRunPipelineInstance
       end
     else
       description = $options[:description]
-      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
+      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
       @instance = PipelineInstance.
         create(components: @components,
                properties: {
@@ -525,6 +515,7 @@ class WhRunPipelineInstance
           # No job yet associated with this component and is component inputs
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
+          my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
           job = JobCache.create(@instance, cname, {
             :script => c[:script],
             :script_parameters => c[:script_parameters],
@@ -533,6 +524,8 @@ class WhRunPipelineInstance
             :nondeterministic => c[:nondeterministic],
             :runtime_constraints => c[:runtime_constraints],
             :owner_uuid => owner_uuid,
+            :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
+            :submit_id => my_submit_id,
           }, {
             # This is the right place to put these attributes when
             # dealing with new API servers.
@@ -545,12 +538,50 @@ class WhRunPipelineInstance
           if job
             debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
+            c[:run_in_process] = (@options[:run_jobs_here] and
+                                  job[:submit_id] == my_submit_id)
           else
             debuglog "component #{cname} new job failed", 0
             job_creation_failed += 1
           end
         end
 
+        if c[:job] and c[:run_in_process] and c[:job][:success].nil?
+          report_status
+          begin
+            require 'open3'
+            Open3.popen3("arv-crunch-job", "--force-unlock",
+                         "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
+              debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
+              stdin.close
+              while true
+                rready, wready, = IO.select([stdout, stderr], [])
+                break if !rready[0]
+                begin
+                  buf = rready[0].read_nonblock(2**20)
+                rescue EOFError
+                  break
+                end
+                (rready[0] == stdout ? $stdout : $stderr).write(buf)
+              end
+              stdout.close
+              stderr.close
+              debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
+            end
+            if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
+              raise Exception.new("arv-crunch-job did not set finished_at.")
+            end
+          rescue Exception => e
+            debuglog "Interrupted (#{e}). Failing job.", 0
+            $arv.job.update(uuid: c[:job][:uuid],
+                            job: {
+                              finished_at: Time.now,
+                              running: false,
+                              success: false
+                            })
+          end
+        end
+
         if c[:job] and c[:job][:uuid]
           if (c[:job][:running] or
               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
@@ -575,12 +606,12 @@ class WhRunPipelineInstance
               # succeeded. (At the top of this loop, I was still
               # waiting for it to finish.)
 
-              debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
-              if (not @instance[:name].nil?) and (not @instance[:name].empty?)
+              if @instance[:name].andand.length.andand > 0
                 pipeline_name = @instance[:name]
-              else
-                fetch_template(@instance[:pipeline_template_uuid])
+              elsif @template.andand[:name].andand.length.andand > 0
                 pipeline_name = @template[:name]
+              else
+                pipeline_name = @instance[:uuid]
               end
               if c[:output_name] != false
                 # Create a collection located in the same project as the pipeline with the contents of the output.
@@ -745,6 +776,8 @@ class WhRunPipelineInstance
                       "failed #{c[:job][:finished_at]}"
                     elsif c[:job][:started_at]
                       "started #{c[:job][:started_at]}"
+                    elsif c[:job][:is_locked_by_uuid]
+                      "starting #{c[:job][:started_at]}"
                     else
                       "queued #{c[:job][:created_at]}"
                     end