17756: Move "no suitable instance type" reporting to dispatch lib.
authorTom Clegg <tom@curii.com>
Thu, 22 Jul 2021 14:36:14 +0000 (10:36 -0400)
committerTom Clegg <tom@curii.com>
Thu, 22 Jul 2021 14:36:14 +0000 (10:36 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/lsf/dispatch.go
sdk/go/dispatch/dispatch.go
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index 760002aa05417b54e955011a3ed65320b8d3463e..7461597c45c8e48cee3d105877dc1d16abd7cf0c 100644 (file)
@@ -5,7 +5,6 @@
 package lsf
 
 import (
-       "bytes"
        "context"
        "errors"
        "fmt"
@@ -162,7 +161,7 @@ func (disp *dispatcher) init() {
        }
 }
 
-func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
        ctx, cancel := context.WithCancel(disp.Context)
        defer cancel()
 
@@ -173,38 +172,9 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
                cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...)
-               if err := disp.submit(ctr, cmd); err != nil {
-                       var text string
-                       switch err := err.(type) {
-                       case dispatchcloud.ConstraintsNotSatisfiableError:
-                               var logBuf bytes.Buffer
-                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
-                               if len(err.AvailableTypes) == 0 {
-                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
-                               } else {
-                                       fmt.Fprint(&logBuf, "Available instance types:\n")
-                                       for _, t := range err.AvailableTypes {
-                                               fmt.Fprintf(&logBuf,
-                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
-                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
-                                               )
-                                       }
-                               }
-                               text = logBuf.String()
-                               disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled)
-                       default:
-                               text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err)
-                       }
-                       disp.logger.Print(text)
-
-                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-                               "object_uuid": ctr.UUID,
-                               "event_type":  "dispatch",
-                               "properties":  map[string]string{"text": text}}}
-                       disp.arvDispatcher.Arv.Create("logs", lr, nil)
-
-                       disp.arvDispatcher.Unlock(ctr.UUID)
-                       return
+               err := disp.submit(ctr, cmd)
+               if err != nil {
+                       return err
                }
        }
 
@@ -237,7 +207,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                        case dispatch.Locked:
                                disp.arvDispatcher.Unlock(ctr.UUID)
                        }
-                       return
+                       return nil
                case updated, ok := <-status:
                        if !ok {
                                // status channel is closed, which is
@@ -273,6 +243,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                }
                <-ticker.C
        }
+       return nil
 }
 
 func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
index df43c2b10d9778d7c62befc8a3f7e71babacb168..00c75154f656a70e0b42deed7ef0e34fa7a01d7d 100644 (file)
@@ -7,11 +7,13 @@
 package dispatch
 
 import (
+       "bytes"
        "context"
        "fmt"
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "github.com/sirupsen/logrus"
@@ -66,7 +68,7 @@ type Dispatcher struct {
 // running, and return.
 //
 // The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
 
 // Run watches the API server's queue for containers that are either
 // ready to run and available to lock, or are already locked by this
@@ -170,9 +172,34 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
        }
        tracker.updates <- c
        go func() {
-               d.RunContainer(d, c, tracker.updates)
-               // RunContainer blocks for the lifetime of the container.  When
-               // it returns, the tracker should delete itself.
+               err := d.RunContainer(d, c, tracker.updates)
+               if err != nil {
+                       text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+                       if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+                               var logBuf bytes.Buffer
+                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+                               if len(err.AvailableTypes) == 0 {
+                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
+                               } else {
+                                       fmt.Fprint(&logBuf, "Available instance types:\n")
+                                       for _, t := range err.AvailableTypes {
+                                               fmt.Fprintf(&logBuf,
+                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+                                       }
+                               }
+                               text = logBuf.String()
+                               d.UpdateState(c.UUID, Cancelled)
+                       }
+                       d.Logger.Printf("%s", text)
+                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+                               "object_uuid": c.UUID,
+                               "event_type":  "dispatch",
+                               "properties":  map[string]string{"text": text}}}
+                       d.Arv.Create("logs", lr, nil)
+                       d.Unlock(c.UUID)
+               }
+
                d.mtx.Lock()
                delete(d.trackers, c.UUID)
                d.mtx.Unlock()
index 2922817b557ccef70fa25f32c66b8447575abb5d..1486332382c3e370437542063e69719392a941ee 100644 (file)
@@ -140,7 +140,7 @@ type LocalRun struct {
 // crunch-run terminates, mark the container as Cancelled.
 func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
        container arvados.Container,
-       status <-chan arvados.Container) {
+       status <-chan arvados.Container) error {
 
        uuid := container.UUID
 
@@ -150,7 +150,7 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher,
                case lr.concurrencyLimit <- true:
                        break
                case <-lr.ctx.Done():
-                       return
+                       return lr.ctx.Err()
                }
 
                defer func() { <-lr.concurrencyLimit }()
@@ -241,4 +241,5 @@ Finish:
        }
 
        dispatcher.Logger.Printf("finalized container %v", uuid)
+       return nil
 }
index 5f51134df8a9866a126fc32922787fd2be7c8957..6ec31b1737f5a21004226f4bad31bf8fc504a970 100644 (file)
@@ -81,9 +81,9 @@ func (s *TestSuite) TestIntegration(c *C) {
                return cmd.Start()
        }
 
-       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-               (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
-               cancel()
+       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
+               defer cancel()
+               return (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
        }
 
        err = dispatcher.Run(ctx)
@@ -184,9 +184,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                return cmd.Start()
        }
 
