13647: Merge branch 'master' into 13647-keepstore-config
[arvados.git] / lib / dispatchcloud / dispatcher.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "context"
9         "crypto/md5"
10         "encoding/json"
11         "fmt"
12         "net/http"
13         "strings"
14         "sync"
15         "time"
16
17         "git.curoverse.com/arvados.git/lib/cloud"
18         "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
19         "git.curoverse.com/arvados.git/lib/dispatchcloud/scheduler"
20         "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
21         "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
22         "git.curoverse.com/arvados.git/sdk/go/arvados"
23         "git.curoverse.com/arvados.git/sdk/go/auth"
24         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
25         "git.curoverse.com/arvados.git/sdk/go/httpserver"
26         "github.com/julienschmidt/httprouter"
27         "github.com/prometheus/client_golang/prometheus"
28         "github.com/prometheus/client_golang/prometheus/promhttp"
29         "github.com/sirupsen/logrus"
30         "golang.org/x/crypto/ssh"
31 )
32
33 const (
34         defaultPollInterval     = time.Second
35         defaultStaleLockTimeout = time.Minute
36 )
37
38 type pool interface {
39         scheduler.WorkerPool
40         Instances() []worker.InstanceView
41         SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
42         KillInstance(id cloud.InstanceID, reason string) error
43         Stop()
44 }
45
46 type dispatcher struct {
47         Cluster       *arvados.Cluster
48         Context       context.Context
49         ArvClient     *arvados.Client
50         AuthToken     string
51         Registry      *prometheus.Registry
52         InstanceSetID cloud.InstanceSetID
53
54         logger      logrus.FieldLogger
55         instanceSet cloud.InstanceSet
56         pool        pool
57         queue       scheduler.ContainerQueue
58         httpHandler http.Handler
59         sshKey      ssh.Signer
60
61         setupOnce sync.Once
62         stop      chan struct{}
63         stopped   chan struct{}
64 }
65
66 // Start starts the dispatcher. Start can be called multiple times
67 // with no ill effect.
68 func (disp *dispatcher) Start() {
69         disp.setupOnce.Do(disp.setup)
70 }
71
72 // ServeHTTP implements service.Handler.
73 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
74         disp.Start()
75         disp.httpHandler.ServeHTTP(w, r)
76 }
77
78 // CheckHealth implements service.Handler.
79 func (disp *dispatcher) CheckHealth() error {
80         disp.Start()
81         return nil
82 }
83
84 // Stop dispatching containers and release resources. Typically used
85 // in tests.
86 func (disp *dispatcher) Close() {
87         disp.Start()
88         select {
89         case disp.stop <- struct{}{}:
90         default:
91         }
92         <-disp.stopped
93 }
94
95 // Make a worker.Executor for the given instance.
96 func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
97         exr := ssh_executor.New(inst)
98         exr.SetTargetPort(disp.Cluster.Containers.CloudVMs.SSHPort)
99         exr.SetSigners(disp.sshKey)
100         return exr
101 }
102
103 func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
104         return ChooseInstanceType(disp.Cluster, ctr)
105 }
106
107 func (disp *dispatcher) setup() {
108         disp.initialize()
109         go disp.run()
110 }
111
112 func (disp *dispatcher) initialize() {
113         disp.logger = ctxlog.FromContext(disp.Context)
114
115         disp.ArvClient.AuthToken = disp.AuthToken
116
117         if disp.InstanceSetID == "" {
118                 if strings.HasPrefix(disp.AuthToken, "v2/") {
119                         disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
120                 } else {
121                         // Use some other string unique to this token
122                         // that doesn't reveal the token itself.
123                         disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
124                 }
125         }
126         disp.stop = make(chan struct{}, 1)
127         disp.stopped = make(chan struct{})
128
129         if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Containers.DispatchPrivateKey)); err != nil {
130                 disp.logger.Fatalf("error parsing configured Containers.DispatchPrivateKey: %s", err)
131         } else {
132                 disp.sshKey = key
133         }
134
135         instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger, disp.Registry)
136         if err != nil {
137                 disp.logger.Fatalf("error initializing driver: %s", err)
138         }
139         disp.instanceSet = instanceSet
140         disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
141         disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
142
143         if disp.Cluster.ManagementToken == "" {
144                 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
145                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
146                 })
147         } else {
148                 mux := httprouter.New()
149                 mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
150                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiInstanceKill)
151                 mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
152                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
153                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
154                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
155                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
156                 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
157                         ErrorLog: disp.logger,
158                 })
159                 mux.Handler("GET", "/metrics", metricsH)
160                 mux.Handler("GET", "/metrics.json", metricsH)
161                 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
162         }
163 }
164
165 func (disp *dispatcher) run() {
166         defer close(disp.stopped)
167         defer disp.instanceSet.Stop()
168         defer disp.pool.Stop()
169
170         staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
171         if staleLockTimeout == 0 {
172                 staleLockTimeout = defaultStaleLockTimeout
173         }
174         pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
175         if pollInterval <= 0 {
176                 pollInterval = defaultPollInterval
177         }
178         sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
179         sched.Start()
180         defer sched.Stop()
181
182         <-disp.stop
183 }
184
185 // Management API: all active and queued containers.
186 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
187         var resp struct {
188                 Items []container.QueueEnt `json:"items"`
189         }
190         qEntries, _ := disp.queue.Entries()
191         for _, ent := range qEntries {
192                 resp.Items = append(resp.Items, ent)
193         }
194         json.NewEncoder(w).Encode(resp)
195 }
196
197 // Management API: all active instances (cloud VMs).
198 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
199         var resp struct {
200                 Items []worker.InstanceView `json:"items"`
201         }
202         resp.Items = disp.pool.Instances()
203         json.NewEncoder(w).Encode(resp)
204 }
205
206 // Management API: set idle behavior to "hold" for specified instance.
207 func (disp *dispatcher) apiInstanceHold(w http.ResponseWriter, r *http.Request) {
208         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorHold)
209 }
210
211 // Management API: set idle behavior to "drain" for specified instance.
212 func (disp *dispatcher) apiInstanceDrain(w http.ResponseWriter, r *http.Request) {
213         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorDrain)
214 }
215
216 // Management API: set idle behavior to "run" for specified instance.
217 func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
218         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
219 }
220
221 // Management API: shutdown/destroy specified instance now.
222 func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
223         id := cloud.InstanceID(r.FormValue("instance_id"))
224         if id == "" {
225                 httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
226                 return
227         }
228         err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
229         if err != nil {
230                 httpserver.Error(w, err.Error(), http.StatusNotFound)
231                 return
232         }
233 }
234
235 // Management API: send SIGTERM to specified container's crunch-run
236 // process now.
237 func (disp *dispatcher) apiContainerKill(w http.ResponseWriter, r *http.Request) {
238         uuid := r.FormValue("container_uuid")
239         if uuid == "" {
240                 httpserver.Error(w, "container_uuid parameter not provided", http.StatusBadRequest)
241                 return
242         }
243         if !disp.pool.KillContainer(uuid, "via management API: "+r.FormValue("reason")) {
244                 httpserver.Error(w, "container not found", http.StatusNotFound)
245                 return
246         }
247 }
248
249 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
250         id := cloud.InstanceID(r.FormValue("instance_id"))
251         if id == "" {
252                 httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
253                 return
254         }
255         err := disp.pool.SetIdleBehavior(id, want)
256         if err != nil {
257                 httpserver.Error(w, err.Error(), http.StatusNotFound)
258                 return
259         }
260 }