21123: Add container status API to cloud dispatcher.
authorTom Clegg <tom@curii.com>
Fri, 1 Mar 2024 21:00:09 +0000 (16:00 -0500)
committerTom Clegg <tom@curii.com>
Tue, 5 Mar 2024 22:40:18 +0000 (17:40 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

doc/api/dispatch.html.textile.liquid
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/run_queue.go
lib/dispatchcloud/scheduler/scheduler.go

index b06136db9a8a219b8a93c27c0decbca19324b60e..488545c7d4ff8702bc8fb0a4a3ce6efaa5c89fd6 100644 (file)
@@ -32,6 +32,7 @@ Return a list of containers that are either ready to dispatch, or being started/
 Each entry in the returned list of @items@ includes:
 * an @instance_type@ entry with the name and attributes of the instance type that will be used to schedule the container (chosen from the @InstanceTypes@ section of your cluster config file); and
 * a @container@ entry with selected attributes of the container itself, including @uuid@, @priority@, @runtime_constraints@, and @state@. Other fields of the container records are not loaded by the dispatcher, and will have empty/zero values here (e.g., @{...,"created_at":"0001-01-01T00:00:00Z","command":[],...}@).
+* a @scheduling_status@ entry: a brief explanation of the container's status in the dispatch queue, or empty if scheduling is not applicable, e.g., the container has already started running.
 
 Example response:
 
@@ -56,12 +57,31 @@ Example response:
         "AddedScratch": 0,
         "Price": 0.146,
         "Preemptible": false
-      }
+      },
+      "scheduling_status": "waiting for new instance to be ready"
     },
     ...
   ]
 }</pre></notextile>
 
+h3. Get specified container
+
+@GET /arvados/v1/dispatch/container?container_uuid={uuid}@
+
+Return the same information as "list containers" above, but for a single specified container.
+
+Example response:
+
+<notextile><pre>{
+  "container": {
+    ...
+  },
+  "instance_type": {
+    ...
+  },
+  "scheduling_status": "waiting for new instance to be ready"
+}</pre></notextile>
+
 h3. Terminate a container
 
 @POST /arvados/v1/dispatch/containers/kill?container_uuid={uuid}&reason={string}@
index 47e60abdee4744689f18ecd2094fd606188982d3..611d13306ff261991a49d70307a278847c33d390 100644 (file)
@@ -61,14 +61,22 @@ type dispatcher struct {
        instanceSet cloud.InstanceSet
        pool        pool
        queue       scheduler.ContainerQueue
+       sched       *scheduler.Scheduler
        httpHandler http.Handler
        sshKey      ssh.Signer
 
        setupOnce sync.Once
        stop      chan struct{}
        stopped   chan struct{}
+
+       sQueueMtx       sync.Mutex
+       sQueueRefreshed time.Time
+       sQueue          []scheduler.QueueEnt
+       sQueueMap       map[string]scheduler.QueueEnt
 }
 
+var sQueueRefresh = time.Second
+
 // Start starts the dispatcher. Start can be called multiple times
 // with no ill effect.
 func (disp *dispatcher) Start() {
@@ -155,7 +163,22 @@ func (disp *dispatcher) initialize() {
        dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
        disp.instanceSet = instanceSet
        disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster)
-       disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
+       if disp.queue == nil {
+               disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
+       }
+
+       staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
+       if staleLockTimeout == 0 {
+               staleLockTimeout = defaultStaleLockTimeout
+       }
+       pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
+       if pollInterval <= 0 {
+               pollInterval = defaultPollInterval
+       }
+       disp.sched = scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
+               disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
+               disp.Cluster.Containers.CloudVMs.MaxInstances,
+               disp.Cluster.Containers.CloudVMs.SupervisorFraction)
 
        if disp.Cluster.ManagementToken == "" {
                disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -164,6 +187,7 @@ func (disp *dispatcher) initialize() {
        } else {
                mux := httprouter.New()
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+               mux.HandlerFunc("GET", "/arvados/v1/dispatch/container", disp.apiContainer)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill)
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
@@ -190,36 +214,53 @@ func (disp *dispatcher) run() {
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()
 
-       staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
-       if staleLockTimeout == 0 {
-               staleLockTimeout = defaultStaleLockTimeout
-       }
-       pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
-       if pollInterval <= 0 {
-               pollInterval = defaultPollInterval
-       }
-       sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
-               disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
-               disp.Cluster.Containers.CloudVMs.MaxInstances,
-               disp.Cluster.Containers.CloudVMs.SupervisorFraction)
-       sched.Start()
-       defer sched.Stop()
+       disp.sched.Start()
+       defer disp.sched.Stop()
 
        <-disp.stop
 }
 
