22017: Comment test.
[arvados.git] / lib / dispatchcloud / dispatcher.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "context"
9         "crypto/md5"
10         "encoding/json"
11         "fmt"
12         "net/http"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/cloud"
18         "git.arvados.org/arvados.git/lib/config"
19         "git.arvados.org/arvados.git/lib/controller/dblock"
20         "git.arvados.org/arvados.git/lib/ctrlctx"
21         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
22         "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
23         "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
24         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/auth"
27         "git.arvados.org/arvados.git/sdk/go/ctxlog"
28         "git.arvados.org/arvados.git/sdk/go/health"
29         "git.arvados.org/arvados.git/sdk/go/httpserver"
30         "github.com/julienschmidt/httprouter"
31         "github.com/prometheus/client_golang/prometheus"
32         "github.com/prometheus/client_golang/prometheus/promhttp"
33         "github.com/sirupsen/logrus"
34         "golang.org/x/crypto/ssh"
35 )
36
37 const (
38         defaultPollInterval     = time.Second
39         defaultStaleLockTimeout = time.Minute
40 )
41
42 type pool interface {
43         scheduler.WorkerPool
44         CheckHealth() error
45         Instances() []worker.InstanceView
46         SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
47         KillInstance(id cloud.InstanceID, reason string) error
48         Stop()
49 }
50
51 type dispatcher struct {
52         Cluster       *arvados.Cluster
53         Context       context.Context
54         ArvClient     *arvados.Client
55         AuthToken     string
56         Registry      *prometheus.Registry
57         InstanceSetID cloud.InstanceSetID
58
59         dbConnector ctrlctx.DBConnector
60         logger      logrus.FieldLogger
61         instanceSet cloud.InstanceSet
62         pool        pool
63         queue       scheduler.ContainerQueue
64         sched       *scheduler.Scheduler
65         httpHandler http.Handler
66         sshKey      ssh.Signer
67
68         setupOnce sync.Once
69         stop      chan struct{}
70         stopped   chan struct{}
71
72         schedQueueMtx       sync.Mutex
73         schedQueueRefreshed time.Time
74         schedQueue          []scheduler.QueueEnt
75         schedQueueMap       map[string]scheduler.QueueEnt
76 }
77
78 var schedQueueRefresh = time.Second
79
80 // Start starts the dispatcher. Start can be called multiple times
81 // with no ill effect.
82 func (disp *dispatcher) Start() {
83         disp.setupOnce.Do(disp.setup)
84 }
85
86 // ServeHTTP implements service.Handler.
87 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
88         disp.Start()
89         disp.httpHandler.ServeHTTP(w, r)
90 }
91
92 // CheckHealth implements service.Handler.
93 func (disp *dispatcher) CheckHealth() error {
94         disp.Start()
95         return disp.pool.CheckHealth()
96 }
97
98 // Done implements service.Handler.
99 func (disp *dispatcher) Done() <-chan struct{} {
100         return disp.stopped
101 }
102
103 // Stop dispatching containers and release resources. Typically used
104 // in tests.
105 func (disp *dispatcher) Close() {
106         disp.Start()
107         select {
108         case disp.stop <- struct{}{}:
109         default:
110         }
111         <-disp.stopped
112 }
113
114 // Make a worker.Executor for the given instance.
115 func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
116         exr := sshexecutor.New(inst)
117         exr.SetTargetPort(disp.Cluster.Containers.CloudVMs.SSHPort)
118         exr.SetSigners(disp.sshKey)
119         return exr
120 }
121
122 func (disp *dispatcher) typeChooser(ctr *arvados.Container) ([]arvados.InstanceType, error) {
123         return ChooseInstanceType(disp.Cluster, ctr)
124 }
125
126 func (disp *dispatcher) setup() {
127         disp.initialize()
128         go disp.run()
129 }
130
131 func (disp *dispatcher) initialize() {
132         disp.logger = ctxlog.FromContext(disp.Context)
133         disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
134
135         disp.ArvClient.AuthToken = disp.AuthToken
136
137         if disp.InstanceSetID == "" {
138                 if strings.HasPrefix(disp.AuthToken, "v2/") {
139                         disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
140                 } else {
141                         // Use some other string unique to this token
142                         // that doesn't reveal the token itself.
143                         disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
144                 }
145         }
146         disp.stop = make(chan struct{}, 1)
147         disp.stopped = make(chan struct{})
148
149         if key, err := config.LoadSSHKey(disp.Cluster.Containers.DispatchPrivateKey); err != nil {
150                 disp.logger.Fatalf("error parsing configured Containers.DispatchPrivateKey: %s", err)
151         } else {
152                 disp.sshKey = key
153         }
154         installPublicKey := disp.sshKey.PublicKey()
155         if !disp.Cluster.Containers.CloudVMs.DeployPublicKey {
156                 installPublicKey = nil
157         }
158
159         instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger, disp.Registry)
160         if err != nil {
161                 disp.logger.Fatalf("error initializing driver: %s", err)
162         }
163         dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
164         disp.instanceSet = instanceSet
165         disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, installPublicKey, disp.Cluster)
166         if disp.queue == nil {
167                 disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
168         }
169
170         staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
171         if staleLockTimeout == 0 {
172                 staleLockTimeout = defaultStaleLockTimeout
173         }
174         pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
175         if pollInterval <= 0 {
176                 pollInterval = defaultPollInterval
177         }
178         disp.sched = scheduler.New(disp.Context, disp.ArvClient, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval,
179                 disp.Cluster.Containers.CloudVMs.InitialQuotaEstimate,
180                 disp.Cluster.Containers.CloudVMs.MaxInstances,
181                 disp.Cluster.Containers.CloudVMs.SupervisorFraction)
182
183         if disp.Cluster.ManagementToken == "" {
184                 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
185                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
186                 })
187         } else {
188                 mux := httprouter.New()
189                 mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
190                 mux.HandlerFunc("GET", "/arvados/v1/dispatch/container", disp.apiContainer)
191                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill)
192                 mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
193                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
194                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
195                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
196                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
197                 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
198                         ErrorLog: disp.logger,
199                 })
200                 mux.Handler("GET", "/metrics", metricsH)
201                 mux.Handler("GET", "/metrics.json", metricsH)
202                 mux.Handler("GET", "/_health/:check", &health.Handler{
203                         Token:  disp.Cluster.ManagementToken,
204                         Prefix: "/_health/",
205                         Routes: health.Routes{"ping": disp.CheckHealth},
206                 })
207                 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
208         }
209 }
210
211 func (disp *dispatcher) run() {
212         defer dblock.Dispatch.Unlock()
213         defer close(disp.stopped)
214         defer disp.instanceSet.Stop()
215         defer disp.pool.Stop()
216
217         disp.sched.Start()
218         defer disp.sched.Stop()
219
220         <-disp.stop
221 }
222
223 // Get a snapshot of the scheduler's queue, no older than
224 // schedQueueRefresh.
225 //
226 // First return value is in the sorted order used by the scheduler.
227 // Second return value is a map of the same entries, for efficiently
228 // looking up a single container.
229 func (disp *dispatcher) schedQueueCurrent() ([]scheduler.QueueEnt, map[string]scheduler.QueueEnt) {
230         disp.schedQueueMtx.Lock()
231         defer disp.schedQueueMtx.Unlock()
232         if time.Since(disp.schedQueueRefreshed) > schedQueueRefresh {
233                 disp.schedQueue = disp.sched.Queue()
234                 disp.schedQueueMap = make(map[string]scheduler.QueueEnt)
235                 for _, ent := range disp.schedQueue {
236                         disp.schedQueueMap[ent.Container.UUID] = ent
237                 }
238                 disp.schedQueueRefreshed = time.Now()
239         }
240         return disp.schedQueue, disp.schedQueueMap
241 }
242
243 // Management API: scheduling queue entries for all active and queued
244 // containers.
245 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
246         var resp struct {
247                 Items []scheduler.QueueEnt `json:"items"`
248         }
249         resp.Items, _ = disp.schedQueueCurrent()
250         json.NewEncoder(w).Encode(resp)
251 }
252
253 // Management API: scheduling queue entry for a specified container.
254 func (disp *dispatcher) apiContainer(w http.ResponseWriter, r *http.Request) {
255         _, sq := disp.schedQueueCurrent()
256         ent, ok := sq[r.FormValue("container_uuid")]
257         if !ok {
258                 httpserver.Error(w, "container not found", http.StatusNotFound)
259                 return
260         }
261         json.NewEncoder(w).Encode(ent)
262 }
263
264 // Management API: all active instances (cloud VMs).
265 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
266         var resp struct {
267                 Items []worker.InstanceView `json:"items"`
268         }
269         resp.Items = disp.pool.Instances()
270         json.NewEncoder(w).Encode(resp)
271 }
272
273 // Management API: set idle behavior to "hold" for specified instance.
274 func (disp *dispatcher) apiInstanceHold(w http.ResponseWriter, r *http.Request) {
275         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorHold)
276 }
277
278 // Management API: set idle behavior to "drain" for specified instance.
279 func (disp *dispatcher) apiInstanceDrain(w http.ResponseWriter, r *http.Request) {
280         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorDrain)
281 }
282
283 // Management API: set idle behavior to "run" for specified instance.
284 func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
285         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
286 }
287
288 // Management API: shutdown/destroy specified instance now.
289 func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
290         id := cloud.InstanceID(r.FormValue("instance_id"))
291         if id == "" {
292                 httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
293                 return
294         }
295         err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
296         if err != nil {
297                 httpserver.Error(w, err.Error(), http.StatusNotFound)
298                 return
299         }
300 }
301
302 // Management API: send SIGTERM to specified container's crunch-run
303 // process now.
304 func (disp *dispatcher) apiContainerKill(w http.ResponseWriter, r *http.Request) {
305         uuid := r.FormValue("container_uuid")
306         if uuid == "" {
307                 httpserver.Error(w, "container_uuid parameter not provided", http.StatusBadRequest)
308                 return
309         }
310         if !disp.pool.KillContainer(uuid, "via management API: "+r.FormValue("reason")) {
311                 httpserver.Error(w, "container not found", http.StatusNotFound)
312                 return
313         }
314 }
315
316 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
317         id := cloud.InstanceID(r.FormValue("instance_id"))
318         if id == "" {
319                 httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
320                 return
321         }
322         err := disp.pool.SetIdleBehavior(id, want)
323         if err != nil {
324                 httpserver.Error(w, err.Error(), http.StatusNotFound)
325                 return
326         }
327 }