Merge remote-tracking branch 'origin/master' into 3899-crunch-use-job-state
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 26 Sep 2014 13:01:22 +0000 (09:01 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 26 Sep 2014 13:01:22 +0000 (09:01 -0400)
14 files changed:
apps/workbench/app/helpers/pipeline_instances_helper.rb
apps/workbench/app/models/job.rb
apps/workbench/app/views/application/_job_status_label.html.erb
apps/workbench/app/views/jobs/_show_recent.html.erb
apps/workbench/app/views/jobs/show.html.erb
apps/workbench/app/views/pipeline_instances/_running_component.html.erb
apps/workbench/app/views/projects/_show_dashboard.html.erb
apps/workbench/app/views/users/_tables.html.erb
sdk/cli/bin/arv-run-pipeline-instance
sdk/cli/bin/crunch-job
services/api/app/controllers/arvados/v1/jobs_controller.rb
services/api/app/models/job.rb
services/api/script/crunch-dispatch.rb
services/api/test/fixtures/jobs.yml

index 7e5324be7af92b1709739cc01a667b40ecea4f47..78f491a85cf29ce33ddb4526b565e6faf738a32f 100644 (file)
@@ -104,6 +104,19 @@ module PipelineInstancesHelper
         pj[:job] = job[c[:job][:uuid]]
       else
         pj[:job] = c[:job].is_a?(Hash) ? c[:job] : {}
+
+        # Figure out the state based on the other fields.
+        pj[:job][:state] = if pj[:job][:cancelled_at]
+          "Cancelled"
+        elsif pj[:job][:success] == false
+          "Failed"
+        elsif pj[:job][:success] == true
+          "Complete"
+        elsif pj[:job][:running] == true
+          "Running"
+        else
+          "Queued"
+        end
       end
       pj[:percent_done] = 0
       pj[:percent_running] = 0
@@ -132,19 +145,25 @@ module PipelineInstancesHelper
           pj[:progress] = 0.0
         end
       end
-      if pj[:job][:success]
+
+      case pj[:job][:state]
+        when 'Complete'
         pj[:result] = 'complete'
         pj[:labeltype] = 'success'
         pj[:complete] = true
         pj[:progress] = 1.0
-      elsif pj[:job][:finished_at]
+      when 'Failed'
         pj[:result] = 'failed'
         pj[:labeltype] = 'danger'
         pj[:failed] = true
-      elsif pj[:job][:started_at]
+      when 'Cancelled'
+        pj[:result] = 'cancelled'
+        pj[:labeltype] = 'danger'
+        pj[:failed] = true
+      when 'Running'
         pj[:result] = 'running'
         pj[:labeltype] = 'primary'
-      elsif pj[:job][:uuid]
+      when 'Queued'
         pj[:result] = 'queued'
         pj[:labeltype] = 'default'
       else
index d3d38b0b2dbc6109e54cf94afd1ba890e7b3622e..977eef91bf278ac5e9a528e935262dce478e4fa5 100644 (file)
@@ -46,26 +46,6 @@ class Job < ArvadosBase
     arvados_api_client.unpack_api_response arvados_api_client.api("jobs/", "queue", {"_method"=> "GET"})
   end
 
-  def self.state job
-    if job.respond_to? :state and job.state
-      return job.state
-    end
-
-    if not job[:cancelled_at].nil?
-      "Cancelled"
-    elsif not job[:finished_at].nil? or not job[:success].nil?
-      if job[:success]
-        "Completed"
-      else
-        "Failed"
-      end
-    elsif job[:running]
-      "Running"
-    else
-      "Queued"
-    end
-  end
-
   def textile_attributes
     [ 'description' ]
   end
index ece81676d806bbbb470340581cab71d5a01a24ab..17073fe98cbae839ba24ae12698dc294c2506044 100644 (file)
@@ -1,4 +1,3 @@
-<% status = Job.state j %>
 <% to_label = {
      "Cancelled" => "danger",
      "Complete" => "success",
@@ -8,4 +7,4 @@
      nil => "default"
    } %>
 
-  <span class="label label-<%= to_label[status] %>"><%= if defined? title then title else status.downcase end %></span>
+  <span class="label label-<%= to_label[j[:state]] %>"><%= if defined? title then title else j[:state].downcase end %></span>
index b19b7d93ed13ac83c9ed6d66871846c727fb4aa8..c823fc590017d5d8ac304445f3d0c01d3065ec74 100644 (file)
@@ -83,7 +83,7 @@
             <td>
               <% if j.finished_at.is_a? Time %>
              <%= raw('ran&nbsp;' + distance_of_time_in_words(j.finished_at, j.started_at).sub('about ','~').sub(' ','&nbsp;')) %>
-              <% elsif j.running %>
+              <% elsif j.state == "Running" %>
               <span class="badge badge-success" title="tasks finished">&#x2714;&nbsp;<%= j.tasks_summary[:done] %></span>
               <span class="badge badge-info" title="tasks running">&#x2708;&nbsp;<%= j.tasks_summary[:running] %></span>
               <span class="badge" title="tasks todo">&#x2709;&nbsp;<%= j.tasks_summary[:todo] %></span>
index d3047ce119b5dbaac80cf40ddd193e2fd4851515..276aec5eb47228a8167d22f54f50eb66d4e4c215 100644 (file)
@@ -1,5 +1,5 @@
 <% content_for :tab_line_buttons do %>
-    <% if @object.running %>
+    <% if @object.state == "Running" %>
     <%= form_tag "/jobs/#{@object.uuid}/cancel", style: "display:inline; padding-left: 1em" do |f| %>
       <%= button_tag "Cancel running job", {class: 'btn btn-sm btn-danger', id: "cancel-job-button"} %>
     <% end %>
index 1d52e281f67a390dc4802f7079e69c52f38270ad..8613d54a100515bd2f1093757a8f62d366c33e80 100644 (file)
@@ -31,7 +31,7 @@
               <% end %>
             </div>
 
-            <% if Job::state(current_job).in? ["Complete", "Failed", "Canceled"] %>
+            <% if current_job[:state].in? ["Complete", "Failed", "Cancelled"] %>
               <div class="col-md-5 text-overflow-ellipsis">
                 <% if pj[:output_uuid] %>
                   <%= link_to_if_arvados_object pj[:output_uuid], friendly_name: true %>
@@ -41,7 +41,7 @@
                   No output.
                 <% end %>
               </div>
-            <% elsif Job::state(current_job) == "Running" %>
+            <% elsif current_job[:state] == "Running" %>
               <div class="col-md-3 pipeline-instance-spacing">
                 <%= pj[:progress_bar] %>
               </div>
@@ -51,7 +51,7 @@
                 <%= button_tag "Cancel", {class: 'btn btn-xs btn-danger', id: "cancel-job-button"} %>
             </div>
             <% end %>
-          <% elsif Job::state(current_job) == "Queued" %>
+          <% elsif current_job[:state] == "Queued" %>
             <div class="col-md-5">
               <% queuetime = Time.now - current_job[:created_at] %>
               Queued for <%= render_runtime(queuetime, true) %>.
index 2841637ded00df136128d4e86d8d4409e380c6e4..d80282544072e34e318c5c7c4709e1231e7920e8 100644 (file)
                     <% queued = [] %>
                     <% p.components.each do |k, v| %>
                       <% if v.is_a? Hash and v[:job] %>
-                        <% if Job::state(v[:job]) == "Running" %>
+                        <% if v[:job][:state] == "Running" %>
                           <% running << k %>
-                        <% elsif Job::state(v[:job]) == "Failed" or Job::state(v[:job]) == "Cancelled" %>
+                        <% elsif v[:job][:state] == "Failed" or v[:job][:state] == "Cancelled" %>
                           <% failed << k %>
-                        <% elsif Job::state(v[:job]) == "Complete" %>
+                        <% elsif v[:job][:state] == "Complete" %>
                           <% completed << k %>
-                        <% elsif Job::state(v[:job]) == "Queued" %>
+                        <% elsif v[:job][:state] == "Queued" %>
                           <% queued << k %>
                         <% end %>
                       <% end %>
index ebb52019a9f57d3912c7cc4c9b2383eec689d9b6..acde5ce8fdd88ee376abfdb294ec1fb225390951 100644 (file)
@@ -59,7 +59,7 @@
 
             <td>
               <small>
-                <% if j.success and j.output %>
+                <% if j.state == "Complete" and j.output %>
                   <a href="<%= collection_path(j.output) %>">
                     <% collections = collections_for_object(j.output) %>
                       <% if collections && !collections.empty? %>
index 472c20bd73a283feb3149c44b6b3f534d3ed50bb..ded7ab152049308b8dfd685a2190672f72ad89ab 100755 (executable)
@@ -509,7 +509,7 @@ class WhRunPipelineInstance
         # the job's current state")
         c_already_finished = (c[:job] &&
                               c[:job][:uuid] &&
-                              !c[:job][:success].nil?)
+                              ["Complete", "Failed", "Cancelled"].include?(c[:job][:state]))
         if !c[:job] and
             c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
           # No job yet associated with this component and is component inputs
