10979: Check for orphans only once at startup. Add missing Lock() in
authorTom Clegg <tom@curoverse.com>
Fri, 24 Feb 2017 21:46:46 +0000 (16:46 -0500)
committerTom Clegg <tom@curoverse.com>
Fri, 24 Feb 2017 21:46:46 +0000 (16:46 -0500)
squeue checker. Avoid holding mtx while waiting for API response.
Ensure RunContainer actually gets called in test case.

refs #10979

sdk/go/arvadostest/fixtures.go
sdk/go/dispatch/dispatch.go
sdk/go/dispatch/dispatch_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/squeue.go

index a7ab0fe23d7c7592b29231069f0561d98cb8ced8..70393a6ae30a8108dfa29fbb7cab27bbcbcfa647 100644 (file)
@@ -16,6 +16,8 @@ const (
 
        Dispatch1Token    = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
        Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
+
+       QueuedContainerUUID = "zzzzz-dz642-queuedcontainer"
 )
 
 // PathologicalManifest : A valid manifest designed to test
@@ -39,5 +41,3 @@ var (
 
 // BlobSigningKey used by the test servers
 const BlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
-
-const QueuedContainerUuid = "zzzzz-dz642-queuedcontainer"
index 49c756e892f232e26c9fffa74feeffed46400309..261444a05fd8b91166245a98c35fe6da680175c2 100644 (file)
@@ -203,28 +203,41 @@ func (d *Dispatcher) Unlock(uuid string) error {
        return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
 
-// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state.
-func (d *Dispatcher) TrackContainer(uuid string) {
-       d.mtx.Lock()
-       defer d.mtx.Unlock()
-
-       if d.trackers == nil {
-               d.trackers = make(map[string]*runTracker)
-       }
-
-       _, alreadyTracking := d.trackers[uuid]
-       if alreadyTracking {
-               return
-       }
-
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
        var cntr arvados.Container
        err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
        if err != nil {
-               log.Printf("Error getting container %s: %s", uuid, err)
-               return
+               return err
+       }
+       if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+               return nil
        }
 
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+               return nil
+       }
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
+       }
        d.trackers[uuid] = d.start(cntr)
+       switch cntr.State {
+       case Queued, Cancelled, Complete:
+               d.trackers[uuid].close()
+       }
+       return nil
 }
 
 type runTracker struct {
index 57b6126738c15dc96b72dde225ca4d6ae3d51f51..08ce512dea409b340e3398ef82257007f2bec277 100644 (file)
@@ -1,38 +1,43 @@
 package dispatch
 
 import (
+       "time"
+
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "os/exec"
-
        . "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
-var _ = Suite(&DispatchTestSuite{})
+var _ = Suite(&suite{})
 
-type DispatchTestSuite struct{}
+type suite struct{}
 
-func (s *DispatchTestSuite) SetUpSuite(c *C) {
+func (s *suite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
 }
 
-func (s *DispatchTestSuite) TearDownSuite(c *C) {
+func (s *suite) TearDownSuite(c *C) {
        arvadostest.StopAPI()
 }
 
-func (s *DispatchTestSuite) TestTrackContainer(c *C) {
+func (s *suite) TestTrackContainer(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        c.Assert(err, Equals, nil)
-
-       runContainer := func(d *Dispatcher, ctr arvados.Container) *exec.Cmd { return exec.Command("echo") }
-       d := &Dispatcher{Arv: arv, RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
-               go runContainer(dsp, ctr)
-       }}
-       d.trackers = make(map[string]*runTracker)
-
-       d.TrackContainer(arvadostest.QueuedContainerUuid)
-       _, tracking := d.trackers[arvadostest.QueuedContainerUuid]
-       c.Assert(tracking, Equals, true)
+       arv.ApiToken = arvadostest.Dispatch1Token
+
+       done := make(chan bool, 1)
+       time.AfterFunc(10*time.Second, func() { done <- false })
+       d := &Dispatcher{
+               Arv: arv,
+               RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+                       for ctr := range status {
+                               c.Logf("%#v", ctr)
+                       }
+                       done <- true
+               },
+       }
+       d.TrackContainer(arvadostest.QueuedContainerUUID)
+       c.Assert(<-done, Equals, true)
 }
index 7cb14fab15cc051103ea355996b246e0e6f43645..d84d461c30184e12b45cd87611580d03004a343b 100644 (file)
@@ -123,32 +123,28 @@ func doMain() error {
                log.Printf("Error notifying init daemon: %v", err)
        }
 
-       containerTrackerTicker := trackContainers(dispatcher)
-       defer containerTrackerTicker.Stop()
+       go checkSqueueForOrphans(dispatcher, sqCheck)
 
        return dispatcher.Run(context.Background())
 }
 
-var containerUuidPattern = regexp.MustCompile(`[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
-
-// Start a goroutine to check squeue report periodically, and
-// invoke TrackContainer for all the containers in the report.
-func trackContainers(dispatcher *dispatch.Dispatcher) *time.Ticker {
-       ticker := time.NewTicker(sqCheck.Period)
-       go func() {
-               for {
-                       select {
-                       case <-ticker.C:
-                               for uuid := range sqCheck.AllUuids() {
-                                       match := containerUuidPattern.MatchString(uuid)
-                                       if match {
-                                               dispatcher.TrackContainer(uuid)
-                                       }
-                               }
-                       }
+var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+
+// Check the next squeue report, and invoke TrackContainer for all the
+// containers in the report. This gives us a chance to cancel slurm
+// jobs started by a previous dispatch process that never released
+// their slurm allocations even though their container states are
+// Cancelled or Complete. See https://dev.arvados.org/issues/10979
+func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
+       for _, uuid := range sqCheck.All() {
+               if !containerUuidPattern.MatchString(uuid) {
+                       continue
+               }
+               err := dispatcher.TrackContainer(uuid)
+               if err != nil {
+                       log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
                }
-       }()
-       return ticker
+       }
 }
 
 // sbatchCmd
index 0c90679d3fa6d309655953abadf2bea3c337beee..85fadbdd996c4a076a259f2f19fda62101c0adc3 100644 (file)
@@ -49,7 +49,7 @@ func (sqc *SqueueChecker) Stop() {
 
 // check gets the names of jobs in the SLURM queue (running and
 // queued). If it succeeds, it updates squeue.uuids and wakes up any
-// goroutines that are waiting in HasUUID().
+// goroutines that are waiting in HasUUID() or All().
 func (sqc *SqueueChecker) check() {
        // Mutex between squeue sync and running sbatch or scancel.  This
        // establishes a sequence so that squeue doesn't run concurrently with
@@ -93,7 +93,15 @@ func (sqc *SqueueChecker) start() {
        }()
 }
 
-// All Uuids in squeue
-func (sqc *SqueueChecker) AllUuids() map[string]bool {
-       return sqc.uuids
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+       sqc.Wait()
+       var uuids []string
+       for uuid := range sqc.uuids {
+               uuids = append(uuids, uuid)
+       }
+       return uuids
 }