Merge branch '10499-cwl-crunch2-docs' refs #10499
[arvados.git] / services / api / app / models / container.rb
index f8e27e7bbc5ff1a3fafff32755aedd3b49830e1d..52f1cba723ed5744af3bf226265f9bb600d4f61f 100644 (file)
@@ -5,11 +5,13 @@ class Container < ArvadosModel
   include KindAndEtag
   include CommonApiTemplate
   include WhitelistUpdate
+  extend CurrentApiClient
 
   serialize :environment, Hash
   serialize :mounts, Hash
   serialize :runtime_constraints, Hash
   serialize :command, Array
+  serialize :scheduling_parameters, Hash
 
   before_validation :fill_field_defaults, :if => :new_record?
   before_validation :set_timestamps
@@ -17,6 +19,7 @@ class Container < ArvadosModel
   validate :validate_state_change
   validate :validate_change
   validate :validate_lock
+  validate :validate_output
   after_validation :assign_auth
   before_save :sort_serialized_attrs
   after_save :handle_completed
@@ -42,6 +45,7 @@ class Container < ArvadosModel
     t.add :started_at
     t.add :state
     t.add :auth_uuid
+    t.add :scheduling_parameters
   end
 
   # Supported states for a container
@@ -77,20 +81,100 @@ class Container < ArvadosModel
     end
   end
 
-  protected
-
-  def self.deep_sort_hash(x)
-    if x.is_a? Hash
-      x.sort.collect do |k, v|
-        [k, deep_sort_hash(v)]
-      end.to_h
-    elsif x.is_a? Array
-      x.collect { |v| deep_sort_hash(v) }
+  def self.find_reusable(attrs)
+    candidates = Container.
+      where('command = ?', attrs[:command].to_yaml).
+      where('cwd = ?', attrs[:cwd]).
+      where('environment = ?', self.deep_sort_hash(attrs[:environment]).to_yaml).
+      where('output_path = ?', attrs[:output_path]).
+      where('container_image = ?', attrs[:container_image]).
+      where('mounts = ?', self.deep_sort_hash(attrs[:mounts]).to_yaml).
+      where('runtime_constraints = ?', self.deep_sort_hash(attrs[:runtime_constraints]).to_yaml)
+
+    # Check for Completed candidates that had consistent outputs.
+    completed = candidates.where(state: Complete).where(exit_code: 0)
+    outputs = completed.select('output').group('output').limit(2)
+    if outputs.count.count != 1
+      Rails.logger.debug("Found #{outputs.count.length} different outputs")
+    elsif Collection.
+        readable_by(current_user).
+        where(portable_data_hash: outputs.first.output).
+        count < 1
+      Rails.logger.info("Found reusable container(s) " +
+                        "but output #{outputs.first} is not readable " +
+                        "by user #{current_user.uuid}")
     else
-      x
+      # Return the oldest eligible container whose log is still
+      # present and readable by current_user.
+      readable_pdh = Collection.
+        readable_by(current_user).
+        select('portable_data_hash')
+      completed = completed.
+        where("log in (#{readable_pdh.to_sql})").
+        order('finished_at asc').
+        limit(1)
+      if completed.first
+        return completed.first
+      else
+        Rails.logger.info("Found reusable container(s) but none with a log " +
+                          "readable by user #{current_user.uuid}")
+      end
+    end
+
+    # Check for Running candidates and return the most likely to finish sooner.
+    running = candidates.where(state: Running).
+      order('progress desc, started_at asc').limit(1).first
+    return running if not running.nil?
+
+    # Check for Locked or Queued ones and return the most likely to start first.
+    locked_or_queued = candidates.where("state IN (?)", [Locked, Queued]).
+      order('state asc, priority desc, created_at asc').limit(1).first
+    return locked_or_queued if not locked_or_queued.nil?
+
+    # No suitable candidate found.
+    nil
+  end
+
+  def lock
+    with_lock do
+      if self.state == Locked
+        raise AlreadyLockedError
+      end
+      self.state = Locked
+      self.save!
     end
   end
 
+  def unlock
+    with_lock do
+      if self.state == Queued
+        raise InvalidStateTransitionError
+      end
+      self.state = Queued
+      self.save!
+    end
+  end
+
+  def self.readable_by(*users_list)
+    if users_list.select { |u| u.is_admin }.any?
+      return self
+    end
+    user_uuids = users_list.map { |u| u.uuid }
+    uuid_list = user_uuids + users_list.flat_map { |u| u.groups_i_can(:read) }
+    uuid_list.uniq!
+    permitted = "(SELECT head_uuid FROM links WHERE link_class='permission' AND tail_uuid IN (:uuids))"
+    joins(:container_requests).
+      where("container_requests.uuid IN #{permitted} OR "+
+            "container_requests.owner_uuid IN (:uuids)",
+            uuids: uuid_list)
+  end
+
+  def final?
+    [Complete, Cancelled].include?(self.state)
+  end
+
+  protected
+
   def fill_field_defaults
     self.state ||= Queued
     self.environment ||= {}
