Merge branch 'master' into 16811-public-favs
[arvados.git] / lib / dispatchcloud / dispatcher.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package dispatchcloud
6
7 import (
8         "context"
9         "crypto/md5"
10         "encoding/json"
11         "fmt"
12         "net/http"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/cloud"
18         "git.arvados.org/arvados.git/lib/dispatchcloud/container"
19         "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
20         "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
21         "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/auth"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "git.arvados.org/arvados.git/sdk/go/httpserver"
26         "github.com/julienschmidt/httprouter"
27         "github.com/prometheus/client_golang/prometheus"
28         "github.com/prometheus/client_golang/prometheus/promhttp"
29         "github.com/sirupsen/logrus"
30         "golang.org/x/crypto/ssh"
31 )
32
33 const (
34         defaultPollInterval     = time.Second
35         defaultStaleLockTimeout = time.Minute
36 )
37
38 type pool interface {
39         scheduler.WorkerPool
40         CheckHealth() error
41         Instances() []worker.InstanceView
42         SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
43         KillInstance(id cloud.InstanceID, reason string) error
44         Stop()
45 }
46
47 type dispatcher struct {
48         Cluster       *arvados.Cluster
49         Context       context.Context
50         ArvClient     *arvados.Client
51         AuthToken     string
52         Registry      *prometheus.Registry
53         InstanceSetID cloud.InstanceSetID
54
55         logger      logrus.FieldLogger
56         instanceSet cloud.InstanceSet
57         pool        pool
58         queue       scheduler.ContainerQueue
59         httpHandler http.Handler
60         sshKey      ssh.Signer
61
62         setupOnce sync.Once
63         stop      chan struct{}
64         stopped   chan struct{}
65 }
66
67 // Start starts the dispatcher. Start can be called multiple times
68 // with no ill effect.
69 func (disp *dispatcher) Start() {
70         disp.setupOnce.Do(disp.setup)
71 }
72
73 // ServeHTTP implements service.Handler.
74 func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
75         disp.Start()
76         disp.httpHandler.ServeHTTP(w, r)
77 }
78
79 // CheckHealth implements service.Handler.
80 func (disp *dispatcher) CheckHealth() error {
81         disp.Start()
82         return disp.pool.CheckHealth()
83 }
84
85 // Done implements service.Handler.
86 func (disp *dispatcher) Done() <-chan struct{} {
87         return disp.stopped
88 }
89
90 // Stop dispatching containers and release resources. Typically used
91 // in tests.
92 func (disp *dispatcher) Close() {
93         disp.Start()
94         select {
95         case disp.stop <- struct{}{}:
96         default:
97         }
98         <-disp.stopped
99 }
100
101 // Make a worker.Executor for the given instance.
102 func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
103         exr := sshexecutor.New(inst)
104         exr.SetTargetPort(disp.Cluster.Containers.CloudVMs.SSHPort)
105         exr.SetSigners(disp.sshKey)
106         return exr
107 }
108
109 func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
110         return ChooseInstanceType(disp.Cluster, ctr)
111 }
112
113 func (disp *dispatcher) setup() {
114         disp.initialize()
115         go disp.run()
116 }
117
118 func (disp *dispatcher) initialize() {
119         disp.logger = ctxlog.FromContext(disp.Context)
120
121         disp.ArvClient.AuthToken = disp.AuthToken
122
123         if disp.InstanceSetID == "" {
124                 if strings.HasPrefix(disp.AuthToken, "v2/") {
125                         disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
126                 } else {
127                         // Use some other string unique to this token
128                         // that doesn't reveal the token itself.
129                         disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
130                 }
131         }
132         disp.stop = make(chan struct{}, 1)
133         disp.stopped = make(chan struct{})
134
135         if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Containers.DispatchPrivateKey)); err != nil {
136                 disp.logger.Fatalf("error parsing configured Containers.DispatchPrivateKey: %s", err)
137         } else {
138                 disp.sshKey = key
139         }
140
141         instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger, disp.Registry)
142         if err != nil {
143                 disp.logger.Fatalf("error initializing driver: %s", err)
144         }
145         disp.instanceSet = instanceSet
146         disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
147         disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
148
149         if disp.Cluster.ManagementToken == "" {
150                 disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
151                         http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
152                 })
153         } else {
154                 mux := httprouter.New()
155                 mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
156                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill)
157                 mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
158                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
159                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
160                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
161                 mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
162                 metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
163                         ErrorLog: disp.logger,
164                 })
165                 mux.Handler("GET", "/metrics", metricsH)
166                 mux.Handler("GET", "/metrics.json", metricsH)
167                 disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
168         }
169 }
170
171 func (disp *dispatcher) run() {
172         defer close(disp.stopped)
173         defer disp.instanceSet.Stop()
174         defer disp.pool.Stop()
175
176         staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
177         if staleLockTimeout == 0 {
178                 staleLockTimeout = defaultStaleLockTimeout
179         }
180         pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
181         if pollInterval <= 0 {
182                 pollInterval = defaultPollInterval
183         }
184         sched := scheduler.New(disp.Context, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
185         sched.Start()
186         defer sched.Stop()
187
188         <-disp.stop
189 }
190
191 // Management API: all active and queued containers.
192 func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
193         var resp struct {
194                 Items []container.QueueEnt `json:"items"`
195         }
196         qEntries, _ := disp.queue.Entries()
197         for _, ent := range qEntries {
198                 resp.Items = append(resp.Items, ent)
199         }
200         json.NewEncoder(w).Encode(resp)
201 }
202
203 // Management API: all active instances (cloud VMs).
204 func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
205         var resp struct {
206                 Items []worker.InstanceView `json:"items"`
207         }
208         resp.Items = disp.pool.Instances()
209         json.NewEncoder(w).Encode(resp)
210 }
211
212 // Management API: set idle behavior to "hold" for specified instance.
213 func (disp *dispatcher) apiInstanceHold(w http.ResponseWriter, r *http.Request) {
214         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorHold)
215 }
216
217 // Management API: set idle behavior to "drain" for specified instance.
218 func (disp *dispatcher) apiInstanceDrain(w http.ResponseWriter, r *http.Request) {
219         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorDrain)
220 }
221
222 // Management API: set idle behavior to "run" for specified instance.
223 func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
224         disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
225 }
226
227 // Management API: shutdown/destroy specified instance now.
228 func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
229         id := cloud.InstanceID(r.FormValue("instance_id"))
230         if id == "" {
231                 httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
232                 return
233         }
234         err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
235         if err != nil {
236                 httpserver.Error(w, err.Error(), http.StatusNotFound)
237                 return
238         }
239 }
240
241 // Management API: send SIGTERM to specified container's crunch-run
242 // process now.
243 func (disp *dispatcher) apiContainerKill(w http.ResponseWriter, r *http.Request) {
244         uuid := r.FormValue("container_uuid")
245         if uuid == "" {
246                 httpserver.Error(w, "container_uuid parameter not provided", http.StatusBadRequest)
247                 return
248         }
249         if !disp.pool.KillContainer(uuid, "via management API: "+r.FormValue("reason")) {
250                 httpserver.Error(w, "container not found", http.StatusNotFound)
251                 return
252         }
253 }
254
255 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
256         id := cloud.InstanceID(r.FormValue("instance_id"))
257         if id == "" {
258                 httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
259                 return
260         }
261         err := disp.pool.SetIdleBehavior(id, want)
262         if err != nil {
263                 httpserver.Error(w, err.Error(), http.StatusNotFound)
264                 return
265         }
266 }