20183: Move update_priority tests. Fix updater starvation.
authorTom Clegg <tom@curii.com>
Mon, 6 Mar 2023 00:27:22 +0000 (19:27 -0500)
committerTom Clegg <tom@curii.com>
Mon, 6 Mar 2023 00:27:22 +0000 (19:27 -0500)
Previously (in the Rails implementation) the "find containers that
have priority=0 but need priority>0" query was returning all
containers with active (committed, priority>0) requests. However, it
is possible for all such requests to have parent
containers (requesting_container_uuid) to have priority=0, in which
case Container.update_priority!() leaves the container priority at 0.

With the controller implementation, this was manifesting as lack of
progress, and warnings in logs.

This commit fixes it by not including such containers as needing
priority>0.

With the previous Rails setup, this could cause infinite recursion:
the updater thread called update_priority!(), which left
priority=0 but still triggered an after_commit hook, which
(if the updater thread had already been running for >5s) started a new
updater thread, which called update_priority!() on the same container,
etc.

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/controller/localdb/container.go
lib/controller/localdb/container_test.go [new file with mode: 0644]
services/api/app/controllers/arvados/v1/containers_controller.rb
services/api/test/functional/arvados/v1/containers_controller_test.rb
services/api/test/unit/update_priority_test.rb [deleted file]

index e689cc51cd3904c9d9cda70ea606ae4f7fb0d275..b87f9033d5ef370cd7ac76e380ca4d9c301716b7 100644 (file)
@@ -37,7 +37,7 @@ func (conn *Conn) runContainerPriorityUpdateThread(ctx context.Context) {
        ctx = ctrlctx.NewWithToken(ctx, conn.cluster, conn.cluster.SystemRootToken)
        log := ctxlog.FromContext(ctx).WithField("worker", "runContainerPriorityUpdateThread")
        ticker := time.NewTicker(5 * time.Minute)
-       for {
+       for ctx.Err() == nil {
                err := conn.containerPriorityUpdate(ctx, log)
                if err != nil {
                        log.WithError(err).Warn("error updating container priorities")
@@ -45,6 +45,8 @@ func (conn *Conn) runContainerPriorityUpdateThread(ctx context.Context) {
                select {
                case <-ticker.C:
                case <-conn.wantContainerPriorityUpdate:
+               case <-ctx.Done():
+                       return
                }
        }
 }
@@ -55,7 +57,7 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL
                return fmt.Errorf("getdb: %w", err)
        }
        res, err := db.ExecContext(ctx, `
-               UPDATE containers AS c
+               UPDATE containers
                SET priority=0
                WHERE state IN ('Queued', 'Locked', 'Running')
                 AND priority>0
@@ -69,7 +71,7 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL
        } else if rows, err := res.RowsAffected(); err != nil {
                return fmt.Errorf("update: %w", err)
        } else if rows > 0 {
-               log.Infof("found %d containers with no active requests but priority>0, updated to priority=0", rows)
+               log.Infof("found %d containers with priority>0 and no active requests, updated to priority=0", rows)
        }
        // In this loop we look for a single container that needs
        // fixing, call out to Rails to fix it, and repeat until we
@@ -86,9 +88,12 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL
                        JOIN container_requests
                         ON container_requests.container_uuid=containers.uuid
                         AND container_requests.state = 'Committed' AND container_requests.priority > 0
+                       LEFT JOIN containers parent
+                        ON parent.uuid = container_requests.requesting_container_uuid
                        WHERE containers.state IN ('Queued', 'Locked', 'Running')
                         AND containers.priority = 0
                         AND container_requests.uuid IS NOT NULL
+                        AND (parent.uuid IS NULL OR parent.priority > 0)
                        LIMIT 1`).Scan(&uuid)
                if err == sql.ErrNoRows {
                        break
@@ -103,10 +108,11 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL
                        return fmt.Errorf("possible lack of progress: container %s still has priority=0 after updating", uuid)
                }
                lastUUID = uuid
-               _, err = conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid"}})
+               upd, err := conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid", "priority"}})
                if err != nil {
                        return err
                }
+               log.Debugf("updated container %s priority from 0 to %d", uuid, upd.Priority)
        }
        return nil
 }
