1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 "git.arvados.org/arvados.git/lib/ctrlctx"
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "git.arvados.org/arvados.git/sdk/go/arvadostest"
22 var _ = Suite(&containerSuite{})
24 type containerSuite struct {
26 topcr arvados.ContainerRequest
27 topc arvados.Container
31 func (s *containerSuite) crAttrs(c *C) map[string]interface{} {
32 return map[string]interface{}{
33 "container_image": arvadostest.DockerImage112PDH,
34 "command": []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), "top"},
35 "output_path": "/out",
38 "container_count_max": 1,
39 "runtime_constraints": arvados.RuntimeConstraints{
43 "mounts": map[string]arvados.Mount{
44 "/out": arvados.Mount{},
49 func (s *containerSuite) SetUpTest(c *C) {
50 s.localdbSuite.SetUpTest(c)
52 s.topcr, err = s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: s.crAttrs(c)})
54 s.topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topcr.ContainerUUID})
56 c.Assert(int(s.topc.Priority), Not(Equals), 0)
57 c.Logf("topcr %s topc %s", s.topcr.UUID, s.topc.UUID)
58 s.starttime = time.Now()
61 func (s *containerSuite) syncUpdatePriority(c *C) {
62 // Sending 1x to the "update now" channel starts an update;
63 // sending again fills the channel while the first update is
64 // running; sending a third time blocks until the worker
65 // receives the 2nd send, i.e., guarantees that the first
66 // update has finished.
67 s.localdb.wantContainerPriorityUpdate <- struct{}{}
68 s.localdb.wantContainerPriorityUpdate <- struct{}{}
69 s.localdb.wantContainerPriorityUpdate <- struct{}{}
72 func (s *containerSuite) TestUpdatePriorityShouldBeNonZero(c *C) {
73 _, err := s.db.Exec("update containers set priority=0 where uuid=$1", s.topc.UUID)
75 topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
77 c.Assert(int(topc.Priority), Equals, 0)
78 s.syncUpdatePriority(c)
79 topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
81 c.Check(int(topc.Priority), Not(Equals), 0)
84 func (s *containerSuite) TestUpdatePriorityShouldBeZero(c *C) {
85 _, err := s.db.Exec("update container_requests set priority=0 where uuid=$1", s.topcr.UUID)
87 topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
89 c.Assert(int(topc.Priority), Not(Equals), 0)
90 s.syncUpdatePriority(c)
91 topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
93 c.Check(int(topc.Priority), Equals, 0)
96 func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
97 childCR := func(parent arvados.ContainerRequest, arg string) arvados.ContainerRequest {
99 attrs["command"] = []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), arg}
100 cr, err := s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: attrs})
102 _, err = s.db.Exec("update container_requests set requesting_container_uuid=$1 where uuid=$2", parent.ContainerUUID, cr.UUID)
106 // Build a tree of container requests and containers (3 levels
107 // deep below s.topcr)
108 allcrs := []arvados.ContainerRequest{s.topcr}
109 for i := 0; i < 2; i++ {
110 cri := childCR(s.topcr, fmt.Sprintf("i %d", i))
111 allcrs = append(allcrs, cri)
112 for j := 0; j < 3; j++ {
113 crj := childCR(cri, fmt.Sprintf("i %d j %d", i, j))
114 allcrs = append(allcrs, crj)
115 for k := 0; k < 4; k++ {
116 crk := childCR(crj, fmt.Sprintf("i %d j %d k %d", i, j, k))
117 allcrs = append(allcrs, crk)
122 testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(time.Second*20))
125 // Set priority=0 on a parent+child, plus 18 other randomly
126 // selected containers in the tree
127 adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken)
128 needfix := make([]int, 20)
129 running := make(map[int]bool)
130 for n := range needfix {
131 var i int // which container are we going to run & then set priority=0
133 // first two are allcrs[1] (which is "i 0")
134 // and allcrs[2] (which is "i 0 j 0")
138 i = rand.Intn(len(allcrs))
142 _, err := s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
143 UUID: allcrs[i].ContainerUUID,
144 Attrs: map[string]interface{}{"state": "Locked"},
147 _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
148 UUID: allcrs[i].ContainerUUID,
149 Attrs: map[string]interface{}{"state": "Running"},
154 res, err := s.db.Exec("update containers set priority=0 where uuid=$1", allcrs[i].ContainerUUID)
156 updated, err := res.RowsAffected()
159 c.Assert(int(updated), Equals, 1)
163 var wg sync.WaitGroup
166 chaosCtx, chaosCancel := context.WithCancel(adminCtx)
171 // Flood the api with ContainerUpdate calls for the
172 // same containers that need to have their priority
174 for chaosCtx.Err() == nil {
175 n := rand.Intn(len(needfix))
176 _, err := s.localdb.ContainerUpdate(chaosCtx, arvados.UpdateOptions{
177 UUID: allcrs[needfix[n]].ContainerUUID,
178 Attrs: map[string]interface{}{
179 "runtime_status": map[string]string{
180 "info": time.Now().Format(time.RFC3339Nano),
184 if !errors.Is(err, context.Canceled) {
189 // Find and fix the containers with wrong priority
190 s.syncUpdatePriority(c)
191 // Ensure they all got fixed
192 for _, cr := range allcrs {
194 err := s.db.QueryRow("select priority from containers where uuid=$1", cr.ContainerUUID).Scan(&priority)
196 c.Check(priority, Not(Equals), 0)
201 // Simulate cascading cancellation of the entire tree. For
202 // this we need a goroutine to notice and cancel containers
203 // with state=Running and priority=0, and cancel them
204 // (this is normally done by a dispatcher).
205 dispCtx, dispCancel := context.WithCancel(adminCtx)
210 for dispCtx.Err() == nil {
211 needcancel, err := s.localdb.ContainerList(dispCtx, arvados.ListOptions{
213 Filters: []arvados.Filter{{"state", "=", "Running"}, {"priority", "=", 0}},
215 if errors.Is(err, context.Canceled) {
219 for _, ctr := range needcancel.Items {
220 _, err := s.localdb.ContainerUpdate(dispCtx, arvados.UpdateOptions{
222 Attrs: map[string]interface{}{
223 "state": "Cancelled",
231 _, err := s.localdb.ContainerRequestUpdate(s.userctx, arvados.UpdateOptions{
233 Attrs: map[string]interface{}{
239 c.Logf("waiting for all %d containers to have priority=0 after cancelling top level CR", len(allcrs))
241 time.Sleep(time.Second / 2)
242 if testCtx.Err() != nil {
246 for _, cr := range allcrs {
248 var crstate, command, ctrUUID string
249 var parent sql.NullString
250 err := s.db.QueryRowContext(s.ctx, "select state, priority, command, container_uuid, requesting_container_uuid from container_requests where uuid=$1", cr.UUID).Scan(&crstate, &priority, &command, &ctrUUID, &parent)
251 if errors.Is(err, context.Canceled) {
255 if crstate == "Committed" && priority > 0 {
256 c.Logf("container request %s (%s; parent=%s) still has state %s priority %d", cr.UUID, command, parent.String, crstate, priority)
260 err = s.db.QueryRowContext(s.ctx, "select priority, command from containers where uuid=$1", cr.ContainerUUID).Scan(&priority, &command)
261 if errors.Is(err, context.Canceled) {
266 c.Logf("container %s (%s) still has priority %d", cr.ContainerUUID, command, priority)
272 c.Logf("success -- all %d containers have priority=0", len(allcrs))