From: Tom Clegg Date: Fri, 10 Feb 2017 06:25:14 +0000 (-0500) Subject: 10701: Refactor dispatch library. X-Git-Tag: 1.1.0~423^2~6 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/ddee3839f8a82b889f84171e2354108cb20f93e0?ds=sidebyside 10701: Refactor dispatch library. --- diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index ce960c0772..7342c3b8cb 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -4,11 +4,14 @@ package dispatch import ( - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "context" + "fmt" "log" "sync" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" ) const ( @@ -19,231 +22,173 @@ const ( Cancelled = arvados.ContainerStateCancelled ) -// Dispatcher holds the state of the dispatcher -type Dispatcher struct { - // The Arvados client - Arv *arvadosclient.ArvadosClient - - // When a new queued container appears and is either already owned by - // this dispatcher or is successfully locked, the dispatcher will call - // go RunContainer(). The RunContainer() goroutine gets a channel over - // which it will receive updates to the container state. The - // RunContainer() goroutine should only assume status updates come when - // the container record changes on the API server; if it needs to - // monitor the job submission to the underlying slurm/grid engine/etc - // queue it should spin up its own polling goroutines. When the - // channel is closed, that means the container is no longer being - // handled by this dispatcher and the goroutine should terminate. The - // goroutine is responsible for draining the 'status' channel, failure - // to do so may deadlock the dispatcher. - RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container) - - // Amount of time to wait between polling for updates. - PollPeriod time.Duration - - // Minimum time between two attempts to run the same container - MinRetryPeriod time.Duration - - mineMutex sync.Mutex - mineMap map[string]chan arvados.Container - Auth arvados.APIClientAuthorization - - throttle throttle - - stop chan struct{} +type runner struct { + closing bool + updates chan arvados.Container } -// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones -// for which this process is actively starting/monitoring. Returns channel to -// be used to send container status updates. -func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - if ch, ok := dispatcher.mineMap[uuid]; ok { - return ch +func (ex *runner) close() { + if !ex.closing { + close(ex.updates) } - - ch := make(chan arvados.Container) - dispatcher.mineMap[uuid] = ch - return ch + ex.closing = true } -// Release a container which is no longer being monitored. -func (dispatcher *Dispatcher) notMine(uuid string) { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - if ch, ok := dispatcher.mineMap[uuid]; ok { - close(ch) - delete(dispatcher.mineMap, uuid) +func (ex *runner) update(c arvados.Container) { + if ex.closing { + return } -} - -// checkMine returns true if there is a channel for updates associated -// with container c. If update is true, also send the container record on -// the channel. -func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool { - dispatcher.mineMutex.Lock() - defer dispatcher.mineMutex.Unlock() - ch, ok := dispatcher.mineMap[c.UUID] - if ok { - if update { - ch <- c - } - return true + select { + case <-ex.updates: + log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID) + default: } - return false + ex.updates <- c } -func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) { - var containers arvados.ContainerList - err := dispatcher.Arv.List("containers", params, &containers) - if err != nil { - log.Printf("Error getting list of containers: %q", err) - return - } +type Dispatcher struct { + Arv *arvadosclient.ArvadosClient + PollPeriod time.Duration + MinRetryPeriod time.Duration + RunContainer Runner - if containers.ItemsAvailable > len(containers.Items) { - // TODO: support paging - log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.", - containers.ItemsAvailable, - len(containers.Items)) - } - for _, container := range containers.Items { - touched[container.UUID] = true - dispatcher.handleUpdate(container) - } + auth arvados.APIClientAuthorization + mtx sync.Mutex + running map[string]*runner + throttle throttle } -func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) { - ticker := time.NewTicker(dispatcher.PollPeriod) - defer ticker.Stop() +// A Runner executes a container. If it starts any goroutines, it must +// not return until it can guarantee that none of those goroutines +// will do anything with this container. +type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container) - paramsQ := arvadosclient.Dict{ - "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}}, - "order": []string{"priority desc"}, - "limit": "1000"} - paramsP := arvadosclient.Dict{ - "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}}, - "limit": "1000"} +func (d *Dispatcher) Run(ctx context.Context) error { + err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) + if err != nil { + return fmt.Errorf("error getting my token UUID: %v", err) + } + + poll := time.NewTicker(d.PollPeriod) + defer poll.Stop() for { - touched := make(map[string]bool) - dispatcher.getContainers(paramsQ, touched) - dispatcher.getContainers(paramsP, touched) - dispatcher.mineMutex.Lock() - var monitored []string - for k := range dispatcher.mineMap { - if _, ok := touched[k]; !ok { - monitored = append(monitored, k) - } + running := make([]string, 0, len(d.running)) + d.mtx.Lock() + for uuid := range d.running { + running = append(running, uuid) } - dispatcher.mineMutex.Unlock() - if monitored != nil { - dispatcher.getContainers(arvadosclient.Dict{ - "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched) + d.mtx.Unlock() + if len(running) == 0 { + // API bug: ["uuid", "not in", []] does not match everything + running = []string{"X"} } + d.checkForUpdates([][]interface{}{ + {"uuid", "in", running}}) + d.checkForUpdates([][]interface{}{ + {"state", "=", Queued}, + {"priority", ">", "0"}, + {"uuid", "not in", running}}) + d.checkForUpdates([][]interface{}{ + {"locked_by_uuid", "=", d.auth.UUID}, + {"uuid", "not in", running}}) select { - case <-ticker.C: - case <-stop: - return + case <-poll.C: + continue + case <-ctx.Done(): + return ctx.Err() } } } -func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) { - if container.State == Queued && dispatcher.checkMine(container, false) { - // If we previously started the job, something failed, and it - // was re-queued, this dispatcher might still be monitoring it. - // Stop the existing monitor, then try to lock and run it - // again. - dispatcher.notMine(container.UUID) - } +func (d *Dispatcher) start(c arvados.Container) *runner { + ex := &runner{ + updates: make(chan arvados.Container, 1), + } + if d.running == nil { + d.running = make(map[string]*runner) + } + d.running[c.UUID] = ex + go func() { + d.RunContainer(d, c, ex.updates) + d.mtx.Lock() + delete(d.running, c.UUID) + d.mtx.Unlock() + }() + return ex +} - if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued { - // If container is Complete, Cancelled, or Queued, LockedByUUID - // will be nil. If the container was formerly Locked, moved - // back to Queued and then locked by another dispatcher, - // LockedByUUID will be different. In either case, we want - // to stop monitoring it. - log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID) - dispatcher.notMine(container.UUID) - return - } +func (d *Dispatcher) checkForUpdates(filters [][]interface{}) { + params := arvadosclient.Dict{ + "filters": filters, + "order": []string{"priority desc"}, + "limit": "1000"} - if dispatcher.checkMine(container, true) { - // Already monitored, sent status update + var list arvados.ContainerList + err := d.Arv.List("containers", params, &list) + if err != nil { + log.Printf("Error getting list of containers: %q", err) return } - if container.State == Queued && container.Priority > 0 { - if !dispatcher.throttle.Check(container.UUID) { - return - } - // Try to take the lock - if err := dispatcher.Lock(container.UUID); err != nil { - return + if list.ItemsAvailable > len(list.Items) { + // TODO: support paging + log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.", + list.ItemsAvailable, + len(list.Items)) + } + + d.mtx.Lock() + defer d.mtx.Unlock() + for _, c := range list.Items { + ex, running := d.running[c.UUID] + if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { + log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) + } else if running { + switch c.State { + case Queued: + ex.close() + case Locked, Running: + ex.update(c) + case Cancelled, Complete: + ex.close() + } + } else { + switch c.State { + case Queued: + if err := d.lock(c.UUID); err != nil { + log.Printf("Error locking container %s: %s", c.UUID, err) + } else { + c.State = Locked + d.start(c).update(c) + } + case Locked, Running: + d.start(c).update(c) + case Cancelled, Complete: + ex.close() + } } - container.State = Locked - } - - if container.State == Locked || container.State == Running { - // Not currently monitored but in Locked or Running state and - // owned by this dispatcher, so start monitoring. - go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID)) } } // UpdateState makes an API call to change the state of a container. -func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error { - err := dispatcher.Arv.Update("containers", uuid, +func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error { + err := d.Arv.Update("containers", uuid, arvadosclient.Dict{ - "container": arvadosclient.Dict{"state": newState}}, - nil) + "container": arvadosclient.Dict{"state": state}, + }, nil) if err != nil { - log.Printf("Error updating container %s to state %q: %q", uuid, newState, err) + log.Printf("Error updating container %s to state %q: %s", uuid, state, err) } return err } // Lock makes the lock API call which updates the state of a container to Locked. -func (dispatcher *Dispatcher) Lock(uuid string) error { - err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil) - if err != nil { - log.Printf("Error locking container %s: %q", uuid, err) - } - return err +func (d *Dispatcher) lock(uuid string) error { + return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil) } // Unlock makes the unlock API call which updates the state of a container to Queued. -func (dispatcher *Dispatcher) Unlock(uuid string) error { - err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) - if err != nil { - log.Printf("Error unlocking container %s: %q", uuid, err) - } - return err -} - -// Stop causes Run to return after the current polling cycle. -func (dispatcher *Dispatcher) Stop() { - if dispatcher.stop == nil { - // already stopped - return - } - close(dispatcher.stop) - dispatcher.stop = nil -} - -// Run runs the main loop of the dispatcher. -func (dispatcher *Dispatcher) Run() (err error) { - err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth) - if err != nil { - log.Printf("Error getting my token UUID: %v", err) - return - } - - dispatcher.mineMap = make(map[string]chan arvados.Container) - dispatcher.stop = make(chan struct{}) - dispatcher.throttle.hold = dispatcher.MinRetryPeriod - dispatcher.pollContainers(dispatcher.stop) - return nil +func (d *Dispatcher) Unlock(uuid string) error { + return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) } diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index bb3c05c7eb..22f7d8b646 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -3,10 +3,8 @@ package main // Dispatcher service for Crunch that runs containers locally. import ( + "context" "flag" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/dispatch" "log" "os" "os/exec" @@ -14,6 +12,10 @@ import ( "sync" "syscall" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/dispatch" ) func main() { @@ -61,7 +63,8 @@ func doMain() error { PollPeriod: time.Duration(*pollInterval) * time.Second, } - err = dispatcher.Run() + ctx, cancel := context.WithCancel(context.Background()) + err = dispatcher.Run(ctx) if err != nil { return err } @@ -72,7 +75,7 @@ func doMain() error { log.Printf("Received %s, shutting down", sig) signal.Stop(c) - dispatcher.Stop() + cancel() runningCmdsMutex.Lock() // Finished dispatching; interrupt any crunch jobs that are still running @@ -103,7 +106,7 @@ var startCmd = startFunc // crunch-run terminates, mark the container as Cancelled. func run(dispatcher *dispatch.Dispatcher, container arvados.Container, - status chan arvados.Container) { + status <-chan arvados.Container) { uuid := container.UUID @@ -170,8 +173,7 @@ func run(dispatcher *dispatch.Dispatcher, if err != nil { log.Printf("Error getting final container state: %v", err) } - if container.LockedByUUID == dispatcher.Auth.UUID && - (container.State == dispatch.Locked || container.State == dispatch.Running) { + if container.State == dispatch.Locked || container.State == dispatch.Running { log.Printf("After %s process termination, container state for %v is %q. Updating it to %q", *crunchRunCommand, container.State, uuid, dispatch.Cancelled) dispatcher.UpdateState(uuid, dispatch.Cancelled) diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go index ed13a4109c..a72d65b6ec 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go @@ -2,11 +2,7 @@ package main import ( "bytes" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "git.curoverse.com/arvados.git/sdk/go/dispatch" - . "gopkg.in/check.v1" + "context" "io" "log" "net/http" @@ -16,6 +12,12 @@ import ( "strings" "testing" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/dispatch" + . "gopkg.in/check.v1" ) // Gocheck boilerplate @@ -62,14 +64,13 @@ func (s *TestSuite) TestIntegration(c *C) { echo := "echo" crunchRunCommand = &echo + ctx, cancel := context.WithCancel(context.Background()) dispatcher := dispatch.Dispatcher{ Arv: arv, PollPeriod: time.Second, - RunContainer: func(dispatcher *dispatch.Dispatcher, - container arvados.Container, - status chan arvados.Container) { - run(dispatcher, container, status) - dispatcher.Stop() + RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) { + run(d, c, s) + cancel() }, } @@ -79,8 +80,8 @@ func (s *TestSuite) TestIntegration(c *C) { return cmd.Start() } - err = dispatcher.Run() - c.Assert(err, IsNil) + err = dispatcher.Run(ctx) + c.Assert(err, Equals, context.Canceled) // Wait for all running crunch jobs to complete / terminate waitGroup.Wait() @@ -165,14 +166,13 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon *crunchRunCommand = crunchCmd + ctx, cancel := context.WithCancel(context.Background()) dispatcher := dispatch.Dispatcher{ Arv: arv, PollPeriod: time.Duration(1) * time.Second, - RunContainer: func(dispatcher *dispatch.Dispatcher, - container arvados.Container, - status chan arvados.Container) { - run(dispatcher, container, status) - dispatcher.Stop() + RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) { + run(d, c, s) + cancel() }, } @@ -186,11 +186,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ { time.Sleep(100 * time.Millisecond) } - dispatcher.Stop() + cancel() }() - err := dispatcher.Run() - c.Assert(err, IsNil) + err := dispatcher.Run(ctx) + c.Assert(err, Equals, context.Canceled) // Wait for all running crunch jobs to complete / terminate waitGroup.Wait() diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 476ca1f4a1..617b076da2 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -4,6 +4,7 @@ package main import ( "bytes" + "context" "flag" "fmt" "log" @@ -46,7 +47,7 @@ func main() { var ( theConfig Config - sqCheck SqueueChecker + sqCheck = &SqueueChecker{} ) const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml" @@ -107,10 +108,10 @@ func doMain() error { } arv.Retries = 25 - sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)} + sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)} defer sqCheck.Stop() - dispatcher := dispatch.Dispatcher{ + dispatcher := &dispatch.Dispatcher{ Arv: arv, RunContainer: run, PollPeriod: time.Duration(theConfig.PollPeriod), @@ -121,7 +122,7 @@ func doMain() error { log.Printf("Error notifying init daemon: %v", err) } - return dispatcher.Run() + return dispatcher.Run(context.Background()) } // sbatchCmd @@ -184,97 +185,74 @@ func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunch } } -// If the container is marked as Locked, check if it is already in the slurm -// queue. If not, submit it. -// -// If the container is marked as Running, check if it is in the slurm queue. -// If not, mark it as Cancelled. -func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) { - submitted := false - for !*monitorDone { - if sqCheck.HasUUID(container.UUID) { - // Found in the queue, so continue monitoring - submitted = true - } else if container.State == dispatch.Locked && !submitted { - // Not in queue but in Locked state and we haven't - // submitted it yet, so submit it. - - log.Printf("About to submit queued container %v", container.UUID) - - if err := submit(dispatcher, container, theConfig.CrunchRunCommand); err != nil { - log.Printf("Error submitting container %s to slurm: %v", - container.UUID, err) - // maybe sbatch is broken, put it back to queued - dispatcher.Unlock(container.UUID) - } - submitted = true - } else { - // Not in queue and we are not going to submit it. - // Refresh the container state. If it is - // Complete/Cancelled, do nothing, if it is Locked then - // release it back to the Queue, if it is Running then - // clean up the record. - - var con arvados.Container - err := dispatcher.Arv.Get("containers", container.UUID, nil, &con) - if err != nil { - log.Printf("Error getting final container state: %v", err) - } +// Submit a container to the slurm queue (or resume monitoring if it's +// already in the queue). Cancel the slurm job if the container's +// priority changes to zero or its state indicates it's no longer +// running. +func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) { + log.Printf("Submitting container %s to slurm", ctr.UUID) + if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil { + log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err) + disp.Unlock(ctr.UUID) + return + } + } - switch con.State { - case dispatch.Locked: - log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.", - container.UUID, con.State, dispatch.Queued) - dispatcher.Unlock(container.UUID) + log.Printf("Start monitoring container %s", ctr.UUID) + defer log.Printf("Done monitoring container %s", ctr.UUID) + + // If the container disappears from the slurm queue, there is + // no point in waiting for further dispatch updates: just + // clean up and return. + go func(uuid string) { + for ctx.Err() == nil && sqCheck.HasUUID(uuid) { + } + cancel() + }(ctr.UUID) + + for { + select { + case <-ctx.Done(): + // Disappeared from squeue + if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil { + log.Printf("Error getting final container state for %s: %s", ctr.UUID, err) + } + switch ctr.State { case dispatch.Running: - st := dispatch.Cancelled - log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.", - container.UUID, con.State, st) - dispatcher.UpdateState(container.UUID, st) - default: - // Container state is Queued, Complete or Cancelled so stop monitoring it. - return + disp.UpdateState(ctr.UUID, dispatch.Cancelled) + case dispatch.Locked: + disp.Unlock(ctr.UUID) + } + return + case updated, ok := <-status: + if !ok { + log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID) + scancel(ctr) + } else if updated.Priority == 0 { + log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority) + scancel(ctr) } } } } -// Run or monitor a container. -// -// Monitor status updates. If the priority changes to zero, cancel the -// container using scancel. -func run(dispatcher *dispatch.Dispatcher, - container arvados.Container, - status chan arvados.Container) { - - log.Printf("Monitoring container %v started", container.UUID) - defer log.Printf("Monitoring container %v finished", container.UUID) - - monitorDone := false - go monitorSubmitOrCancel(dispatcher, container, &monitorDone) - - for container = range status { - if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) { - log.Printf("Canceling container %s", container.UUID) - // Mutex between squeue sync and running sbatch or scancel. - sqCheck.L.Lock() - cmd := scancelCmd(container) - msg, err := cmd.CombinedOutput() - sqCheck.L.Unlock() - - if err != nil { - log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg)) - if sqCheck.HasUUID(container.UUID) { - log.Printf("Container %s is still in squeue after scancel.", container.UUID) - continue - } - } +func scancel(ctr arvados.Container) { + sqCheck.L.Lock() + cmd := scancelCmd(ctr) + msg, err := cmd.CombinedOutput() + sqCheck.L.Unlock() - // Ignore errors; if necessary, we'll try again next time - dispatcher.UpdateState(container.UUID, dispatch.Cancelled) - } + if err != nil { + log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg) + time.Sleep(time.Second) + } else if sqCheck.HasUUID(ctr.UUID) { + log.Printf("container %s is still in squeue after scancel", ctr.UUID) + time.Sleep(time.Second) } - monitorDone = true } func readConfig(dst interface{}, path string) error { diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go index 8809e7bcc6..e6abb1650e 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go @@ -2,11 +2,8 @@ package main import ( "bytes" + "context" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "git.curoverse.com/arvados.git/sdk/go/dispatch" "io" "io/ioutil" "log" @@ -18,6 +15,10 @@ import ( "testing" "time" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/arvadostest" + "git.curoverse.com/arvados.git/sdk/go/dispatch" . "gopkg.in/check.v1" ) @@ -59,30 +60,50 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) { } func (s *TestSuite) TestIntegrationNormal(c *C) { - container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") }, + done := false + container := s.integrationTest(c, + func() *exec.Cmd { + if done { + return exec.Command("true") + } else { + return exec.Command("echo", "zzzzz-dz642-queuedcontainer") + } + }, []string(nil), func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) time.Sleep(3 * time.Second) dispatcher.UpdateState(container.UUID, dispatch.Complete) + done = true }) c.Check(container.State, Equals, arvados.ContainerStateComplete) } func (s *TestSuite) TestIntegrationCancel(c *C) { - - // Override sbatchCmd + var cmd *exec.Cmd var scancelCmdLine []string defer func(orig func(arvados.Container) *exec.Cmd) { scancelCmd = orig }(scancelCmd) + attempt := 0 scancelCmd = func(container arvados.Container) *exec.Cmd { - scancelCmdLine = scancelFunc(container).Args - return exec.Command("echo") + if attempt++; attempt == 1 { + return exec.Command("false") + } else { + scancelCmdLine = scancelFunc(container).Args + cmd = exec.Command("echo") + return cmd + } } container := s.integrationTest(c, - func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") }, + func() *exec.Cmd { + if cmd != nil && cmd.ProcessState != nil { + return exec.Command("true") + } else { + return exec.Command("echo", "zzzzz-dz642-queuedcontainer") + } + }, []string(nil), func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) @@ -146,22 +167,21 @@ func (s *TestSuite) integrationTest(c *C, theConfig.CrunchRunCommand = []string{"echo"} + ctx, cancel := context.WithCancel(context.Background()) dispatcher := dispatch.Dispatcher{ Arv: arv, PollPeriod: time.Duration(1) * time.Second, - RunContainer: func(dispatcher *dispatch.Dispatcher, - container arvados.Container, - status chan arvados.Container) { - go runContainer(dispatcher, container) - run(dispatcher, container, status) - dispatcher.Stop() + RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { + go runContainer(disp, ctr) + run(disp, ctr, status) + cancel() }, } - sqCheck = SqueueChecker{Period: 500 * time.Millisecond} + sqCheck = &SqueueChecker{Period: 500 * time.Millisecond} - err = dispatcher.Run() - c.Assert(err, IsNil) + err = dispatcher.Run(ctx) + c.Assert(err, Equals, context.Canceled) sqCheck.Stop() @@ -207,19 +227,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon theConfig.CrunchRunCommand = []string{crunchCmd} + ctx, cancel := context.WithCancel(context.Background()) dispatcher := dispatch.Dispatcher{ Arv: arv, PollPeriod: time.Duration(1) * time.Second, - RunContainer: func(dispatcher *dispatch.Dispatcher, - container arvados.Container, - status chan arvados.Container) { + RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { go func() { time.Sleep(1 * time.Second) - dispatcher.UpdateState(container.UUID, dispatch.Running) - dispatcher.UpdateState(container.UUID, dispatch.Complete) + disp.UpdateState(ctr.UUID, dispatch.Running) + disp.UpdateState(ctr.UUID, dispatch.Complete) }() - run(dispatcher, container, status) - dispatcher.Stop() + run(disp, ctr, status) + cancel() }, } @@ -227,11 +246,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ { time.Sleep(100 * time.Millisecond) } - dispatcher.Stop() + cancel() }() - err := dispatcher.Run() - c.Assert(err, IsNil) + err := dispatcher.Run(ctx) + c.Assert(err, Equals, context.Canceled) c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`) }