Merge branch '21126-trash-when-ro'
[arvados.git] / lib / controller / localdb / container.go
index 82f3c3b0ae3a02955f22dd67dcbae0de963d5e95..da2e16e7036667fc62d8b5173f08931dce261507 100644 (file)
@@ -10,6 +10,7 @@ import (
        "fmt"
        "time"
 
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/sirupsen/logrus"
@@ -29,20 +30,25 @@ func (conn *Conn) ContainerUpdate(ctx context.Context, opts arvados.UpdateOption
        return resp, err
 }
 
+var containerPriorityUpdateInterval = 5 * time.Minute
+
 // runContainerPriorityUpdateThread periodically (and immediately
 // after each container update request) corrects any inconsistent
 // container priorities caused by races.
 func (conn *Conn) runContainerPriorityUpdateThread(ctx context.Context) {
+       ctx = ctrlctx.NewWithToken(ctx, conn.cluster, conn.cluster.SystemRootToken)
        log := ctxlog.FromContext(ctx).WithField("worker", "runContainerPriorityUpdateThread")
-       ticker := time.NewTicker(5 * time.Minute)
-       for {
-               err := conn.containerPriorityUpdate(ctx, log)
-               if err != nil {
-                       log.WithError(err).Warn("error updating container priorities")
-               }
+       ticker := time.NewTicker(containerPriorityUpdateInterval)
+       for ctx.Err() == nil {
                select {
                case <-ticker.C:
                case <-conn.wantContainerPriorityUpdate:
+               case <-ctx.Done():
+                       return
+               }
+               err := conn.containerPriorityUpdate(ctx, log)
+               if err != nil {
+                       log.WithError(err).Warn("error updating container priorities")
                }
        }
 }
@@ -52,8 +58,12 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL
        if err != nil {
                return fmt.Errorf("getdb: %w", err)
        }
+       // Stage 1: Fix containers that have priority>0 but should
+       // have priority=0 because there are no active
+       // container_requests (unfinished, priority>0) associated with
+       // them.
        res, err := db.ExecContext(ctx, `
-               UPDATE containers AS c
+               UPDATE containers
                SET priority=0
                WHERE state IN ('Queued', 'Locked', 'Running')
                 AND priority>0
@@ -67,8 +77,18 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL
        } else if rows, err := res.RowsAffected(); err != nil {
                return fmt.Errorf("update: %w", err)
        } else if rows > 0 {
-               log.Infof("found %d containers with no active requests but priority>0, updated to priority=0", rows)
+               log.Infof("found %d containers with priority>0 and no active requests, updated to priority=0", rows)
        }
+
+       // Stage 2: Fix containers that have priority=0 but should
+       // have priority>0 because there are active container_requests
+       // (priority>0, unfinished, and not children of cancelled
+       // containers).
+       //
+       // Fixing here means calling out to RailsAPI to compute the
+       // correct priority for the contianer and (if needed)
+       // propagate that change to child containers.
+
        // In this loop we look for a single container that needs
        // fixing, call out to Rails to fix it, and repeat until we
        // don't find any more.
@@ -82,11 +102,14 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL
                err := db.QueryRowxContext(ctx, `
                        SELECT containers.uuid from containers
                        JOIN container_requests
-                        ON container_requests.container_uuid=containers.uuid
+                        ON container_requests.container_uuid = containers.uuid
                         AND container_requests.state = 'Committed' AND container_requests.priority > 0
+                       LEFT JOIN containers parent
+                        ON parent.uuid = container_requests.requesting_container_uuid
                        WHERE containers.state IN ('Queued', 'Locked', 'Running')
                         AND containers.priority = 0
-                        AND container_requests.uuid IS NOT NULL
+                        AND (parent.uuid IS NULL OR parent.priority > 0)
+                       ORDER BY containers.created_at
                        LIMIT 1`).Scan(&uuid)
                if err == sql.ErrNoRows {
                        break
@@ -101,10 +124,11 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL
                        return fmt.Errorf("possible lack of progress: container %s still has priority=0 after updating", uuid)
                }
                lastUUID = uuid
-               _, err = conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid"}})
+               upd, err := conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid", "priority"}})
                if err != nil {
                        return err
                }
+               log.Debugf("updated container %s priority from 0 to %d", uuid, upd.Priority)
        }
        return nil
 }