@@ -526,6 +526,7 @@ class WhRunPipelineInstance
             :owner_uuid => owner_uuid,
             :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
             :submit_id => my_submit_id,
+            :state => (if @options[:run_jobs_here] then "Running" else "Queued" end)
           }, {
             # This is the right place to put these attributes when
             # dealing with new API servers.
@@ -546,7 +547,7 @@ class WhRunPipelineInstance
           end
         end
 
-        if c[:job] and c[:run_in_process] and c[:job][:success].nil?
+        if c[:job] and c[:run_in_process] and not ["Complete", "Failed", "Cancelled"].include? c[:job][:state]
           report_status
           begin
             require 'open3'
@@ -575,21 +576,18 @@ class WhRunPipelineInstance
             debuglog "Interrupted (#{e}). Failing job.", 0
             $arv.job.update(uuid: c[:job][:uuid],
                             job: {
-                              finished_at: Time.now,
-                              running: false,
-                              success: false
+                              state: "Failed"
                             })
           end
         end
 
         if c[:job] and c[:job][:uuid]
-          if (c[:job][:running] or
-              not (c[:job][:finished_at] or c[:job][:cancelled_at]))
-            # Job is running so update copy of job record
+          if ["Running", "Queued"].include?(c[:job][:state])
+            # Job is running (or may be soon) so update copy of job record
             c[:job] = JobCache.get(c[:job][:uuid])
           end
 
-          if c[:job][:success]
+          if c[:job][:state] == "Complete"
             # Populate script_parameters of other components waiting for
             # this job
             @components.each do |c2name, c2|
@@ -654,12 +652,14 @@ class WhRunPipelineInstance
                 end
               end
             end
-          elsif c[:job][:running] ||
-              (!c[:job][:started_at] && !c[:job][:cancelled_at])
+          elsif c[:job][:state] == "Running"
             # Job is still running
             moretodo = true
-          elsif c[:job][:cancelled_at]
+          elsif c[:job][:state] == "Cancelled"
             debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
+            moretodo = false
+          elsif c[:job][:state] == "Failed"
+            moretodo = false
           end
         end
       end
@@ -686,21 +686,12 @@ class WhRunPipelineInstance
       end
     end
 
-    ended = 0
-    succeeded = 0
-    failed = 0
-    @components.each do |cname, c|
-      if c[:job]
-        if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
-          ended += 1
-          if c[:job][:success] == true
-            succeeded += 1
-          elsif c[:job][:success] == false or c[:job][:cancelled_at]
-            failed += 1
-          end
-        end
-      end
-    end
+    c_in_state = @components.values.group_by { |c| 
+      c[:job] and c[:job][:state]
+    }
+    succeeded = c_in_state["Complete"].count
+    failed = c_in_state["Failed"].count + c_in_state["Cancelled"].count
+    ended = succeeded + failed
 
     success = (succeeded == @components.length)
 
@@ -766,20 +757,18 @@ class WhRunPipelineInstance
         @components.each do |cname, c|
           jstatus = if !c[:job]
                       "-"
-                    elsif c[:job][:running]
-                      "#{c[:job][:tasks_summary].inspect}"
-                    elsif c[:job][:success]
-                      c[:job][:output]
-                    elsif c[:job][:cancelled_at]
-                      "cancelled #{c[:job][:cancelled_at]}"
-                    elsif c[:job][:finished_at]
-                      "failed #{c[:job][:finished_at]}"
-                    elsif c[:job][:started_at]
-                      "started #{c[:job][:started_at]}"
-                    elsif c[:job][:is_locked_by_uuid]
-                      "starting #{c[:job][:started_at]}"
-                    else
-                      "queued #{c[:job][:created_at]}"
+                    else case c[:job][:state]
+                         when "Running"
+                           "#{c[:job][:tasks_summary].inspect}"
+                         when "Complete"
+                           c[:job][:output]
+                         when "Cancelled"
+                           "cancelled #{c[:job][:cancelled_at]}"
+                         when "Failed"
+                           "failed #{c[:job][:finished_at]}"
+                         when "Queued"
+                           "queued #{c[:job][:created_at]}"
+                         end
                     end
           f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
         end
index 70f379e53fd9cc307bd933bc1b21276097863e4a..f56099d1ad36d34024977ac0e4a8b9c1529c6a74 100755 (executable)
@@ -161,6 +161,10 @@ if ($job_has_uuid)
       Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
       exit EX_TEMPFAIL;
     }
