Merge branch '19937-build-packages-fix'. Closes #19937
[arvados.git] / lib / controller / localdb / container_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "database/sql"
10         "errors"
11         "fmt"
12         "math/rand"
13         "sync"
14         "time"
15
16         "git.arvados.org/arvados.git/lib/ctrlctx"
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadostest"
19         . "gopkg.in/check.v1"
20 )
21
22 var _ = Suite(&containerSuite{})
23
24 type containerSuite struct {
25         localdbSuite
26         topcr     arvados.ContainerRequest
27         topc      arvados.Container
28         starttime time.Time
29 }
30
31 func (s *containerSuite) crAttrs(c *C) map[string]interface{} {
32         return map[string]interface{}{
33                 "container_image":     arvadostest.DockerImage112PDH,
34                 "command":             []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), "top"},
35                 "output_path":         "/out",
36                 "priority":            1,
37                 "state":               "Committed",
38                 "container_count_max": 1,
39                 "runtime_constraints": arvados.RuntimeConstraints{
40                         RAM:   1,
41                         VCPUs: 1,
42                 },
43                 "mounts": map[string]arvados.Mount{
44                         "/out": arvados.Mount{},
45                 },
46         }
47 }
48
49 func (s *containerSuite) SetUpTest(c *C) {
50         s.localdbSuite.SetUpTest(c)
51         var err error
52         s.topcr, err = s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: s.crAttrs(c)})
53         c.Assert(err, IsNil)
54         s.topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topcr.ContainerUUID})
55         c.Assert(err, IsNil)
56         c.Assert(int(s.topc.Priority), Not(Equals), 0)
57         c.Logf("topcr %s topc %s", s.topcr.UUID, s.topc.UUID)
58         s.starttime = time.Now()
59 }
60
61 func (s *containerSuite) syncUpdatePriority(c *C) {
62         // Sending 1x to the "update now" channel starts an update;
63         // sending again fills the channel while the first update is
64         // running; sending a third time blocks until the worker
65         // receives the 2nd send, i.e., guarantees that the first
66         // update has finished.
67         s.localdb.wantContainerPriorityUpdate <- struct{}{}
68         s.localdb.wantContainerPriorityUpdate <- struct{}{}
69         s.localdb.wantContainerPriorityUpdate <- struct{}{}
70 }
71
72 func (s *containerSuite) TestUpdatePriorityShouldBeNonZero(c *C) {
73         _, err := s.db.Exec("update containers set priority=0 where uuid=$1", s.topc.UUID)
74         c.Assert(err, IsNil)
75         topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
76         c.Assert(err, IsNil)
77         c.Assert(int(topc.Priority), Equals, 0)
78         s.syncUpdatePriority(c)
79         topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
80         c.Assert(err, IsNil)
81         c.Check(int(topc.Priority), Not(Equals), 0)
82 }
83
84 func (s *containerSuite) TestUpdatePriorityShouldBeZero(c *C) {
85         _, err := s.db.Exec("update container_requests set priority=0 where uuid=$1", s.topcr.UUID)
86         c.Assert(err, IsNil)
87         topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
88         c.Assert(err, IsNil)
89         c.Assert(int(topc.Priority), Not(Equals), 0)
90         s.syncUpdatePriority(c)
91         topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
92         c.Assert(err, IsNil)
93         c.Check(int(topc.Priority), Equals, 0)
94 }
95
96 func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
97         childCR := func(parent arvados.ContainerRequest, arg string) arvados.ContainerRequest {
98                 attrs := s.crAttrs(c)
99                 attrs["command"] = []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), arg}
100                 cr, err := s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: attrs})
101                 c.Assert(err, IsNil)
102                 _, err = s.db.Exec("update container_requests set requesting_container_uuid=$1 where uuid=$2", parent.ContainerUUID, cr.UUID)
103                 c.Assert(err, IsNil)
104                 return cr
105         }
106         // Build a tree of container requests and containers (3 levels
107         // deep below s.topcr)
108         allcrs := []arvados.ContainerRequest{s.topcr}
109         for i := 0; i < 2; i++ {
110                 cri := childCR(s.topcr, fmt.Sprintf("i %d", i))
111                 allcrs = append(allcrs, cri)
112                 for j := 0; j < 3; j++ {
113                         crj := childCR(cri, fmt.Sprintf("i %d j %d", i, j))
114                         allcrs = append(allcrs, crj)
115                         for k := 0; k < 4; k++ {
116                                 crk := childCR(crj, fmt.Sprintf("i %d j %d k %d", i, j, k))
117                                 allcrs = append(allcrs, crk)
118                         }
119                 }
120         }
121
122         testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(time.Second*20))
123         defer testCancel()
124
125         // Set priority=0 on a parent+child, plus 18 other randomly
126         // selected containers in the tree
127         adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken)
128         needfix := make([]int, 20)
129         running := make(map[int]bool)
130         for n := range needfix {
131                 var i int // which container are we going to run & then set priority=0
132                 if n < 2 {
133                         // first two are allcrs[1] (which is "i 0")
134                         // and allcrs[2] (which is "i 0 j 0")
135                         i = n + 1
136                 } else {
137                         // rest are random
138                         i = rand.Intn(len(allcrs))
139                 }
140                 needfix[n] = i
141                 if !running[i] {
142                         _, err := s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
143                                 UUID:  allcrs[i].ContainerUUID,
144                                 Attrs: map[string]interface{}{"state": "Locked"},
145                         })
146                         c.Assert(err, IsNil)
147                         _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
148                                 UUID:  allcrs[i].ContainerUUID,
149                                 Attrs: map[string]interface{}{"state": "Running"},
150                         })
151                         c.Assert(err, IsNil)
152                         running[i] = true
153                 }
154                 res, err := s.db.Exec("update containers set priority=0 where uuid=$1", allcrs[i].ContainerUUID)
155                 c.Assert(err, IsNil)
156                 updated, err := res.RowsAffected()
157                 c.Assert(err, IsNil)
158                 if n == 0 {
159                         c.Assert(int(updated), Equals, 1)
160                 }
161         }
162
163         var wg sync.WaitGroup
164         defer wg.Wait()
165
166         chaosCtx, chaosCancel := context.WithCancel(adminCtx)
167         defer chaosCancel()
168         wg.Add(1)
169         go func() {
170                 defer wg.Done()
171                 // Flood the api with ContainerUpdate calls for the
172                 // same containers that need to have their priority
173                 // fixed
174                 for chaosCtx.Err() == nil {
175                         n := rand.Intn(len(needfix))
176                         _, err := s.localdb.ContainerUpdate(chaosCtx, arvados.UpdateOptions{
177                                 UUID: allcrs[needfix[n]].ContainerUUID,
178                                 Attrs: map[string]interface{}{
179                                         "runtime_status": map[string]string{
180                                                 "info": time.Now().Format(time.RFC3339Nano),
181                                         },
182                                 },
183                         })
184                         if !errors.Is(err, context.Canceled) {
185                                 c.Check(err, IsNil)
186                         }
187                 }
188         }()
189         // Find and fix the containers with wrong priority
190         s.syncUpdatePriority(c)
191         // Ensure they all got fixed
192         for _, cr := range allcrs {
193                 var priority int
194                 err := s.db.QueryRow("select priority from containers where uuid=$1", cr.ContainerUUID).Scan(&priority)
195                 c.Assert(err, IsNil)
196                 c.Check(priority, Not(Equals), 0)
197         }
198
199         chaosCancel()
200
201         // Simulate cascading cancellation of the entire tree. For
202         // this we need a goroutine to notice and cancel containers
203         // with state=Running and priority=0, and cancel them
204         // (this is normally done by a dispatcher).
205         dispCtx, dispCancel := context.WithCancel(adminCtx)
206         defer dispCancel()
207         wg.Add(1)
208         go func() {
209                 defer wg.Done()
210                 for dispCtx.Err() == nil {
211                         needcancel, err := s.localdb.ContainerList(dispCtx, arvados.ListOptions{
212                                 Limit:   1,
213                                 Filters: []arvados.Filter{{"state", "=", "Running"}, {"priority", "=", 0}},
214                         })
215                         if errors.Is(err, context.Canceled) {
216                                 break
217                         }
218                         c.Assert(err, IsNil)
219                         for _, ctr := range needcancel.Items {
220                                 _, err := s.localdb.ContainerUpdate(dispCtx, arvados.UpdateOptions{
221                                         UUID: ctr.UUID,
222                                         Attrs: map[string]interface{}{
223                                                 "state": "Cancelled",
224                                         },
225                                 })
226                                 c.Assert(err, IsNil)
227                         }
228                 }
229         }()
230
231         _, err := s.localdb.ContainerRequestUpdate(s.userctx, arvados.UpdateOptions{
232                 UUID: s.topcr.UUID,
233                 Attrs: map[string]interface{}{
234                         "priority": 0,
235                 },
236         })
237         c.Assert(err, IsNil)
238
239         c.Logf("waiting for all %d containers to have priority=0 after cancelling top level CR", len(allcrs))
240         for {
241                 time.Sleep(time.Second / 2)
242                 if testCtx.Err() != nil {
243                         c.Fatal("timed out")
244                 }
245                 done := true
246                 for _, cr := range allcrs {
247                         var priority int
248                         var crstate, command, ctrUUID string
249                         var parent sql.NullString
250                         err := s.db.QueryRowContext(s.ctx, "select state, priority, command, container_uuid, requesting_container_uuid from container_requests where uuid=$1", cr.UUID).Scan(&crstate, &priority, &command, &ctrUUID, &parent)
251                         if errors.Is(err, context.Canceled) {
252                                 break
253                         }
254                         c.Assert(err, IsNil)
255                         if crstate == "Committed" && priority > 0 {
256                                 c.Logf("container request %s (%s; parent=%s) still has state %s priority %d", cr.UUID, command, parent.String, crstate, priority)
257                                 done = false
258                                 break
259                         }
260                         err = s.db.QueryRowContext(s.ctx, "select priority, command from containers where uuid=$1", cr.ContainerUUID).Scan(&priority, &command)
261                         if errors.Is(err, context.Canceled) {
262                                 break
263                         }
264                         c.Assert(err, IsNil)
265                         if priority > 0 {
266                                 c.Logf("container %s (%s) still has priority %d", cr.ContainerUUID, command, priority)
267                                 done = false
268                                 break
269                         }
270                 }
271                 if done {
272                         c.Logf("success -- all %d containers have priority=0", len(allcrs))
273                         break
274                 }
275         }
276 }