From 0f784d657527c998e7cd1d7aee8cbd8f0d75e04a Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 22 Jul 2021 10:36:14 -0400 Subject: [PATCH] 17756: Move "no suitable instance type" reporting to dispatch lib. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/lsf/dispatch.go | 41 +++---------------- sdk/go/dispatch/dispatch.go | 35 ++++++++++++++-- .../crunch-dispatch-local.go | 5 ++- .../crunch-dispatch-local_test.go | 12 +++--- .../crunch-dispatch-slurm.go | 40 +++--------------- .../crunch-dispatch-slurm_test.go | 35 +++++++--------- 6 files changed, 66 insertions(+), 102 deletions(-) diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go index 760002aa05..7461597c45 100644 --- a/lib/lsf/dispatch.go +++ b/lib/lsf/dispatch.go @@ -5,7 +5,6 @@ package lsf import ( - "bytes" "context" "errors" "fmt" @@ -162,7 +161,7 @@ func (disp *dispatcher) init() { } } -func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { +func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error { ctx, cancel := context.WithCancel(disp.Context) defer cancel() @@ -173,38 +172,9 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain cmd := []string{disp.Cluster.Containers.CrunchRunCommand} cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine) cmd = append(cmd, disp.Cluster.Containers.CrunchRunArgumentsList...) - if err := disp.submit(ctr, cmd); err != nil { - var text string - switch err := err.(type) { - case dispatchcloud.ConstraintsNotSatisfiableError: - var logBuf bytes.Buffer - fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err) - if len(err.AvailableTypes) == 0 { - fmt.Fprint(&logBuf, "No instance types are configured.\n") - } else { - fmt.Fprint(&logBuf, "Available instance types:\n") - for _, t := range err.AvailableTypes { - fmt.Fprintf(&logBuf, - "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", - t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price, - ) - } - } - text = logBuf.String() - disp.arvDispatcher.UpdateState(ctr.UUID, dispatch.Cancelled) - default: - text = fmt.Sprintf("Error submitting container %s to LSF: %s", ctr.UUID, err) - } - disp.logger.Print(text) - - lr := arvadosclient.Dict{"log": arvadosclient.Dict{ - "object_uuid": ctr.UUID, - "event_type": "dispatch", - "properties": map[string]string{"text": text}}} - disp.arvDispatcher.Arv.Create("logs", lr, nil) - - disp.arvDispatcher.Unlock(ctr.UUID) - return + err := disp.submit(ctr, cmd) + if err != nil { + return err } } @@ -237,7 +207,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain case dispatch.Locked: disp.arvDispatcher.Unlock(ctr.UUID) } - return + return nil case updated, ok := <-status: if !ok { // status channel is closed, which is @@ -273,6 +243,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain } <-ticker.C } + return nil } func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []string) error { diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index df43c2b10d..00c75154f6 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -7,11 +7,13 @@ package dispatch import ( + "bytes" "context" "fmt" "sync" "time" + "git.arvados.org/arvados.git/lib/dispatchcloud" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "github.com/sirupsen/logrus" @@ -66,7 +68,7 @@ type Dispatcher struct { // running, and return. // // The DispatchFunc should not return until the container is finished. -type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error // Run watches the API server's queue for containers that are either // ready to run and available to lock, or are already locked by this @@ -170,9 +172,34 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } tracker.updates <- c go func() { - d.RunContainer(d, c, tracker.updates) - // RunContainer blocks for the lifetime of the container. When - // it returns, the tracker should delete itself. + err := d.RunContainer(d, c, tracker.updates) + if err != nil { + text := fmt.Sprintf("Error running container %s: %s", c.UUID, err) + if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok { + var logBuf bytes.Buffer + fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err) + if len(err.AvailableTypes) == 0 { + fmt.Fprint(&logBuf, "No instance types are configured.\n") + } else { + fmt.Fprint(&logBuf, "Available instance types:\n") + for _, t := range err.AvailableTypes { + fmt.Fprintf(&logBuf, + "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", + t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price) + } + } + text = logBuf.String() + d.UpdateState(c.UUID, Cancelled) + } + d.Logger.Printf("%s", text) + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": c.UUID, + "event_type": "dispatch", + "properties": map[string]string{"text": text}}} + d.Arv.Create("logs", lr, nil) + d.Unlock(c.UUID) + } + d.mtx.Lock() delete(d.trackers, c.UUID) d.mtx.Unlock() diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index 2922817b55..1486332382 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -140,7 +140,7 @@ type LocalRun struct { // crunch-run terminates, mark the container as Cancelled. func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher, container arvados.Container, - status <-chan arvados.Container) { + status <-chan arvados.Container) error { uuid := container.UUID @@ -150,7 +150,7 @@ func (lr *LocalRun) run(dispatcher *dispatch.Dispatcher, case lr.concurrencyLimit <- true: break case <-lr.ctx.Done(): - return + return lr.ctx.Err() } defer func() { <-lr.concurrencyLimit }() @@ -241,4 +241,5 @@ Finish: } dispatcher.Logger.Printf("finalized container %v", uuid) + return nil } diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go index 5f51134df8..6ec31b1737 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go @@ -81,9 +81,9 @@ func (s *TestSuite) TestIntegration(c *C) { return cmd.Start() } - dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) { - (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s) - cancel() + dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error { + defer cancel() + return (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s) } err = dispatcher.Run(ctx) @@ -184,9 +184,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon return cmd.Start() } - dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) { - (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s) - cancel() + dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) error { + defer cancel() + return (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s) } re := regexp.MustCompile(`(?ms).*` + expected + `.*`) diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index a5899ce8a7..5129495a06 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -7,7 +7,6 @@ package main // Dispatcher service for Crunch that submits containers to the slurm queue. import ( - "bytes" "context" "flag" "fmt" @@ -270,7 +269,7 @@ func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []s // already in the queue). Cancel the slurm job if the container's // priority changes to zero or its state indicates it's no longer // running. -func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { +func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -278,38 +277,9 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain log.Printf("Submitting container %s to slurm", ctr.UUID) cmd := []string{disp.cluster.Containers.CrunchRunCommand} cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...) - if err := disp.submit(ctr, cmd); err != nil { - var text string - switch err := err.(type) { - case dispatchcloud.ConstraintsNotSatisfiableError: - var logBuf bytes.Buffer - fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err) - if len(err.AvailableTypes) == 0 { - fmt.Fprint(&logBuf, "No instance types are configured.\n") - } else { - fmt.Fprint(&logBuf, "Available instance types:\n") - for _, t := range err.AvailableTypes { - fmt.Fprintf(&logBuf, - "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", - t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price, - ) - } - } - text = logBuf.String() - disp.UpdateState(ctr.UUID, dispatch.Cancelled) - default: - text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err) - } - log.Print(text) - - lr := arvadosclient.Dict{"log": arvadosclient.Dict{ - "object_uuid": ctr.UUID, - "event_type": "dispatch", - "properties": map[string]string{"text": text}}} - disp.Arv.Create("logs", lr, nil) - - disp.Unlock(ctr.UUID) - return + err := disp.submit(ctr, cmd) + if err != nil { + return err } } @@ -338,7 +308,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain case dispatch.Locked: disp.Unlock(ctr.UUID) } - return + return nil case updated, ok := <-status: if !ok { log.Printf("container %s is done: cancel slurm job", ctr.UUID) diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go index 480434de65..e7a89db23c 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go @@ -104,7 +104,7 @@ func (sf *slurmFake) Cancel(name string) error { func (s *IntegrationSuite) integrationTest(c *C, expectBatch [][]string, - runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container { + runContainer func(*dispatch.Dispatcher, arvados.Container)) (arvados.Container, error) { arvadostest.ResetEnv() arv, err := arvadosclient.MakeArvadosClient() @@ -123,18 +123,21 @@ func (s *IntegrationSuite) integrationTest(c *C, ctx, cancel := context.WithCancel(context.Background()) doneRun := make(chan struct{}) + doneDispatch := make(chan error) s.disp.Dispatcher = &dispatch.Dispatcher{ Arv: arv, PollPeriod: time.Second, - RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { + RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error { go func() { runContainer(disp, ctr) s.slurm.queue = "" doneRun <- struct{}{} }() - s.disp.runContainer(disp, ctr, status) + err := s.disp.runContainer(disp, ctr, status) cancel() + doneDispatch <- err + return nil }, } @@ -148,6 +151,7 @@ func (s *IntegrationSuite) integrationTest(c *C, err = s.disp.Dispatcher.Run(ctx) <-doneRun c.Assert(err, Equals, context.Canceled) + errDispatch := <-doneDispatch s.disp.sqCheck.Stop() @@ -162,12 +166,12 @@ func (s *IntegrationSuite) integrationTest(c *C, var container arvados.Container err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container) c.Check(err, IsNil) - return container + return container, errDispatch } func (s *IntegrationSuite) TestNormal(c *C) { s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"} - container := s.integrationTest(c, + container, _ := s.integrationTest(c, nil, func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) @@ -181,7 +185,7 @@ func (s *IntegrationSuite) TestCancel(c *C) { s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100 PENDING Resources\n"} readyToCancel := make(chan bool) s.slurm.onCancel = func() { <-readyToCancel } - container := s.integrationTest(c, + container, _ := s.integrationTest(c, nil, func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) @@ -199,7 +203,7 @@ func (s *IntegrationSuite) TestCancel(c *C) { } func (s *IntegrationSuite) TestMissingFromSqueue(c *C) { - container := s.integrationTest(c, + container, _ := s.integrationTest(c, [][]string{{ fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"), fmt.Sprintf("--nice=%d", 10000), @@ -218,24 +222,14 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) { func (s *IntegrationSuite) TestSbatchFail(c *C) { s.slurm = slurmFake{errBatch: errors.New("something terrible happened")} - container := s.integrationTest(c, + container, err := s.integrationTest(c, [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}}, func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) dispatcher.UpdateState(container.UUID, dispatch.Complete) }) c.Check(container.State, Equals, arvados.ContainerStateComplete) - - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, IsNil) - - var ll arvados.LogList - err = arv.List("logs", arvadosclient.Dict{"filters": [][]string{ - {"object_uuid", "=", container.UUID}, - {"event_type", "=", "dispatch"}, - }}, &ll) - c.Assert(err, IsNil) - c.Assert(len(ll.Items), Equals, 1) + c.Check(err, ErrorMatches, `something terrible happened`) } type StubbedSuite struct { @@ -280,7 +274,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva dispatcher := dispatch.Dispatcher{ Arv: arv, PollPeriod: time.Second, - RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { + RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) error { go func() { time.Sleep(time.Second) disp.UpdateState(ctr.UUID, dispatch.Running) @@ -288,6 +282,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva }() s.disp.runContainer(disp, ctr, status) cancel() + return nil }, } -- 2.30.2