18790: Improve status update messages.
[arvados.git] / lib / controller / localdb / container_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "database/sql"
10         "errors"
11         "fmt"
12         "math/rand"
13         "sync"
14         "time"
15
16         "git.arvados.org/arvados.git/lib/ctrlctx"
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadostest"
19         . "gopkg.in/check.v1"
20 )
21
22 var _ = Suite(&containerSuite{})
23
24 type containerSuite struct {
25         localdbSuite
26         topcr     arvados.ContainerRequest
27         topc      arvados.Container
28         starttime time.Time
29 }
30
31 func (s *containerSuite) crAttrs(c *C) map[string]interface{} {
32         return map[string]interface{}{
33                 "container_image":     arvadostest.DockerImage112PDH,
34                 "command":             []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), "top"},
35                 "output_path":         "/out",
36                 "priority":            1,
37                 "state":               "Committed",
38                 "container_count_max": 1,
39                 "runtime_constraints": arvados.RuntimeConstraints{
40                         RAM:   1,
41                         VCPUs: 1,
42                 },
43                 "mounts": map[string]arvados.Mount{
44                         "/out": arvados.Mount{},
45                 },
46         }
47 }
48
49 func (s *containerSuite) SetUpTest(c *C) {
50         s.localdbSuite.SetUpTest(c)
51         var err error
52         s.topcr, err = s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: s.crAttrs(c)})
53         c.Assert(err, IsNil)
54         s.topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topcr.ContainerUUID})
55         c.Assert(err, IsNil)
56         c.Assert(int(s.topc.Priority), Not(Equals), 0)
57         c.Logf("topcr %s topc %s", s.topcr.UUID, s.topc.UUID)
58         s.starttime = time.Now()
59 }
60
61 func (s *containerSuite) syncUpdatePriority(c *C) {
62         // Sending 1x to the "update now" channel starts an update;
63         // sending again fills the channel while the first update is
64         // running; sending a third time blocks until the worker
65         // receives the 2nd send, i.e., guarantees that the first
66         // update has finished.
67         s.localdb.wantContainerPriorityUpdate <- struct{}{}
68         s.localdb.wantContainerPriorityUpdate <- struct{}{}
69         s.localdb.wantContainerPriorityUpdate <- struct{}{}
70 }
71
72 func (s *containerSuite) TestUpdatePriorityShouldBeNonZero(c *C) {
73         _, err := s.db.Exec("update containers set priority=0 where uuid=$1", s.topc.UUID)
74         c.Assert(err, IsNil)
75         topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
76         c.Assert(err, IsNil)
77         c.Assert(int(topc.Priority), Equals, 0)
78         s.syncUpdatePriority(c)
79         topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
80         c.Assert(err, IsNil)
81         c.Check(int(topc.Priority), Not(Equals), 0)
82 }
83
84 func (s *containerSuite) TestUpdatePriorityShouldBeZero(c *C) {
85         _, err := s.db.Exec("update container_requests set priority=0 where uuid=$1", s.topcr.UUID)
86         c.Assert(err, IsNil)
87         topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
88         c.Assert(err, IsNil)
89         c.Assert(int(topc.Priority), Not(Equals), 0)
90         s.syncUpdatePriority(c)
91         topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
92         c.Assert(err, IsNil)
93         c.Check(int(topc.Priority), Equals, 0)
94 }
95
96 func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
97         childCR := func(parent arvados.ContainerRequest, arg string) arvados.ContainerRequest {
98                 attrs := s.crAttrs(c)
99                 attrs["command"] = []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), arg}
100                 cr, err := s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: attrs})
101                 c.Assert(err, IsNil)
102                 _, err = s.db.Exec("update container_requests set requesting_container_uuid=$1 where uuid=$2", parent.ContainerUUID, cr.UUID)
103                 c.Assert(err, IsNil)
104                 return cr
105         }
106         // Build a tree of container requests and containers (3 levels
107         // deep below s.topcr)
108         allcrs := []arvados.ContainerRequest{s.topcr}
109         for i := 0; i < 2; i++ {
110                 cri := childCR(s.topcr, fmt.Sprintf("i %d", i))
111                 allcrs = append(allcrs, cri)
112                 for j := 0; j < 3; j++ {
113                         crj := childCR(cri, fmt.Sprintf("i %d j %d", i, j))
114                         allcrs = append(allcrs, crj)
115                         for k := 0; k < 4; k++ {
116                                 crk := childCR(crj, fmt.Sprintf("i %d j %d k %d", i, j, k))
117                                 allcrs = append(allcrs, crk)
118                         }
119                 }
120         }
121
122         testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(time.Second*20))
123         defer testCancel()
124
125         // Set priority=0 on a parent+child, plus 18 other randomly
126         // selected containers in the tree
127         adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken)
128         // First entries of needfix are allcrs[1] (which is "i 0") and
129         // allcrs[2] ("i 0 j 0") -- we want to make sure to get at
130         // least one parent/child pair -- and the rest were chosen
131         // randomly.
132         //
133         // Similar randomly chosen sets (e.g., skipping 23 here) are
134         // known to fail. Possibly related to #20240.
135         needfix := []int{1, 2, 23, 12, 20, 14, 13, 15, 7, 17, 28, 6, 26, 22, 21, 11, 1, 17, 18, 5}
136         running := make(map[int]bool)
137         for n, i := range needfix {
138                 needfix[n] = i
139                 if !running[i] {
140                         _, err := s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
141                                 UUID:  allcrs[i].ContainerUUID,
142                                 Attrs: map[string]interface{}{"state": "Locked"},
143                         })
144                         c.Assert(err, IsNil)
145                         _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
146                                 UUID:  allcrs[i].ContainerUUID,
147                                 Attrs: map[string]interface{}{"state": "Running"},
148                         })
149                         c.Assert(err, IsNil)
150                         running[i] = true
151                 }
152                 res, err := s.db.Exec("update containers set priority=0 where uuid=$1", allcrs[i].ContainerUUID)
153                 c.Assert(err, IsNil)
154                 updated, err := res.RowsAffected()
155                 c.Assert(err, IsNil)
156                 if n == 0 {
157                         c.Assert(int(updated), Equals, 1)
158                 }
159         }
160
161         var wg sync.WaitGroup
162         defer wg.Wait()
163
164         chaosCtx, chaosCancel := context.WithCancel(adminCtx)
165         defer chaosCancel()
166         wg.Add(1)
167         go func() {
168                 defer wg.Done()
169                 // Flood the api with ContainerUpdate calls for the
170                 // same containers that need to have their priority
171                 // fixed
172                 for chaosCtx.Err() == nil {
173                         n := rand.Intn(len(needfix))
174                         _, err := s.localdb.ContainerUpdate(chaosCtx, arvados.UpdateOptions{
175                                 UUID: allcrs[needfix[n]].ContainerUUID,
176                                 Attrs: map[string]interface{}{
177                                         "runtime_status": map[string]string{
178                                                 "info": time.Now().Format(time.RFC3339Nano),
179                                         },
180                                 },
181                         })
182                         if !errors.Is(err, context.Canceled) {
183                                 c.Check(err, IsNil)
184                         }
185                 }
186         }()
187         // Find and fix the containers with wrong priority
188         s.syncUpdatePriority(c)
189         // Ensure they all got fixed
190         for _, cr := range allcrs {
191                 var priority int
192                 err := s.db.QueryRow("select priority from containers where uuid=$1", cr.ContainerUUID).Scan(&priority)
193                 c.Assert(err, IsNil)
194                 c.Check(priority, Not(Equals), 0)
195         }
196
197         chaosCancel()
198
199         // Simulate cascading cancellation of the entire tree. For
200         // this we need a goroutine to notice and cancel containers
201         // with state=Running and priority=0, and cancel them
202         // (this is normally done by a dispatcher).
203         dispCtx, dispCancel := context.WithCancel(adminCtx)
204         defer dispCancel()
205         wg.Add(1)
206         go func() {
207                 defer wg.Done()
208                 for dispCtx.Err() == nil {
209                         needcancel, err := s.localdb.ContainerList(dispCtx, arvados.ListOptions{
210                                 Limit:   1,
211                                 Filters: []arvados.Filter{{"state", "=", "Running"}, {"priority", "=", 0}},
212                         })
213                         if errors.Is(err, context.Canceled) {
214                                 break
215                         }
216                         c.Assert(err, IsNil)
217                         for _, ctr := range needcancel.Items {
218                                 _, err := s.localdb.ContainerUpdate(dispCtx, arvados.UpdateOptions{
219                                         UUID: ctr.UUID,
220                                         Attrs: map[string]interface{}{
221                                                 "state": "Cancelled",
222                                         },
223                                 })
224                                 c.Assert(err, IsNil)
225                         }
226                 }
227         }()
228
229         _, err := s.localdb.ContainerRequestUpdate(s.userctx, arvados.UpdateOptions{
230                 UUID: s.topcr.UUID,
231                 Attrs: map[string]interface{}{
232                         "priority": 0,
233                 },
234         })
235         c.Assert(err, IsNil)
236
237         c.Logf("waiting for all %d containers to have priority=0 after cancelling top level CR", len(allcrs))
238         for {
239                 time.Sleep(time.Second / 2)
240                 if testCtx.Err() != nil {
241                         c.Fatal("timed out")
242                 }
243                 done := true
244                 for _, cr := range allcrs {
245                         var priority int
246                         var crstate, command, ctrUUID string
247                         var parent sql.NullString
248                         err := s.db.QueryRowContext(s.ctx, "select state, priority, command, container_uuid, requesting_container_uuid from container_requests where uuid=$1", cr.UUID).Scan(&crstate, &priority, &command, &ctrUUID, &parent)
249                         if errors.Is(err, context.Canceled) {
250                                 break
251                         }
252                         c.Assert(err, IsNil)
253                         if crstate == "Committed" && priority > 0 {
254                                 c.Logf("container request %s (%s; parent=%s) still has state %s priority %d", cr.UUID, command, parent.String, crstate, priority)
255                                 done = false
256                                 break
257                         }
258                         err = s.db.QueryRowContext(s.ctx, "select priority, command from containers where uuid=$1", cr.ContainerUUID).Scan(&priority, &command)
259                         if errors.Is(err, context.Canceled) {
260                                 break
261                         }
262                         c.Assert(err, IsNil)
263                         if priority > 0 {
264                                 c.Logf("container %s (%s) still has priority %d", cr.ContainerUUID, command, priority)
265                                 done = false
266                                 break
267                         }
268                 }
269                 if done {
270                         c.Logf("success -- all %d containers have priority=0", len(allcrs))
271                         break
272                 }
273         }
274 }