Merge branch '20457-logs-and-mem-usage'
authorTom Clegg <tom@curii.com>
Mon, 1 May 2023 20:11:07 +0000 (16:11 -0400)
committerTom Clegg <tom@curii.com>
Mon, 1 May 2023 20:11:07 +0000 (16:11 -0400)
refs #20457

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/dispatchcloud/container/queue.go
lib/dispatchcloud/container/queue_test.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/test/queue.go
sdk/go/arvados/container.go

index 938ef915f251e4d27e1ea4f714b82f10425d4224..ab686e85c11cc5e94d9ec4af9ec57a093ba600cd 100644 (file)
@@ -233,6 +233,11 @@ func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
 // Caller must have lock.
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
        it, err := cq.chooseType(&ctr)
+
+       // Avoid wasting memory on a large Mounts attr (we don't need
+       // it after choosing type).
+       ctr.Mounts = nil
+
        if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
                // We assume here that any chooseType error is a hard
                // error: it wouldn't help to try again, or to leave
@@ -490,6 +495,16 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
                        break
                }
 
+               // Conserve memory by deleting mounts that aren't
+               // relevant to choosing the instance type.
+               for _, c := range list.Items {
+                       for path, mnt := range c.Mounts {
+                               if mnt.Kind != "tmp" {
+                                       delete(c.Mounts, path)
+                               }
+                       }
+               }
+
                results = append(results, list.Items...)
                if len(params.Order) == 1 && params.Order == "uuid" {
                        params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
index 0075ee324ef8eb1d10a54207af97ddbf6a70b6bf..ca10983534a3579e79e8a02a6601126af9d32ae0 100644 (file)
@@ -41,6 +41,7 @@ func (suite *IntegrationSuite) TearDownTest(c *check.C) {
 
 func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
        typeChooser := func(ctr *arvados.Container) (arvados.InstanceType, error) {
+               c.Check(ctr.Mounts["/tmp"].Capacity, check.Equals, int64(24000000000))
                return arvados.InstanceType{Name: "testType"}, nil
        }
 
@@ -64,6 +65,8 @@ func (suite *IntegrationSuite) TestGetLockUnlockCancel(c *check.C) {
                c.Check(ent.InstanceType.Name, check.Equals, "testType")
                c.Check(ent.Container.State, check.Equals, arvados.ContainerStateQueued)
                c.Check(ent.Container.Priority > 0, check.Equals, true)
+               // Mounts should be deleted to avoid wasting memory
+               c.Check(ent.Container.Mounts, check.IsNil)
 
                ctr, ok := cq.Get(uuid)
                c.Check(ok, check.Equals, true)
index 7454b5784e54a8be36b5c4b027460b3bf72e3d08..273a3836dc4f8f65a82a0ef40c2d8b6d0bfab162 100644 (file)
@@ -105,7 +105,10 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
        // Disable auto-retry
        arvClient.Timeout = 0
 
-       s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusServiceUnavailable) }))
+       s.error503Server = httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               c.Logf("503 stub: returning 503")
+               w.WriteHeader(http.StatusServiceUnavailable)
+       }))
        arvClient.Client = &http.Client{
                Transport: &http.Transport{
                        Proxy: s.arvClientProxy(c),
@@ -136,6 +139,7 @@ func (s *DispatcherSuite) TearDownTest(c *check.C) {
 func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.URL, error) {
        return func(req *http.Request) (*url.URL, error) {
                if req.URL.Path == "/503" {
+                       c.Logf("arvClientProxy: proxying to 503 stub")
                        return url.Parse(s.error503Server.URL)
                } else {
                        return nil, nil
@@ -151,6 +155,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
        queue := &test.Queue{
+               MaxDispatchAttempts: 5,
                ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
                        return ChooseInstanceType(s.cluster, ctr)
                },
@@ -185,6 +190,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                delete(waiting, ctr.UUID)
                if len(waiting) == 100 {
                        // trigger scheduler maxConcurrency limit
+                       c.Logf("test: requesting 503 in order to trigger maxConcurrency limit")
                        s.disp.ArvClient.RequestAndDecode(nil, "GET", "503", nil, nil)
                }
                if len(waiting) == 0 {
index b8158579a3a3a1b30a0eccc80b421f8abe8bdecf..dda3630ee7b88c6888b8eb2d1ba7db4735ebdd74 100644 (file)
@@ -185,7 +185,7 @@ tryrun:
                        _, toolate := running[ctr.UUID]
                        if ctr.State == arvados.ContainerStateLocked && !toolate {
                                logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
-                               logger.Debug("unlock because pool capacity is used by higher priority containers")
+                               logger.Info("unlock because pool capacity is used by higher priority containers")
                                err := sch.queue.Unlock(ctr.UUID)
                                if err != nil {
                                        logger.WithError(err).Warn("error unlocking")
index fcb2cfb33b31627ca85ccadc2c5705c18f1e055e..2be8246bd619640ee7cd6ffd43bfacf42236370c 100644 (file)
@@ -24,6 +24,9 @@ type Queue struct {
        // must not be nil.
        ChooseType func(*arvados.Container) (arvados.InstanceType, error)
 
+       // Mimic railsapi implementation of MaxDispatchAttempts config
+       MaxDispatchAttempts int
+
        Logger logrus.FieldLogger
 
        entries      map[string]container.QueueEnt
@@ -133,7 +136,15 @@ func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error
        q.entries[uuid] = ent
        for i, ctr := range q.Containers {
                if ctr.UUID == uuid {
-                       q.Containers[i].State = to
+                       if max := q.MaxDispatchAttempts; max > 0 && ctr.LockCount >= max && to == arvados.ContainerStateQueued {
+                               q.Containers[i].State = arvados.ContainerStateCancelled
+                               q.Containers[i].RuntimeStatus = map[string]interface{}{"error": fmt.Sprintf("Failed to start: lock_count == %d", ctr.LockCount)}
+                       } else {
+                               q.Containers[i].State = to
+                               if to == arvados.ContainerStateLocked {
+                                       q.Containers[i].LockCount++
+                               }
+                       }
                        break
                }
        }
@@ -157,6 +168,7 @@ func (q *Queue) Update() error {
                        upd[ctr.UUID] = ent
                } else {
                        it, _ := q.ChooseType(&ctr)
+                       ctr.Mounts = nil
                        upd[ctr.UUID] = container.QueueEnt{
                                Container:    ctr,
                                InstanceType: it,
index 7b31726aa06f1d2fcd46eae82f51e6aec97cf290..2467e807a1253e2764ae657bb0ce78ee10399ee1 100644 (file)
@@ -19,6 +19,7 @@ type Container struct {
        Cwd                       string                 `json:"cwd"`
        Environment               map[string]string      `json:"environment"`
        LockedByUUID              string                 `json:"locked_by_uuid"`
+       LockCount                 int                    `json:"lock_count"`
        Mounts                    map[string]Mount       `json:"mounts"`
        Output                    string                 `json:"output"`
        OutputPath                string                 `json:"output_path"`