Merge branch 'master' into 5375-preview-collection-text-files
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 7ce1fa95146dbfdb57069ea94cf364fb1a999d90..d9e00dc22bf7d69ab91aaf2b9efce1616e1842ec 100755 (executable)
@@ -42,6 +42,8 @@
 # [--status-json path] Print JSON status report to a file or
 #                      fifo. Default: /dev/null
 #
+# [--description] Description for the pipeline instance.
+#
 # == Parameters
 #
 # [param_name=param_value]
 class WhRunPipelineInstance
 end
 
-$application_version = 1.0
-
 if RUBY_VERSION < '1.9.3' then
   abort <<-EOS
 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
   EOS
 end
 
-$arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
-$arvados_api_host = ENV['ARVADOS_API_HOST'] or
-  abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
-$arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
-  abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
-
 begin
   require 'arvados'
   require 'rubygems'
@@ -93,38 +87,6 @@ def debuglog(message, verbosity=1)
   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
 end
 
-module Kernel
-  def suppress_warnings
-    original_verbosity = $VERBOSE
-    $VERBOSE = nil
-    result = yield
-    $VERBOSE = original_verbosity
-    return result
-  end
-end
-
-if $arvados_api_host.match /local/
-  # You probably don't care about SSL certificate checks if you're
-  # testing with a dev server.
-  suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
-end
-
-class Google::APIClient
-  def discovery_document(api, version)
-    api = api.to_s
-    return @discovery_documents["#{api}:#{version}"] ||=
-      begin
-        response = self.execute!(
-                                 :http_method => :get,
-                                 :uri => self.discovery_uri(api, version),
-                                 :authenticated => false
-                                 )
-        response.body.class == String ? JSON.parse(response.body) : response.body
-      end
-  end
-end
-
-
 # Parse command line options (the kind that control the behavior of
 # this program, that is, not the pipeline component parameters).
 
@@ -168,13 +130,25 @@ p = Trollop::Parser.new do
       :short => :none,
       :type => :string)
   opt(:submit,
-      "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
+      "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
+      :short => :none,
+      :type => :boolean)
+  opt(:run_pipeline_here,
+      "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
+      :short => :none,
+      :type => :boolean)
+  opt(:run_jobs_here,
+      "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
       :short => :none,
       :type => :boolean)
   opt(:run_here,
-      "Manage the pipeline in process.",
+      "Synonym for --run-jobs-here.",
       :short => :none,
       :type => :boolean)
+  opt(:description,
+      "Description for the pipeline instance.",
+      :short => :none,
+      :type => :string)
   stop_on [:'--']
 end
 $options = Trollop::with_standard_exception_handling p do
@@ -182,6 +156,9 @@ $options = Trollop::with_standard_exception_handling p do
 end
 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
 
+$options[:run_jobs_here] ||= $options[:run_here] # old flag name
+$options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
+
 if $options[:instance]
   if $options[:template] or $options[:submit]
     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
@@ -192,35 +169,15 @@ elsif not $options[:template]
   abort
 end
 
-if $options[:run_here] == $options[:submit]
-  abort "#{$0}: syntax error: you must supply either --run-here or --submit."
-end
-
-# Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
-
-module Kernel
-  def suppress_warnings
-    original_verbosity = $VERBOSE
-    $VERBOSE = nil
-    result = yield
-    $VERBOSE = original_verbosity
-    return result
-  end
-end
-
-if ENV['ARVADOS_API_HOST_INSECURE']
-  suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
+if $options[:run_pipeline_here] == $options[:submit]
+  abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
 end
 
 # Set up the API client.
 
-$client ||= Google::APIClient.
-  new(:host => $arvados_api_host,
-      :application_name => File.split($0).last,
-      :application_version => $application_version.to_s)
-$arvados = $client.discovered_api('arvados', $arvados_api_version)
 $arv = Arvados.new api_version: 'v1'
-
+$client = $arv.client
+$arvados = $arv.arvados_api
 
 class PipelineInstance
   def self.find(uuid)
@@ -230,7 +187,7 @@ class PipelineInstance
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
@@ -248,7 +205,7 @@ class PipelineInstance
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
@@ -267,7 +224,7 @@ class PipelineInstance
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
@@ -311,7 +268,7 @@ class JobCache
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
   end
@@ -323,7 +280,7 @@ class JobCache
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     list = JSON.parse result.body, :symbolize_names => true
     if list and list[:items].is_a? Array
