Merge branch 'master' into 5375-preview-collection-text-files
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 472c20bd73a283feb3149c44b6b3f534d3ed50bb..d9e00dc22bf7d69ab91aaf2b9efce1616e1842ec 100755 (executable)
@@ -67,12 +67,6 @@ if RUBY_VERSION < '1.9.3' then
   EOS
 end
 
-$arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
-$arvados_api_host = ENV['ARVADOS_API_HOST'] or
-  abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
-$arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
-  abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
-
 begin
   require 'arvados'
   require 'rubygems'
@@ -93,23 +87,6 @@ def debuglog(message, verbosity=1)
   $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
 end
 
-module Kernel
-  def suppress_warnings
-    original_verbosity = $VERBOSE
-    $VERBOSE = nil
-    result = yield
-    $VERBOSE = original_verbosity
-    return result
-  end
-end
-
-if $arvados_api_host.match /local/
-  # You probably don't care about SSL certificate checks if you're
-  # testing with a dev server.
-  suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
-end
-
-
 # Parse command line options (the kind that control the behavior of
 # this program, that is, not the pipeline component parameters).
 
@@ -196,22 +173,6 @@ if $options[:run_pipeline_here] == $options[:submit]
   abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
 end
 
-# Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
-
-module Kernel
-  def suppress_warnings
-    original_verbosity = $VERBOSE
-    $VERBOSE = nil
-    result = yield
-    $VERBOSE = original_verbosity
-    return result
-  end
-end
-
-if ENV['ARVADOS_API_HOST_INSECURE']
-  suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
-end
-
 # Set up the API client.
 
 $arv = Arvados.new api_version: 'v1'
@@ -226,7 +187,7 @@ class PipelineInstance
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
@@ -244,7 +205,7 @@ class PipelineInstance
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
@@ -263,7 +224,7 @@ class PipelineInstance
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
@@ -307,7 +268,7 @@ class JobCache
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
   end
@@ -319,7 +280,7 @@ class JobCache
                              },
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     list = JSON.parse result.body, :symbolize_names => true
     if list and list[:items].is_a? Array
@@ -337,7 +298,7 @@ class JobCache
                              :body_object => body,
                              :authenticated => false,
                              :headers => {
-                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                              })
     j = JSON.parse result.body, :symbolize_names => true
     if j.is_a? Hash and j[:uuid]
@@ -381,7 +342,7 @@ class WhRunPipelineInstance
                                },
                                :authenticated => false,
                                :headers => {
-                                 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                                 authorization: 'OAuth2 '+$arv.config['ARVADOS_API_TOKEN']
                                })
       @template = JSON.parse result.body, :symbolize_names => true
       if !@template[:uuid]
@@ -445,12 +406,17 @@ class WhRunPipelineInstance
         if value.nil? and
             ![false,'false',0,'0'].index parameter[:required]
           if parameter[:output_of]
+            if not @components[parameter[:output_of].intern]
+              errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+            end
             next
           end
           errors << [componentname, parametername, "required parameter is missing"]
         end
         debuglog "parameter #{componentname}::#{parametername} == #{value}"
-        component[:script_parameters][parametername] = value
+
+        component[:script_parameters][parametername] =
+          parameter.dup.merge(value: value)
       end
     end
     if !errors.empty?
@@ -509,7 +475,7 @@ class WhRunPipelineInstance
         # the job's current state")
         c_already_finished = (c[:job] &&
                               c[:job][:uuid] &&
-                              !c[:job][:success].nil?)
+                              ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
         if !c[:job] and
             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
           # No job yet associated with this component and is component inputs
