Merge branch 'master' into 4951-request-vm
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index bc87c5deabc899e1d1bd02b787877755c2a32f44..63313fc8082a18b7c637a222d92cabafc76e4ab3 100755 (executable)
@@ -445,12 +445,17 @@ class WhRunPipelineInstance
         if value.nil? and
             ![false,'false',0,'0'].index parameter[:required]
           if parameter[:output_of]
+            if not @components[parameter[:output_of].intern]
+              errors << [componentname, parametername, "output_of refers to nonexistent component '#{parameter[:output_of]}'"]
+            end
             next
           end
           errors << [componentname, parametername, "required parameter is missing"]
         end
         debuglog "parameter #{componentname}::#{parametername} == #{value}"
-        component[:script_parameters][parametername] = value
+
+        component[:script_parameters][parametername] =
+          parameter.dup.merge(value: value)
       end
     end
     if !errors.empty?
@@ -509,7 +514,7 @@ class WhRunPipelineInstance
         # the job's current state")
         c_already_finished = (c[:job] &&
                               c[:job][:uuid] &&
-                              ["Complete", "Failed", "Cancelled"].include? c[:job][:state])
+                              ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
         if !c[:job] and
             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
           # No job yet associated with this component and is component inputs
@@ -518,7 +523,9 @@ class WhRunPipelineInstance
           my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
           job = JobCache.create(@instance, cname, {
             :script => c[:script],
-            :script_parameters => c[:script_parameters],
+            :script_parameters => Hash[c[:script_parameters].map do |key, spec|
+                                         [key, spec[:value]]
+                                       end],
             :script_version => c[:script_version],
             :repository => c[:repository],
             :nondeterministic => c[:nondeterministic],
@@ -526,7 +533,7 @@ class WhRunPipelineInstance
             :owner_uuid => owner_uuid,
             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
             :submit_id => my_submit_id,
-            :state => (if @options[:run_jobs_here] then "Running" else "Queued")
+            :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
           }, {
             # This is the right place to put these attributes when
             # dealing with new API servers.
@@ -582,8 +589,8 @@ class WhRunPipelineInstance
         end
 
         if c[:job] and c[:job][:uuid]
-          if c[:job][:state] == "Running"
-            # Job is running so update copy of job record
+          if ["Running", "Queued"].include?(c[:job][:state])
+            # Job is running (or may be soon) so update copy of job record
             c[:job] = JobCache.get(c[:job][:uuid])
           end
 
@@ -594,7 +601,7 @@ class WhRunPipelineInstance
               c2[:script_parameters].each do |pname, p|
                 if p.is_a? Hash and p[:output_of] == cname.to_s
                   debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
-                  c2[:script_parameters][pname] = c[:job][:output]
+                  c2[:script_parameters][pname] = {value: c[:job][:output]}
                   moretodo = true
                 end
               end
@@ -652,11 +659,15 @@ class WhRunPipelineInstance
                 end
               end
             end
-          elsif c[:job][:state] == "Running"
-            # Job is still running
+          elsif ["Queued", "Running"].include? c[:job][:state]
+            # Job is running or queued to run, so indicate that pipeline
+            # should continue to run
             moretodo = true
-          elsif c[:job][:cancelled_at]
+          elsif c[:job][:state] == "Cancelled"
             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
+            moretodo = false
+          elsif c[:job][:state] == "Failed"
+            moretodo = false
           end
         end
       end
@@ -683,17 +694,12 @@ class WhRunPipelineInstance
       end
     end
 
-    ended = @components.map { |cname, c| 
-      if c[:job] and ["Complete", "Failed", "Cancelled"].include? c[:job][:state] then 1 else 0 end 
-    }.reduce(:+) || 0
-
-    succeeded = @components.map { |cname, c| 
-      if c[:job] and ["Complete"].include? c[:job][:state] then 1 else 0 end 
-    }.reduce(:+) || 0
-
-    failed = @components.map { |cname, c| 
-      if c[:job] and ["Failed", "Cancelled"].include? c[:job][:state] then 1 else 0  end 
-    }.reduce(:+) || 0
+    c_in_state = @components.values.group_by { |c|
+      c[:job] and c[:job][:state]
+    }
+    succeeded = c_in_state["Complete"].andand.count || 0
+    failed = (c_in_state["Failed"].andand.count || 0) + (c_in_state["Cancelled"].andand.count || 0)
+    ended = succeeded + failed
 
     success = (succeeded == @components.length)
 
@@ -759,16 +765,18 @@ class WhRunPipelineInstance
         @components.each do |cname, c|
           jstatus = if !c[:job]
                       "-"
-                    elsif c[:job][:state] == "Running"
-                      "#{c[:job][:tasks_summary].inspect}"
-                    elsif c[:job][:state] == "Complete"
-                      c[:job][:output]
-                    elsif c[:job][:state] == "Cancelled"
-                      "cancelled #{c[:job][:cancelled_at]}"
-                    elsif c[:job][:state] == "Failed"
-                      "failed #{c[:job][:finished_at]}"
-                    elsif c[:job][:state] == "Queued"
-                      "queued #{c[:job][:created_at]}"
+                    else case c[:job][:state]
+                         when "Running"
+                           "#{c[:job][:tasks_summary].inspect}"
+                         when "Complete"
+                           c[:job][:output]
+                         when "Cancelled"
+                           "cancelled #{c[:job][:cancelled_at]}"
+                         when "Failed"
+                           "failed #{c[:job][:finished_at]}"
+                         when "Queued"
+                           "queued #{c[:job][:created_at]}"
+                         end
                     end
           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
         end