@@ -341,7 +298,7 @@ class JobCache
                              :body_object => body,
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     j = JSON.parse result.body, :symbolize_names => true
     if j.is_a? Hash and j[:uuid]
@@ -385,7 +342,7 @@ class WhRunPipelineInstance
                                },
                                :authenticated => false,
                                :headers => {
-                                 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                                 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                                })
       @template = JSON.parse result.body, :symbolize_names => true
       if !@template[:uuid]
@@ -449,12 +406,17 @@ class WhRunPipelineInstance
         if value.nil? and
             ![false,'false',0,'0'].index parameter[:required]
           if parameter[:output_of]
+            if not @components[parameter[:output_of].intern]
+              errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+            end
             next
           end
           errors << [componentname, parametername, "required parameter is missing"]
         end
         debuglog "parameter #{componentname}::#{parametername} == #{value}"
-        component[:script_parameters][parametername] = value
+
+        component[:script_parameters][parametername] =
+          parameter.dup.merge(value: value)
       end
     end
     if !errors.empty?
@@ -478,6 +440,8 @@ class WhRunPipelineInstance
         end
       end
     else
+      description = $options[:description]
+      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
       @instance = PipelineInstance.
         create(components: @components,
                properties: {
@@ -486,6 +450,7 @@ class WhRunPipelineInstance
                  }
                },
                pipeline_template_uuid: @template[:uuid],
+               description: description,
                state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
     end
     self
@@ -495,6 +460,10 @@ class WhRunPipelineInstance
     moretodo = true
     interrupted = false
 
+    if @instance[:started_at].nil?
+      @instance[:started_at] = Time.now
+    end
+
     job_creation_failed = 0
     while moretodo
       moretodo = false
