21123: Add container status API to cloud dispatcher.
[arvados.git] / lib / dispatchcloud / dispatcher.go
index 47e60abdee4744689f18ecd2094fd606188982d3..611d13306ff261991a49d70307a278847c33d390 100644 (file)
@@ -61,14 +61,22 @@ type dispatcher struct {
        instanceSet cloud.InstanceSet
        pool        pool
        queue       scheduler.ContainerQueue
+       sched       *scheduler.Scheduler
        httpHandler http.Handler
        sshKey      ssh.Signer
 
        setupOnce sync.Once
        stop      chan struct{}
        stopped   chan struct{}
+
+       sQueueMtx       sync.Mutex
+       sQueueRefreshed time.Time
+       sQueue          []scheduler.QueueEnt
+       sQueueMap       map[string]scheduler.QueueEnt
 }
 
+var sQueueRefresh = time.Second
+
 // Start starts the dispatcher. Start can be called multiple times
 // with no ill effect.
 func (disp *dispatcher) Start() {
@@ -155,7 +163,22 @@ func (disp *dispatcher) initialize() {
        dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
        disp.instanceSet = instanceSet
        disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster)
-       disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
+       if disp.queue == nil {
+               disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
+       }
+
+       staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
+       if staleLockTimeout == 0 {
+               staleLockTimeout = defaultStaleLockTimeout
+       }
+       pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
+       if pollInterval <= 0 {
+               pollInterval = defaultPollInterval
+       }
+       disp.sched = scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
+               disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
+               disp.Cluster.Containers.CloudVMs.MaxInstances,
+               disp.Cluster.Containers.CloudVMs.SupervisorFraction)
 
        if disp.Cluster.ManagementToken == "" {
                disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -164,6 +187,7 @@ func (disp *dispatcher) initialize() {
        } else {
                mux := httprouter.New()
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+               mux.HandlerFunc("GET", "/arvados/v1/dispatch/container", disp.apiContainer)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill)
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
@@ -190,36 +214,53 @@ func (disp *dispatcher) run() {
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()
 
-       staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
-       if staleLockTimeout == 0 {
-               staleLockTimeout = defaultStaleLockTimeout
-       }
-       pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
-       if pollInterval <= 0 {
-               pollInterval = defaultPollInterval
-       }
-       sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
-               disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
-               disp.Cluster.Containers.CloudVMs.MaxInstances,
-               disp.Cluster.Containers.CloudVMs.SupervisorFraction)
-       sched.Start()
-       defer sched.Stop()
+       disp.sched.Start()
+       defer disp.sched.Stop()
 
        <-disp.stop
 }
 
-// Management API: all active and queued containers.
+// Get a snapshot of the scheduler's queue, no older than
+// sQueueRefresh.
+//
+// First return value is in the sorted order used by the scheduler.
+// Second return value is a map of the same entries, for efficiently
+// looking up a single container.
+func (disp *dispatcher) sQueueCurrent() ([]scheduler.QueueEnt, map[string]scheduler.QueueEnt) {
+       disp.sQueueMtx.Lock()
+       defer disp.sQueueMtx.Unlock()
+       if time.Since(disp.sQueueRefreshed) > sQueueRefresh {
+               disp.sQueue = disp.sched.Queue()
+               disp.sQueueMap = make(map[string]scheduler.QueueEnt)
+               for _, ent := range disp.sQueue {
+                       disp.sQueueMap[ent.Container.UUID] = ent
+               }
+               disp.sQueueRefreshed = time.Now()
+       }
+       return disp.sQueue, disp.sQueueMap
+}
+
+// Management API: scheduling queue entries for all active and queued
+// containers.
 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
        var resp struct {
-               Items []container.QueueEnt `json:"items"`
-       }
-       qEntries, _ := disp.queue.Entries()
-       for _, ent := range qEntries {
-               resp.Items = append(resp.Items, ent)
+               Items []scheduler.QueueEnt `json:"items"`
        }
+       resp.Items, _ = disp.sQueueCurrent()
        json.NewEncoder(w).Encode(resp)
 }
 
+// Management API: scheduling queue entry for a specified container.
+func (disp *dispatcher) apiContainer(w http.ResponseWriter, r *http.Request) {
+       _, sq := disp.sQueueCurrent()
+       ent, ok := sq[r.FormValue("container_uuid")]
+       if !ok {
+               httpserver.Error(w, "container not found", http.StatusNotFound)
+               return
+       }
+       json.NewEncoder(w).Encode(ent)
+}
+
 // Management API: all active instances (cloud VMs).
 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
        var resp struct {