1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/lib/ctrlctx"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "github.com/sirupsen/logrus"
19 // ContainerUpdate defers to railsProxy and then notifies the
20 // container priority updater thread.
21 func (conn *Conn) ContainerUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Container, error) {
22 resp, err := conn.railsProxy.ContainerUpdate(ctx, opts)
25 case conn.wantContainerPriorityUpdate <- struct{}{}:
27 // update already pending
33 var containerPriorityUpdateInterval = 5 * time.Minute
35 // runContainerPriorityUpdateThread periodically (and immediately
36 // after each container update request) corrects any inconsistent
37 // container priorities caused by races.
38 func (conn *Conn) runContainerPriorityUpdateThread(ctx context.Context) {
39 ctx = ctrlctx.NewWithToken(ctx, conn.cluster, conn.cluster.SystemRootToken)
40 log := ctxlog.FromContext(ctx).WithField("worker", "runContainerPriorityUpdateThread")
41 ticker := time.NewTicker(containerPriorityUpdateInterval)
42 for ctx.Err() == nil {
45 case <-conn.wantContainerPriorityUpdate:
49 err := conn.containerPriorityUpdate(ctx, log)
51 log.WithError(err).Warn("error updating container priorities")
56 func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldLogger) error {
57 db, err := conn.getdb(ctx)
59 return fmt.Errorf("getdb: %w", err)
61 // Stage 1: Fix containers that have priority>0 but should
62 // have priority=0 because there are no active
63 // container_requests (unfinished, priority>0) associated with
65 res, err := db.ExecContext(ctx, `
68 WHERE state IN ('Queued', 'Locked', 'Running')
72 FROM container_requests
74 AND state = 'Committed')`)
76 return fmt.Errorf("update: %w", err)
77 } else if rows, err := res.RowsAffected(); err != nil {
78 return fmt.Errorf("update: %w", err)
80 log.Infof("found %d containers with priority>0 and no active requests, updated to priority=0", rows)
83 // Stage 2: Fix containers that have priority=0 but should
84 // have priority>0 because there are active container_requests
85 // (priority>0, unfinished, and not children of cancelled
88 // Fixing here means calling out to RailsAPI to compute the
89 // correct priority for the contianer and (if needed)
90 // propagate that change to child containers.
92 // In this loop we look for a single container that needs
93 // fixing, call out to Rails to fix it, and repeat until we
94 // don't find any more.
96 // We could get a batch of UUIDs that need attention by
97 // increasing LIMIT 1, however, updating priority on one
98 // container typically cascades to other containers, so we
99 // would often end up repeating work.
100 for lastUUID := ""; ; {
102 err := db.QueryRowxContext(ctx, `
103 SELECT containers.uuid from containers
104 JOIN container_requests
105 ON container_requests.container_uuid = containers.uuid
106 AND container_requests.state = 'Committed' AND container_requests.priority > 0
107 LEFT JOIN containers parent
108 ON parent.uuid = container_requests.requesting_container_uuid
109 WHERE containers.state IN ('Queued', 'Locked', 'Running')
110 AND containers.priority = 0
111 AND (parent.uuid IS NULL OR parent.priority > 0)
112 ORDER BY containers.created_at
113 LIMIT 1`).Scan(&uuid)
114 if err == sql.ErrNoRows {
118 return fmt.Errorf("join: %w", err)
120 if uuid == lastUUID {
121 // We don't want to keep hammering this
122 // forever if the ContainerPriorityUpdate call
123 // didn't achieve anything.
124 return fmt.Errorf("possible lack of progress: container %s still has priority=0 after updating", uuid)
127 upd, err := conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid", "priority"}})
131 log.Debugf("updated container %s priority from 0 to %d", uuid, upd.Priority)