-// Management API: all active and queued containers.
+// Get a snapshot of the scheduler's queue, no older than
+// sQueueRefresh.
+//
+// First return value is in the sorted order used by the scheduler.
+// Second return value is a map of the same entries, for efficiently
+// looking up a single container.
+func (disp *dispatcher) sQueueCurrent() ([]scheduler.QueueEnt, map[string]scheduler.QueueEnt) {
+       disp.sQueueMtx.Lock()
+       defer disp.sQueueMtx.Unlock()
+       if time.Since(disp.sQueueRefreshed) > sQueueRefresh {
+               disp.sQueue = disp.sched.Queue()
+               disp.sQueueMap = make(map[string]scheduler.QueueEnt)
+               for _, ent := range disp.sQueue {
+                       disp.sQueueMap[ent.Container.UUID] = ent
+               }
+               disp.sQueueRefreshed = time.Now()
+       }
+       return disp.sQueue, disp.sQueueMap
+}
+
+// Management API: scheduling queue entries for all active and queued
+// containers.
 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
        var resp struct {
-               Items []container.QueueEnt `json:"items"`
-       }
-       qEntries, _ := disp.queue.Entries()
-       for _, ent := range qEntries {
-               resp.Items = append(resp.Items, ent)
+               Items []scheduler.QueueEnt `json:"items"`
        }
+       resp.Items, _ = disp.sQueueCurrent()
        json.NewEncoder(w).Encode(resp)
 }
 
+// Management API: scheduling queue entry for a specified container.
+func (disp *dispatcher) apiContainer(w http.ResponseWriter, r *http.Request) {
+       _, sq := disp.sQueueCurrent()
+       ent, ok := sq[r.FormValue("container_uuid")]
+       if !ok {
+               httpserver.Error(w, "container not found", http.StatusNotFound)
+               return
+       }
+       json.NewEncoder(w).Encode(ent)
+}
+
 // Management API: all active instances (cloud VMs).
 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
        var resp struct {
index 20185554b8b1828fc92e24b1c1f7ecbc8603b6fc..e7465d65b1673b35b29566c8ab176347f5931d32 100644 (file)
@@ -8,12 +8,14 @@ import (
        "context"
        "crypto/tls"
        "encoding/json"
+       "fmt"
        "io/ioutil"
        "math/rand"
        "net/http"
        "net/http/httptest"
        "net/url"
        "os"
+       "strings"
        "sync"
        "sync/atomic"
        "time"
@@ -159,7 +161,6 @@ func (s *DispatcherSuite) arvClientProxy(c *check.C) func(*http.Request) (*url.U
 // artificial errors in order to exercise a variety of code paths.
 func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        Drivers["test"] = s.stubDriver
-       s.disp.setupOnce.Do(s.disp.initialize)
        queue := &test.Queue{
                MaxDispatchAttempts: 5,
                ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
@@ -179,6 +180,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                })
        }
        s.disp.queue = queue
+       s.disp.setupOnce.Do(s.disp.initialize)
 
        var mtx sync.Mutex
        done := make(chan struct{})
@@ -323,7 +325,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        c.Check(resp.Body.String(), check.Matches, `(?ms).*max_concurrent_containers [1-9][0-9e+.]*`)
 }
 
-func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
+func (s *DispatcherSuite) TestManagementAPI_Permissions(c *check.C) {
        s.cluster.ManagementToken = "abcdefgh"
        Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
@@ -345,7 +347,7 @@ func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
        }
 }
 
-func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
+func (s *DispatcherSuite) TestManagementAPI_Disabled(c *check.C) {
        s.cluster.ManagementToken = ""
        Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
@@ -363,13 +365,122 @@ func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
        }
 }
 
