10979: trackContainers func in crunch-dispatch-slurm.go
authorradhika <radhika@curoverse.com>
Tue, 14 Feb 2017 21:26:39 +0000 (16:26 -0500)
committerradhika <radhika@curoverse.com>
Tue, 14 Feb 2017 21:26:39 +0000 (16:26 -0500)
sdk/go/dispatch/dispatch.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/squeue.go

index 5d85c1bdfd0dbb77d09d94c0695ac6c7451e3e62..e489ac79f3c3cbe59aab8b031eeb74d257081f0e 100644 (file)
@@ -225,3 +225,8 @@ func (tracker *runTracker) update(c arvados.Container) {
        }
        tracker.updates <- c
 }
+
+// Start a tracker for the given uuid if one is not already existing, despite its state.
+// its vs. it's -- episode 5 from Series 1 of Netflix' "A Series of Unfortunate Events"
+func (dispatcher *Dispatcher) TrackContainer(uuid string) {
+}
index 617b076da281a982b81f24ae9b7b7fe4d3897aee..7cb14fab15cc051103ea355996b246e0e6f43645 100644 (file)
@@ -11,6 +11,7 @@ import (
        "math"
        "os"
        "os/exec"
+       "regexp"
        "strings"
        "time"
 
@@ -122,9 +123,34 @@ func doMain() error {
                log.Printf("Error notifying init daemon: %v", err)
        }
 
+       containerTrackerTicker := trackContainers(dispatcher)
+       defer containerTrackerTicker.Stop()
+
        return dispatcher.Run(context.Background())
 }
 
+var containerUuidPattern = regexp.MustCompile(`[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
+
+// Start a goroutine to check squeue report periodically, and
+// invoke TrackContainer for all the containers in the report.
+func trackContainers(dispatcher *dispatch.Dispatcher) *time.Ticker {
+       ticker := time.NewTicker(sqCheck.Period)
+       go func() {
+               for {
+                       select {
+                       case <-ticker.C:
+                               for uuid := range sqCheck.AllUuids() {
+                                       match := containerUuidPattern.MatchString(uuid)
+                                       if match {
+                                               dispatcher.TrackContainer(uuid)
+                                       }
+                               }
+                       }
+               }
+       }()
+       return ticker
+}
+
 // sbatchCmd
 func sbatchFunc(container arvados.Container) *exec.Cmd {
        memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
index 3bebe561c380d12a3ca8b5724a7a705f27bff3ef..0c90679d3fa6d309655953abadf2bea3c337beee 100644 (file)
@@ -13,7 +13,7 @@ import (
 // command 'squeue'.
 type SqueueChecker struct {
        Period    time.Duration
-       hasUUID   map[string]bool
+       uuids     map[string]bool
        startOnce sync.Once
        done      chan struct{}
        sync.Cond
@@ -36,7 +36,7 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool {
 
        // block until next squeue broadcast signaling an update.
        sqc.Wait()
-       return sqc.hasUUID[uuid]
+       return sqc.uuids[uuid]
 }
 
 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
@@ -48,7 +48,7 @@ func (sqc *SqueueChecker) Stop() {
 }
 
 // check gets the names of jobs in the SLURM queue (running and
-// queued). If it succeeds, it updates squeue.hasUUID and wakes up any
+// queued). If it succeeds, it updates squeue.uuids and wakes up any
 // goroutines that are waiting in HasUUID().
 func (sqc *SqueueChecker) check() {
        // Mutex between squeue sync and running sbatch or scancel.  This
@@ -67,9 +67,9 @@ func (sqc *SqueueChecker) check() {
        }
 
        uuids := strings.Split(stdout.String(), "\n")
-       sqc.hasUUID = make(map[string]bool, len(uuids))
+       sqc.uuids = make(map[string]bool, len(uuids))
        for _, uuid := range uuids {
-               sqc.hasUUID[uuid] = true
+               sqc.uuids[uuid] = true
        }
        sqc.Broadcast()
 }
@@ -92,3 +92,8 @@ func (sqc *SqueueChecker) start() {
                }
        }()
 }
+
+// All Uuids in squeue
+func (sqc *SqueueChecker) AllUuids() map[string]bool {
+       return sqc.uuids
+}