X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b290de5604e7ccfad230cd1e0547f0c09cc2fe01..cb68d4e34688abd308d7adffc288c82a5deb6c85:/lib/dispatchcloud/dispatcher.go diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go index 47e60abdee..04283df48f 100644 --- a/lib/dispatchcloud/dispatcher.go +++ b/lib/dispatchcloud/dispatcher.go @@ -61,14 +61,22 @@ type dispatcher struct { instanceSet cloud.InstanceSet pool pool queue scheduler.ContainerQueue + sched *scheduler.Scheduler httpHandler http.Handler sshKey ssh.Signer setupOnce sync.Once stop chan struct{} stopped chan struct{} + + schedQueueMtx sync.Mutex + schedQueueRefreshed time.Time + schedQueue []scheduler.QueueEnt + schedQueueMap map[string]scheduler.QueueEnt } +var schedQueueRefresh = time.Second + // Start starts the dispatcher. Start can be called multiple times // with no ill effect. func (disp *dispatcher) Start() { @@ -155,7 +163,22 @@ func (disp *dispatcher) initialize() { dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB) disp.instanceSet = instanceSet disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster) - disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient) + if disp.queue == nil { + disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient) + } + + staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout) + if staleLockTimeout == 0 { + staleLockTimeout = defaultStaleLockTimeout + } + pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval) + if pollInterval <= 0 { + pollInterval = defaultPollInterval + } + disp.sched = scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval, + disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate, + disp.Cluster.Containers.CloudVMs.MaxInstances, + disp.Cluster.Containers.CloudVMs.SupervisorFraction) if disp.Cluster.ManagementToken == "" { disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -164,6 +187,7 @@ func (disp *dispatcher) initialize() { } else { mux := httprouter.New() mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers) + mux.HandlerFunc("GET", "/arvados/v1/dispatch/container", disp.apiContainer) mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill) mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances) mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold) @@ -190,36 +214,53 @@ func (disp *dispatcher) run() { defer disp.instanceSet.Stop() defer disp.pool.Stop() - staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout) - if staleLockTimeout == 0 { - staleLockTimeout = defaultStaleLockTimeout - } - pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval) - if pollInterval <= 0 { - pollInterval = defaultPollInterval - } - sched := scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval, - disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate, - disp.Cluster.Containers.CloudVMs.MaxInstances, - disp.Cluster.Containers.CloudVMs.SupervisorFraction) - sched.Start() - defer sched.Stop() + disp.sched.Start() + defer disp.sched.Stop() <-disp.stop } -// Management API: all active and queued containers. +// Get a snapshot of the scheduler's queue, no older than +// schedQueueRefresh. +// +// First return value is in the sorted order used by the scheduler. +// Second return value is a map of the same entries, for efficiently +// looking up a single container. +func (disp *dispatcher) schedQueueCurrent() ([]scheduler.QueueEnt, map[string]scheduler.QueueEnt) { + disp.schedQueueMtx.Lock() + defer disp.schedQueueMtx.Unlock() + if time.Since(disp.schedQueueRefreshed) > schedQueueRefresh { + disp.schedQueue = disp.sched.Queue() + disp.schedQueueMap = make(map[string]scheduler.QueueEnt) + for _, ent := range disp.schedQueue { + disp.schedQueueMap[ent.Container.UUID] = ent + } + disp.schedQueueRefreshed = time.Now() + } + return disp.schedQueue, disp.schedQueueMap +} + +// Management API: scheduling queue entries for all active and queued +// containers. func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) { var resp struct { - Items []container.QueueEnt `json:"items"` - } - qEntries, _ := disp.queue.Entries() - for _, ent := range qEntries { - resp.Items = append(resp.Items, ent) + Items []scheduler.QueueEnt `json:"items"` } + resp.Items, _ = disp.schedQueueCurrent() json.NewEncoder(w).Encode(resp) } +// Management API: scheduling queue entry for a specified container. +func (disp *dispatcher) apiContainer(w http.ResponseWriter, r *http.Request) { + _, sq := disp.schedQueueCurrent() + ent, ok := sq[r.FormValue("container_uuid")] + if !ok { + httpserver.Error(w, "container not found", http.StatusNotFound) + return + } + json.NewEncoder(w).Encode(ent) +} + // Management API: all active instances (cloud VMs). func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) { var resp struct {