10701: Refactor dispatch library.
authorTom Clegg <tom@curoverse.com>
Fri, 10 Feb 2017 06:25:14 +0000 (01:25 -0500)
committerTom Clegg <tom@curoverse.com>
Fri, 10 Feb 2017 06:25:14 +0000 (01:25 -0500)
sdk/go/dispatch/dispatch.go
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index ce960c0772eebe9f28fb314a9248b1f1490d8496..7342c3b8cb9d98577d8639ef24d7de96c48e8f47 100644 (file)
@@ -4,11 +4,14 @@
 package dispatch
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "context"
+       "fmt"
        "log"
        "sync"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 )
 
 const (
@@ -19,231 +22,173 @@ const (
        Cancelled = arvados.ContainerStateCancelled
 )
 
-// Dispatcher holds the state of the dispatcher
-type Dispatcher struct {
-       // The Arvados client
-       Arv *arvadosclient.ArvadosClient
-
-       // When a new queued container appears and is either already owned by
-       // this dispatcher or is successfully locked, the dispatcher will call
-       // go RunContainer().  The RunContainer() goroutine gets a channel over
-       // which it will receive updates to the container state.  The
-       // RunContainer() goroutine should only assume status updates come when
-       // the container record changes on the API server; if it needs to
-       // monitor the job submission to the underlying slurm/grid engine/etc
-       // queue it should spin up its own polling goroutines.  When the
-       // channel is closed, that means the container is no longer being
-       // handled by this dispatcher and the goroutine should terminate.  The
-       // goroutine is responsible for draining the 'status' channel, failure
-       // to do so may deadlock the dispatcher.
-       RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
-
-       // Amount of time to wait between polling for updates.
-       PollPeriod time.Duration
-
-       // Minimum time between two attempts to run the same container
-       MinRetryPeriod time.Duration
-
-       mineMutex sync.Mutex
-       mineMap   map[string]chan arvados.Container
-       Auth      arvados.APIClientAuthorization
-
-       throttle throttle
-
-       stop chan struct{}
+type runner struct {
+       closing bool
+       updates chan arvados.Container
 }
 
-// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
-// for which this process is actively starting/monitoring.  Returns channel to
-// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       if ch, ok := dispatcher.mineMap[uuid]; ok {
-               return ch
+func (ex *runner) close() {
+       if !ex.closing {
+               close(ex.updates)
        }
-
-       ch := make(chan arvados.Container)
-       dispatcher.mineMap[uuid] = ch
-       return ch
+       ex.closing = true
 }
 
-// Release a container which is no longer being monitored.
-func (dispatcher *Dispatcher) notMine(uuid string) {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       if ch, ok := dispatcher.mineMap[uuid]; ok {
-               close(ch)
-               delete(dispatcher.mineMap, uuid)
+func (ex *runner) update(c arvados.Container) {
+       if ex.closing {
+               return
        }
-}
-
-// checkMine returns true if there is a channel for updates associated
-// with container c.  If update is true, also send the container record on
-// the channel.
-func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       ch, ok := dispatcher.mineMap[c.UUID]
-       if ok {
-               if update {
-                       ch <- c
-               }
-               return true
+       select {
+       case <-ex.updates:
+               log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID)
+       default:
        }
-       return false
+       ex.updates <- c
 }
 
-func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
-       var containers arvados.ContainerList
-       err := dispatcher.Arv.List("containers", params, &containers)
-       if err != nil {
-               log.Printf("Error getting list of containers: %q", err)
-               return
-       }
+type Dispatcher struct {
+       Arv            *arvadosclient.ArvadosClient
+       PollPeriod     time.Duration
+       MinRetryPeriod time.Duration
+       RunContainer   Runner
 
-       if containers.ItemsAvailable > len(containers.Items) {
-               // TODO: support paging
-               log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-                       containers.ItemsAvailable,
-                       len(containers.Items))
-       }
-       for _, container := range containers.Items {
-               touched[container.UUID] = true
-               dispatcher.handleUpdate(container)
-       }
+       auth     arvados.APIClientAuthorization
+       mtx      sync.Mutex
+       running  map[string]*runner
+       throttle throttle
 }
 
-func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
-       ticker := time.NewTicker(dispatcher.PollPeriod)
-       defer ticker.Stop()
+// A Runner executes a container. If it starts any goroutines, it must
+// not return until it can guarantee that none of those goroutines
+// will do anything with this container.
+type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
 
-       paramsQ := arvadosclient.Dict{
-               "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
-               "order":   []string{"priority desc"},
-               "limit":   "1000"}
-       paramsP := arvadosclient.Dict{
-               "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
-               "limit":   "1000"}
+func (d *Dispatcher) Run(ctx context.Context) error {
+       err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
+       if err != nil {
+               return fmt.Errorf("error getting my token UUID: %v", err)
+       }
+
+       poll := time.NewTicker(d.PollPeriod)
+       defer poll.Stop()
 
        for {
-               touched := make(map[string]bool)
-               dispatcher.getContainers(paramsQ, touched)
-               dispatcher.getContainers(paramsP, touched)
-               dispatcher.mineMutex.Lock()
-               var monitored []string
-               for k := range dispatcher.mineMap {
-                       if _, ok := touched[k]; !ok {
-                               monitored = append(monitored, k)
-                       }
+               running := make([]string, 0, len(d.running))
+               d.mtx.Lock()
+               for uuid := range d.running {
+                       running = append(running, uuid)
                }
-               dispatcher.mineMutex.Unlock()
-               if monitored != nil {
-                       dispatcher.getContainers(arvadosclient.Dict{
-                               "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+               d.mtx.Unlock()
+               if len(running) == 0 {
+                       // API bug: ["uuid", "not in", []] does not match everything
+                       running = []string{"X"}
                }
+               d.checkForUpdates([][]interface{}{
+                       {"uuid", "in", running}})
+               d.checkForUpdates([][]interface{}{
+                       {"state", "=", Queued},
+                       {"priority", ">", "0"},
+                       {"uuid", "not in", running}})
+               d.checkForUpdates([][]interface{}{
+                       {"locked_by_uuid", "=", d.auth.UUID},
+                       {"uuid", "not in", running}})
                select {
-               case <-ticker.C:
-               case <-stop:
-                       return
+               case <-poll.C:
+                       continue
+               case <-ctx.Done():
+                       return ctx.Err()
                }
        }
 }
 
-func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
-       if container.State == Queued && dispatcher.checkMine(container, false) {
-               // If we previously started the job, something failed, and it
-               // was re-queued, this dispatcher might still be monitoring it.
-               // Stop the existing monitor, then try to lock and run it
-               // again.
-               dispatcher.notMine(container.UUID)
-       }
+func (d *Dispatcher) start(c arvados.Container) *runner {
+       ex := &runner{
+               updates: make(chan arvados.Container, 1),
+       }
+       if d.running == nil {
+               d.running = make(map[string]*runner)
+       }
+       d.running[c.UUID] = ex
+       go func() {
+               d.RunContainer(d, c, ex.updates)
+               d.mtx.Lock()
+               delete(d.running, c.UUID)
+               d.mtx.Unlock()
+       }()
+       return ex
+}
 
-       if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
-               // If container is Complete, Cancelled, or Queued, LockedByUUID
-               // will be nil.  If the container was formerly Locked, moved
-               // back to Queued and then locked by another dispatcher,
-               // LockedByUUID will be different.  In either case, we want
-               // to stop monitoring it.
-               log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
-               dispatcher.notMine(container.UUID)
-               return
-       }
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+       params := arvadosclient.Dict{
+               "filters": filters,
+               "order":   []string{"priority desc"},
+               "limit":   "1000"}
 
-       if dispatcher.checkMine(container, true) {
-               // Already monitored, sent status update
+       var list arvados.ContainerList
+       err := d.Arv.List("containers", params, &list)
+       if err != nil {
+               log.Printf("Error getting list of containers: %q", err)
                return
        }
 
-       if container.State == Queued && container.Priority > 0 {
-               if !dispatcher.throttle.Check(container.UUID) {
-                       return
-               }
-               // Try to take the lock
-               if err := dispatcher.Lock(container.UUID); err != nil {
-                       return
+       if list.ItemsAvailable > len(list.Items) {
+               // TODO: support paging
+               log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
+                       list.ItemsAvailable,
+                       len(list.Items))
+       }
+
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       for _, c := range list.Items {
+               ex, running := d.running[c.UUID]
+               if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
+                       log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+               } else if running {
+                       switch c.State {
+                       case Queued:
+                               ex.close()
+                       case Locked, Running:
+                               ex.update(c)
+                       case Cancelled, Complete:
+                               ex.close()
+                       }
+               } else {
+                       switch c.State {
+                       case Queued:
+                               if err := d.lock(c.UUID); err != nil {
+                                       log.Printf("Error locking container %s: %s", c.UUID, err)
+                               } else {
+                                       c.State = Locked
+                                       d.start(c).update(c)
+                               }
+                       case Locked, Running:
+                               d.start(c).update(c)
+                       case Cancelled, Complete:
+                               ex.close()
+                       }
                }
-               container.State = Locked
-       }
-
-       if container.State == Locked || container.State == Running {
-               // Not currently monitored but in Locked or Running state and
-               // owned by this dispatcher, so start monitoring.
-               go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
        }
 }
 
 // UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
-       err := dispatcher.Arv.Update("containers", uuid,
+func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
+       err := d.Arv.Update("containers", uuid,
                arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"state": newState}},
-               nil)
+                       "container": arvadosclient.Dict{"state": state},
+               }, nil)
        if err != nil {
-               log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
+               log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
        }
        return err
 }
 
 // Lock makes the lock API call which updates the state of a container to Locked.
-func (dispatcher *Dispatcher) Lock(uuid string) error {
-       err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
-       if err != nil {
-               log.Printf("Error locking container %s: %q", uuid, err)
-       }
-       return err
+func (d *Dispatcher) lock(uuid string) error {
+       return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
 }
 
 // Unlock makes the unlock API call which updates the state of a container to Queued.
-func (dispatcher *Dispatcher) Unlock(uuid string) error {
-       err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
-       if err != nil {
-               log.Printf("Error unlocking container %s: %q", uuid, err)
-       }
-       return err
-}
-
-// Stop causes Run to return after the current polling cycle.
-func (dispatcher *Dispatcher) Stop() {
-       if dispatcher.stop == nil {
-               // already stopped
-               return
-       }
-       close(dispatcher.stop)
-       dispatcher.stop = nil
-}
-
-// Run runs the main loop of the dispatcher.
-func (dispatcher *Dispatcher) Run() (err error) {
-       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
-       if err != nil {
-               log.Printf("Error getting my token UUID: %v", err)
-               return
-       }
-
-       dispatcher.mineMap = make(map[string]chan arvados.Container)
-       dispatcher.stop = make(chan struct{})
-       dispatcher.throttle.hold = dispatcher.MinRetryPeriod
-       dispatcher.pollContainers(dispatcher.stop)
-       return nil
+func (d *Dispatcher) Unlock(uuid string) error {
+       return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
index bb3c05c7eb88b9abed4ff8278ff3fbdfcf367be8..22f7d8b6469bfdd65497a18eff738850d94bc1c6 100644 (file)
@@ -3,10 +3,8 @@ package main
 // Dispatcher service for Crunch that runs containers locally.
 
 import (
+       "context"
        "flag"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/dispatch"
        "log"
        "os"
        "os/exec"
@@ -14,6 +12,10 @@ import (
        "sync"
        "syscall"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
 )
 
 func main() {
@@ -61,7 +63,8 @@ func doMain() error {
                PollPeriod:   time.Duration(*pollInterval) * time.Second,
        }
 
-       err = dispatcher.Run()
+       ctx, cancel := context.WithCancel(context.Background())
+       err = dispatcher.Run(ctx)
        if err != nil {
                return err
        }
@@ -72,7 +75,7 @@ func doMain() error {
        log.Printf("Received %s, shutting down", sig)
        signal.Stop(c)
 
-       dispatcher.Stop()
+       cancel()
 
        runningCmdsMutex.Lock()
        // Finished dispatching; interrupt any crunch jobs that are still running
@@ -103,7 +106,7 @@ var startCmd = startFunc
 // crunch-run terminates, mark the container as Cancelled.
 func run(dispatcher *dispatch.Dispatcher,
        container arvados.Container,
-       status chan arvados.Container) {
+       status <-chan arvados.Container) {
 
        uuid := container.UUID
 
@@ -170,8 +173,7 @@ func run(dispatcher *dispatch.Dispatcher,
        if err != nil {
                log.Printf("Error getting final container state: %v", err)
        }
-       if container.LockedByUUID == dispatcher.Auth.UUID &&
-               (container.State == dispatch.Locked || container.State == dispatch.Running) {
+       if container.State == dispatch.Locked || container.State == dispatch.Running {
                log.Printf("After %s process termination, container state for %v is %q.  Updating it to %q",
                        *crunchRunCommand, container.State, uuid, dispatch.Cancelled)
                dispatcher.UpdateState(uuid, dispatch.Cancelled)
index ed13a4109c4dc829581d0fb36d935de972f7a46e..a72d65b6ec64c52ef0348fd3d127095f20ffb1e1 100644 (file)
@@ -2,11 +2,7 @@ package main
 
 import (
        "bytes"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/dispatch"
-       . "gopkg.in/check.v1"
+       "context"
        "io"
        "log"
        "net/http"
@@ -16,6 +12,12 @@ import (
        "strings"
        "testing"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
+       . "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -62,14 +64,13 @@ func (s *TestSuite) TestIntegration(c *C) {
        echo := "echo"
        crunchRunCommand = &echo
 
+       ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Second,
-               RunContainer: func(dispatcher *dispatch.Dispatcher,
-                       container arvados.Container,
-                       status chan arvados.Container) {
-                       run(dispatcher, container, status)
-                       dispatcher.Stop()
+               RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+                       run(d, c, s)
+                       cancel()
                },
        }
 
@@ -79,8 +80,8 @@ func (s *TestSuite) TestIntegration(c *C) {
                return cmd.Start()
        }
 
-       err = dispatcher.Run()
-       c.Assert(err, IsNil)
+       err = dispatcher.Run(ctx)
+       c.Assert(err, Equals, context.Canceled)
 
        // Wait for all running crunch jobs to complete / terminate
        waitGroup.Wait()
@@ -165,14 +166,13 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
        *crunchRunCommand = crunchCmd
 
+       ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Duration(1) * time.Second,
-               RunContainer: func(dispatcher *dispatch.Dispatcher,
-                       container arvados.Container,
-                       status chan arvados.Container) {
-                       run(dispatcher, container, status)
-                       dispatcher.Stop()
+               RunContainer: func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
+                       run(d, c, s)
+                       cancel()
                },
        }
 
@@ -186,11 +186,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
                        time.Sleep(100 * time.Millisecond)
                }
-               dispatcher.Stop()
+               cancel()
        }()
 
-       err := dispatcher.Run()
-       c.Assert(err, IsNil)
+       err := dispatcher.Run(ctx)
+       c.Assert(err, Equals, context.Canceled)
 
        // Wait for all running crunch jobs to complete / terminate
        waitGroup.Wait()
index 476ca1f4a117cd5021460a5c8c63ef6b5d198d80..617b076da281a982b81f24ae9b7b7fe4d3897aee 100644 (file)
@@ -4,6 +4,7 @@ package main
 
 import (
        "bytes"
+       "context"
        "flag"
        "fmt"
        "log"
@@ -46,7 +47,7 @@ func main() {
 
 var (
        theConfig Config
-       sqCheck   SqueueChecker
+       sqCheck   = &SqueueChecker{}
 )
 
 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -107,10 +108,10 @@ func doMain() error {
        }
        arv.Retries = 25
 
-       sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+       sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
        defer sqCheck.Stop()
 
-       dispatcher := dispatch.Dispatcher{
+       dispatcher := &dispatch.Dispatcher{
                Arv:            arv,
                RunContainer:   run,
                PollPeriod:     time.Duration(theConfig.PollPeriod),
@@ -121,7 +122,7 @@ func doMain() error {
                log.Printf("Error notifying init daemon: %v", err)
        }
 
-       return dispatcher.Run()
+       return dispatcher.Run(context.Background())
 }
 
 // sbatchCmd
@@ -184,97 +185,74 @@ func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunch
        }
 }
 
-// If the container is marked as Locked, check if it is already in the slurm
-// queue.  If not, submit it.
-//
-// If the container is marked as Running, check if it is in the slurm queue.
-// If not, mark it as Cancelled.
-func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
-       submitted := false
-       for !*monitorDone {
-               if sqCheck.HasUUID(container.UUID) {
-                       // Found in the queue, so continue monitoring
-                       submitted = true
-               } else if container.State == dispatch.Locked && !submitted {
-                       // Not in queue but in Locked state and we haven't
-                       // submitted it yet, so submit it.
-
-                       log.Printf("About to submit queued container %v", container.UUID)
-
-                       if err := submit(dispatcher, container, theConfig.CrunchRunCommand); err != nil {
-                               log.Printf("Error submitting container %s to slurm: %v",
-                                       container.UUID, err)
-                               // maybe sbatch is broken, put it back to queued
-                               dispatcher.Unlock(container.UUID)
-                       }
-                       submitted = true
-               } else {
-                       // Not in queue and we are not going to submit it.
-                       // Refresh the container state. If it is
-                       // Complete/Cancelled, do nothing, if it is Locked then
-                       // release it back to the Queue, if it is Running then
-                       // clean up the record.
-
-                       var con arvados.Container
-                       err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
-                       if err != nil {
-                               log.Printf("Error getting final container state: %v", err)
-                       }
+// Submit a container to the slurm queue (or resume monitoring if it's
+// already in the queue).  Cancel the slurm job if the container's
+// priority changes to zero or its state indicates it's no longer
+// running.
+func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("Submitting container %s to slurm", ctr.UUID)
+               if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
+                       log.Printf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+                       disp.Unlock(ctr.UUID)
+                       return
+               }
+       }
 
-                       switch con.State {
-                       case dispatch.Locked:
-                               log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
-                                       container.UUID, con.State, dispatch.Queued)
-                               dispatcher.Unlock(container.UUID)
+       log.Printf("Start monitoring container %s", ctr.UUID)
+       defer log.Printf("Done monitoring container %s", ctr.UUID)
+
+       // If the container disappears from the slurm queue, there is
+       // no point in waiting for further dispatch updates: just
+       // clean up and return.
+       go func(uuid string) {
+               for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
+               }
+               cancel()
+       }(ctr.UUID)
+
+       for {
+               select {
+               case <-ctx.Done():
+                       // Disappeared from squeue
+                       if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
+                               log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
+                       }
+                       switch ctr.State {
                        case dispatch.Running:
-                               st := dispatch.Cancelled
-                               log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
-                                       container.UUID, con.State, st)
-                               dispatcher.UpdateState(container.UUID, st)
-                       default:
-                               // Container state is Queued, Complete or Cancelled so stop monitoring it.
-                               return
+                               disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+                       case dispatch.Locked:
+                               disp.Unlock(ctr.UUID)
+                       }
+                       return
+               case updated, ok := <-status:
+                       if !ok {
+                               log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
+                               scancel(ctr)
+                       } else if updated.Priority == 0 {
+                               log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+                               scancel(ctr)
                        }
                }
        }
 }
 
-// Run or monitor a container.
-//
-// Monitor status updates.  If the priority changes to zero, cancel the
-// container using scancel.
-func run(dispatcher *dispatch.Dispatcher,
-       container arvados.Container,
-       status chan arvados.Container) {
-
-       log.Printf("Monitoring container %v started", container.UUID)
-       defer log.Printf("Monitoring container %v finished", container.UUID)
-
-       monitorDone := false
-       go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
-
-       for container = range status {
-               if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
-                       log.Printf("Canceling container %s", container.UUID)
-                       // Mutex between squeue sync and running sbatch or scancel.
-                       sqCheck.L.Lock()
-                       cmd := scancelCmd(container)
-                       msg, err := cmd.CombinedOutput()
-                       sqCheck.L.Unlock()
-
-                       if err != nil {
-                               log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
-                               if sqCheck.HasUUID(container.UUID) {
-                                       log.Printf("Container %s is still in squeue after scancel.", container.UUID)
-                                       continue
-                               }
-                       }
+func scancel(ctr arvados.Container) {
+       sqCheck.L.Lock()
+       cmd := scancelCmd(ctr)
+       msg, err := cmd.CombinedOutput()
+       sqCheck.L.Unlock()
 
-                       // Ignore errors; if necessary, we'll try again next time
-                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
-               }
+       if err != nil {
+               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               time.Sleep(time.Second)
+       } else if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("container %s is still in squeue after scancel", ctr.UUID)
+               time.Sleep(time.Second)
        }
-       monitorDone = true
 }
 
 func readConfig(dst interface{}, path string) error {
index 8809e7bcc6fc10847ab84da1270387c6c9eb3dd3..e6abb1650ec1007d6f44906d7fdeba7c977318f9 100644 (file)
@@ -2,11 +2,8 @@ package main
 
 import (
        "bytes"
+       "context"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/dispatch"
        "io"
        "io/ioutil"
        "log"
@@ -18,6 +15,10 @@ import (
        "testing"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/dispatch"
        . "gopkg.in/check.v1"
 )
 
@@ -59,30 +60,50 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
 }
 
 func (s *TestSuite) TestIntegrationNormal(c *C) {
-       container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+       done := false
+       container := s.integrationTest(c,
+               func() *exec.Cmd {
+                       if done {
+                               return exec.Command("true")
+                       } else {
+                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+                       }
+               },
                []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
+                       done = true
                })
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
 func (s *TestSuite) TestIntegrationCancel(c *C) {
-
-       // Override sbatchCmd
+       var cmd *exec.Cmd
        var scancelCmdLine []string
        defer func(orig func(arvados.Container) *exec.Cmd) {
                scancelCmd = orig
        }(scancelCmd)
+       attempt := 0
        scancelCmd = func(container arvados.Container) *exec.Cmd {
-               scancelCmdLine = scancelFunc(container).Args
-               return exec.Command("echo")
+               if attempt++; attempt == 1 {
+                       return exec.Command("false")
+               } else {
+                       scancelCmdLine = scancelFunc(container).Args
+                       cmd = exec.Command("echo")
+                       return cmd
+               }
        }
 
        container := s.integrationTest(c,
-               func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+               func() *exec.Cmd {
+                       if cmd != nil && cmd.ProcessState != nil {
+                               return exec.Command("true")
+                       } else {
+                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+                       }
+               },
                []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -146,22 +167,21 @@ func (s *TestSuite) integrationTest(c *C,
 
        theConfig.CrunchRunCommand = []string{"echo"}
 
+       ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Duration(1) * time.Second,
-               RunContainer: func(dispatcher *dispatch.Dispatcher,
-                       container arvados.Container,
-                       status chan arvados.Container) {
-                       go runContainer(dispatcher, container)
-                       run(dispatcher, container, status)
-                       dispatcher.Stop()
+               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+                       go runContainer(disp, ctr)
+                       run(disp, ctr, status)
+                       cancel()
                },
        }
 
-       sqCheck = SqueueChecker{Period: 500 * time.Millisecond}
+       sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
 
-       err = dispatcher.Run()
-       c.Assert(err, IsNil)
+       err = dispatcher.Run(ctx)
+       c.Assert(err, Equals, context.Canceled)
 
        sqCheck.Stop()
 
@@ -207,19 +227,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
        theConfig.CrunchRunCommand = []string{crunchCmd}
 
+       ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Duration(1) * time.Second,
-               RunContainer: func(dispatcher *dispatch.Dispatcher,
-                       container arvados.Container,
-                       status chan arvados.Container) {
+               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
                        go func() {
                                time.Sleep(1 * time.Second)
-                               dispatcher.UpdateState(container.UUID, dispatch.Running)
-                               dispatcher.UpdateState(container.UUID, dispatch.Complete)
+                               disp.UpdateState(ctr.UUID, dispatch.Running)
+                               disp.UpdateState(ctr.UUID, dispatch.Complete)
                        }()
-                       run(dispatcher, container, status)
-                       dispatcher.Stop()
+                       run(disp, ctr, status)
+                       cancel()
                },
        }
 
@@ -227,11 +246,11 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
                        time.Sleep(100 * time.Millisecond)
                }
-               dispatcher.Stop()
+               cancel()
        }()
 
-       err := dispatcher.Run()
-       c.Assert(err, IsNil)
+       err := dispatcher.Run(ctx)
+       c.Assert(err, Equals, context.Canceled)
 
        c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }