X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c75359c0116392ea13ebf578c74e495c9e158f62..202d4e06c61dbeaeed7d2e5196cd65c7f788b103:/lib/controller/localdb/container_test.go diff --git a/lib/controller/localdb/container_test.go b/lib/controller/localdb/container_test.go index 437e30b144..65d9fac5bb 100644 --- a/lib/controller/localdb/container_test.go +++ b/lib/controller/localdb/container_test.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "math/rand" + "strings" "sync" "time" @@ -47,7 +48,9 @@ func (s *containerSuite) crAttrs(c *C) map[string]interface{} { } func (s *containerSuite) SetUpTest(c *C) { + containerPriorityUpdateInterval = 2 * time.Second s.localdbSuite.SetUpTest(c) + s.starttime = time.Now() var err error s.topcr, err = s.localdb.ContainerRequestCreate(s.userctx, arvados.CreateOptions{Attrs: s.crAttrs(c)}) c.Assert(err, IsNil) @@ -55,7 +58,11 @@ func (s *containerSuite) SetUpTest(c *C) { c.Assert(err, IsNil) c.Assert(int(s.topc.Priority), Not(Equals), 0) c.Logf("topcr %s topc %s", s.topcr.UUID, s.topc.UUID) - s.starttime = time.Now() +} + +func (s *containerSuite) TearDownTest(c *C) { + containerPriorityUpdateInterval = 5 * time.Minute + s.localdbSuite.TearDownTest(c) } func (s *containerSuite) syncUpdatePriority(c *C) { @@ -94,6 +101,10 @@ func (s *containerSuite) TestUpdatePriorityShouldBeZero(c *C) { } func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) { + testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(30*time.Second)) + defer testCancel() + adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken) + childCR := func(parent arvados.ContainerRequest, arg string) arvados.ContainerRequest { attrs := s.crAttrs(c) attrs["command"] = []string{c.TestName(), fmt.Sprintf("%d", s.starttime.UnixMilli()), arg} @@ -101,6 +112,16 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) { c.Assert(err, IsNil) _, err = s.db.Exec("update container_requests set requesting_container_uuid=$1 where uuid=$2", parent.ContainerUUID, cr.UUID) c.Assert(err, IsNil) + _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{ + UUID: cr.ContainerUUID, + Attrs: map[string]interface{}{"state": "Locked"}, + }) + c.Assert(err, IsNil) + _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{ + UUID: cr.ContainerUUID, + Attrs: map[string]interface{}{"state": "Running"}, + }) + c.Assert(err, IsNil) return cr } // Build a tree of container requests and containers (3 levels @@ -119,38 +140,16 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) { } } - testCtx, testCancel := context.WithDeadline(s.ctx, time.Now().Add(time.Second*20)) - defer testCancel() - // Set priority=0 on a parent+child, plus 18 other randomly // selected containers in the tree - adminCtx := ctrlctx.NewWithToken(testCtx, s.cluster, s.cluster.SystemRootToken) - needfix := make([]int, 20) - running := make(map[int]bool) - for n := range needfix { - var i int // which container are we going to run & then set priority=0 - if n < 2 { - // first two are allcrs[1] (which is "i 0") - // and allcrs[2] (which is "i 0 j 0") - i = n + 1 - } else { - // rest are random - i = rand.Intn(len(allcrs)) - } + // + // First entries of needfix are allcrs[1] (which is "i 0") and + // allcrs[2] ("i 0 j 0") -- we want to make sure to get at + // least one parent/child pair -- and the rest were chosen + // randomly. + needfix := []int{1, 2, 23, 12, 20, 14, 13, 15, 7, 17, 6, 22, 21, 11, 1, 17, 18} + for n, i := range needfix { needfix[n] = i - if !running[i] { - _, err := s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{ - UUID: allcrs[i].ContainerUUID, - Attrs: map[string]interface{}{"state": "Locked"}, - }) - c.Assert(err, IsNil) - _, err = s.localdb.ContainerUpdate(adminCtx, arvados.UpdateOptions{ - UUID: allcrs[i].ContainerUUID, - Attrs: map[string]interface{}{"state": "Running"}, - }) - c.Assert(err, IsNil) - running[i] = true - } res, err := s.db.Exec("update containers set priority=0 where uuid=$1", allcrs[i].ContainerUUID) c.Assert(err, IsNil) updated, err := res.RowsAffected() @@ -195,9 +194,37 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) { c.Assert(err, IsNil) c.Check(priority, Not(Equals), 0) } - chaosCancel() + // Flood railsapi with priority updates. This can cause + // database deadlock: one call acquires row locks in the order + // {i0j0, i0, i0j1}, while another call acquires row locks in + // the order {i0j1, i0, i0j0}. + deadlockCtx, deadlockCancel := context.WithDeadline(adminCtx, time.Now().Add(30*time.Second)) + defer deadlockCancel() + for _, cr := range allcrs { + if strings.Contains(cr.Command[2], " j ") && !strings.Contains(cr.Command[2], " k ") { + wg.Add(1) + go func() { + defer wg.Done() + for _, p := range []int{1, 2, 3, 4} { + var err error + for { + _, err = s.localdb.ContainerRequestUpdate(deadlockCtx, arvados.UpdateOptions{ + UUID: cr.UUID, + Attrs: map[string]interface{}{ + "priority": p, + }, + }) + c.Check(err, IsNil) + break + } + } + }() + } + } + wg.Wait() + // Simulate cascading cancellation of the entire tree. For // this we need a goroutine to notice and cancel containers // with state=Running and priority=0, and cancel them @@ -209,7 +236,7 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) { defer wg.Done() for dispCtx.Err() == nil { needcancel, err := s.localdb.ContainerList(dispCtx, arvados.ListOptions{ - Limit: 1, + Limit: 10, Filters: []arvados.Filter{{"state", "=", "Running"}, {"priority", "=", 0}}, }) if errors.Is(err, context.Canceled) { @@ -225,6 +252,7 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) { }) c.Assert(err, IsNil) } + time.Sleep(time.Second / 10) } }() @@ -240,6 +268,16 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) { for { time.Sleep(time.Second / 2) if testCtx.Err() != nil { + for i, cr := range allcrs { + var ctr arvados.Container + var command string + err = s.db.QueryRowContext(s.ctx, `select cr.priority, cr.state, cr.container_uuid, c.state, c.priority, cr.command + from container_requests cr + left join containers c on cr.container_uuid = c.uuid + where cr.uuid=$1`, cr.UUID).Scan(&cr.Priority, &cr.State, &ctr.UUID, &ctr.State, &ctr.Priority, &command) + c.Check(err, IsNil) + c.Logf("allcrs[%d] cr.pri %d %s c.pri %d %s cr.uuid %s c.uuid %s cmd %s", i, cr.Priority, cr.State, ctr.Priority, ctr.State, cr.UUID, ctr.UUID, command) + } c.Fatal("timed out") } done := true @@ -247,7 +285,8 @@ func (s *containerSuite) TestUpdatePriorityMultiLevelWorkflow(c *C) { var priority int var crstate, command, ctrUUID string var parent sql.NullString - err := s.db.QueryRowContext(s.ctx, "select state, priority, command, container_uuid, requesting_container_uuid from container_requests where uuid=$1", cr.UUID).Scan(&crstate, &priority, &command, &ctrUUID, &parent) + err := s.db.QueryRowContext(s.ctx, `select state, priority, container_uuid, requesting_container_uuid, command + from container_requests where uuid=$1`, cr.UUID).Scan(&crstate, &priority, &ctrUUID, &parent, &command) if errors.Is(err, context.Canceled) { break }