X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/df8aa6da0173f77aa9f2fcbe2814a07bcdcbf5d9..45ebdd1005f12c3c18355ab511e7a2e7f623358a:/lib/controller/localdb/container.go diff --git a/lib/controller/localdb/container.go b/lib/controller/localdb/container.go index 82f3c3b0ae..da2e16e703 100644 --- a/lib/controller/localdb/container.go +++ b/lib/controller/localdb/container.go @@ -10,6 +10,7 @@ import ( "fmt" "time" + "git.arvados.org/arvados.git/lib/ctrlctx" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/sirupsen/logrus" @@ -29,20 +30,25 @@ func (conn *Conn) ContainerUpdate(ctx context.Context, opts arvados.UpdateOption return resp, err } +var containerPriorityUpdateInterval = 5 * time.Minute + // runContainerPriorityUpdateThread periodically (and immediately // after each container update request) corrects any inconsistent // container priorities caused by races. func (conn *Conn) runContainerPriorityUpdateThread(ctx context.Context) { + ctx = ctrlctx.NewWithToken(ctx, conn.cluster, conn.cluster.SystemRootToken) log := ctxlog.FromContext(ctx).WithField("worker", "runContainerPriorityUpdateThread") - ticker := time.NewTicker(5 * time.Minute) - for { - err := conn.containerPriorityUpdate(ctx, log) - if err != nil { - log.WithError(err).Warn("error updating container priorities") - } + ticker := time.NewTicker(containerPriorityUpdateInterval) + for ctx.Err() == nil { select { case <-ticker.C: case <-conn.wantContainerPriorityUpdate: + case <-ctx.Done(): + return + } + err := conn.containerPriorityUpdate(ctx, log) + if err != nil { + log.WithError(err).Warn("error updating container priorities") } } } @@ -52,8 +58,12 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL if err != nil { return fmt.Errorf("getdb: %w", err) } + // Stage 1: Fix containers that have priority>0 but should + // have priority=0 because there are no active + // container_requests (unfinished, priority>0) associated with + // them. res, err := db.ExecContext(ctx, ` - UPDATE containers AS c + UPDATE containers SET priority=0 WHERE state IN ('Queued', 'Locked', 'Running') AND priority>0 @@ -67,8 +77,18 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL } else if rows, err := res.RowsAffected(); err != nil { return fmt.Errorf("update: %w", err) } else if rows > 0 { - log.Infof("found %d containers with no active requests but priority>0, updated to priority=0", rows) + log.Infof("found %d containers with priority>0 and no active requests, updated to priority=0", rows) } + + // Stage 2: Fix containers that have priority=0 but should + // have priority>0 because there are active container_requests + // (priority>0, unfinished, and not children of cancelled + // containers). + // + // Fixing here means calling out to RailsAPI to compute the + // correct priority for the contianer and (if needed) + // propagate that change to child containers. + // In this loop we look for a single container that needs // fixing, call out to Rails to fix it, and repeat until we // don't find any more. @@ -82,11 +102,14 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL err := db.QueryRowxContext(ctx, ` SELECT containers.uuid from containers JOIN container_requests - ON container_requests.container_uuid=containers.uuid + ON container_requests.container_uuid = containers.uuid AND container_requests.state = 'Committed' AND container_requests.priority > 0 + LEFT JOIN containers parent + ON parent.uuid = container_requests.requesting_container_uuid WHERE containers.state IN ('Queued', 'Locked', 'Running') AND containers.priority = 0 - AND container_requests.uuid IS NOT NULL + AND (parent.uuid IS NULL OR parent.priority > 0) + ORDER BY containers.created_at LIMIT 1`).Scan(&uuid) if err == sql.ErrNoRows { break @@ -101,10 +124,11 @@ func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldL return fmt.Errorf("possible lack of progress: container %s still has priority=0 after updating", uuid) } lastUUID = uuid - _, err = conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid"}}) + upd, err := conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid", "priority"}}) if err != nil { return err } + log.Debugf("updated container %s priority from 0 to %d", uuid, upd.Priority) } return nil }