@@ -506,20 +475,26 @@ class WhRunPipelineInstance
         # the job's current state")
         c_already_finished = (c[:job] &&
                               c[:job][:uuid] &&
-                              !c[:job][:success].nil?)
+                              ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
         if !c[:job] and
             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
           # No job yet associated with this component and is component inputs
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
+          my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
           job = JobCache.create(@instance, cname, {
             :script => c[:script],
-            :script_parameters => c[:script_parameters],
+            :script_parameters => Hash[c[:script_parameters].map do |key, spec|
+                                         [key, spec[:value]]
+                                       end],
             :script_version => c[:script_version],
             :repository => c[:repository],
             :nondeterministic => c[:nondeterministic],
             :runtime_constraints => c[:runtime_constraints],
             :owner_uuid => owner_uuid,
+            :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
+            :submit_id => my_submit_id,
+            :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
           }, {
             # This is the right place to put these attributes when
             # dealing with new API servers.
@@ -532,27 +507,62 @@ class WhRunPipelineInstance
           if job
             debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
+            c[:run_in_process] = (@options[:run_jobs_here] and
+                                  job[:submit_id] == my_submit_id)
           else
             debuglog "component #{cname} new job failed", 0
             job_creation_failed += 1
           end
         end
 
+        if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
+          report_status
+          begin
+            require 'open3'
+            Open3.popen3("arv-crunch-job", "--force-unlock",
+                         "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
+              debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
+              stdin.close
+              while true
+                rready, wready, = IO.select([stdout, stderr], [])
+                break if !rready[0]
+                begin
+                  buf = rready[0].read_nonblock(2**20)
+                rescue EOFError
+                  break
+                end
+                (rready[0] == stdout ? $stdout : $stderr).write(buf)
+              end
+              stdout.close
+              stderr.close
+              debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
+            end
+            if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
+              raise Exception.new("arv-crunch-job did not set finished_at.")
+            end
+          rescue Exception => e
+            debuglog "Interrupted (#{e}). Failing job.", 0
+            $arv.job.update(uuid: c[:job][:uuid],
+                            job: {
+                              state: "Failed"
+                            })
+          end
+        end
+
         if c[:job] and c[:job][:uuid]
-          if (c[:job][:running] or
-              not (c[:job][:finished_at] or c[:job][:cancelled_at]))
-            # Job is running so update copy of job record
+          if ["Running", "Queued"].include?(c[:job][:state])
+            # Job is running (or may be soon) so update copy of job record
             c[:job] = JobCache.get(c[:job][:uuid])
           end
 
-          if c[:job][:success]
+          if c[:job][:state] == "Complete"
             # Populate script_parameters of other components waiting for
             # this job
             @components.each do |c2name, c2|
               c2[:script_parameters].each do |pname, p|
                 if p.is_a? Hash and p[:output_of] == cname.to_s
                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
-                  c2[:script_parameters][pname] = c[:job][:output]
+                  c2[:script_parameters][pname] = {value: c[:job][:output]}
                   moretodo = true
                 end
               end
@@ -562,12 +572,12 @@ class WhRunPipelineInstance
               # succeeded. (At the top of this loop, I was still
               # waiting for it to finish.)
 
-              debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
-              if (not @instance[:name].nil?) and (not @instance[:name].empty?)
+              if @instance[:name].andand.length.andand > 0
                 pipeline_name = @instance[:name]
-              else
-                fetch_template(@instance[:pipeline_template_uuid])
+              elsif @template.andand[:name].andand.length.andand > 0
                 pipeline_name = @template[:name]
+              else
+                pipeline_name = @instance[:uuid]
               end
               if c[:output_name] != false
                 # Create a collection located in the same project as the pipeline with the contents of the output.
@@ -598,7 +608,7 @@ class WhRunPipelineInstance
                       owner_uuid: owner_uuid,
                       name: name,
                       portable_data_hash: collections.first[:portable_data_hash],
-                      manifest_text: collections.first[:manifest_text]                      
+                      manifest_text: collections.first[:manifest_text]
                     }
                     debuglog "Creating collection #{newcollection}", 0
                     newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
@@ -610,12 +620,15 @@ class WhRunPipelineInstance
                 end
               end
             end
-          elsif c[:job][:running] ||
-              (!c[:job][:started_at] && !c[:job][:cancelled_at])
-            # Job is still running
+          elsif ["Queued", "Running"].include? c[:job][:state]
+            # Job is running or queued to run, so indicate that pipeline
+            # should continue to run
             moretodo = true
-          elsif c[:job][:cancelled_at]
+          elsif c[:job][:state] == "Cancelled"
             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
+            moretodo = false
+          elsif c[:job][:state] == "Failed"
+            moretodo = false
           end
         end
       end
@@ -642,21 +655,12 @@ class WhRunPipelineInstance
       end
     end
 
-    ended = 0
-    succeeded = 0
-    failed = 0
-    @components.each do |cname, c|
-      if c[:job]
-        if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
-          ended += 1
-          if c[:job][:success] == true
-            succeeded += 1
-          elsif c[:job][:success] == false or c[:job][:cancelled_at]
-            failed += 1
-          end
-        end
-      end
-    end
+    c_in_state = @components.values.group_by { |c|
+      c[:job] and c[:job][:state]
+    }
+    succeeded = c_in_state["Complete"].andand.count || 0
+    failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
+    ended = succeeded + failed
 
     success = (succeeded == @components.length)
 
@@ -679,6 +683,10 @@ class WhRunPipelineInstance
       end
     end
 
+    if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state]
+      @instance[:finished_at] = Time.now
+    end
+
     debuglog "pipeline instance state is #{@instance[:state]}"
 
     # set components_summary
@@ -718,18 +726,18 @@ class WhRunPipelineInstance
         @components.each do |cname, c|
           jstatus = if !c[:job]
                       "-"
-                    elsif c[:job][:running]
-                      "#{c[:job][:tasks_summary].inspect}"
-                    elsif c[:job][:success]
-                      c[:job][:output]
-                    elsif c[:job][:cancelled_at]
-                      "cancelled #{c[:job][:cancelled_at]}"
-                    elsif c[:job][:finished_at]
-                      "failed #{c[:job][:finished_at]}"
-                    elsif c[:job][:started_at]
-                      "started #{c[:job][:started_at]}"
-                    else
-                      "queued #{c[:job][:created_at]}"
+                    else case c[:job][:state]
+                         when "Running"
+                           "#{c[:job][:tasks_summary].inspect}"
+                         when "Complete"
+                           c[:job][:output]
+                         when "Cancelled"
+                           "cancelled #{c[:job][:cancelled_at]}"
+                         when "Failed"
+                           "failed #{c[:job][:finished_at]}"
+                         when "Queued"
+                           "queued #{c[:job][:created_at]}"
+                         end
                     end
           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
         end
@@ -742,6 +750,7 @@ class WhRunPipelineInstance
       if ["New", "Ready", "RunningOnClient",
           "RunningOnServer"].include?(@instance[:state])
         @instance[:state] = "Failed"
+        @instance[:finished_at] = Time.now
         @instance.save
       end
       @instance.log_stderr(msg)