@@ -518,7 +484,9 @@ class WhRunPipelineInstance
           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
           job = JobCache.create(@instance, cname, {
             :script => c[:script],
-            :script_parameters => c[:script_parameters],
+            :script_parameters => Hash[c[:script_parameters].map do |key, spec|
+                                         [key, spec[:value]]
+                                       end],
             :script_version => c[:script_version],
             :repository => c[:repository],
             :nondeterministic => c[:nondeterministic],
@@ -526,6 +494,7 @@ class WhRunPipelineInstance
             :owner_uuid => owner_uuid,
             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
             :submit_id => my_submit_id,
+            :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
           }, {
             # This is the right place to put these attributes when
             # dealing with new API servers.
@@ -546,7 +515,7 @@ class WhRunPipelineInstance
           end
         end
 
-        if c[:job] and c[:run_in_process] and c[:job][:success].nil?
+        if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
           report_status
           begin
             require 'open3'
@@ -575,28 +544,25 @@ class WhRunPipelineInstance
             debuglog "Interrupted (#{e}). Failing job.", 0
             $arv.job.update(uuid: c[:job][:uuid],
                             job: {
-                              finished_at: Time.now,
-                              running: false,
-                              success: false
+                              state: "Failed"
                             })
           end
         end
 
         if c[:job] and c[:job][:uuid]
-          if (c[:job][:running] or
-              not (c[:job][:finished_at] or c[:job][:cancelled_at]))
-            # Job is running so update copy of job record
+          if ["Running", "Queued"].include?(c[:job][:state])
+            # Job is running (or may be soon) so update copy of job record
             c[:job] = JobCache.get(c[:job][:uuid])
           end
 
-          if c[:job][:success]
+          if c[:job][:state] == "Complete"
             # Populate script_parameters of other components waiting for
             # this job
             @components.each do |c2name, c2|
               c2[:script_parameters].each do |pname, p|
                 if p.is_a? Hash and p[:output_of] == cname.to_s
                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
-                  c2[:script_parameters][pname] = c[:job][:output]
+                  c2[:script_parameters][pname] = {value: c[:job][:output]}
                   moretodo = true
                 end
               end
@@ -654,12 +620,15 @@ class WhRunPipelineInstance
                 end
               end
             end
-          elsif c[:job][:running] ||
-              (!c[:job][:started_at] && !c[:job][:cancelled_at])
-            # Job is still running
+          elsif ["Queued", "Running"].include? c[:job][:state]
+            # Job is running or queued to run, so indicate that pipeline
+            # should continue to run
             moretodo = true
-          elsif c[:job][:cancelled_at]
+          elsif c[:job][:state] == "Cancelled"
             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
+            moretodo = false
+          elsif c[:job][:state] == "Failed"
+            moretodo = false
           end
         end
       end
@@ -686,21 +655,12 @@ class WhRunPipelineInstance
       end
     end
 
-    ended = 0
-    succeeded = 0
-    failed = 0
-    @components.each do |cname, c|
-      if c[:job]
-        if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
-          ended += 1
-          if c[:job][:success] == true
-            succeeded += 1
-          elsif c[:job][:success] == false or c[:job][:cancelled_at]
-            failed += 1
-          end
-        end
-      end
-    end
+    c_in_state = @components.values.group_by { |c|
+      c[:job] and c[:job][:state]
+    }
+    succeeded = c_in_state["Complete"].andand.count || 0
+    failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
+    ended = succeeded + failed
 
     success = (succeeded == @components.length)
 
@@ -766,20 +726,18 @@ class WhRunPipelineInstance
         @components.each do |cname, c|
           jstatus = if !c[:job]
                       "-"
-                    elsif c[:job][:running]
-                      "#{c[:job][:tasks_summary].inspect}"
-                    elsif c[:job][:success]
-                      c[:job][:output]
-                    elsif c[:job][:cancelled_at]
-                      "cancelled #{c[:job][:cancelled_at]}"
-                    elsif c[:job][:finished_at]
-                      "failed #{c[:job][:finished_at]}"
-                    elsif c[:job][:started_at]
-                      "started #{c[:job][:started_at]}"
-                    elsif c[:job][:is_locked_by_uuid]
-                      "starting #{c[:job][:started_at]}"
-                    else
-                      "queued #{c[:job][:created_at]}"
+                    else case c[:job][:state]
+                         when "Running"
+                           "#{c[:job][:tasks_summary].inspect}"
+                         when "Complete"
+                           c[:job][:output]
+                         when "Cancelled"
+                           "cancelled #{c[:job][:cancelled_at]}"
+                         when "Failed"
+                           "failed #{c[:job][:finished_at]}"
+                         when "Queued"
+                           "queued #{c[:job][:created_at]}"
+                         end
                     end
           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
         end