@@ -98,6 +182,7 @@ class Container < ArvadosModel
     self.mounts ||= {}
     self.cwd ||= "."
     self.priority ||= 1
+    self.scheduling_parameters ||= {}
   end
 
   def permission_to_create
@@ -105,7 +190,23 @@ class Container < ArvadosModel
   end
 
   def permission_to_update
-    current_user.andand.is_admin
+    # Override base permission check to allow auth_uuid to set progress and
+    # output (only).  Whether it is legal to set progress and output in the current
+    # state has already been checked in validate_change.
+    current_user.andand.is_admin ||
+      (!current_api_client_authorization.nil? and
+       [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
+  end
+
+  def ensure_owner_uuid_is_permitted
+    # Override base permission check to allow auth_uuid to set progress and
+    # output (only).  Whether it is legal to set progress and output in the current
+    # state has already been checked in validate_change.
+    if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
+      check_update_whitelist [:progress, :output]
+    else
+      super
+    end
   end
 
   def set_timestamps
@@ -124,7 +225,7 @@ class Container < ArvadosModel
     if self.new_record?
       permitted.push(:owner_uuid, :command, :container_image, :cwd,
                      :environment, :mounts, :output_path, :priority,
-                     :runtime_constraints)
+                     :runtime_constraints, :scheduling_parameters)
     end
 
     case self.state
@@ -132,7 +233,7 @@ class Container < ArvadosModel
       permitted.push :priority
 
     when Running
-      permitted.push :priority, :progress
+      permitted.push :priority, :progress, :output
       if self.state_changed?
         permitted.push :started_at
       end
@@ -159,20 +260,10 @@ class Container < ArvadosModel
   end
 
   def validate_lock
-    # If the Container is already locked by someone other than the
-    # current api_client_auth, disallow all changes -- except
-    # priority, which needs to change to reflect max(priority) of
-    # relevant ContainerRequests.
-    if locked_by_uuid_was
-      if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
-        check_update_whitelist [:priority]
-      end
-    end
-
     if [Locked, Running].include? self.state
       # If the Container was already locked, locked_by_uuid must not
       # changes. Otherwise, the current auth gets the lock.
-      need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
+      need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid
     else
       need_lock = nil
     end
@@ -188,6 +279,21 @@ class Container < ArvadosModel
     self.locked_by_uuid = need_lock
   end
 
+  def validate_output
+    # Output must exist and be readable by the current user.  This is so
+    # that a container cannot "claim" a collection that it doesn't otherwise
+    # have access to just by setting the output field to the collection PDH.
+    if output_changed?
+      c = Collection.
+          readable_by(current_user).
+          where(portable_data_hash: self.output).
+          first
+      if !c
+        errors.add :output, "collection must exist and be readable by current user."
+      end
+    end
+  end
+
   def assign_auth
     if self.auth_uuid_changed?
       return errors.add :auth_uuid, 'is readonly'
@@ -214,25 +320,62 @@ class Container < ArvadosModel
   end
 
   def sort_serialized_attrs
-    self.environment = self.class.deep_sort_hash(self.environment)
-    self.mounts = self.class.deep_sort_hash(self.mounts)
-    self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
+    if self.environment_changed?
+      self.environment = self.class.deep_sort_hash(self.environment)
+    end
+    if self.mounts_changed?
+      self.mounts = self.class.deep_sort_hash(self.mounts)
+    end
+    if self.runtime_constraints_changed?
+      self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
+    end
+    if self.scheduling_parameters_changed?
+      self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
+    end
   end
 
   def handle_completed
     # This container is finished so finalize any associated container requests
     # that are associated with this container.
-    if self.state_changed? and [Complete, Cancelled].include? self.state
+    if self.state_changed? and self.final?
       act_as_system_user do
+
+        if self.state == Cancelled
+          retryable_requests = ContainerRequest.where("priority > 0 and state = 'Committed' and container_count < container_count_max")
+        else
+          retryable_requests = []
+        end
+
+        if retryable_requests.any?
+          c_attrs = {
+            command: self.command,
+            cwd: self.cwd,
+            environment: self.environment,
+            output_path: self.output_path,
+            container_image: self.container_image,
+            mounts: self.mounts,
+            runtime_constraints: self.runtime_constraints,
+            scheduling_parameters: self.scheduling_parameters
+          }
+          c = Container.create! c_attrs
+          retryable_requests.each do |cr|
+            cr.with_lock do
+              # Use row locking because this increments container_count
+              cr.container_uuid = c.uuid
+              cr.save
+            end
+          end
+        end
+
         # Notify container requests associated with this container
         ContainerRequest.where(container_uuid: uuid,
-                               :state => ContainerRequest::Committed).each do |cr|
-          cr.container_completed!
+                               state: ContainerRequest::Committed).each do |cr|
+          cr.finalize!
         end
 
         # Try to cancel any outstanding container requests made by this container.
         ContainerRequest.where(requesting_container_uuid: uuid,
-                               :state => ContainerRequest::Committed).each do |cr|
+                               state: ContainerRequest::Committed).each do |cr|
           cr.priority = 0
           cr.save
         end