2879: API server can find_or_create Jobs based on filters.
[arvados.git] / services / api / app / controllers / arvados / v1 / jobs_controller.rb
index b7b0e677d63d5b5d2e5389a27a42ab1f709f34ad..6fddba7a56c39a80dd94c6e46250f4bc39ae9361 100644 (file)
@@ -5,46 +5,102 @@ class Arvados::V1::JobsController < ApplicationController
   skip_before_filter :find_object_by_uuid, :only => :queue
   skip_before_filter :render_404_if_no_object, :only => :queue
 
-  def index
-    return super unless @where.is_a? Hash
-    want_ancestor = @where[:script_version_descends_from]
-    if want_ancestor
-      # Check for missing commit_ancestor rows, and create them if
-      # possible.
-      @objects.
-        dup.
-        includes(:commit_ancestors). # I wish Rails would let me
-                                     # specify here which
-                                     # commit_ancestors I am
-                                     # interested in.
-        each do |o|
-        if o.commit_ancestors.
-            select { |ca| ca.ancestor == want_ancestor }.
-            empty? and !o.script_version.nil?
-          begin
-            o.commit_ancestors << CommitAncestor.find_or_create_by_descendant_and_ancestor(o.script_version, want_ancestor)
-          rescue
+  def create
+    [:repository, :script, :script_version, :script_parameters].each do |r|
+      if !resource_attrs[r]
+        return render json: {
+          :errors => ["#{r} attribute must be specified"]
+        }, status: :unprocessable_entity
+      end
+    end
+    load_filters_param
+
+    # We used to ask for the minimum_, exclude_, and no_reuse params
+    # in the job resource. Now we advertise them as flags that alter
+    # the behavior of the create action.
+    [:minimum_script_version, :exclude_script_versions].each do |attr|
+      if resource_attrs.has_key? attr
+        params[attr] = resource_attrs.delete attr
+      end
+    end
+    if resource_attrs.has_key? :no_reuse
+      params[:find_or_create] = !resource_attrs.delete(:no_reuse)
+    end
+
+    if params[:find_or_create]
+      # Convert old special-purpose creation parameters to the new
+      # filters-based method.
+      minimum_script_version = params[:minimum_script_version]
+      exclude_script_versions = params.fetch(:exclude_script_versions, [])
+      @filters.select do |(col_name, operand, operator)|
+        case col_name
+        when "script_version"
+          case operand
+          when "in range"
+            minimum_script_version = operator
+            false
+          when "not in", "not in range"
+            begin
+              exclude_script_versions += operator
+            rescue TypeError
+              exclude_script_versions << operator
+            end
+            false
+          else
+            true
           end
+        else
+          true
+        end
+      end
+      @filters.append(["script_version", "in",
+                       Commit.find_commit_range(current_user,
+                                                resource_attrs[:repository],
+                                                minimum_script_version,
+                                                resource_attrs[:script_version],
+                                                exclude_script_versions)])
+
+      # Set up default filters for specific parameters.
+      if @filters.select { |f| f.first == "script" }.empty?
+        @filters.append(["script", "=", resource_attrs[:script]])
+      end
+
+      @objects = Job.readable_by(current_user)
+      apply_filters
+      @object = nil
+      incomplete_job = nil
+      @objects.each do |j|
+        if j.nondeterministic != true and
+            ((j.success == true and j.output != nil) or j.running == true) and
+            j.script_parameters == resource_attrs[:script_parameters]
+          if j.running
+            # We'll use this if we don't find a job that has completed
+            incomplete_job ||= j
+          else
+            # Record the first job in the list
+            if !@object
+              @object = j
+            end
+            # Ensure that all candidate jobs actually did produce the same output
+            if @object.output != j.output
+              @object = nil
+              break
+            end
+          end
+        end
+        @object ||= incomplete_job
+        if @object
+          return show
         end
-        o.commit_ancestors.
-          select { |ca| ca.ancestor == want_ancestor }.
-          select(&:is).
-          first
       end
-      # Now it is safe to do an .includes().where() because we are no
-      # longer interested in jobs that have other ancestors but not
-      # want_ancestor.
-      @objects = @objects.
-        includes(:commit_ancestors).
-        where('commit_ancestors.ancestor = ? and commit_ancestors.is = ?',
-              want_ancestor, true)
     end
+
     super
   end
 
   def cancel
     reload_object_before_update
-    @object.update_attributes cancelled_at: Time.now
+    @object.update_attributes! cancelled_at: Time.now
     show
   end
 
@@ -92,55 +148,12 @@ class Arvados::V1::JobsController < ApplicationController
           @job.reload
         end
       end
-      @redis = Redis.new(:timeout => 0)
-      if @redis.exists @job.uuid
-        # A log buffer exists. Start by showing the last few KB.
-        @redis.
-          getrange(@job.uuid, 0 - [@opts[:buffer_size], 1].max, -1).
-          sub(/^[^\n]*\n?/, '').
-          split("\n").
-          each do |line|
-          yield "#{line}\n"
-        end
-      end
-      # TODO: avoid missing log entries between getrange() above and
-      # subscribe() below.
-      @redis.subscribe(@job.uuid) do |event|
-        event.message do |channel, msg|
-          if msg == "end"
-            @redis.unsubscribe @job.uuid
-          else
-            yield "#{msg}\n"
-          end
-        end
-      end
-    end
-  end
-
-  def self._log_tail_follow_requires_parameters
-    {
-      buffer_size: {type: 'integer', required: false, default: 2**13}
-    }
-  end
-  def log_tail_follow
-    if !@object.andand.uuid
-      return render_not_found
-    end
-    if client_accepts_plain_text_stream
-      self.response.headers['Last-Modified'] = Time.now.ctime.to_s
-      self.response_body = LogStreamer.new @object, {
-        buffer_size: (params[:buffer_size].to_i rescue 2**13)
-      }
-    else
-      render json: {
-        href: url_for(uuid: @object.uuid),
-        comment: ('To retrieve the log stream as plain text, ' +
-                  'use a request header like "Accept: text/plain"')
-      }
     end
   end
 
   def queue
+    params[:order] ||= ['priority desc', 'created_at']
+    load_limit_offset_order_params
     load_where_param
     @where.merge!({
                     started_at: nil,
@@ -148,7 +161,7 @@ class Arvados::V1::JobsController < ApplicationController
                     cancelled_at: nil,
                     success: nil
                   })
-    params[:order] ||= 'priority desc, created_at'
+    load_filters_param
     find_objects_for_index
     index
   end