1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
13 "git.arvados.org/arvados.git/lib/ctrlctx"
14 "git.arvados.org/arvados.git/sdk/go/arvados"
15 "git.arvados.org/arvados.git/sdk/go/ctxlog"
16 "github.com/sirupsen/logrus"
19 // ContainerUpdate defers to railsProxy and then notifies the
20 // container priority updater thread.
21 func (conn *Conn) ContainerUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Container, error) {
22 resp, err := conn.railsProxy.ContainerUpdate(ctx, opts)
25 case conn.wantContainerPriorityUpdate <- struct{}{}:
27 // update already pending
33 // runContainerPriorityUpdateThread periodically (and immediately
34 // after each container update request) corrects any inconsistent
35 // container priorities caused by races.
36 func (conn *Conn) runContainerPriorityUpdateThread(ctx context.Context) {
37 ctx = ctrlctx.NewWithToken(ctx, conn.cluster, conn.cluster.SystemRootToken)
38 log := ctxlog.FromContext(ctx).WithField("worker", "runContainerPriorityUpdateThread")
39 ticker := time.NewTicker(5 * time.Minute)
40 for ctx.Err() == nil {
43 case <-conn.wantContainerPriorityUpdate:
47 err := conn.containerPriorityUpdate(ctx, log)
49 log.WithError(err).Warn("error updating container priorities")
54 func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldLogger) error {
55 db, err := conn.getdb(ctx)
57 return fmt.Errorf("getdb: %w", err)
59 res, err := db.ExecContext(ctx, `
62 WHERE state IN ('Queued', 'Locked', 'Running')
66 FROM container_requests
68 AND state = 'Committed')`)
70 return fmt.Errorf("update: %w", err)
71 } else if rows, err := res.RowsAffected(); err != nil {
72 return fmt.Errorf("update: %w", err)
74 log.Infof("found %d containers with priority>0 and no active requests, updated to priority=0", rows)
76 // In this loop we look for a single container that needs
77 // fixing, call out to Rails to fix it, and repeat until we
78 // don't find any more.
80 // We could get a batch of UUIDs that need attention by
81 // increasing LIMIT 1, however, updating priority on one
82 // container typically cascades to other containers, so we
83 // would often end up repeating work.
84 for lastUUID := ""; ; {
86 err := db.QueryRowxContext(ctx, `
87 SELECT containers.uuid from containers
88 JOIN container_requests
89 ON container_requests.container_uuid=containers.uuid
90 AND container_requests.state = 'Committed' AND container_requests.priority > 0
91 LEFT JOIN containers parent
92 ON parent.uuid = container_requests.requesting_container_uuid
93 WHERE containers.state IN ('Queued', 'Locked', 'Running')
94 AND containers.priority = 0
95 AND container_requests.uuid IS NOT NULL
96 AND (parent.uuid IS NULL OR parent.priority > 0)
98 if err == sql.ErrNoRows {
102 return fmt.Errorf("join: %w", err)
104 if uuid == lastUUID {
105 // We don't want to keep hammering this
106 // forever if the ContainerPriorityUpdate call
107 // didn't achieve anything.
108 return fmt.Errorf("possible lack of progress: container %s still has priority=0 after updating", uuid)
111 upd, err := conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid", "priority"}})
115 log.Debugf("updated container %s priority from 0 to %d", uuid, upd.Priority)