21578: Merge branch 'main' into 21578-mount-debug
[arvados.git] / lib / dispatchcloud / dispatcher.go
index 47e60abdee4744689f18ecd2094fd606188982d3..04283df48f6faf60bd0968b327a6e953c41d6d18 100644 (file)
@@ -61,14 +61,22 @@ type dispatcher struct {
        instanceSet cloud.InstanceSet
        pool        pool
        queue       scheduler.ContainerQueue
+       sched       *scheduler.Scheduler
        httpHandler http.Handler
        sshKey      ssh.Signer
 
        setupOnce sync.Once
        stop      chan struct{}
        stopped   chan struct{}
+
+       schedQueueMtx       sync.Mutex
+       schedQueueRefreshed time.Time
+       schedQueue          []scheduler.QueueEnt
+       schedQueueMap       map[string]scheduler.QueueEnt
 }
 
+var schedQueueRefresh = time.Second
+
 // Start starts the dispatcher. Start can be called multiple times
 // with no ill effect.
 func (disp *dispatcher) Start() {
@@ -155,7 +163,22 @@ func (disp *dispatcher) initialize() {
        dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
        disp.instanceSet = instanceSet
        disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster)
-       disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
+       if disp.queue == nil {
+               disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
+       }
+
+       staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
+       if staleLockTimeout == 0 {
+               staleLockTimeout = defaultStaleLockTimeout
+       }
+       pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
+       if pollInterval <= 0 {
+               pollInterval = defaultPollInterval
+       }
+       disp.sched = scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
+               disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
+               disp.Cluster.Containers.CloudVMs.MaxInstances,
+               disp.Cluster.Containers.CloudVMs.SupervisorFraction)
 
        if disp.Cluster.ManagementToken == "" {
                disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -164,6 +187,7 @@ func (disp *dispatcher) initialize() {
        } else {
                mux := httprouter.New()
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+               mux.HandlerFunc("GET", "/arvados/v1/dispatch/container", disp.apiContainer)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill)
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
@@ -190,36 +214,53 @@ func (disp *dispatcher) run() {
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()
 
-       staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
-       if staleLockTimeout == 0 {
-               staleLockTimeout = defaultStaleLockTimeout
-       }
-       pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
-       if pollInterval <= 0 {
-               pollInterval = defaultPollInterval
-       }
-       sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
-               disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
-               disp.Cluster.Containers.CloudVMs.MaxInstances,
-               disp.Cluster.Containers.CloudVMs.SupervisorFraction)
-       sched.Start()
-       defer sched.Stop()
+       disp.sched.Start()
+       defer disp.sched.Stop()
 
        <-disp.stop
 }
 
-// Management API: all active and queued containers.
+// Get a snapshot of the scheduler's queue, no older than
+// schedQueueRefresh.
+//
+// First return value is in the sorted order used by the scheduler.
+// Second return value is a map of the same entries, for efficiently
+// looking up a single container.
+func (disp *dispatcher) schedQueueCurrent() ([]scheduler.QueueEnt, map[string]scheduler.QueueEnt) {
+       disp.schedQueueMtx.Lock()
+       defer disp.schedQueueMtx.Unlock()
+       if time.Since(disp.schedQueueRefreshed) > schedQueueRefresh {
+               disp.schedQueue = disp.sched.Queue()
+               disp.schedQueueMap = make(map[string]scheduler.QueueEnt)
+               for _, ent := range disp.schedQueue {
+                       disp.schedQueueMap[ent.Container.UUID] = ent
+               }
+               disp.schedQueueRefreshed = time.Now()
+       }
+       return disp.schedQueue, disp.schedQueueMap
+}
+
+// Management API: scheduling queue entries for all active and queued
+// containers.
 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
        var resp struct {
-               Items []container.QueueEnt `json:"items"`
-       }
-       qEntries, _ := disp.queue.Entries()
-       for _, ent := range qEntries {
-               resp.Items = append(resp.Items, ent)
+               Items []scheduler.QueueEnt `json:"items"`
        }
+       resp.Items, _ = disp.schedQueueCurrent()
        json.NewEncoder(w).Encode(resp)
 }
 
+// Management API: scheduling queue entry for a specified container.
+func (disp *dispatcher) apiContainer(w http.ResponseWriter, r *http.Request) {
+       _, sq := disp.schedQueueCurrent()
+       ent, ok := sq[r.FormValue("container_uuid")]
+       if !ok {
+               httpserver.Error(w, "container not found", http.StatusNotFound)
+               return
+       }
+       json.NewEncoder(w).Encode(ent)
+}
+
 // Management API: all active instances (cloud VMs).
 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
        var resp struct {