Merge branch '12684-pysdk-auto-retry'
[arvados.git] / lib / controller / localdb / container_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package localdb
6
7 import (
8         "context"
9         "database/sql"
10         "errors"
11         "fmt"
12         "math/rand"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/ctrlctx"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/arvadostest"
20         . "gopkg.in/check.v1"
21 )
22
23 var _ = Suite(&containerSuite{})
24
25 type containerSuite struct {
26         localdbSuite
27         topcr     arvados.ContainerRequest
28         topc      arvados.Container
29         starttime time.Time
30 }
31
32 func (s *containerSuite) crAttrs(c *C) map[string]interface{} {
33         return map[string]interface{}{
34                 "container_image":     arvadostest.DockerImage112PDH,
35                 "command":             []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), "top"},
36                 "output_path":         "/out",
37                 "priority":            1,
38                 "state":               "Committed",
39                 "container_count_max": 1,
40                 "runtime_constraints": arvados.RuntimeConstraints{
41                         RAM:   1,
42                         VCPUs: 1,
43                 },
44                 "mounts": map[string]arvados.Mount{
45                         "/out": arvados.Mount{},
46                 },
47         }
48 }
49
50 func (s *containerSuite) SetUpTest(c *C) {
51         containerPriorityUpdateInterval = 2 * time.Second
52         s.localdbSuite.SetUpTest(c)
53         s.starttime = time.Now()
54         var err error
55         s.topcr, err = s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: s.crAttrs(c)})
56         c.Assert(err, IsNil)
57         s.topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topcr.ContainerUUID})
58         c.Assert(err, IsNil)
59         c.Assert(int(s.topc.Priority), Not(Equals), 0)
60         c.Logf("topcr %s topc %s", s.topcr.UUID, s.topc.UUID)
61 }
62
63 func (s *containerSuite) TearDownTest(c *C) {
64         containerPriorityUpdateInterval = 5 * time.Minute
65         s.localdbSuite.TearDownTest(c)
66 }
67
68 func (s *containerSuite) syncUpdatePriority(c *C) {
69         // Sending 1x to the "update now" channel starts an update;
70         // sending again fills the channel while the first update is
71         // running; sending a third time blocks until the worker
72         // receives the 2nd send, i.e., guarantees that the first
73         // update has finished.
74         s.localdb.wantContainerPriorityUpdate <- struct{}{}
75         s.localdb.wantContainerPriorityUpdate <- struct{}{}
76         s.localdb.wantContainerPriorityUpdate <- struct{}{}
77 }
78
79 func (s *containerSuite) TestUpdatePriorityShouldBeNonZero(c *C) {
80         _, err := s.db.Exec("update containers set priority=0 where uuid=$1", s.topc.UUID)
81         c.Assert(err, IsNil)
82         topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
83         c.Assert(err, IsNil)
84         c.Assert(int(topc.Priority), Equals, 0)
85         s.syncUpdatePriority(c)
86         topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
87         c.Assert(err, IsNil)
88         c.Check(int(topc.Priority), Not(Equals), 0)
89 }
90
91 func (s *containerSuite) TestUpdatePriorityShouldBeZero(c *C) {
92         _, err := s.db.Exec("update container_requests set priority=0 where uuid=$1", s.topcr.UUID)
93         c.Assert(err, IsNil)
94         topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
95         c.Assert(err, IsNil)
96         c.Assert(int(topc.Priority), Not(Equals), 0)
97         s.syncUpdatePriority(c)
98         topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
99         c.Assert(err, IsNil)
100         c.Check(int(topc.Priority), Equals, 0)
101 }
102
103 func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
104         testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(30*time.Second))
105         defer testCancel()
106         adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken)
107
108         childCR := func(parent arvados.ContainerRequest, arg string) arvados.ContainerRequest {
109                 attrs := s.crAttrs(c)
110                 attrs["command"] = []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), arg}
111                 cr, err := s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: attrs})
112                 c.Assert(err, IsNil)
113                 _, err = s.db.Exec("update container_requests set requesting_container_uuid=$1 where uuid=$2", parent.ContainerUUID, cr.UUID)
114                 c.Assert(err, IsNil)
115                 _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
116                         UUID:  cr.ContainerUUID,
117                         Attrs: map[string]interface{}{"state": "Locked"},
118                 })
119                 c.Assert(err, IsNil)
120                 _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
121                         UUID:  cr.ContainerUUID,
122                         Attrs: map[string]interface{}{"state": "Running"},
123                 })
124                 c.Assert(err, IsNil)
125                 return cr
126         }
127         // Build a tree of container requests and containers (3 levels
128         // deep below s.topcr)
129         allcrs := []arvados.ContainerRequest{s.topcr}
130         for i := 0; i < 2; i++ {
131                 cri := childCR(s.topcr, fmt.Sprintf("i %d", i))
132                 allcrs = append(allcrs, cri)
133                 for j := 0; j < 3; j++ {
134                         crj := childCR(cri, fmt.Sprintf("i %d j %d", i, j))
135                         allcrs = append(allcrs, crj)
136                         for k := 0; k < 4; k++ {
137                                 crk := childCR(crj, fmt.Sprintf("i %d j %d k %d", i, j, k))
138                                 allcrs = append(allcrs, crk)
139                         }
140                 }
141         }
142
143         // Set priority=0 on a parent+child, plus 18 other randomly
144         // selected containers in the tree
145         //
146         // First entries of needfix are allcrs[1] (which is "i 0") and
147         // allcrs[2] ("i 0 j 0") -- we want to make sure to get at
148         // least one parent/child pair -- and the rest were chosen
149         // randomly.
150         needfix := []int{1, 2, 23, 12, 20, 14, 13, 15, 7, 17, 6, 22, 21, 11, 1, 17, 18}
151         for n, i := range needfix {
152                 needfix[n] = i
153                 res, err := s.db.Exec("update containers set priority=0 where uuid=$1", allcrs[i].ContainerUUID)
154                 c.Assert(err, IsNil)
155                 updated, err := res.RowsAffected()
156                 c.Assert(err, IsNil)
157                 if n == 0 {
158                         c.Assert(int(updated), Equals, 1)
159                 }
160         }
161
162         var wg sync.WaitGroup
163         defer wg.Wait()
164
165         chaosCtx, chaosCancel := context.WithCancel(adminCtx)
166         defer chaosCancel()
167         wg.Add(1)
168         go func() {
169                 defer wg.Done()
170                 // Flood the api with ContainerUpdate calls for the
171                 // same containers that need to have their priority
172                 // fixed
173                 for chaosCtx.Err() == nil {
174                         n := rand.Intn(len(needfix))
175                         _, err := s.localdb.ContainerUpdate(chaosCtx, arvados.UpdateOptions{
176                                 UUID: allcrs[needfix[n]].ContainerUUID,
177                                 Attrs: map[string]interface{}{
178                                         "runtime_status": map[string]string{
179                                                 "info": time.Now().Format(time.RFC3339Nano),
180                                         },
181                                 },
182                         })
183                         if !errors.Is(err, context.Canceled) {
184                                 c.Check(err, IsNil)
185                         }
186                 }
187         }()
188         // Find and fix the containers with wrong priority
189         s.syncUpdatePriority(c)
190         // Ensure they all got fixed
191         for _, cr := range allcrs {
192                 var priority int
193                 err := s.db.QueryRow("select priority from containers where uuid=$1", cr.ContainerUUID).Scan(&priority)
194                 c.Assert(err, IsNil)
195                 c.Check(priority, Not(Equals), 0)
196         }
197         chaosCancel()
198
199         // Flood railsapi with priority updates. This can cause
200         // database deadlock: one call acquires row locks in the order
201         // {i0j0, i0, i0j1}, while another call acquires row locks in
202         // the order {i0j1, i0, i0j0}.
203         deadlockCtx, deadlockCancel := context.WithDeadline(adminCtx, time.Now().Add(30*time.Second))
204         defer deadlockCancel()
205         for _, cr := range allcrs {
206                 if strings.Contains(cr.Command[2], " j ") && !strings.Contains(cr.Command[2], " k ") {
207                         wg.Add(1)
208                         go func() {
209                                 defer wg.Done()
210                                 for _, p := range []int{1, 2, 3, 4} {
211                                         var err error
212                                         for {
213                                                 _, err = s.localdb.ContainerRequestUpdate(deadlockCtx, arvados.UpdateOptions{
214                                                         UUID: cr.UUID,
215                                                         Attrs: map[string]interface{}{
216                                                                 "priority": p,
217                                                         },
218                                                 })
219                                                 c.Check(err, IsNil)
220                                                 break
221                                         }
222                                 }
223                         }()
224                 }
225         }
226         wg.Wait()
227
228         // Simulate cascading cancellation of the entire tree. For
229         // this we need a goroutine to notice and cancel containers
230         // with state=Running and priority=0, and cancel them
231         // (this is normally done by a dispatcher).
232         dispCtx, dispCancel := context.WithCancel(adminCtx)
233         defer dispCancel()
234         wg.Add(1)
235         go func() {
236                 defer wg.Done()
237                 for dispCtx.Err() == nil {
238                         needcancel, err := s.localdb.ContainerList(dispCtx, arvados.ListOptions{
239                                 Limit:   10,
240                                 Filters: []arvados.Filter{{"state", "=", "Running"}, {"priority", "=", 0}},
241                         })
242                         if errors.Is(err, context.Canceled) {
243                                 break
244                         }
245                         c.Assert(err, IsNil)
246                         for _, ctr := range needcancel.Items {
247                                 _, err := s.localdb.ContainerUpdate(dispCtx, arvados.UpdateOptions{
248                                         UUID: ctr.UUID,
249                                         Attrs: map[string]interface{}{
250                                                 "state": "Cancelled",
251                                         },
252                                 })
253                                 c.Assert(err, IsNil)
254                         }
255                         time.Sleep(time.Second / 10)
256                 }
257         }()
258
259         _, err := s.localdb.ContainerRequestUpdate(s.userctx, arvados.UpdateOptions{
260                 UUID: s.topcr.UUID,
261                 Attrs: map[string]interface{}{
262                         "priority": 0,
263                 },
264         })
265         c.Assert(err, IsNil)
266
267         c.Logf("waiting for all %d containers to have priority=0 after cancelling top level CR", len(allcrs))
268         for {
269                 time.Sleep(time.Second / 2)
270                 if testCtx.Err() != nil {
271                         for i, cr := range allcrs {
272                                 var ctr arvados.Container
273                                 var command string
274                                 err = s.db.QueryRowContext(s.ctx, `select cr.priority, cr.state, cr.container_uuid, c.state, c.priority, cr.command
275                                         from container_requests cr
276                                         left join containers c on cr.container_uuid = c.uuid
277                                         where cr.uuid=$1`, cr.UUID).Scan(&cr.Priority, &cr.State, &ctr.UUID, &ctr.State, &ctr.Priority, &command)
278                                 c.Check(err, IsNil)
279                                 c.Logf("allcrs[%d] cr.pri %d %s c.pri %d %s cr.uuid %s c.uuid %s cmd %s", i, cr.Priority, cr.State, ctr.Priority, ctr.State, cr.UUID, ctr.UUID, command)
280                         }
281                         c.Fatal("timed out")
282                 }
283                 done := true
284                 for _, cr := range allcrs {
285                         var priority int
286                         var crstate, command, ctrUUID string
287                         var parent sql.NullString
288                         err := s.db.QueryRowContext(s.ctx, `select state, priority, container_uuid, requesting_container_uuid, command
289                                 from container_requests where uuid=$1`, cr.UUID).Scan(&crstate, &priority, &ctrUUID, &parent, &command)
290                         if errors.Is(err, context.Canceled) {
291                                 break
292                         }
293                         c.Assert(err, IsNil)
294                         if crstate == "Committed" && priority > 0 {
295                                 c.Logf("container request %s (%s; parent=%s) still has state %s priority %d", cr.UUID, command, parent.String, crstate, priority)
296                                 done = false
297                                 break
298                         }
299                         err = s.db.QueryRowContext(s.ctx, "select priority, command from containers where uuid=$1", cr.ContainerUUID).Scan(&priority, &command)
300                         if errors.Is(err, context.Canceled) {
301                                 break
302                         }
303                         c.Assert(err, IsNil)
304                         if priority > 0 {
305                                 c.Logf("container %s (%s) still has priority %d", cr.ContainerUUID, command, priority)
306                                 done = false
307                                 break
308                         }
309                 }
310                 if done {
311                         c.Logf("success -- all %d containers have priority=0", len(allcrs))
312                         break
313                 }
314         }
315 }