21123: Rename sQueue* to schedQueue*.
[arvados.git] / lib / dispatchcloud / dispatcher.go
index 3403c50c972987e7f6f21a927a6db592fac9f6fc..04283df48f6faf60bd0968b327a6e953c41d6d18 100644 (file)
@@ -15,6 +15,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/controller/dblock"
        "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
@@ -60,14 +61,22 @@ type dispatcher struct {
        instanceSet cloud.InstanceSet
        pool        pool
        queue       scheduler.ContainerQueue
+       sched       *scheduler.Scheduler
        httpHandler http.Handler
        sshKey      ssh.Signer
 
        setupOnce sync.Once
        stop      chan struct{}
        stopped   chan struct{}
+
+       schedQueueMtx       sync.Mutex
+       schedQueueRefreshed time.Time
+       schedQueue          []scheduler.QueueEnt
+       schedQueueMap       map[string]scheduler.QueueEnt
 }
 
+var schedQueueRefresh = time.Second
+
 // Start starts the dispatcher. Start can be called multiple times
 // with no ill effect.
 func (disp *dispatcher) Start() {
@@ -110,7 +119,7 @@ func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
        return exr
 }
 
-func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
+func (disp *dispatcher) typeChooser(ctr *arvados.Container) ([]arvados.InstanceType, error) {
        return ChooseInstanceType(disp.Cluster, ctr)
 }
 
@@ -137,11 +146,15 @@ func (disp *dispatcher) initialize() {
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
 
-       if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Containers.DispatchPrivateKey)); err != nil {
+       if key, err := config.LoadSSHKey(disp.Cluster.Containers.DispatchPrivateKey); err != nil {
                disp.logger.Fatalf("error parsing configured Containers.DispatchPrivateKey: %s", err)
        } else {
                disp.sshKey = key
        }
+       installPublicKey := disp.sshKey.PublicKey()
+       if !disp.Cluster.Containers.CloudVMs.DeployPublicKey {
+               installPublicKey = nil
+       }
 
        instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger, disp.Registry)
        if err != nil {
@@ -149,8 +162,23 @@ func (disp *dispatcher) initialize() {
        }
        dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
        disp.instanceSet = instanceSet
-       disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
-       disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
+       disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster)
+       if disp.queue == nil {
+               disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
+       }
+
+       staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
+       if staleLockTimeout == 0 {
+               staleLockTimeout = defaultStaleLockTimeout
+       }
+       pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
+       if pollInterval <= 0 {
+               pollInterval = defaultPollInterval
+       }
+       disp.sched = scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
+               disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
+               disp.Cluster.Containers.CloudVMs.MaxInstances,
+               disp.Cluster.Containers.CloudVMs.SupervisorFraction)
 
        if disp.Cluster.ManagementToken == "" {
                disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -159,6 +187,7 @@ func (disp *dispatcher) initialize() {
        } else {
                mux := httprouter.New()
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+               mux.HandlerFunc("GET", "/arvados/v1/dispatch/container", disp.apiContainer)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill)
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
@@ -185,33 +214,53 @@ func (disp *dispatcher) run() {
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()
 
-       staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
-       if staleLockTimeout == 0 {
-               staleLockTimeout = defaultStaleLockTimeout
-       }
-       pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
-       if pollInterval <= 0 {
-               pollInterval = defaultPollInterval
-       }
-       sched := scheduler.New(disp.Context, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
-       sched.Start()
-       defer sched.Stop()
+       disp.sched.Start()
+       defer disp.sched.Stop()
 
        <-disp.stop
 }
 
-// Management API: all active and queued containers.
+// Get a snapshot of the scheduler's queue, no older than
+// schedQueueRefresh.
+//
+// First return value is in the sorted order used by the scheduler.
+// Second return value is a map of the same entries, for efficiently
+// looking up a single container.
+func (disp *dispatcher) schedQueueCurrent() ([]scheduler.QueueEnt, map[string]scheduler.QueueEnt) {
+       disp.schedQueueMtx.Lock()
+       defer disp.schedQueueMtx.Unlock()
+       if time.Since(disp.schedQueueRefreshed) > schedQueueRefresh {
+               disp.schedQueue = disp.sched.Queue()
+               disp.schedQueueMap = make(map[string]scheduler.QueueEnt)
+               for _, ent := range disp.schedQueue {
+                       disp.schedQueueMap[ent.Container.UUID] = ent
+               }
+               disp.schedQueueRefreshed = time.Now()
+       }
+       return disp.schedQueue, disp.schedQueueMap
+}
+
+// Management API: scheduling queue entries for all active and queued
+// containers.
 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
        var resp struct {
-               Items []container.QueueEnt `json:"items"`
-       }
-       qEntries, _ := disp.queue.Entries()
-       for _, ent := range qEntries {
-               resp.Items = append(resp.Items, ent)
+               Items []scheduler.QueueEnt `json:"items"`
        }
+       resp.Items, _ = disp.schedQueueCurrent()
        json.NewEncoder(w).Encode(resp)
 }
 
+// Management API: scheduling queue entry for a specified container.
+func (disp *dispatcher) apiContainer(w http.ResponseWriter, r *http.Request) {
+       _, sq := disp.schedQueueCurrent()
+       ent, ok := sq[r.FormValue("container_uuid")]
+       if !ok {
+               httpserver.Error(w, "container not found", http.StatusNotFound)
+               return
+       }
+       json.NewEncoder(w).Encode(ent)
+}
+
 // Management API: all active instances (cloud VMs).
 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
        var resp struct {