diff --git a/lib/controller/localdb/container_test.go b/lib/controller/localdb/container_test.go
new file mode 100644 (file)
index 0000000..437e30b
--- /dev/null
@@ -0,0 +1,276 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package localdb
+
+import (
+       "context"
+       "database/sql"
+       "errors"
+       "fmt"
+       "math/rand"
+       "sync"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/ctrlctx"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&containerSuite{})
+
+type containerSuite struct {
+       localdbSuite
+       topcr     arvados.ContainerRequest
+       topc      arvados.Container
+       starttime time.Time
+}
+
+func (s *containerSuite) crAttrs(c *C) map[string]interface{} {
+       return map[string]interface{}{
+               "container_image":     arvadostest.DockerImage112PDH,
+               "command":             []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), "top"},
+               "output_path":         "/out",
+               "priority":            1,
+               "state":               "Committed",
+               "container_count_max": 1,
+               "runtime_constraints": arvados.RuntimeConstraints{
+                       RAM:   1,
+                       VCPUs: 1,
+               },
+               "mounts": map[string]arvados.Mount{
+                       "/out": arvados.Mount{},
+               },
+       }
+}
+
+func (s *containerSuite) SetUpTest(c *C) {
+       s.localdbSuite.SetUpTest(c)
+       var err error
+       s.topcr, err = s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: s.crAttrs(c)})
+       c.Assert(err, IsNil)
+       s.topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topcr.ContainerUUID})
+       c.Assert(err, IsNil)
+       c.Assert(int(s.topc.Priority), Not(Equals), 0)
+       c.Logf("topcr %s topc %s", s.topcr.UUID, s.topc.UUID)
+       s.starttime = time.Now()
+}
+
+func (s *containerSuite) syncUpdatePriority(c *C) {
+       // Sending 1x to the "update now" channel starts an update;
+       // sending again fills the channel while the first update is
+       // running; sending a third time blocks until the worker
+       // receives the 2nd send, i.e., guarantees that the first
+       // update has finished.
+       s.localdb.wantContainerPriorityUpdate <- struct{}{}
+       s.localdb.wantContainerPriorityUpdate <- struct{}{}
+       s.localdb.wantContainerPriorityUpdate <- struct{}{}
+}
+
+func (s *containerSuite) TestUpdatePriorityShouldBeNonZero(c *C) {
+       _, err := s.db.Exec("update containers set priority=0 where uuid=$1", s.topc.UUID)
+       c.Assert(err, IsNil)
+       topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
+       c.Assert(err, IsNil)
+       c.Assert(int(topc.Priority), Equals, 0)
+       s.syncUpdatePriority(c)
+       topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
+       c.Assert(err, IsNil)
+       c.Check(int(topc.Priority), Not(Equals), 0)
+}
+
+func (s *containerSuite) TestUpdatePriorityShouldBeZero(c *C) {
+       _, err := s.db.Exec("update container_requests set priority=0 where uuid=$1", s.topcr.UUID)
+       c.Assert(err, IsNil)
+       topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
+       c.Assert(err, IsNil)
+       c.Assert(int(topc.Priority), Not(Equals), 0)
+       s.syncUpdatePriority(c)
+       topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
+       c.Assert(err, IsNil)
+       c.Check(int(topc.Priority), Equals, 0)
+}
+
+func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
+       childCR := func(parent arvados.ContainerRequest, arg string) arvados.ContainerRequest {
+               attrs := s.crAttrs(c)
+               attrs["command"] = []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), arg}
+               cr, err := s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: attrs})
+               c.Assert(err, IsNil)
+               _, err = s.db.Exec("update container_requests set requesting_container_uuid=$1 where uuid=$2", parent.ContainerUUID, cr.UUID)
+               c.Assert(err, IsNil)
+               return cr
+       }
+       // Build a tree of container requests and containers (3 levels
+       // deep below s.topcr)
+       allcrs := []arvados.ContainerRequest{s.topcr}
+       for i := 0; i < 2; i++ {
+               cri := childCR(s.topcr, fmt.Sprintf("i %d", i))
+               allcrs = append(allcrs, cri)
+               for j := 0; j < 3; j++ {
+                       crj := childCR(cri, fmt.Sprintf("i %d j %d", i, j))
+                       allcrs = append(allcrs, crj)
+                       for k := 0; k < 4; k++ {
+                               crk := childCR(crj, fmt.Sprintf("i %d j %d k %d", i, j, k))
+                               allcrs = append(allcrs, crk)
+                       }
+               }
+       }
+
+       testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(time.Second*20))
+       defer testCancel()
+
+       // Set priority=0 on a parent+child, plus 18 other randomly
+       // selected containers in the tree
+       adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken)
+       needfix := make([]int, 20)
+       running := make(map[int]bool)
+       for n := range needfix {
+               var i int // which container are we going to run & then set priority=0
+               if n < 2 {
+                       // first two are allcrs[1] (which is "i 0")
+                       // and allcrs[2] (which is "i 0 j 0")
+                       i = n + 1
+               } else {
+                       // rest are random
+                       i = rand.Intn(len(allcrs))
+               }
+               needfix[n] = i
+               if !running[i] {
+                       _, err := s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
+                               UUID:  allcrs[i].ContainerUUID,
+                               Attrs: map[string]interface{}{"state": "Locked"},
+                       })
+                       c.Assert(err, IsNil)
+                       _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
+                               UUID:  allcrs[i].ContainerUUID,
+                               Attrs: map[string]interface{}{"state": "Running"},
+                       })
+                       c.Assert(err, IsNil)
+                       running[i] = true
+               }
+               res, err := s.db.Exec("update containers set priority=0 where uuid=$1", allcrs[i].ContainerUUID)
+               c.Assert(err, IsNil)
+               updated, err := res.RowsAffected()
+               c.Assert(err, IsNil)
+               if n == 0 {
+                       c.Assert(int(updated), Equals, 1)
+               }
+       }
+
+       var wg sync.WaitGroup
+       defer wg.Wait()
+
+       chaosCtx, chaosCancel := context.WithCancel(adminCtx)
+       defer chaosCancel()
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               // Flood the api with ContainerUpdate calls for the
+               // same containers that need to have their priority
+               // fixed
+               for chaosCtx.Err() == nil {
+                       n := rand.Intn(len(needfix))
+                       _, err := s.localdb.ContainerUpdate(chaosCtx, arvados.UpdateOptions{
+                               UUID: allcrs[needfix[n]].ContainerUUID,
+                               Attrs: map[string]interface{}{
+                                       "runtime_status": map[string]string{
+                                               "info": time.Now().Format(time.RFC3339Nano),
+                                       },
+                               },
+                       })
+                       if !errors.Is(err, context.Canceled) {
+                               c.Check(err, IsNil)
+                       }
+               }
+       }()
+       // Find and fix the containers with wrong priority
+       s.syncUpdatePriority(c)
+       // Ensure they all got fixed
+       for _, cr := range allcrs {
+               var priority int
+               err := s.db.QueryRow("select priority from containers where uuid=$1", cr.ContainerUUID).Scan(&priority)
+               c.Assert(err, IsNil)
+               c.Check(priority, Not(Equals), 0)
+       }
+
+       chaosCancel()
+
+       // Simulate cascading cancellation of the entire tree. For
+       // this we need a goroutine to notice and cancel containers
+       // with state=Running and priority=0, and cancel them
+       // (this is normally done by a dispatcher).
+       dispCtx, dispCancel := context.WithCancel(adminCtx)
+       defer dispCancel()
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               for dispCtx.Err() == nil {
+                       needcancel, err := s.localdb.ContainerList(dispCtx, arvados.ListOptions{
+                               Limit:   1,
+                               Filters: []arvados.Filter{{"state", "=", "Running"}, {"priority", "=", 0}},
+                       })
+                       if errors.Is(err, context.Canceled) {
+                               break
+                       }
+                       c.Assert(err, IsNil)
+                       for _, ctr := range needcancel.Items {
+                               _, err := s.localdb.ContainerUpdate(dispCtx, arvados.UpdateOptions{
+                                       UUID: ctr.UUID,
+                                       Attrs: map[string]interface{}{
+                                               "state": "Cancelled",
+                                       },
+                               })
+                               c.Assert(err, IsNil)
+                       }
+               }
+       }()
+
+       _, err := s.localdb.ContainerRequestUpdate(s.userctx, arvados.UpdateOptions{
+               UUID: s.topcr.UUID,
+               Attrs: map[string]interface{}{
+                       "priority": 0,
+               },
+       })
+       c.Assert(err, IsNil)
+
+       c.Logf("waiting for all %d containers to have priority=0 after cancelling top level CR", len(allcrs))
+       for {
+               time.Sleep(time.Second / 2)
+               if testCtx.Err() != nil {
+                       c.Fatal("timed out")
+               }
+               done := true
+               for _, cr := range allcrs {
+                       var priority int
+                       var crstate, command, ctrUUID string
+                       var parent sql.NullString
+                       err := s.db.QueryRowContext(s.ctx, "select state, priority, command, container_uuid, requesting_container_uuid from container_requests where uuid=$1", cr.UUID).Scan(&crstate, &priority, &command, &ctrUUID, &parent)
+                       if errors.Is(err, context.Canceled) {
+                               break
+                       }
+                       c.Assert(err, IsNil)
+                       if crstate == "Committed" && priority > 0 {
+                               c.Logf("container request %s (%s; parent=%s) still has state %s priority %d", cr.UUID, command, parent.String, crstate, priority)
+                               done = false
+                               break
+                       }
+                       err = s.db.QueryRowContext(s.ctx, "select priority, command from containers where uuid=$1", cr.ContainerUUID).Scan(&priority, &command)
+                       if errors.Is(err, context.Canceled) {
+                               break
+                       }
+                       c.Assert(err, IsNil)
+                       if priority > 0 {
+                               c.Logf("container %s (%s) still has priority %d", cr.ContainerUUID, command, priority)
+                               done = false
+                               break
+                       }
+               }
+               if done {
+                       c.Logf("success -- all %d containers have priority=0", len(allcrs))
+                       break
+               }
+       }
+}
index 11e74ea82f3f0fc7961361082ba0c91f5a5168d5..20e7d6272e1e1c1d0deafa94272fddb39b755e6f 100644 (file)
@@ -41,8 +41,10 @@ class Arvados::V1::ContainersController < ApplicationController
       @objects = @objects.select(:id, :uuid, :state, :priority, :auth_uuid, :locked_by_uuid, :lock_count)
       @select = %w(uuid state priority auth_uuid locked_by_uuid)
     elsif action_name == 'update_priority'
