Merge branch 'github-pr-223'
[arvados.git] / lib / controller / localdb / container.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "database/sql"
10         "fmt"
11         "time"
12
13         "git.arvados.org/arvados.git/lib/ctrlctx"
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/ctxlog"
16         "github.com/sirupsen/logrus"
17 )
18
19 // ContainerUpdate defers to railsProxy and then notifies the
20 // container priority updater thread.
21 func (conn *Conn) ContainerUpdate(ctx context.Context, opts arvados.UpdateOptions) (arvados.Container, error) {
22         resp, err := conn.railsProxy.ContainerUpdate(ctx, opts)
23         if err == nil {
24                 select {
25                 case conn.wantContainerPriorityUpdate <- struct{}{}:
26                 default:
27                         // update already pending
28                 }
29         }
30         return resp, err
31 }
32
33 var containerPriorityUpdateInterval = 5 * time.Minute
34
35 // runContainerPriorityUpdateThread periodically (and immediately
36 // after each container update request) corrects any inconsistent
37 // container priorities caused by races.
38 func (conn *Conn) runContainerPriorityUpdateThread(ctx context.Context) {
39         ctx = ctrlctx.NewWithToken(ctx, conn.cluster, conn.cluster.SystemRootToken)
40         log := ctxlog.FromContext(ctx).WithField("worker", "runContainerPriorityUpdateThread")
41         ticker := time.NewTicker(containerPriorityUpdateInterval)
42         for ctx.Err() == nil {
43                 select {
44                 case <-ticker.C:
45                 case <-conn.wantContainerPriorityUpdate:
46                 case <-ctx.Done():
47                         return
48                 }
49                 err := conn.containerPriorityUpdate(ctx, log)
50                 if err != nil {
51                         log.WithError(err).Warn("error updating container priorities")
52                 }
53         }
54 }
55
56 func (conn *Conn) containerPriorityUpdate(ctx context.Context, log logrus.FieldLogger) error {
57         db, err := conn.getdb(ctx)
58         if err != nil {
59                 return fmt.Errorf("getdb: %w", err)
60         }
61         // Stage 1: Fix containers that have priority>0 but should
62         // have priority=0 because there are no active
63         // container_requests (unfinished, priority>0) associated with
64         // them.
65         res, err := db.ExecContext(ctx, `
66                 UPDATE containers
67                 SET priority=0
68                 WHERE state IN ('Queued', 'Locked', 'Running')
69                  AND priority>0
70                  AND uuid NOT IN (
71                         SELECT container_uuid
72                         FROM container_requests
73                         WHERE priority > 0
74                          AND state = 'Committed')`)
75         if err != nil {
76                 return fmt.Errorf("update: %w", err)
77         } else if rows, err := res.RowsAffected(); err != nil {
78                 return fmt.Errorf("update: %w", err)
79         } else if rows > 0 {
80                 log.Infof("found %d containers with priority>0 and no active requests, updated to priority=0", rows)
81         }
82
83         // Stage 2: Fix containers that have priority=0 but should
84         // have priority>0 because there are active container_requests
85         // (priority>0, unfinished, and not children of cancelled
86         // containers).
87         //
88         // Fixing here means calling out to RailsAPI to compute the
89         // correct priority for the contianer and (if needed)
90         // propagate that change to child containers.
91
92         // In this loop we look for a single container that needs
93         // fixing, call out to Rails to fix it, and repeat until we
94         // don't find any more.
95         //
96         // We could get a batch of UUIDs that need attention by
97         // increasing LIMIT 1, however, updating priority on one
98         // container typically cascades to other containers, so we
99         // would often end up repeating work.
100         for lastUUID := ""; ; {
101                 var uuid string
102                 err := db.QueryRowxContext(ctx, `
103                         SELECT containers.uuid from containers
104                         JOIN container_requests
105                          ON container_requests.container_uuid = containers.uuid
106                          AND container_requests.state = 'Committed' AND container_requests.priority > 0
107                         LEFT JOIN containers parent
108                          ON parent.uuid = container_requests.requesting_container_uuid
109                         WHERE containers.state IN ('Queued', 'Locked', 'Running')
110                          AND containers.priority = 0
111                          AND (parent.uuid IS NULL OR parent.priority > 0)
112                         ORDER BY containers.created_at
113                         LIMIT 1`).Scan(&uuid)
114                 if err == sql.ErrNoRows {
115                         break
116                 }
117                 if err != nil {
118                         return fmt.Errorf("join: %w", err)
119                 }
120                 if uuid == lastUUID {
121                         // We don't want to keep hammering this
122                         // forever if the ContainerPriorityUpdate call
123                         // didn't achieve anything.
124                         return fmt.Errorf("possible lack of progress: container %s still has priority=0 after updating", uuid)
125                 }
126                 lastUUID = uuid
127                 upd, err := conn.railsProxy.ContainerPriorityUpdate(ctx, arvados.UpdateOptions{UUID: uuid, Select: []string{"uuid", "priority"}})
128                 if err != nil {
129                         return err
130                 }
131                 log.Debugf("updated container %s priority from 0 to %d", uuid, upd.Priority)
132         }
133         return nil
134 }