-       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
-               (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
-               cancel()
+       dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error {
+               defer cancel()
+               return (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
        }
 
        re := regexp.MustCompile(`(?ms).*` + expected + `.*`)
index a5899ce8a7cc0809a57b64a9588d8e227846c274..5129495a0656633d146272bf8d60ff9d16a59918 100644 (file)
@@ -7,7 +7,6 @@ package main
 // Dispatcher service for Crunch that submits containers to the slurm queue.
 
 import (
-       "bytes"
        "context"
        "flag"
        "fmt"
@@ -270,7 +269,7 @@ func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []s
 // already in the queue).  Cancel the slurm job if the container's
 // priority changes to zero or its state indicates it's no longer
 // running.
-func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
@@ -278,38 +277,9 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                log.Printf("Submitting container %s to slurm", ctr.UUID)
                cmd := []string{disp.cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
-               if err := disp.submit(ctr, cmd); err != nil {
-                       var text string
-                       switch err := err.(type) {
-                       case dispatchcloud.ConstraintsNotSatisfiableError:
-                               var logBuf bytes.Buffer
-                               fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
-                               if len(err.AvailableTypes) == 0 {
-                                       fmt.Fprint(&logBuf, "No instance types are configured.\n")
-                               } else {
-                                       fmt.Fprint(&logBuf, "Available instance types:\n")
-                                       for _, t := range err.AvailableTypes {
-                                               fmt.Fprintf(&logBuf,
-                                                       "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
-                                                       t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
-                                               )
-                                       }
-                               }
-                               text = logBuf.String()
-                               disp.UpdateState(ctr.UUID, dispatch.Cancelled)
-                       default:
-                               text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
-                       }
-                       log.Print(text)
-
-                       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
-                               "object_uuid": ctr.UUID,
-                               "event_type":  "dispatch",
-                               "properties":  map[string]string{"text": text}}}
-                       disp.Arv.Create("logs", lr, nil)
-
-                       disp.Unlock(ctr.UUID)
-                       return
+               err := disp.submit(ctr, cmd)
+               if err != nil {
+                       return err
                }
        }
 
@@ -338,7 +308,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                        case dispatch.Locked:
                                disp.Unlock(ctr.UUID)
                        }
-                       return
+                       return nil
                case updated, ok := <-status:
                        if !ok {
                                log.Printf("container %s is done: cancel slurm job", ctr.UUID)
index 480434de65d291fad8ac2ac8ff8fbb6092ece327..e7a89db23c8743b3e2934a5c26f12a446a5bd6a9 100644 (file)
@@ -104,7 +104,7 @@ func (sf *slurmFake) Cancel(name string) error {
 
 func (s *IntegrationSuite) integrationTest(c *C,
        expectBatch [][]string,
-       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+       runContainer func(*dispatch.Dispatcher, arvados.Container)) (arvados.Container, error) {
        arvadostest.ResetEnv()
 
        arv, err := arvadosclient.MakeArvadosClient()
@@ -123,18 +123,21 @@ func (s *IntegrationSuite) integrationTest(c *C,
 
        ctx, cancel := context.WithCancel(context.Background())
        doneRun := make(chan struct{})
+       doneDispatch := make(chan error)
 
        s.disp.Dispatcher = &dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Second,
-               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
                        go func() {
                                runContainer(disp, ctr)
                                s.slurm.queue = ""
                                doneRun <- struct{}{}
                        }()
-                       s.disp.runContainer(disp, ctr, status)
+                       err := s.disp.runContainer(disp, ctr, status)
                        cancel()
+                       doneDispatch <- err
+                       return nil
                },
        }
 
@@ -148,6 +151,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
        err = s.disp.Dispatcher.Run(ctx)
        <-doneRun
        c.Assert(err, Equals, context.Canceled)
+       errDispatch := <-doneDispatch
 
        s.disp.sqCheck.Stop()
 
@@ -162,12 +166,12 @@ func (s *IntegrationSuite) integrationTest(c *C,
        var container arvados.Container
        err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
        c.Check(err, IsNil)
-       return container
+       return container, errDispatch
 }
 
 func (s *IntegrationSuite) TestNormal(c *C) {
        s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
-       container := s.integrationTest(c,
+       container, _ := s.integrationTest(c,
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -181,7 +185,7 @@ func (s *IntegrationSuite) TestCancel(c *C) {
        s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"}
        readyToCancel := make(chan bool)
        s.slurm.onCancel = func() { <-readyToCancel }
-       container := s.integrationTest(c,
+       container, _ := s.integrationTest(c,
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -199,7 +203,7 @@ func (s *IntegrationSuite) TestCancel(c *C) {
 }
 
 func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
-       container := s.integrationTest(c,
+       container, _ := s.integrationTest(c,
                [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--nice=%d", 10000),
@@ -218,24 +222,14 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
 
 func (s *IntegrationSuite) TestSbatchFail(c *C) {
        s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
-       container := s.integrationTest(c,
+       container, err := s.integrationTest(c,
                [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
-
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, IsNil)
-
-       var ll arvados.LogList
-       err = arv.List("logs", arvadosclient.Dict{"filters": [][]string{
-               {"object_uuid", "=", container.UUID},
-               {"event_type", "=", "dispatch"},
-       }}, &ll)
-       c.Assert(err, IsNil)
-       c.Assert(len(ll.Items), Equals, 1)
+       c.Check(err, ErrorMatches, `something terrible happened`)
 }
 
 type StubbedSuite struct {
@@ -280,7 +274,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Second,
-               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error {
                        go func() {
                                time.Sleep(time.Second)
                                disp.UpdateState(ctr.UUID, dispatch.Running)
@@ -288,6 +282,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
                        }()
                        s.disp.runContainer(disp, ctr, status)
                        cancel()
+                       return nil
                },
        }