1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/lib/ctrlctx"
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "git.arvados.org/arvados.git/sdk/go/arvadostest"
22 var _ = Suite(&containerSuite{})
24 type containerSuite struct {
26 topcr arvados.ContainerRequest
27 topc arvados.Container
31 func (s *containerSuite) crAttrs(c *C) map[string]interface{} {
32 return map[string]interface{}{
33 "container_image": arvadostest.DockerImage112PDH,
34 "command": []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), "top"},
35 "output_path": "/out",
38 "container_count_max": 1,
39 "runtime_constraints": arvados.RuntimeConstraints{
43 "mounts": map[string]arvados.Mount{
44 "/out": arvados.Mount{},
49 func (s *containerSuite) SetUpTest(c *C) {
50 s.localdbSuite.SetUpTest(c)
52 s.topcr, err = s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: s.crAttrs(c)})
54 s.topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topcr.ContainerUUID})
56 c.Assert(int(s.topc.Priority), Not(Equals), 0)
57 c.Logf("topcr %s topc %s", s.topcr.UUID, s.topc.UUID)
58 s.starttime = time.Now()
61 func (s *containerSuite) syncUpdatePriority(c *C) {
62 // Sending 1x to the "update now" channel starts an update;
63 // sending again fills the channel while the first update is
64 // running; sending a third time blocks until the worker
65 // receives the 2nd send, i.e., guarantees that the first
66 // update has finished.
67 s.localdb.wantContainerPriorityUpdate <- struct{}{}
68 s.localdb.wantContainerPriorityUpdate <- struct{}{}
69 s.localdb.wantContainerPriorityUpdate <- struct{}{}
72 func (s *containerSuite) TestUpdatePriorityShouldBeNonZero(c *C) {
73 _, err := s.db.Exec("update containers set priority=0 where uuid=$1", s.topc.UUID)
75 topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
77 c.Assert(int(topc.Priority), Equals, 0)
78 s.syncUpdatePriority(c)
79 topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
81 c.Check(int(topc.Priority), Not(Equals), 0)
84 func (s *containerSuite) TestUpdatePriorityShouldBeZero(c *C) {
85 _, err := s.db.Exec("update container_requests set priority=0 where uuid=$1", s.topcr.UUID)
87 topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
89 c.Assert(int(topc.Priority), Not(Equals), 0)
90 s.syncUpdatePriority(c)
91 topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
93 c.Check(int(topc.Priority), Equals, 0)
96 func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
97 childCR := func(parent arvados.ContainerRequest, arg string) arvados.ContainerRequest {
99 attrs["command"] = []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), arg}
100 cr, err := s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: attrs})
102 _, err = s.db.Exec("update container_requests set requesting_container_uuid=$1 where uuid=$2", parent.ContainerUUID, cr.UUID)
106 // Build a tree of container requests and containers (3 levels
107 // deep below s.topcr)
108 allcrs := []arvados.ContainerRequest{s.topcr}
109 for i := 0; i < 2; i++ {
110 cri := childCR(s.topcr, fmt.Sprintf("i %d", i))
111 allcrs = append(allcrs, cri)
112 for j := 0; j < 3; j++ {
113 crj := childCR(cri, fmt.Sprintf("i %d j %d", i, j))
114 allcrs = append(allcrs, crj)
115 for k := 0; k < 4; k++ {
116 crk := childCR(crj, fmt.Sprintf("i %d j %d k %d", i, j, k))
117 allcrs = append(allcrs, crk)
122 testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(time.Second*20))
125 // Set priority=0 on a parent+child, plus 18 other randomly
126 // selected containers in the tree
127 adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken)
128 // First entries of needfix are allcrs[1] (which is "i 0") and
129 // allcrs[2] ("i 0 j 0") -- we want to make sure to get at
130 // least one parent/child pair -- and the rest were chosen
133 // Similar randomly chosen sets (e.g., skipping 23 here) are
134 // known to fail. Possibly related to #20240.
135 needfix := []int{1, 2, 23, 12, 20, 14, 13, 15, 7, 17, 28, 6, 26, 22, 21, 11, 1, 17, 18, 5}
136 running := make(map[int]bool)
137 for n, i := range needfix {
140 _, err := s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
141 UUID: allcrs[i].ContainerUUID,
142 Attrs: map[string]interface{}{"state": "Locked"},
145 _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
146 UUID: allcrs[i].ContainerUUID,
147 Attrs: map[string]interface{}{"state": "Running"},
152 res, err := s.db.Exec("update containers set priority=0 where uuid=$1", allcrs[i].ContainerUUID)
154 updated, err := res.RowsAffected()
157 c.Assert(int(updated), Equals, 1)
161 var wg sync.WaitGroup
164 chaosCtx, chaosCancel := context.WithCancel(adminCtx)
169 // Flood the api with ContainerUpdate calls for the
170 // same containers that need to have their priority
172 for chaosCtx.Err() == nil {
173 n := rand.Intn(len(needfix))
174 _, err := s.localdb.ContainerUpdate(chaosCtx, arvados.UpdateOptions{
175 UUID: allcrs[needfix[n]].ContainerUUID,
176 Attrs: map[string]interface{}{
177 "runtime_status": map[string]string{
178 "info": time.Now().Format(time.RFC3339Nano),
182 if !errors.Is(err, context.Canceled) {
187 // Find and fix the containers with wrong priority
188 s.syncUpdatePriority(c)
189 // Ensure they all got fixed
190 for _, cr := range allcrs {
192 err := s.db.QueryRow("select priority from containers where uuid=$1", cr.ContainerUUID).Scan(&priority)
194 c.Check(priority, Not(Equals), 0)
199 // Simulate cascading cancellation of the entire tree. For
200 // this we need a goroutine to notice and cancel containers
201 // with state=Running and priority=0, and cancel them
202 // (this is normally done by a dispatcher).
203 dispCtx, dispCancel := context.WithCancel(adminCtx)
208 for dispCtx.Err() == nil {
209 needcancel, err := s.localdb.ContainerList(dispCtx, arvados.ListOptions{
211 Filters: []arvados.Filter{{"state", "=", "Running"}, {"priority", "=", 0}},
213 if errors.Is(err, context.Canceled) {
217 for _, ctr := range needcancel.Items {
218 _, err := s.localdb.ContainerUpdate(dispCtx, arvados.UpdateOptions{
220 Attrs: map[string]interface{}{
221 "state": "Cancelled",
229 _, err := s.localdb.ContainerRequestUpdate(s.userctx, arvados.UpdateOptions{
231 Attrs: map[string]interface{}{
237 c.Logf("waiting for all %d containers to have priority=0 after cancelling top level CR", len(allcrs))
239 time.Sleep(time.Second / 2)
240 if testCtx.Err() != nil {
244 for _, cr := range allcrs {
246 var crstate, command, ctrUUID string
247 var parent sql.NullString
248 err := s.db.QueryRowContext(s.ctx, "select state, priority, command, container_uuid, requesting_container_uuid from container_requests where uuid=$1", cr.UUID).Scan(&crstate, &priority, &command, &ctrUUID, &parent)
249 if errors.Is(err, context.Canceled) {
253 if crstate == "Committed" && priority > 0 {
254 c.Logf("container request %s (%s; parent=%s) still has state %s priority %d", cr.UUID, command, parent.String, crstate, priority)
258 err = s.db.QueryRowContext(s.ctx, "select priority, command from containers where uuid=$1", cr.ContainerUUID).Scan(&priority, &command)
259 if errors.Is(err, context.Canceled) {
264 c.Logf("container %s (%s) still has priority %d", cr.ContainerUUID, command, priority)
270 c.Logf("success -- all %d containers have priority=0", len(allcrs))