1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/lib/ctrlctx"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "github.com/sirupsen/logrus"
19 // ContainerUpdate defers to railsProxy and then notifies the
20 // container priority updater thread.
21 func (conn *Conn) ContainerUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Container, error) {
22 resp, err := conn.railsProxy.ContainerUpdate(ctx, opts)
25 case conn.wantContainerPriorityUpdate <- struct{}{}:
27 // update already pending
33 // runContainerPriorityUpdateThread periodically (and immediately
34 // after each container update request) corrects any inconsistent
35 // container priorities caused by races.
36 func (conn *Conn) runContainerPriorityUpdateThread(ctx context.Context) {
37 ctx = ctrlctx.NewWithToken(ctx, conn.cluster, conn.cluster.SystemRootToken)
38 log := ctxlog.FromContext(ctx).WithField("worker", "runContainerPriorityUpdateThread")
39 ticker := time.NewTicker(5 * time.Minute)
40 for ctx.Err() == nil {
43 case <-conn.wantContainerPriorityUpdate:
47 err := conn.containerPriorityUpdate(ctx, log)
49 log.WithError(err).Warn("error updating container priorities")
54 func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldLogger) error {
55 db, err := conn.getdb(ctx)
57 return fmt.Errorf("getdb: %w", err)
59 // Stage 1: Fix containers that have priority>0 but should
60 // have priority=0 because there are no active
61 // container_requests (unfinished, priority>0) associated with
63 res, err := db.ExecContext(ctx, `
66 WHERE state IN ('Queued', 'Locked', 'Running')
70 FROM container_requests
72 AND state = 'Committed')`)
74 return fmt.Errorf("update: %w", err)
75 } else if rows, err := res.RowsAffected(); err != nil {
76 return fmt.Errorf("update: %w", err)
78 log.Infof("found %d containers with priority>0 and no active requests, updated to priority=0", rows)
81 // Stage 2: Fix containers that have priority=0 but should
82 // have priority>0 because there are active container_requests
83 // (priority>0, unfinished, and not children of cancelled
86 // Fixing here means calling out to RailsAPI to compute the
87 // correct priority for the contianer and (if needed)
88 // propagate that change to child containers.
90 // In this loop we look for a single container that needs
91 // fixing, call out to Rails to fix it, and repeat until we
92 // don't find any more.
94 // We could get a batch of UUIDs that need attention by
95 // increasing LIMIT 1, however, updating priority on one
96 // container typically cascades to other containers, so we
97 // would often end up repeating work.
98 for lastUUID := ""; ; {
100 err := db.QueryRowxContext(ctx, `
101 SELECT containers.uuid from containers
102 JOIN container_requests
103 ON container_requests.container_uuid = containers.uuid
104 AND container_requests.state = 'Committed' AND container_requests.priority > 0
105 LEFT JOIN containers parent
106 ON parent.uuid = container_requests.requesting_container_uuid
107 WHERE containers.state IN ('Queued', 'Locked', 'Running')
108 AND containers.priority = 0
109 AND (parent.uuid IS NULL OR parent.priority > 0)
110 ORDER BY containers.created_at
111 LIMIT 1`).Scan(&uuid)
112 if err == sql.ErrNoRows {
116 return fmt.Errorf("join: %w", err)
118 if uuid == lastUUID {
119 // We don't want to keep hammering this
120 // forever if the ContainerPriorityUpdate call
121 // didn't achieve anything.
122 return fmt.Errorf("possible lack of progress: container %s still has priority=0 after updating", uuid)
125 upd, err := conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid", "priority"}})
129 log.Debugf("updated container %s priority from 0 to %d", uuid, upd.Priority)