-func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
+func (s *DispatcherSuite) TestManagementAPI_Containers(c *check.C) {
+       s.cluster.ManagementToken = "abcdefgh"
+       s.cluster.Containers.CloudVMs.InitialQuotaEstimate = 4
+       Drivers["test"] = s.stubDriver
+       queue := &test.Queue{
+               MaxDispatchAttempts: 5,
+               ChooseType: func(ctr *arvados.Container) ([]arvados.InstanceType, error) {
+                       return ChooseInstanceType(s.cluster, ctr)
+               },
+               Logger: ctxlog.TestLogger(c),
+       }
+       s.stubDriver.Queue = queue
+       s.stubDriver.QuotaMaxInstances = 4
+       s.stubDriver.SetupVM = func(stubvm *test.StubVM) error {
+               if stubvm.Instance().ProviderType() >= test.InstanceType(4).ProviderType {
+                       return test.CapacityError{InstanceTypeSpecific: true}
+               }
+               stubvm.ExecuteContainer = func(ctr arvados.Container) int {
+                       time.Sleep(5 * time.Second)
+                       return 0
+               }
+               return nil
+       }
+       s.disp.queue = queue
+       s.disp.setupOnce.Do(s.disp.initialize)
+
+       go s.disp.run()
+
+       type queueEnt struct {
+               Container        arvados.Container
+               InstanceType     arvados.InstanceType `json:"instance_type"`
+               SchedulingStatus string               `json:"scheduling_status"`
+       }
+       type containersResponse struct {
+               Items []queueEnt
+       }
+       getContainers := func() containersResponse {
+               sQueueRefresh = time.Millisecond
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/containers", nil)
+               req.Header.Set("Authorization", "Bearer abcdefgh")
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               var cresp containersResponse
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+               err := json.Unmarshal(resp.Body.Bytes(), &cresp)
+               c.Check(err, check.IsNil)
+               return cresp
+       }
+
+       c.Check(getContainers().Items, check.HasLen, 0)
+
+       for i := 0; i < 20; i++ {
+               queue.Containers = append(queue.Containers, arvados.Container{
+                       UUID:     test.ContainerUUID(i),
+                       State:    arvados.ContainerStateQueued,
+                       Priority: int64(100 - i),
+                       RuntimeConstraints: arvados.RuntimeConstraints{
+                               RAM:   int64(i%3+1) << 30,
+                               VCPUs: i%8 + 1,
+                       },
+               })
+       }
+       queue.Update()
+
+       expect := `
+ 0 zzzzz-dz642-000000000000000 (Running) ""
+ 1 zzzzz-dz642-000000000000001 (Running) ""
+ 2 zzzzz-dz642-000000000000002 (Locked) "waiting for suitable instance type to become available: queue position 1"
+ 3 zzzzz-dz642-000000000000003 (Locked) "waiting for suitable instance type to become available: queue position 2"
+ 4 zzzzz-dz642-000000000000004 (Queued) "waiting while cluster is running at capacity: queue position 3"
+ 5 zzzzz-dz642-000000000000005 (Queued) "waiting while cluster is running at capacity: queue position 4"
+ 6 zzzzz-dz642-000000000000006 (Queued) "waiting while cluster is running at capacity: queue position 5"
+ 7 zzzzz-dz642-000000000000007 (Queued) "waiting while cluster is running at capacity: queue position 6"
+ 8 zzzzz-dz642-000000000000008 (Queued) "waiting while cluster is running at capacity: queue position 7"
+ 9 zzzzz-dz642-000000000000009 (Queued) "waiting while cluster is running at capacity: queue position 8"
+ 10 zzzzz-dz642-000000000000010 (Queued) "waiting while cluster is running at capacity: queue position 9"
+ 11 zzzzz-dz642-000000000000011 (Queued) "waiting while cluster is running at capacity: queue position 10"
+ 12 zzzzz-dz642-000000000000012 (Queued) "waiting while cluster is running at capacity: queue position 11"
+ 13 zzzzz-dz642-000000000000013 (Queued) "waiting while cluster is running at capacity: queue position 12"
+ 14 zzzzz-dz642-000000000000014 (Queued) "waiting while cluster is running at capacity: queue position 13"
+ 15 zzzzz-dz642-000000000000015 (Queued) "waiting while cluster is running at capacity: queue position 14"
+ 16 zzzzz-dz642-000000000000016 (Queued) "waiting while cluster is running at capacity: queue position 15"
+ 17 zzzzz-dz642-000000000000017 (Queued) "waiting while cluster is running at capacity: queue position 16"
+ 18 zzzzz-dz642-000000000000018 (Queued) "waiting while cluster is running at capacity: queue position 17"
+ 19 zzzzz-dz642-000000000000019 (Queued) "waiting while cluster is running at capacity: queue position 18"
+`
+       sequence := make(map[string][]string)
+       var summary string
+       for deadline := time.Now().Add(time.Second); time.Now().Before(deadline); time.Sleep(time.Millisecond) {
+               cresp := getContainers()
+               summary = "\n"
+               for i, ent := range cresp.Items {
+                       summary += fmt.Sprintf("% 2d %s (%s) %q\n", i, ent.Container.UUID, ent.Container.State, ent.SchedulingStatus)
+                       s := sequence[ent.Container.UUID]
+                       if len(s) == 0 || s[len(s)-1] != ent.SchedulingStatus {
+                               sequence[ent.Container.UUID] = append(s, ent.SchedulingStatus)
+                       }
+               }
+               if summary == expect {
+                       break
+               }
+       }
+       c.Check(summary, check.Equals, expect)
+       for i := 0; i < 5; i++ {
+               c.Logf("sequence for container %d:\n... %s", i, strings.Join(sequence[test.ContainerUUID(i)], "\n... "))
+       }
+}
+
+func (s *DispatcherSuite) TestManagementAPI_Instances(c *check.C) {
        s.cluster.ManagementToken = "abcdefgh"
        s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
        Drivers["test"] = s.stubDriver
        s.disp.setupOnce.Do(s.disp.initialize)
        s.disp.queue = &test.Queue{}
        go s.disp.run()
+       defer s.disp.Close()
 
        type instance struct {
                Instance             string
index 03fa592777e6fa7c09eb57031bb19c3bdeb80029..2f4bce8987b86418ab3389db45073fbccc2e576f 100644 (file)
@@ -5,6 +5,7 @@
 package scheduler
 
 import (
+       "fmt"
        "sort"
        "time"
 
@@ -15,6 +16,20 @@ import (
 
 var quietAfter503 = time.Minute
 
+type QueueEnt struct {
+       container.QueueEnt
+
+       // Human-readable scheduling status as of the last scheduling
+       // iteration.
+       SchedulingStatus string `json:"scheduling_status"`
+}
+
+// Queue returns the sorted queue from the last scheduling iteration.
+func (sch *Scheduler) Queue() []QueueEnt {
+       ents, _ := sch.lastQueue.Load().([]QueueEnt)
+       return ents
+}
+
 func (sch *Scheduler) runQueue() {
        running := sch.pool.Running()
        unalloc := sch.pool.Unallocated()
@@ -25,9 +40,9 @@ func (sch *Scheduler) runQueue() {
        }
 
        unsorted, _ := sch.queue.Entries()
-       sorted := make([]container.QueueEnt, 0, len(unsorted))
+       sorted := make([]QueueEnt, 0, len(unsorted))
        for _, ent := range unsorted {
-               sorted = append(sorted, ent)
+               sorted = append(sorted, QueueEnt{QueueEnt: ent})
        }
        sort.Slice(sorted, func(i, j int) bool {
                _, irunning := running[sorted[i].Container.UUID]
@@ -149,9 +164,9 @@ func (sch *Scheduler) runQueue() {
        }).Debug("runQueue")
 
        dontstart := map[arvados.InstanceType]bool{}
-       var atcapacity = map[string]bool{}    // ProviderTypes reported as AtCapacity during this runQueue() invocation
-       var overquota []container.QueueEnt    // entries that are unmappable because of worker pool quota
-       var overmaxsuper []container.QueueEnt // unmappable because max supervisors (these are not included in overquota)
+       var atcapacity = map[string]bool{} // ProviderTypes reported as AtCapacity during this runQueue() invocation
+       var overquota []QueueEnt           // entries that are unmappable because of worker pool quota
+       var overmaxsuper []QueueEnt        // unmappable because max supervisors (these are not included in overquota)
        var containerAllocatedWorkerBootingCount int
 
        // trying is #containers running + #containers we're trying to
@@ -159,6 +174,7 @@ func (sch *Scheduler) runQueue() {
        // reaches the dynamic maxConcurrency limit.
        trying := len(running)
 
+       qpos := 0
        supervisors := 0
 
 tryrun:
@@ -169,12 +185,20 @@ tryrun:
                })
                if ctr.SchedulingParameters.Supervisor {
                        supervisors += 1
-                       if maxSupervisors > 0 && supervisors > maxSupervisors {
-                               overmaxsuper = append(overmaxsuper, sorted[i])
-                               continue
+               }
+               if _, running := running[ctr.UUID]; running {
+                       if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
+                               sorted[i].SchedulingStatus = "preparing runtime environment"
                        }
+                       continue
                }
-               if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
+               if ctr.Priority < 1 {
+                       sorted[i].SchedulingStatus = "not scheduling: priority 0, state " + string(ctr.State)
+                       continue
+               }
+               if ctr.SchedulingParameters.Supervisor && maxSupervisors > 0 && supervisors > maxSupervisors {
+                       overmaxsuper = append(overmaxsuper, sorted[i])
+                       sorted[i].SchedulingStatus = "not starting: supervisor container limit has been reached"
                        continue
                }
                // If we have unalloc instances of any of the eligible
@@ -214,7 +238,7 @@ tryrun:
                        }
                        trying++
                        if !unallocOK && sch.pool.AtQuota() {
-                               logger.Trace("not locking: AtQuota and no unalloc workers")
+                               logger.Trace("not starting: AtQuota and no unalloc workers")
                                overquota = sorted[i:]
                                break tryrun
                        }
@@ -246,10 +270,13 @@ tryrun:
                                        // same instance type. Don't let this
                                        // one sneak in ahead of it.
                                } else if sch.pool.KillContainer(ctr.UUID, "about to start") {
+                                       sorted[i].SchedulingStatus = "waiting for previous attempt to exit"
                                        logger.Info("not restarting yet: crunch-run process from previous attempt has not exited")
                                } else if sch.pool.StartContainer(unallocType, ctr) {
+                                       sorted[i].SchedulingStatus = "preparing runtime environment"
                                        logger.Trace("StartContainer => true")
                                } else {
+                                       sorted[i].SchedulingStatus = "waiting for new instance to be ready"
                                        logger.Trace("StartContainer => false")
                                        containerAllocatedWorkerBootingCount += 1
                                        dontstart[unallocType] = true
@@ -279,6 +306,8 @@ tryrun:
                                // container A on the next call to
                                // runQueue(), rather than run
                                // container B now.
+                               qpos++
+                               sorted[i].SchedulingStatus = fmt.Sprintf("waiting for suitable instance type to become available: queue position %d", qpos)
                                logger.Trace("all eligible types at capacity")
                                continue
                        }
@@ -293,6 +322,7 @@ tryrun:
                        // asynchronously and does its own logging
                        // about the eventual outcome, so we don't
                        // need to.)
+                       sorted[i].SchedulingStatus = "waiting for new instance to be ready"
                        logger.Info("creating new instance")
                        // Don't bother trying to start the container
                        // yet -- obviously the instance will take
@@ -305,12 +335,26 @@ tryrun:
        sch.mContainersAllocatedNotStarted.Set(float64(containerAllocatedWorkerBootingCount))
        sch.mContainersNotAllocatedOverQuota.Set(float64(len(overquota) + len(overmaxsuper)))
 
+       var qreason string
+       if sch.pool.AtQuota() {
+               qreason = "waiting for cloud resources"
+       } else {
+               qreason = "waiting while cluster is running at capacity"
+       }
+       for i, ent := range sorted {
+               if ent.SchedulingStatus == "" && (ent.Container.State == arvados.ContainerStateQueued || ent.Container.State == arvados.ContainerStateLocked) {
+                       qpos++
+                       sorted[i].SchedulingStatus = fmt.Sprintf("%s: queue position %d", qreason, qpos)
+               }
+       }
+       sch.lastQueue.Store(sorted)
+
        if len(overquota)+len(overmaxsuper) > 0 {
                // Unlock any containers that are unmappable while
                // we're at quota (but if they have already been
                // scheduled and they're loading docker images etc.,
                // let them run).
-               var unlock []container.QueueEnt
+               var unlock []QueueEnt
                unlock = append(unlock, overmaxsuper...)
                if totalInstances > 0 && len(overquota) > 1 {
                        // We don't unlock the next-in-line container
index ee7ab508839622af6f108cf6b98e6b48f6def603..bc6574a21a538134c618320f9e97511b84d9b307 100644 (file)
@@ -9,6 +9,7 @@ package scheduler
 import (
        "context"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -57,6 +58,8 @@ type Scheduler struct {
        mLongestWaitTimeSinceQueue       prometheus.Gauge
        mLast503Time                     prometheus.Gauge
        mMaxContainerConcurrency         prometheus.Gauge
+
+       lastQueue atomic.Value // stores a []QueueEnt
 }
 
 // New returns a new unstarted Scheduler.