+      # We're going to reload(lock: true) in the handler, which will
+      # select all attributes, but will fail if we don't select :id
+      # now.
       @objects = @objects.select(:id, :uuid)
-      @select = %w(uuid)
     end
   end
 
@@ -57,6 +59,7 @@ class Arvados::V1::ContainersController < ApplicationController
   end
 
   def update_priority
+    @object.reload(lock: true)
     @object.update_priority!
     show
   end
index 5b8ec0f6383c7dd4042e8197b7853786eae63afe..8c2919b97102db2e3007938e1fe32405c4251928 100644 (file)
@@ -160,4 +160,12 @@ class Arvados::V1::ContainersControllerTest < ActionController::TestCase
     assert_equal "v2/#{json_response['uuid']}/#{json_response['api_token']}", api_client_authorizations(:container_runtime_token).token
     assert_equal 'arvados#apiClientAuthorization', json_response['kind']
   end
+
+  test 'update_priority' do
+    ActiveRecord::Base.connection.execute "update containers set priority=0 where uuid='#{containers(:running).uuid}'"
+    authorize_with :admin
+    post :update_priority, params: {id: containers(:running).uuid}
+    assert_response :success
+    assert_not_equal 0, Container.find_by_uuid(containers(:running).uuid).priority
+  end
 end
