1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/lib/cloud"
18 "git.arvados.org/arvados.git/lib/config"
19 "git.arvados.org/arvados.git/lib/controller/dblock"
20 "git.arvados.org/arvados.git/lib/ctrlctx"
21 "git.arvados.org/arvados.git/lib/dispatchcloud/container"
22 "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
23 "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
24 "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
25 "git.arvados.org/arvados.git/sdk/go/arvados"
26 "git.arvados.org/arvados.git/sdk/go/auth"
27 "git.arvados.org/arvados.git/sdk/go/ctxlog"
28 "git.arvados.org/arvados.git/sdk/go/health"
29 "git.arvados.org/arvados.git/sdk/go/httpserver"
30 "github.com/julienschmidt/httprouter"
31 "github.com/prometheus/client_golang/prometheus"
32 "github.com/prometheus/client_golang/prometheus/promhttp"
33 "github.com/sirupsen/logrus"
34 "golang.org/x/crypto/ssh"
38 defaultPollInterval = time.Second
39 defaultStaleLockTimeout = time.Minute
45 Instances() []worker.InstanceView
46 SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
47 KillInstance(id cloud.InstanceID, reason string) error
51 type dispatcher struct {
52 Cluster *arvados.Cluster
53 Context context.Context
54 ArvClient *arvados.Client
56 Registry *prometheus.Registry
57 InstanceSetID cloud.InstanceSetID
59 dbConnector ctrlctx.DBConnector
60 logger logrus.FieldLogger
61 instanceSet cloud.InstanceSet
63 queue scheduler.ContainerQueue
64 sched *scheduler.Scheduler
65 httpHandler http.Handler
72 schedQueueMtx sync.Mutex
73 schedQueueRefreshed time.Time
74 schedQueue []scheduler.QueueEnt
75 schedQueueMap map[string]scheduler.QueueEnt
78 var schedQueueRefresh = time.Second
80 // Start starts the dispatcher. Start can be called multiple times
81 // with no ill effect.
82 func (disp *dispatcher) Start() {
83 disp.setupOnce.Do(disp.setup)
86 // ServeHTTP implements service.Handler.
87 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
89 disp.httpHandler.ServeHTTP(w, r)
92 // CheckHealth implements service.Handler.
93 func (disp *dispatcher) CheckHealth() error {
95 return disp.pool.CheckHealth()
98 // Done implements service.Handler.
99 func (disp *dispatcher) Done() <-chan struct{} {
103 // Stop dispatching containers and release resources. Typically used
105 func (disp *dispatcher) Close() {
108 case disp.stop <- struct{}{}:
114 // Make a worker.Executor for the given instance.
115 func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
116 exr := sshexecutor.New(inst)
117 exr.SetTargetPort(disp.Cluster.Containers.CloudVMs.SSHPort)
118 exr.SetSigners(disp.sshKey)
122 func (disp *dispatcher) typeChooser(ctr *arvados.Container) ([]arvados.InstanceType, error) {
123 return ChooseInstanceType(disp.Cluster, ctr)
126 func (disp *dispatcher) setup() {
131 func (disp *dispatcher) initialize() {
132 disp.logger = ctxlog.FromContext(disp.Context)
133 disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
135 disp.ArvClient.AuthToken = disp.AuthToken
137 if disp.InstanceSetID == "" {
138 if strings.HasPrefix(disp.AuthToken, "v2/") {
139 disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
141 // Use some other string unique to this token
142 // that doesn't reveal the token itself.
143 disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
146 disp.stop = make(chan struct{}, 1)
147 disp.stopped = make(chan struct{})
149 if key, err := config.LoadSSHKey(disp.Cluster.Containers.DispatchPrivateKey); err != nil {
150 disp.logger.Fatalf("error parsing configured Containers.DispatchPrivateKey: %s", err)
154 installPublicKey := disp.sshKey.PublicKey()
155 if !disp.Cluster.Containers.CloudVMs.DeployPublicKey {
156 installPublicKey = nil
159 instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger, disp.Registry)
161 disp.logger.Fatalf("error initializing driver: %s", err)
163 dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
164 disp.instanceSet = instanceSet
165 disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster)
166 if disp.queue == nil {
167 disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
170 staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
171 if staleLockTimeout == 0 {
172 staleLockTimeout = defaultStaleLockTimeout
174 pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
175 if pollInterval <= 0 {
176 pollInterval = defaultPollInterval
178 disp.sched = scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
179 disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
180 disp.Cluster.Containers.CloudVMs.MaxInstances,
181 disp.Cluster.Containers.CloudVMs.SupervisorFraction)
183 if disp.Cluster.ManagementToken == "" {
184 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
185 http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
188 mux := httprouter.New()
189 mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
190 mux.HandlerFunc("GET", "/arvados/v1/dispatch/container", disp.apiContainer)
191 mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill)
192 mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
193 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
194 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
195 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
196 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
197 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
198 ErrorLog: disp.logger,
200 mux.Handler("GET", "/metrics", metricsH)
201 mux.Handler("GET", "/metrics.json", metricsH)
202 mux.Handler("GET", "/_health/:check", &health.Handler{
203 Token: disp.Cluster.ManagementToken,
205 Routes: health.Routes{"ping": disp.CheckHealth},
207 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
211 func (disp *dispatcher) run() {
212 defer dblock.Dispatch.Unlock()
213 defer close(disp.stopped)
214 defer disp.instanceSet.Stop()
215 defer disp.pool.Stop()
218 defer disp.sched.Stop()
223 // Get a snapshot of the scheduler's queue, no older than
224 // schedQueueRefresh.
226 // First return value is in the sorted order used by the scheduler.
227 // Second return value is a map of the same entries, for efficiently
228 // looking up a single container.
229 func (disp *dispatcher) schedQueueCurrent() ([]scheduler.QueueEnt, map[string]scheduler.QueueEnt) {
230 disp.schedQueueMtx.Lock()
231 defer disp.schedQueueMtx.Unlock()
232 if time.Since(disp.schedQueueRefreshed) > schedQueueRefresh {
233 disp.schedQueue = disp.sched.Queue()
234 disp.schedQueueMap = make(map[string]scheduler.QueueEnt)
235 for _, ent := range disp.schedQueue {
236 disp.schedQueueMap[ent.Container.UUID] = ent
238 disp.schedQueueRefreshed = time.Now()
240 return disp.schedQueue, disp.schedQueueMap
243 // Management API: scheduling queue entries for all active and queued
245 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
247 Items []scheduler.QueueEnt `json:"items"`
249 resp.Items, _ = disp.schedQueueCurrent()
250 json.NewEncoder(w).Encode(resp)
253 // Management API: scheduling queue entry for a specified container.
254 func (disp *dispatcher) apiContainer(w http.ResponseWriter, r *http.Request) {
255 _, sq := disp.schedQueueCurrent()
256 ent, ok := sq[r.FormValue("container_uuid")]
258 httpserver.Error(w, "container not found", http.StatusNotFound)
261 json.NewEncoder(w).Encode(ent)
264 // Management API: all active instances (cloud VMs).
265 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
267 Items []worker.InstanceView `json:"items"`
269 resp.Items = disp.pool.Instances()
270 json.NewEncoder(w).Encode(resp)
273 // Management API: set idle behavior to "hold" for specified instance.
274 func (disp *dispatcher) apiInstanceHold(w http.ResponseWriter, r *http.Request) {
275 disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorHold)
278 // Management API: set idle behavior to "drain" for specified instance.
279 func (disp *dispatcher) apiInstanceDrain(w http.ResponseWriter, r *http.Request) {
280 disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorDrain)
283 // Management API: set idle behavior to "run" for specified instance.
284 func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
285 disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
288 // Management API: shutdown/destroy specified instance now.
289 func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
290 id := cloud.InstanceID(r.FormValue("instance_id"))
292 httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
295 err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
297 httpserver.Error(w, err.Error(), http.StatusNotFound)
302 // Management API: send SIGTERM to specified container's crunch-run
304 func (disp *dispatcher) apiContainerKill(w http.ResponseWriter, r *http.Request) {
305 uuid := r.FormValue("container_uuid")
307 httpserver.Error(w, "container_uuid parameter not provided", http.StatusBadRequest)
310 if !disp.pool.KillContainer(uuid, "via management API: "+r.FormValue("reason")) {
311 httpserver.Error(w, "container not found", http.StatusNotFound)
316 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
317 id := cloud.InstanceID(r.FormValue("instance_id"))
319 httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
322 err := disp.pool.SetIdleBehavior(id, want)
324 httpserver.Error(w, err.Error(), http.StatusNotFound)