+    if ($Job->{'state'} ne 'Queued') {
+      Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
+      exit EX_TEMPFAIL;
+    }
     if ($Job->{'success'} ne undef) {
       Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
       exit EX_TEMPFAIL;
@@ -287,9 +291,7 @@ if ($job_has_uuid)
     Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
     exit EX_TEMPFAIL;
   }
-  $Job->update_attributes('started_at' => scalar gmtime,
-                          'running' => 1,
-                          'success' => undef,
+  $Job->update_attributes('state' => 'Running',
                           'tasks_summary' => { 'failed' => 0,
                                                'todo' => 1,
                                                'running' => 0,
@@ -876,12 +878,14 @@ Log (undef, "finish");
 save_meta();
 
 if ($job_has_uuid) {
-  $Job->update_attributes('running' => 0,
-                          'success' => $collated_output && $main::success,
-                          'finished_at' => scalar gmtime)
+  if ($collated_output && $main::success) {
+    $Job->update_attributes('state' => 'Complete')
+  } else {
+    $Job->update_attributes('state' => 'Failed')
+  }
 }
 
-exit ($Job->{'success'} ? 1 : 0);
+exit ($Job->{'state'} != 'Complete' ? 1 : 0);
 
 
 
@@ -1033,12 +1037,16 @@ sub check_refresh_wanted
       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
       for my $attr ('cancelled_at',
                     'cancelled_by_user_uuid',
-                    'cancelled_by_client_uuid') {
+                    'cancelled_by_client_uuid',
+                    'state') {
         $Job->{$attr} = $Job2->{$attr};
       }
-      if ($Job->{'cancelled_at'}) {
-        Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
-             " by user " . $Job->{cancelled_by_user_uuid});
+      if ($Job->{'state'} ne "Running") {
+        if ($Job->{'state'} eq "Cancelled") {
+          Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
+        } else {
+          Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
+        }
         $main::success = 0;
         $main::please_freeze = 1;
       }
@@ -1336,9 +1344,11 @@ sub croak
 sub cleanup
 {
   return if !$job_has_uuid;
-  $Job->update_attributes('running' => 0,
-                          'success' => 0,
-                          'finished_at' => scalar gmtime);
+  if ($Job->{'state'} eq 'Cancelled') {
+    $Job->update_attributes('finished_at' => scalar gmtime);
+  } else {
+    $Job->update_attributes('state' => 'Failed');
+  }
 }
 
 
index d8ceb850f8a7c0dc7e1290b4b39f66f48961671b..b157de42fc8130fb8b1a33f694f0bdd71b58c049 100644 (file)
@@ -151,12 +151,7 @@ class Arvados::V1::JobsController < ApplicationController
     params[:order] ||= ['priority desc', 'created_at']
     load_limit_offset_order_params
     load_where_param
-    @where.merge!({
-                    started_at: nil,
-                    is_locked_by_uuid: nil,
-                    cancelled_at: nil,
-                    success: nil
-                  })
+    @where.merge!({state: Job::Queued})
     return if false.equal?(load_filters_param)
     find_objects_for_index
     index
index 7da6852ee0143940866db0961641caf0015a202b..a09ddb2aad32c338df9cae499113bdfe60a6ea19 100644 (file)
@@ -70,9 +70,7 @@ class Job < ArvadosModel
   end
 
   def self.queue
-    self.where('started_at is ? and is_locked_by_uuid is ? and cancelled_at is ? and success is ?',
-               nil, nil, nil, nil).
-      order('priority desc, created_at')
+    self.where('state = ?', Queued).order('priority desc, created_at')
   end
 
   def queue_position
index 45fb6dd33f42b58450c6d0b152dfd5ba1e0e8493..56fd6e45df96ce6c007bd932ed127cdc1f8e7743 100755 (executable)
@@ -422,14 +422,11 @@ class Dispatcher
     exit_status = j_done[:wait_thr].value
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
-    if exit_status.to_i != 75 and jobrecord.started_at
-      # Clean up state fields in case crunch-job exited without
-      # putting the job in a suitable "finished" state.
-      jobrecord.running = false
-      jobrecord.finished_at ||= Time.now
-      if jobrecord.success.nil?
-        jobrecord.success = false
-      end
+    if exit_status.to_i != 75 and jobrecord.state == "Running"
+      # crunch-job did not return exit code 75 (see below) and left the job in
+      # the "Running" state, which means there was an unhandled error.  Fail
+      # the job.
+      jobrecord.state = "Failed"
       jobrecord.save!
     else
       # Don't fail the job if crunch-job didn't even get as far as
index e6772404f5c0ae41df6bb4d99160cd9b412c83d9..a591e673366eee46603e5eb5199a024dfc7dcbb7 100644 (file)
@@ -20,6 +20,7 @@ running:
     running: 1
     done: 1
   runtime_constraints: {}
+  state: Running
 
 running_cancelled:
   uuid: zzzzz-8i9sb-4cf0nhn6xte809j
@@ -43,6 +44,7 @@ running_cancelled:
     running: 1
     done: 1
   runtime_constraints: {}
+  state: Cancelled
 
 uses_nonexistent_script_version:
   uuid: zzzzz-8i9sb-7m339pu0x9mla88
@@ -66,6 +68,7 @@ uses_nonexistent_script_version:
     running: 0
     done: 1
   runtime_constraints: {}
+  state: Complete
 
 foobar:
   uuid: zzzzz-8i9sb-aceg2bnq7jt7kon
@@ -91,6 +94,7 @@ foobar:
     running: 0
     done: 1
   runtime_constraints: {}
+  state: Complete
 
 barbaz:
   uuid: zzzzz-8i9sb-cjs4pklxxjykyuq
@@ -118,6 +122,7 @@ barbaz:
     running: 0
     done: 1
   runtime_constraints: {}
+  state: Complete
 
 previous_job_run:
   uuid: zzzzz-8i9sb-cjs4pklxxjykqqq
@@ -131,6 +136,7 @@ previous_job_run:
     an_integer: "1"
   success: true
   output: ea10d51bcf88862dbcc36eb292017dfd+45
+  state: Complete
 
 previous_docker_job_run:
   uuid: zzzzz-8i9sb-k6emstgk4kw4yhi
@@ -145,6 +151,7 @@ previous_docker_job_run:
   success: true
   output: ea10d51bcf88862dbcc36eb292017dfd+45
   docker_image_locator: fa3c1a9cb6783f85f2ecda037e07b8c3+167
+  state: Complete
 
 previous_job_run_no_output:
   uuid: zzzzz-8i9sb-cjs4pklxxjykppp
@@ -158,6 +165,7 @@ previous_job_run_no_output:
     an_integer: "2"
   success: true
   output: ~
+  state: Complete
 
 nondeterminisic_job_run:
   uuid: zzzzz-8i9sb-cjs4pklxxjykyyy
@@ -171,6 +179,7 @@ nondeterminisic_job_run:
     an_integer: "1"
   success: true
   nondeterministic: true
+  state: Complete
 
 nearly_finished_job:
   uuid: zzzzz-8i9sb-2gx6rz0pjl033w3
@@ -191,6 +200,7 @@ nearly_finished_job:
     running: 1
     done: 0
   runtime_constraints: {}
+  state: Complete
 
 queued:
   uuid: zzzzz-8i9sb-grx15v5mjnsyxk7
@@ -212,3 +222,4 @@ queued:
   is_locked_by_uuid: ~
   tasks_summary: {}
   runtime_constraints: {}
+  state: Queued
\ No newline at end of file