From: Tom Clegg Date: Fri, 24 Feb 2017 21:46:46 +0000 (-0500) Subject: 10979: Check for orphans only once at startup. Add missing Lock() in X-Git-Tag: 1.1.0~399 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/17f4a03d71d8d6130f796d61fa49b8480bf555b6 10979: Check for orphans only once at startup. Add missing Lock() in squeue checker. Avoid holding mtx while waiting for API response. Ensure RunContainer actually gets called in test case. refs #10979 --- diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go index a7ab0fe23d..70393a6ae3 100644 --- a/sdk/go/arvadostest/fixtures.go +++ b/sdk/go/arvadostest/fixtures.go @@ -16,6 +16,8 @@ const ( Dispatch1Token = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw" Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr" + + QueuedContainerUUID = "zzzzz-dz642-queuedcontainer" ) // PathologicalManifest : A valid manifest designed to test @@ -39,5 +41,3 @@ var ( // BlobSigningKey used by the test servers const BlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc" - -const QueuedContainerUuid = "zzzzz-dz642-queuedcontainer" diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 49c756e892..261444a05f 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -203,28 +203,41 @@ func (d *Dispatcher) Unlock(uuid string) error { return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil) } -// TrackContainer starts a tracker for given uuid if one is not already existing, despite its state. -func (d *Dispatcher) TrackContainer(uuid string) { - d.mtx.Lock() - defer d.mtx.Unlock() - - if d.trackers == nil { - d.trackers = make(map[string]*runTracker) - } - - _, alreadyTracking := d.trackers[uuid] - if alreadyTracking { - return - } - +// TrackContainer ensures a tracker is running for the given UUID, +// regardless of the current state of the container (except: if the +// container is locked by a different dispatcher, a tracker will not +// be started). If the container is not in Locked or Running state, +// the new tracker will close down immediately. +// +// This allows the dispatcher to put its own RunContainer func into a +// cleanup phase (for example, to kill local processes created by a +// prevous dispatch process that are still running even though the +// container state is final) without the risk of having multiple +// goroutines monitoring the same UUID. +func (d *Dispatcher) TrackContainer(uuid string) error { var cntr arvados.Container err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr) if err != nil { - log.Printf("Error getting container %s: %s", uuid, err) - return + return err + } + if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID { + return nil } + d.mtx.Lock() + defer d.mtx.Unlock() + if _, alreadyTracking := d.trackers[uuid]; alreadyTracking { + return nil + } + if d.trackers == nil { + d.trackers = make(map[string]*runTracker) + } d.trackers[uuid] = d.start(cntr) + switch cntr.State { + case Queued, Cancelled, Complete: + d.trackers[uuid].close() + } + return nil } type runTracker struct { diff --git a/sdk/go/dispatch/dispatch_test.go b/sdk/go/dispatch/dispatch_test.go index 57b6126738..08ce512dea 100644 --- a/sdk/go/dispatch/dispatch_test.go +++ b/sdk/go/dispatch/dispatch_test.go @@ -1,38 +1,43 @@ package dispatch import ( + "time" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "os/exec" - . "gopkg.in/check.v1" ) // Gocheck boilerplate -var _ = Suite(&DispatchTestSuite{}) +var _ = Suite(&suite{}) -type DispatchTestSuite struct{} +type suite struct{} -func (s *DispatchTestSuite) SetUpSuite(c *C) { +func (s *suite) SetUpSuite(c *C) { arvadostest.StartAPI() } -func (s *DispatchTestSuite) TearDownSuite(c *C) { +func (s *suite) TearDownSuite(c *C) { arvadostest.StopAPI() } -func (s *DispatchTestSuite) TestTrackContainer(c *C) { +func (s *suite) TestTrackContainer(c *C) { arv, err := arvadosclient.MakeArvadosClient() c.Assert(err, Equals, nil) - - runContainer := func(d *Dispatcher, ctr arvados.Container) *exec.Cmd { return exec.Command("echo") } - d := &Dispatcher{Arv: arv, RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { - go runContainer(dsp, ctr) - }} - d.trackers = make(map[string]*runTracker) - - d.TrackContainer(arvadostest.QueuedContainerUuid) - _, tracking := d.trackers[arvadostest.QueuedContainerUuid] - c.Assert(tracking, Equals, true) + arv.ApiToken = arvadostest.Dispatch1Token + + done := make(chan bool, 1) + time.AfterFunc(10*time.Second, func() { done <- false }) + d := &Dispatcher{ + Arv: arv, + RunContainer: func(dsp *Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { + for ctr := range status { + c.Logf("%#v", ctr) + } + done <- true + }, + } + d.TrackContainer(arvadostest.QueuedContainerUUID) + c.Assert(<-done, Equals, true) } diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 7cb14fab15..d84d461c30 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -123,32 +123,28 @@ func doMain() error { log.Printf("Error notifying init daemon: %v", err) } - containerTrackerTicker := trackContainers(dispatcher) - defer containerTrackerTicker.Stop() + go checkSqueueForOrphans(dispatcher, sqCheck) return dispatcher.Run(context.Background()) } -var containerUuidPattern = regexp.MustCompile(`[a-z0-9]{5}-dz642-[a-z0-9]{15}$`) - -// Start a goroutine to check squeue report periodically, and -// invoke TrackContainer for all the containers in the report. -func trackContainers(dispatcher *dispatch.Dispatcher) *time.Ticker { - ticker := time.NewTicker(sqCheck.Period) - go func() { - for { - select { - case <-ticker.C: - for uuid := range sqCheck.AllUuids() { - match := containerUuidPattern.MatchString(uuid) - if match { - dispatcher.TrackContainer(uuid) - } - } - } +var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`) + +// Check the next squeue report, and invoke TrackContainer for all the +// containers in the report. This gives us a chance to cancel slurm +// jobs started by a previous dispatch process that never released +// their slurm allocations even though their container states are +// Cancelled or Complete. See https://dev.arvados.org/issues/10979 +func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) { + for _, uuid := range sqCheck.All() { + if !containerUuidPattern.MatchString(uuid) { + continue + } + err := dispatcher.TrackContainer(uuid) + if err != nil { + log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err) } - }() - return ticker + } } // sbatchCmd diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index 0c90679d3f..85fadbdd99 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -49,7 +49,7 @@ func (sqc *SqueueChecker) Stop() { // check gets the names of jobs in the SLURM queue (running and // queued). If it succeeds, it updates squeue.uuids and wakes up any -// goroutines that are waiting in HasUUID(). +// goroutines that are waiting in HasUUID() or All(). func (sqc *SqueueChecker) check() { // Mutex between squeue sync and running sbatch or scancel. This // establishes a sequence so that squeue doesn't run concurrently with @@ -93,7 +93,15 @@ func (sqc *SqueueChecker) start() { }() } -// All Uuids in squeue -func (sqc *SqueueChecker) AllUuids() map[string]bool { - return sqc.uuids +// All waits for the next squeue invocation, and returns all job +// names reported by squeue. +func (sqc *SqueueChecker) All() []string { + sqc.L.Lock() + defer sqc.L.Unlock() + sqc.Wait() + var uuids []string + for uuid := range sqc.uuids { + uuids = append(uuids, uuid) + } + return uuids }