a43b96ed82f6f0f05628bdc87fef9518db2cc78a
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "io"
9         "sort"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/cloud"
15         "git.curoverse.com/arvados.git/sdk/go/arvados"
16         "github.com/Sirupsen/logrus"
17         "github.com/prometheus/client_golang/prometheus"
18 )
19
20 const (
21         tagKeyInstanceType = "InstanceType"
22         tagKeyHold         = "Hold"
23 )
24
25 // An InstanceView shows a worker's current state and recent activity.
26 type InstanceView struct {
27         Instance             string
28         Price                float64
29         ArvadosInstanceType  string
30         ProviderInstanceType string
31         LastContainerUUID    string
32         LastBusy             time.Time
33         WorkerState          string
34 }
35
36 // An Executor executes shell commands on a remote host.
37 type Executor interface {
38         // Run cmd on the current target.
39         Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
40
41         // Use the given target for subsequent operations. The new
42         // target is the same host as the previous target, but it
43         // might return a different address and verify a different
44         // host key.
45         //
46         // SetTarget is called frequently, and in most cases the new
47         // target will behave exactly the same as the old one. An
48         // implementation should optimize accordingly.
49         //
50         // SetTarget must not block on concurrent Execute calls.
51         SetTarget(cloud.ExecutorTarget)
52
53         Close()
54 }
55
56 const (
57         defaultSyncInterval       = time.Minute
58         defaultProbeInterval      = time.Second * 10
59         defaultMaxProbesPerSecond = 10
60         defaultTimeoutIdle        = time.Minute
61         defaultTimeoutBooting     = time.Minute * 10
62         defaultTimeoutProbe       = time.Minute * 10
63         defaultTimeoutShutdown    = time.Second * 10
64 )
65
66 func duration(conf arvados.Duration, def time.Duration) time.Duration {
67         if conf > 0 {
68                 return time.Duration(conf)
69         } else {
70                 return def
71         }
72 }
73
74 // NewPool creates a Pool of workers backed by instanceSet.
75 //
76 // New instances are configured and set up according to the given
77 // cluster configuration.
78 func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
79         wp := &Pool{
80                 logger:             logger,
81                 instanceSet:        instanceSet,
82                 newExecutor:        newExecutor,
83                 bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
84                 imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
85                 instanceTypes:      cluster.InstanceTypes,
86                 maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
87                 probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
88                 syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
89                 timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
90                 timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
91                 timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
92                 timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
93         }
94         wp.registerMetrics(reg)
95         go func() {
96                 wp.setupOnce.Do(wp.setup)
97                 go wp.runMetrics()
98                 go wp.runProbes()
99                 go wp.runSync()
100         }()
101         return wp
102 }
103
104 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
105 // zero Pool should not be used. Call NewPool to create a new Pool.
106 type Pool struct {
107         // configuration
108         logger             logrus.FieldLogger
109         instanceSet        cloud.InstanceSet
110         newExecutor        func(cloud.Instance) Executor
111         bootProbeCommand   string
112         imageID            cloud.ImageID
113         instanceTypes      map[string]arvados.InstanceType
114         syncInterval       time.Duration
115         probeInterval      time.Duration
116         maxProbesPerSecond int
117         timeoutIdle        time.Duration
118         timeoutBooting     time.Duration
119         timeoutProbe       time.Duration
120         timeoutShutdown    time.Duration
121
122         // private state
123         subscribers  map[<-chan struct{}]chan<- struct{}
124         creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
125         workers      map[cloud.InstanceID]*worker
126         loaded       bool                 // loaded list of instances from InstanceSet at least once
127         exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
128         atQuotaUntil time.Time
129         atQuotaErr   cloud.QuotaError
130         stop         chan bool
131         mtx          sync.RWMutex
132         setupOnce    sync.Once
133
134         mInstances         prometheus.Gauge
135         mContainersRunning prometheus.Gauge
136         mVCPUs             prometheus.Gauge
137         mVCPUsInuse        prometheus.Gauge
138         mMemory            prometheus.Gauge
139         mMemoryInuse       prometheus.Gauge
140 }
141
142 // Subscribe returns a channel that becomes ready whenever a worker's
143 // state changes.
144 //
145 // Example:
146 //
147 //      ch := wp.Subscribe()
148 //      defer wp.Unsubscribe(ch)
149 //      for range ch {
150 //              // ...try scheduling some work...
151 //              if done {
152 //                      break
153 //              }
154 //      }
155 func (wp *Pool) Subscribe() <-chan struct{} {
156         wp.setupOnce.Do(wp.setup)
157         wp.mtx.Lock()
158         defer wp.mtx.Unlock()
159         ch := make(chan struct{}, 1)
160         wp.subscribers[ch] = ch
161         return ch
162 }
163
164 // Unsubscribe stops sending updates to the given channel.
165 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
166         wp.setupOnce.Do(wp.setup)
167         wp.mtx.Lock()
168         defer wp.mtx.Unlock()
169         delete(wp.subscribers, ch)
170 }
171
172 // Unallocated returns the number of unallocated (creating + booting +
173 // idle + unknown) workers for each instance type.
174 //
175 // The returned counts should be interpreted as upper bounds, rather
176 // than exact counts: they are sometimes artificially high when a
177 // newly created instance appears in the driver's Instances() list
178 // before the Create() call returns.
179 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
180         wp.setupOnce.Do(wp.setup)
181         wp.mtx.RLock()
182         defer wp.mtx.RUnlock()
183         u := map[arvados.InstanceType]int{}
184         for it, c := range wp.creating {
185                 u[it] = c
186         }
187         for _, wkr := range wp.workers {
188                 if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
189                         u[wkr.instType]++
190                 }
191         }
192         return u
193 }
194
195 // Create a new instance with the given type, and add it to the worker
196 // pool. The worker is added immediately; instance creation runs in
197 // the background.
198 func (wp *Pool) Create(it arvados.InstanceType) error {
199         logger := wp.logger.WithField("InstanceType", it.Name)
200         wp.setupOnce.Do(wp.setup)
201         wp.mtx.Lock()
202         defer wp.mtx.Unlock()
203         if time.Now().Before(wp.atQuotaUntil) {
204                 return wp.atQuotaErr
205         }
206         tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
207         wp.creating[it]++
208         go func() {
209                 defer wp.notify()
210                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
211                 wp.mtx.Lock()
212                 defer wp.mtx.Unlock()
213                 wp.creating[it]--
214                 if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
215                         wp.atQuotaErr = err
216                         wp.atQuotaUntil = time.Now().Add(time.Minute)
217                 }
218                 if err != nil {
219                         logger.WithError(err).Error("create failed")
220                         return
221                 }
222                 wp.updateWorker(inst, it, StateBooting)
223         }()
224         return nil
225 }
226
227 // AtQuota returns true if Create is not expected to work at the
228 // moment.
229 func (wp *Pool) AtQuota() bool {
230         wp.mtx.Lock()
231         defer wp.mtx.Unlock()
232         return time.Now().Before(wp.atQuotaUntil)
233 }
234
235 // Add or update worker attached to the given instance. Use
236 // initialState if a new worker is created.
237 //
238 // The second return value is true if a new worker is created.
239 //
240 // Caller must have lock.
241 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
242         id := inst.ID()
243         if wkr := wp.workers[id]; wkr != nil {
244                 wkr.executor.SetTarget(inst)
245                 wkr.instance = inst
246                 wkr.updated = time.Now()
247                 if initialState == StateBooting && wkr.state == StateUnknown {
248                         wkr.state = StateBooting
249                 }
250                 return wkr, false
251         }
252         if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
253                 initialState = StateHold
254         }
255         logger := wp.logger.WithFields(logrus.Fields{
256                 "InstanceType": it.Name,
257                 "Instance":     inst,
258         })
259         logger.WithField("State", initialState).Infof("instance appeared in cloud")
260         now := time.Now()
261         wkr := &worker{
262                 mtx:      &wp.mtx,
263                 wp:       wp,
264                 logger:   logger,
265                 executor: wp.newExecutor(inst),
266                 state:    initialState,
267                 instance: inst,
268                 instType: it,
269                 probed:   now,
270                 busy:     now,
271                 updated:  now,
272                 running:  make(map[string]struct{}),
273                 starting: make(map[string]struct{}),
274                 probing:  make(chan struct{}, 1),
275         }
276         wp.workers[id] = wkr
277         return wkr, true
278 }
279
280 // caller must have lock.
281 func (wp *Pool) notifyExited(uuid string, t time.Time) {
282         wp.exited[uuid] = t
283 }
284
285 // Shutdown shuts down a worker with the given type, or returns false
286 // if all workers with the given type are busy.
287 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
288         wp.setupOnce.Do(wp.setup)
289         wp.mtx.Lock()
290         defer wp.mtx.Unlock()
291         logger := wp.logger.WithField("InstanceType", it.Name)
292         logger.Info("shutdown requested")
293         for _, tryState := range []State{StateBooting, StateIdle} {
294                 // TODO: shutdown the worker with the longest idle
295                 // time (Idle) or the earliest create time (Booting)
296                 for _, wkr := range wp.workers {
297                         if wkr.state == tryState && wkr.instType == it {
298                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
299                                 wkr.shutdown()
300                                 return true
301                         }
302                 }
303         }
304         return false
305 }
306
307 // CountWorkers returns the current number of workers in each state.
308 func (wp *Pool) CountWorkers() map[State]int {
309         wp.setupOnce.Do(wp.setup)
310         wp.mtx.Lock()
311         defer wp.mtx.Unlock()
312         r := map[State]int{}
313         for _, w := range wp.workers {
314                 r[w.state]++
315         }
316         return r
317 }
318
319 // Running returns the container UUIDs being prepared/run on workers.
320 func (wp *Pool) Running() map[string]time.Time {
321         wp.setupOnce.Do(wp.setup)
322         wp.mtx.Lock()
323         defer wp.mtx.Unlock()
324         r := map[string]time.Time{}
325         for _, wkr := range wp.workers {
326                 for uuid := range wkr.running {
327                         r[uuid] = time.Time{}
328                 }
329                 for uuid := range wkr.starting {
330                         r[uuid] = time.Time{}
331                 }
332         }
333         for uuid, exited := range wp.exited {
334                 r[uuid] = exited
335         }
336         return r
337 }
338
339 // StartContainer starts a container on an idle worker immediately if
340 // possible, otherwise returns false.
341 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
342         wp.setupOnce.Do(wp.setup)
343         wp.mtx.Lock()
344         defer wp.mtx.Unlock()
345         var wkr *worker
346         for _, w := range wp.workers {
347                 if w.instType == it && w.state == StateIdle {
348                         if wkr == nil || w.busy.After(wkr.busy) {
349                                 wkr = w
350                         }
351                 }
352         }
353         if wkr == nil {
354                 return false
355         }
356         wkr.startContainer(ctr)
357         return true
358 }
359
360 // KillContainer kills the crunch-run process for the given container
361 // UUID, if it's running on any worker.
362 //
363 // KillContainer returns immediately; the act of killing the container
364 // takes some time, and runs in the background.
365 func (wp *Pool) KillContainer(uuid string) {
366         wp.mtx.Lock()
367         defer wp.mtx.Unlock()
368         if _, ok := wp.exited[uuid]; ok {
369                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
370                 delete(wp.exited, uuid)
371                 return
372         }
373         for _, wkr := range wp.workers {
374                 if _, ok := wkr.running[uuid]; ok {
375                         go wp.kill(wkr, uuid)
376                         return
377                 }
378         }
379         wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
380 }
381
382 func (wp *Pool) kill(wkr *worker, uuid string) {
383         logger := wp.logger.WithFields(logrus.Fields{
384                 "ContainerUUID": uuid,
385                 "Instance":      wkr.instance,
386         })
387         logger.Debug("killing process")
388         stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
389         if err != nil {
390                 logger.WithFields(logrus.Fields{
391                         "stderr": string(stderr),
392                         "stdout": string(stdout),
393                         "error":  err,
394                 }).Warn("kill failed")
395                 return
396         }
397         logger.Debug("killing process succeeded")
398         wp.mtx.Lock()
399         defer wp.mtx.Unlock()
400         if _, ok := wkr.running[uuid]; ok {
401                 delete(wkr.running, uuid)
402                 if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
403                         wkr.state = StateIdle
404                 }
405                 wkr.updated = time.Now()
406                 go wp.notify()
407         }
408 }
409
410 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
411         if reg == nil {
412                 reg = prometheus.NewRegistry()
413         }
414         wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
415                 Namespace: "arvados",
416                 Subsystem: "dispatchcloud",
417                 Name:      "instances_total",
418                 Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
419         })
420         reg.MustRegister(wp.mInstances)
421         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
422                 Namespace: "arvados",
423                 Subsystem: "dispatchcloud",
424                 Name:      "containers_running",
425                 Help:      "Number of containers reported running by cloud VMs.",
426         })
427         reg.MustRegister(wp.mContainersRunning)
428
429         wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
430                 Namespace: "arvados",
431                 Subsystem: "dispatchcloud",
432                 Name:      "vcpus_total",
433                 Help:      "Total VCPUs on all cloud VMs.",
434         })
435         reg.MustRegister(wp.mVCPUs)
436         wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
437                 Namespace: "arvados",
438                 Subsystem: "dispatchcloud",
439                 Name:      "vcpus_inuse",
440                 Help:      "VCPUs on cloud VMs that are running containers.",
441         })
442         reg.MustRegister(wp.mVCPUsInuse)
443         wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
444                 Namespace: "arvados",
445                 Subsystem: "dispatchcloud",
446                 Name:      "memory_bytes_total",
447                 Help:      "Total memory on all cloud VMs.",
448         })
449         reg.MustRegister(wp.mMemory)
450         wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
451                 Namespace: "arvados",
452                 Subsystem: "dispatchcloud",
453                 Name:      "memory_bytes_inuse",
454                 Help:      "Memory on cloud VMs that are running containers.",
455         })
456         reg.MustRegister(wp.mMemoryInuse)
457 }
458
459 func (wp *Pool) runMetrics() {
460         ch := wp.Subscribe()
461         defer wp.Unsubscribe(ch)
462         for range ch {
463                 wp.updateMetrics()
464         }
465 }
466
467 func (wp *Pool) updateMetrics() {
468         wp.mtx.RLock()
469         defer wp.mtx.RUnlock()
470
471         var alloc, cpu, cpuInuse, mem, memInuse int64
472         for _, wkr := range wp.workers {
473                 cpu += int64(wkr.instType.VCPUs)
474                 mem += int64(wkr.instType.RAM)
475                 if len(wkr.running)+len(wkr.starting) == 0 {
476                         continue
477                 }
478                 alloc += int64(len(wkr.running) + len(wkr.starting))
479                 cpuInuse += int64(wkr.instType.VCPUs)
480                 memInuse += int64(wkr.instType.RAM)
481         }
482         wp.mInstances.Set(float64(len(wp.workers)))
483         wp.mContainersRunning.Set(float64(alloc))
484         wp.mVCPUs.Set(float64(cpu))
485         wp.mMemory.Set(float64(mem))
486         wp.mVCPUsInuse.Set(float64(cpuInuse))
487         wp.mMemoryInuse.Set(float64(memInuse))
488 }
489
490 func (wp *Pool) runProbes() {
491         maxPPS := wp.maxProbesPerSecond
492         if maxPPS < 1 {
493                 maxPPS = defaultMaxProbesPerSecond
494         }
495         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
496         defer limitticker.Stop()
497
498         probeticker := time.NewTicker(wp.probeInterval)
499         defer probeticker.Stop()
500
501         workers := []cloud.InstanceID{}
502         for range probeticker.C {
503                 workers = workers[:0]
504                 wp.mtx.Lock()
505                 for id, wkr := range wp.workers {
506                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
507                                 continue
508                         }
509                         workers = append(workers, id)
510                 }
511                 wp.mtx.Unlock()
512
513                 for _, id := range workers {
514                         wp.mtx.Lock()
515                         wkr, ok := wp.workers[id]
516                         wp.mtx.Unlock()
517                         if !ok {
518                                 // Deleted while we were probing
519                                 // others
520                                 continue
521                         }
522                         go wkr.ProbeAndUpdate()
523                         select {
524                         case <-wp.stop:
525                                 return
526                         case <-limitticker.C:
527                         }
528                 }
529         }
530 }
531
532 func (wp *Pool) runSync() {
533         // sync once immediately, then wait syncInterval, sync again,
534         // etc.
535         timer := time.NewTimer(1)
536         for {
537                 select {
538                 case <-timer.C:
539                         err := wp.getInstancesAndSync()
540                         if err != nil {
541                                 wp.logger.WithError(err).Warn("sync failed")
542                         }
543                         timer.Reset(wp.syncInterval)
544                 case <-wp.stop:
545                         wp.logger.Debug("worker.Pool stopped")
546                         return
547                 }
548         }
549 }
550
551 // Stop synchronizing with the InstanceSet.
552 func (wp *Pool) Stop() {
553         wp.setupOnce.Do(wp.setup)
554         close(wp.stop)
555 }
556
557 // Instances returns an InstanceView for each worker in the pool,
558 // summarizing its current state and recent activity.
559 func (wp *Pool) Instances() []InstanceView {
560         var r []InstanceView
561         wp.setupOnce.Do(wp.setup)
562         wp.mtx.Lock()
563         for _, w := range wp.workers {
564                 r = append(r, InstanceView{
565                         Instance:             w.instance.String(),
566                         Price:                w.instType.Price,
567                         ArvadosInstanceType:  w.instType.Name,
568                         ProviderInstanceType: w.instType.ProviderType,
569                         LastContainerUUID:    w.lastUUID,
570                         LastBusy:             w.busy,
571                         WorkerState:          w.state.String(),
572                 })
573         }
574         wp.mtx.Unlock()
575         sort.Slice(r, func(i, j int) bool {
576                 return strings.Compare(r[i].Instance, r[j].Instance) < 0
577         })
578         return r
579 }
580
581 func (wp *Pool) setup() {
582         wp.creating = map[arvados.InstanceType]int{}
583         wp.exited = map[string]time.Time{}
584         wp.workers = map[cloud.InstanceID]*worker{}
585         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
586 }
587
588 func (wp *Pool) notify() {
589         wp.mtx.RLock()
590         defer wp.mtx.RUnlock()
591         for _, send := range wp.subscribers {
592                 select {
593                 case send <- struct{}{}:
594                 default:
595                 }
596         }
597 }
598
599 func (wp *Pool) getInstancesAndSync() error {
600         wp.setupOnce.Do(wp.setup)
601         wp.logger.Debug("getting instance list")
602         threshold := time.Now()
603         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
604         if err != nil {
605                 return err
606         }
607         wp.sync(threshold, instances)
608         wp.logger.Debug("sync done")
609         return nil
610 }
611
612 // Add/remove/update workers based on instances, which was obtained
613 // from the instanceSet. However, don't clobber any other updates that
614 // already happened after threshold.
615 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
616         wp.mtx.Lock()
617         defer wp.mtx.Unlock()
618         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
619         notify := false
620
621         for _, inst := range instances {
622                 itTag := inst.Tags()[tagKeyInstanceType]
623                 it, ok := wp.instanceTypes[itTag]
624                 if !ok {
625                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
626                         continue
627                 }
628                 if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
629                         notify = true
630                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
631                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
632                         wkr.shutdown()
633                 }
634         }
635
636         for id, wkr := range wp.workers {
637                 if wkr.updated.After(threshold) {
638                         continue
639                 }
640                 logger := wp.logger.WithFields(logrus.Fields{
641                         "Instance":    wkr.instance,
642                         "WorkerState": wkr.state,
643                 })
644                 logger.Info("instance disappeared in cloud")
645                 delete(wp.workers, id)
646                 go wkr.executor.Close()
647                 notify = true
648         }
649
650         if !wp.loaded {
651                 wp.loaded = true
652                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
653         }
654
655         if notify {
656                 go wp.notify()
657         }
658 }