diff --git a/services/api/test/unit/update_priority_test.rb b/services/api/test/unit/update_priority_test.rb
deleted file mode 100644 (file)
index 054fe40..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'test_helper'
-
-class UpdatePriorityTest < ActiveSupport::TestCase
-  test 'priority 0 but should be >0' do
-    uuid = containers(:running).uuid
-    ActiveRecord::Base.connection.exec_query('UPDATE containers SET priority=0 WHERE uuid=$1', 'test-setup', [[nil, uuid]])
-    assert_equal 0, Container.find_by_uuid(uuid).priority
-    UpdatePriority.update_priority(nolock: true)
-    assert_operator 0, :<, Container.find_by_uuid(uuid).priority
-
-    uuid = containers(:queued).uuid
-    ActiveRecord::Base.connection.exec_query('UPDATE containers SET priority=0 WHERE uuid=$1', 'test-setup', [[nil, uuid]])
-    assert_equal 0, Container.find_by_uuid(uuid).priority
-    UpdatePriority.update_priority(nolock: true)
-    assert_operator 0, :<, Container.find_by_uuid(uuid).priority
-  end
-
-  test 'priority>0 but should be 0' do
-    uuid = containers(:running).uuid
-    ActiveRecord::Base.connection.exec_query('DELETE FROM container_requests WHERE container_uuid=$1', 'test-setup', [[nil, uuid]])
-    assert_operator 0, :<, Container.find_by_uuid(uuid).priority
-    UpdatePriority.update_priority(nolock: true)
-    assert_equal 0, Container.find_by_uuid(uuid).priority
-  end
-end