1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/lib/ctrlctx"
18 "git.arvados.org/arvados.git/sdk/go/arvados"
19 "git.arvados.org/arvados.git/sdk/go/arvadostest"
23 var _ = Suite(&containerSuite{})
25 type containerSuite struct {
27 topcr arvados.ContainerRequest
28 topc arvados.Container
32 func (s *containerSuite) crAttrs(c *C) map[string]interface{} {
33 return map[string]interface{}{
34 "container_image": arvadostest.DockerImage112PDH,
35 "command": []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), "top"},
36 "output_path": "/out",
39 "container_count_max": 1,
40 "runtime_constraints": arvados.RuntimeConstraints{
44 "mounts": map[string]arvados.Mount{
45 "/out": arvados.Mount{},
50 func (s *containerSuite) SetUpTest(c *C) {
51 containerPriorityUpdateInterval = 2 * time.Second
52 s.localdbSuite.SetUpTest(c)
53 s.starttime = time.Now()
55 s.topcr, err = s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: s.crAttrs(c)})
57 s.topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topcr.ContainerUUID})
59 c.Assert(int(s.topc.Priority), Not(Equals), 0)
60 c.Logf("topcr %s topc %s", s.topcr.UUID, s.topc.UUID)
63 func (s *containerSuite) TearDownTest(c *C) {
64 containerPriorityUpdateInterval = 5 * time.Minute
65 s.localdbSuite.TearDownTest(c)
68 func (s *containerSuite) syncUpdatePriority(c *C) {
69 // Sending 1x to the "update now" channel starts an update;
70 // sending again fills the channel while the first update is
71 // running; sending a third time blocks until the worker
72 // receives the 2nd send, i.e., guarantees that the first
73 // update has finished.
74 s.localdb.wantContainerPriorityUpdate <- struct{}{}
75 s.localdb.wantContainerPriorityUpdate <- struct{}{}
76 s.localdb.wantContainerPriorityUpdate <- struct{}{}
79 func (s *containerSuite) TestUpdatePriorityShouldBeNonZero(c *C) {
80 _, err := s.db.Exec("update containers set priority=0 where uuid=$1", s.topc.UUID)
82 topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
84 c.Assert(int(topc.Priority), Equals, 0)
85 s.syncUpdatePriority(c)
86 topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
88 c.Check(int(topc.Priority), Not(Equals), 0)
91 func (s *containerSuite) TestUpdatePriorityShouldBeZero(c *C) {
92 _, err := s.db.Exec("update container_requests set priority=0 where uuid=$1", s.topcr.UUID)
94 topc, err := s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
96 c.Assert(int(topc.Priority), Not(Equals), 0)
97 s.syncUpdatePriority(c)
98 topc, err = s.localdb.ContainerGet(s.userctx, arvados.GetOptions{UUID: s.topc.UUID})
100 c.Check(int(topc.Priority), Equals, 0)
103 func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) {
104 testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(30*time.Second))
106 adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken)
108 childCR := func(parent arvados.ContainerRequest, arg string) arvados.ContainerRequest {
109 attrs := s.crAttrs(c)
110 attrs["command"] = []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), arg}
111 cr, err := s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: attrs})
113 _, err = s.db.Exec("update container_requests set requesting_container_uuid=$1 where uuid=$2", parent.ContainerUUID, cr.UUID)
115 _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
116 UUID: cr.ContainerUUID,
117 Attrs: map[string]interface{}{"state": "Locked"},
120 _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{
121 UUID: cr.ContainerUUID,
122 Attrs: map[string]interface{}{"state": "Running"},
127 // Build a tree of container requests and containers (3 levels
128 // deep below s.topcr)
129 allcrs := []arvados.ContainerRequest{s.topcr}
130 for i := 0; i < 2; i++ {
131 cri := childCR(s.topcr, fmt.Sprintf("i %d", i))
132 allcrs = append(allcrs, cri)
133 for j := 0; j < 3; j++ {
134 crj := childCR(cri, fmt.Sprintf("i %d j %d", i, j))
135 allcrs = append(allcrs, crj)
136 for k := 0; k < 4; k++ {
137 crk := childCR(crj, fmt.Sprintf("i %d j %d k %d", i, j, k))
138 allcrs = append(allcrs, crk)
143 // Set priority=0 on a parent+child, plus 18 other randomly
144 // selected containers in the tree
146 // First entries of needfix are allcrs[1] (which is "i 0") and
147 // allcrs[2] ("i 0 j 0") -- we want to make sure to get at
148 // least one parent/child pair -- and the rest were chosen
150 needfix := []int{1, 2, 23, 12, 20, 14, 13, 15, 7, 17, 6, 22, 21, 11, 1, 17, 18}
151 for n, i := range needfix {
153 res, err := s.db.Exec("update containers set priority=0 where uuid=$1", allcrs[i].ContainerUUID)
155 updated, err := res.RowsAffected()
158 c.Assert(int(updated), Equals, 1)
162 var wg sync.WaitGroup
165 chaosCtx, chaosCancel := context.WithCancel(adminCtx)
170 // Flood the api with ContainerUpdate calls for the
171 // same containers that need to have their priority
173 for chaosCtx.Err() == nil {
174 n := rand.Intn(len(needfix))
175 _, err := s.localdb.ContainerUpdate(chaosCtx, arvados.UpdateOptions{
176 UUID: allcrs[needfix[n]].ContainerUUID,
177 Attrs: map[string]interface{}{
178 "runtime_status": map[string]string{
179 "info": time.Now().Format(time.RFC3339Nano),
183 if !errors.Is(err, context.Canceled) {
188 // Find and fix the containers with wrong priority
189 s.syncUpdatePriority(c)
190 // Ensure they all got fixed
191 for _, cr := range allcrs {
193 err := s.db.QueryRow("select priority from containers where uuid=$1", cr.ContainerUUID).Scan(&priority)
195 c.Check(priority, Not(Equals), 0)
199 // Flood railsapi with priority updates. This can cause
200 // database deadlock: one call acquires row locks in the order
201 // {i0j0, i0, i0j1}, while another call acquires row locks in
202 // the order {i0j1, i0, i0j0}.
203 deadlockCtx, deadlockCancel := context.WithDeadline(adminCtx, time.Now().Add(30*time.Second))
204 defer deadlockCancel()
205 for _, cr := range allcrs {
206 if strings.Contains(cr.Command[2], " j ") && !strings.Contains(cr.Command[2], " k ") {
211 for _, p := range []int{1, 2, 3, 4} {
214 _, err = s.localdb.ContainerRequestUpdate(deadlockCtx, arvados.UpdateOptions{
216 Attrs: map[string]interface{}{
229 // Simulate cascading cancellation of the entire tree. For
230 // this we need a goroutine to notice and cancel containers
231 // with state=Running and priority=0, and cancel them
232 // (this is normally done by a dispatcher).
233 dispCtx, dispCancel := context.WithCancel(adminCtx)
238 for dispCtx.Err() == nil {
239 needcancel, err := s.localdb.ContainerList(dispCtx, arvados.ListOptions{
241 Filters: []arvados.Filter{{"state", "=", "Running"}, {"priority", "=", 0}},
243 if errors.Is(err, context.Canceled) {
247 for _, ctr := range needcancel.Items {
248 _, err := s.localdb.ContainerUpdate(dispCtx, arvados.UpdateOptions{
250 Attrs: map[string]interface{}{
251 "state": "Cancelled",
254 if errors.Is(err, context.Canceled) {
259 time.Sleep(time.Second / 10)
263 _, err := s.localdb.ContainerRequestUpdate(s.userctx, arvados.UpdateOptions{
265 Attrs: map[string]interface{}{
271 c.Logf("waiting for all %d containers to have priority=0 after cancelling top level CR", len(allcrs))
273 time.Sleep(time.Second / 2)
274 if testCtx.Err() != nil {
275 for i, cr := range allcrs {
276 var ctr arvados.Container
278 err = s.db.QueryRowContext(s.ctx, `select cr.priority, cr.state, cr.container_uuid, c.state, c.priority, cr.command
279 from container_requests cr
280 left join containers c on cr.container_uuid = c.uuid
281 where cr.uuid=$1`, cr.UUID).Scan(&cr.Priority, &cr.State, &ctr.UUID, &ctr.State, &ctr.Priority, &command)
283 c.Logf("allcrs[%d] cr.pri %d %s c.pri %d %s cr.uuid %s c.uuid %s cmd %s", i, cr.Priority, cr.State, ctr.Priority, ctr.State, cr.UUID, ctr.UUID, command)
288 for _, cr := range allcrs {
290 var crstate, command, ctrUUID string
291 var parent sql.NullString
292 err := s.db.QueryRowContext(s.ctx, `select state, priority, container_uuid, requesting_container_uuid, command
293 from container_requests where uuid=$1`, cr.UUID).Scan(&crstate, &priority, &ctrUUID, &parent, &command)
294 if errors.Is(err, context.Canceled) {
298 if crstate == "Committed" && priority > 0 {
299 c.Logf("container request %s (%s; parent=%s) still has state %s priority %d", cr.UUID, command, parent.String, crstate, priority)
303 err = s.db.QueryRowContext(s.ctx, "select priority, command from containers where uuid=$1", cr.ContainerUUID).Scan(&priority, &command)
304 if errors.Is(err, context.Canceled) {
309 c.Logf("container %s (%s) still has priority %d", cr.ContainerUUID, command, priority)
315 c.Logf("success -- all %d containers have priority=0", len(allcrs))