16739: Fix using two different throttles.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:                     logger,
100                 arvClient:                  arvClient,
101                 instanceSetID:              instanceSetID,
102                 instanceSet:                &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:                newExecutor,
104                 bootProbeCommand:           cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:               cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:                    cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:              cluster.InstanceTypes,
108                 maxProbesPerSecond:         cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 maxConcurrentNodeCreateOps: cluster.Containers.CloudVMs.MaxConcurrentNodeCreateOps,
110                 probeInterval:              duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111                 syncInterval:               duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112                 timeoutIdle:                duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113                 timeoutBooting:             duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114                 timeoutProbe:               duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115                 timeoutShutdown:            duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116                 timeoutTERM:                duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117                 timeoutSignal:              duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118                 installPublicKey:           installPublicKey,
119                 tagKeyPrefix:               cluster.Containers.CloudVMs.TagKeyPrefix,
120                 stop:                       make(chan bool),
121         }
122         wp.registerMetrics(reg)
123         go func() {
124                 wp.setupOnce.Do(wp.setup)
125                 go wp.runMetrics()
126                 go wp.runProbes()
127                 go wp.runSync()
128         }()
129         return wp
130 }
131
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
134 type Pool struct {
135         // configuration
136         logger                     logrus.FieldLogger
137         arvClient                  *arvados.Client
138         instanceSetID              cloud.InstanceSetID
139         instanceSet                *throttledInstanceSet
140         newExecutor                func(cloud.Instance) Executor
141         bootProbeCommand           string
142         runnerSource               string
143         imageID                    cloud.ImageID
144         instanceTypes              map[string]arvados.InstanceType
145         syncInterval               time.Duration
146         probeInterval              time.Duration
147         maxProbesPerSecond         int
148         maxConcurrentNodeCreateOps int
149         timeoutIdle                time.Duration
150         timeoutBooting             time.Duration
151         timeoutProbe               time.Duration
152         timeoutShutdown            time.Duration
153         timeoutTERM                time.Duration
154         timeoutSignal              time.Duration
155         installPublicKey           ssh.PublicKey
156         tagKeyPrefix               string
157
158         // private state
159         subscribers  map[<-chan struct{}]chan<- struct{}
160         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161         workers      map[cloud.InstanceID]*worker
162         loaded       bool                 // loaded list of instances from InstanceSet at least once
163         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164         atQuotaUntil time.Time
165         atQuotaErr   cloud.QuotaError
166         stop         chan bool
167         mtx          sync.RWMutex
168         setupOnce    sync.Once
169         runnerData   []byte
170         runnerMD5    [md5.Size]byte
171         runnerCmd    string
172
173         mContainersRunning prometheus.Gauge
174         mInstances         *prometheus.GaugeVec
175         mInstancesPrice    *prometheus.GaugeVec
176         mVCPUs             *prometheus.GaugeVec
177         mMemory            *prometheus.GaugeVec
178         mBootOutcomes      *prometheus.CounterVec
179         mDisappearances    *prometheus.CounterVec
180 }
181
182 type createCall struct {
183         time         time.Time
184         instanceType arvados.InstanceType
185 }
186
187 func (wp *Pool) CheckHealth() error {
188         wp.setupOnce.Do(wp.setup)
189         if err := wp.loadRunnerData(); err != nil {
190                 return fmt.Errorf("error loading runner binary: %s", err)
191         }
192         return nil
193 }
194
195 // Subscribe returns a buffered channel that becomes ready after any
196 // change to the pool's state that could have scheduling implications:
197 // a worker's state changes, a new worker appears, the cloud
198 // provider's API rate limiting period ends, etc.
199 //
200 // Additional events that occur while the channel is already ready
201 // will be dropped, so it is OK if the caller services the channel
202 // slowly.
203 //
204 // Example:
205 //
206 //      ch := wp.Subscribe()
207 //      defer wp.Unsubscribe(ch)
208 //      for range ch {
209 //              tryScheduling(wp)
210 //              if done {
211 //                      break
212 //              }
213 //      }
214 func (wp *Pool) Subscribe() <-chan struct{} {
215         wp.setupOnce.Do(wp.setup)
216         wp.mtx.Lock()
217         defer wp.mtx.Unlock()
218         ch := make(chan struct{}, 1)
219         wp.subscribers[ch] = ch
220         return ch
221 }
222
223 // Unsubscribe stops sending updates to the given channel.
224 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
225         wp.setupOnce.Do(wp.setup)
226         wp.mtx.Lock()
227         defer wp.mtx.Unlock()
228         delete(wp.subscribers, ch)
229 }
230
231 // Unallocated returns the number of unallocated (creating + booting +
232 // idle + unknown) workers for each instance type.  Workers in
233 // hold/drain mode are not included.
234 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
235         wp.setupOnce.Do(wp.setup)
236         wp.mtx.RLock()
237         defer wp.mtx.RUnlock()
238         unalloc := map[arvados.InstanceType]int{}
239         creating := map[arvados.InstanceType]int{}
240         oldestCreate := map[arvados.InstanceType]time.Time{}
241         for _, cc := range wp.creating {
242                 it := cc.instanceType
243                 creating[it]++
244                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
245                         oldestCreate[it] = cc.time
246                 }
247         }
248         for _, wkr := range wp.workers {
249                 // Skip workers that are not expected to become
250                 // available soon. Note len(wkr.running)>0 is not
251                 // redundant here: it can be true even in
252                 // StateUnknown.
253                 if wkr.state == StateShutdown ||
254                         wkr.state == StateRunning ||
255                         wkr.idleBehavior != IdleBehaviorRun ||
256                         len(wkr.running) > 0 {
257                         continue
258                 }
259                 it := wkr.instType
260                 unalloc[it]++
261                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
262                         // If up to N new workers appear in
263                         // Instances() while we are waiting for N
264                         // Create() calls to complete, we assume we're
265                         // just seeing a race between Instances() and
266                         // Create() responses.
267                         //
268                         // The other common reason why nodes have
269                         // state==Unknown is that they appeared at
270                         // startup, before any Create calls. They
271                         // don't match the above timing condition, so
272                         // we never mistakenly attribute them to
273                         // pending Create calls.
274                         creating[it]--
275                 }
276         }
277         for it, c := range creating {
278                 unalloc[it] += c
279         }
280         return unalloc
281 }
282
283 type RateLimitError struct{ Retry time.Time }
284
285 func (e RateLimitError) Error() string {
286         return fmt.Sprintf("node creation request failed, hit maxConcurrentNodeCreateOps, wait until %s", e.Retry)
287 }
288 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
289
290 // Create a new instance with the given type, and add it to the worker
291 // pool. The worker is added immediately; instance creation runs in
292 // the background.
293 //
294 // Create returns false if a pre-existing error state prevents it from
295 // even attempting to create a new instance. Those errors are logged
296 // by the Pool, so the caller does not need to log anything in such
297 // cases.
298 func (wp *Pool) Create(it arvados.InstanceType) bool {
299         logger := wp.logger.WithField("InstanceType", it.Name)
300         wp.setupOnce.Do(wp.setup)
301         if wp.loadRunnerData() != nil {
302                 // Boot probe is certain to fail.
303                 return false
304         }
305         wp.mtx.Lock()
306         defer wp.mtx.Unlock()
307         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
308                 return false
309         }
310         // The maxConcurrentNodeCreateOps knob throttles the number of node create
311         // requests in flight. It was added to work around a limitation in Azure's
312         // managed disks, which support no more than 20 concurrent node creation
313         // requests from a single disk image (cf.
314         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
315         // The code assumes that node creation, from Azure's perspective, means the
316         // period until the instance appears in the "get all instances" list.
317         if wp.maxConcurrentNodeCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentNodeCreateOps {
318                 wp.instanceSet.throttleCreate.CheckRateLimitError(RateLimitError{Retry: time.Now().Add(5 * time.Second)}, wp.logger, "create instance", wp.notify)
319                 return false
320         }
321         now := time.Now()
322         secret := randomHex(instanceSecretLength)
323         wp.creating[secret] = createCall{time: now, instanceType: it}
324         go func() {
325                 defer wp.notify()
326                 tags := cloud.InstanceTags{
327                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
328                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
329                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
330                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
331                 }
332                 initCmd := TagVerifier{nil, secret}.InitCommand()
333                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
334                 wp.mtx.Lock()
335                 defer wp.mtx.Unlock()
336                 // delete() is deferred so the updateWorker() call
337                 // below knows to use StateBooting when adding a new
338                 // worker.
339                 defer delete(wp.creating, secret)
340                 if err != nil {
341                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
342                                 wp.atQuotaErr = err
343                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
344                                 time.AfterFunc(quotaErrorTTL, wp.notify)
345                         }
346                         logger.WithError(err).Error("create failed")
347                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
348                         return
349                 }
350                 wp.updateWorker(inst, it)
351         }()
352         return true
353 }
354
355 // AtQuota returns true if Create is not expected to work at the
356 // moment.
357 func (wp *Pool) AtQuota() bool {
358         wp.mtx.Lock()
359         defer wp.mtx.Unlock()
360         return time.Now().Before(wp.atQuotaUntil)
361 }
362
363 // SetIdleBehavior determines how the indicated instance will behave
364 // when it has no containers running.
365 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
366         wp.mtx.Lock()
367         defer wp.mtx.Unlock()
368         wkr, ok := wp.workers[id]
369         if !ok {
370                 return errors.New("requested instance does not exist")
371         }
372         wkr.setIdleBehavior(idleBehavior)
373         return nil
374 }
375
376 // Add or update worker attached to the given instance.
377 //
378 // The second return value is true if a new worker is created.
379 //
380 // A newly added instance has state=StateBooting if its tags match an
381 // entry in wp.creating, otherwise StateUnknown.
382 //
383 // Caller must have lock.
384 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
385         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
386         inst = TagVerifier{inst, secret}
387         id := inst.ID()
388         if wkr := wp.workers[id]; wkr != nil {
389                 wkr.executor.SetTarget(inst)
390                 wkr.instance = inst
391                 wkr.updated = time.Now()
392                 wkr.saveTags()
393                 return wkr, false
394         }
395
396         state := StateUnknown
397         if _, ok := wp.creating[secret]; ok {
398                 state = StateBooting
399         }
400
401         // If an instance has a valid IdleBehavior tag when it first
402         // appears, initialize the new worker accordingly (this is how
403         // we restore IdleBehavior that was set by a prior dispatch
404         // process); otherwise, default to "run". After this,
405         // wkr.idleBehavior is the source of truth, and will only be
406         // changed via SetIdleBehavior().
407         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
408         if !validIdleBehavior[idleBehavior] {
409                 idleBehavior = IdleBehaviorRun
410         }
411
412         logger := wp.logger.WithFields(logrus.Fields{
413                 "InstanceType": it.Name,
414                 "Instance":     inst.ID(),
415                 "Address":      inst.Address(),
416         })
417         logger.WithFields(logrus.Fields{
418                 "State":        state,
419                 "IdleBehavior": idleBehavior,
420         }).Infof("instance appeared in cloud")
421         now := time.Now()
422         wkr := &worker{
423                 mtx:          &wp.mtx,
424                 wp:           wp,
425                 logger:       logger,
426                 executor:     wp.newExecutor(inst),
427                 state:        state,
428                 idleBehavior: idleBehavior,
429                 instance:     inst,
430                 instType:     it,
431                 appeared:     now,
432                 probed:       now,
433                 busy:         now,
434                 updated:      now,
435                 running:      make(map[string]*remoteRunner),
436                 starting:     make(map[string]*remoteRunner),
437                 probing:      make(chan struct{}, 1),
438         }
439         wp.workers[id] = wkr
440         return wkr, true
441 }
442
443 // Shutdown shuts down a worker with the given type, or returns false
444 // if all workers with the given type are busy.
445 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
446         wp.setupOnce.Do(wp.setup)
447         wp.mtx.Lock()
448         defer wp.mtx.Unlock()
449         logger := wp.logger.WithField("InstanceType", it.Name)
450         logger.Info("shutdown requested")
451         for _, tryState := range []State{StateBooting, StateIdle} {
452                 // TODO: shutdown the worker with the longest idle
453                 // time (Idle) or the earliest create time (Booting)
454                 for _, wkr := range wp.workers {
455                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
456                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
457                                 wkr.reportBootOutcome(BootOutcomeAborted)
458                                 wkr.shutdown()
459                                 return true
460                         }
461                 }
462         }
463         return false
464 }
465
466 // CountWorkers returns the current number of workers in each state.
467 //
468 // CountWorkers blocks, if necessary, until the initial instance list
469 // has been loaded from the cloud provider.
470 func (wp *Pool) CountWorkers() map[State]int {
471         wp.setupOnce.Do(wp.setup)
472         wp.waitUntilLoaded()
473         wp.mtx.Lock()
474         defer wp.mtx.Unlock()
475         r := map[State]int{}
476         for _, w := range wp.workers {
477                 r[w.state]++
478         }
479         return r
480 }
481
482 // Running returns the container UUIDs being prepared/run on workers.
483 //
484 // In the returned map, the time value indicates when the Pool
485 // observed that the container process had exited. A container that
486 // has not yet exited has a zero time value. The caller should use
487 // ForgetContainer() to garbage-collect the entries for exited
488 // containers.
489 func (wp *Pool) Running() map[string]time.Time {
490         wp.setupOnce.Do(wp.setup)
491         wp.mtx.Lock()
492         defer wp.mtx.Unlock()
493         r := map[string]time.Time{}
494         for _, wkr := range wp.workers {
495                 for uuid := range wkr.running {
496                         r[uuid] = time.Time{}
497                 }
498                 for uuid := range wkr.starting {
499                         r[uuid] = time.Time{}
500                 }
501         }
502         for uuid, exited := range wp.exited {
503                 r[uuid] = exited
504         }
505         return r
506 }
507
508 // StartContainer starts a container on an idle worker immediately if
509 // possible, otherwise returns false.
510 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
511         wp.setupOnce.Do(wp.setup)
512         wp.mtx.Lock()
513         defer wp.mtx.Unlock()
514         var wkr *worker
515         for _, w := range wp.workers {
516                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
517                         if wkr == nil || w.busy.After(wkr.busy) {
518                                 wkr = w
519                         }
520                 }
521         }
522         if wkr == nil {
523                 return false
524         }
525         wkr.startContainer(ctr)
526         return true
527 }
528
529 // KillContainer kills the crunch-run process for the given container
530 // UUID, if it's running on any worker.
531 //
532 // KillContainer returns immediately; the act of killing the container
533 // takes some time, and runs in the background.
534 //
535 // KillContainer returns false if the container has already ended.
536 func (wp *Pool) KillContainer(uuid string, reason string) bool {
537         wp.mtx.Lock()
538         defer wp.mtx.Unlock()
539         logger := wp.logger.WithFields(logrus.Fields{
540                 "ContainerUUID": uuid,
541                 "Reason":        reason,
542         })
543         for _, wkr := range wp.workers {
544                 rr := wkr.running[uuid]
545                 if rr == nil {
546                         rr = wkr.starting[uuid]
547                 }
548                 if rr != nil {
549                         rr.Kill(reason)
550                         return true
551                 }
552         }
553         logger.Debug("cannot kill: already disappeared")
554         return false
555 }
556
557 // ForgetContainer clears the placeholder for the given exited
558 // container, so it isn't returned by subsequent calls to Running().
559 //
560 // ForgetContainer has no effect if the container has not yet exited.
561 //
562 // The "container exited at time T" placeholder (which necessitates
563 // ForgetContainer) exists to make it easier for the caller
564 // (scheduler) to distinguish a container that exited without
565 // finalizing its state from a container that exited too recently for
566 // its final state to have appeared in the scheduler's queue cache.
567 func (wp *Pool) ForgetContainer(uuid string) {
568         wp.mtx.Lock()
569         defer wp.mtx.Unlock()
570         if _, ok := wp.exited[uuid]; ok {
571                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
572                 delete(wp.exited, uuid)
573         }
574 }
575
576 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
577         if reg == nil {
578                 reg = prometheus.NewRegistry()
579         }
580         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
581                 Namespace: "arvados",
582                 Subsystem: "dispatchcloud",
583                 Name:      "containers_running",
584                 Help:      "Number of containers reported running by cloud VMs.",
585         })
586         reg.MustRegister(wp.mContainersRunning)
587         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
588                 Namespace: "arvados",
589                 Subsystem: "dispatchcloud",
590                 Name:      "instances_total",
591                 Help:      "Number of cloud VMs.",
592         }, []string{"category", "instance_type"})
593         reg.MustRegister(wp.mInstances)
594         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
595                 Namespace: "arvados",
596                 Subsystem: "dispatchcloud",
597                 Name:      "instances_price",
598                 Help:      "Price of cloud VMs.",
599         }, []string{"category"})
600         reg.MustRegister(wp.mInstancesPrice)
601         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
602                 Namespace: "arvados",
603                 Subsystem: "dispatchcloud",
604                 Name:      "vcpus_total",
605                 Help:      "Total VCPUs on all cloud VMs.",
606         }, []string{"category"})
607         reg.MustRegister(wp.mVCPUs)
608         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
609                 Namespace: "arvados",
610                 Subsystem: "dispatchcloud",
611                 Name:      "memory_bytes_total",
612                 Help:      "Total memory on all cloud VMs.",
613         }, []string{"category"})
614         reg.MustRegister(wp.mMemory)
615         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
616                 Namespace: "arvados",
617                 Subsystem: "dispatchcloud",
618                 Name:      "boot_outcomes",
619                 Help:      "Boot outcomes by type.",
620         }, []string{"outcome"})
621         for k := range validBootOutcomes {
622                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
623         }
624         reg.MustRegister(wp.mBootOutcomes)
625         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
626                 Namespace: "arvados",
627                 Subsystem: "dispatchcloud",
628                 Name:      "instances_disappeared",
629                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
630         }, []string{"state"})
631         for _, v := range stateString {
632                 wp.mDisappearances.WithLabelValues(v).Add(0)
633         }
634         reg.MustRegister(wp.mDisappearances)
635 }
636
637 func (wp *Pool) runMetrics() {
638         ch := wp.Subscribe()
639         defer wp.Unsubscribe(ch)
640         wp.updateMetrics()
641         for range ch {
642                 wp.updateMetrics()
643         }
644 }
645
646 func (wp *Pool) updateMetrics() {
647         wp.mtx.RLock()
648         defer wp.mtx.RUnlock()
649
650         type entKey struct {
651                 cat      string
652                 instType string
653         }
654         instances := map[entKey]int64{}
655         price := map[string]float64{}
656         cpu := map[string]int64{}
657         mem := map[string]int64{}
658         var running int64
659         for _, wkr := range wp.workers {
660                 var cat string
661                 switch {
662                 case len(wkr.running)+len(wkr.starting) > 0:
663                         cat = "inuse"
664                 case wkr.idleBehavior == IdleBehaviorHold:
665                         cat = "hold"
666                 case wkr.state == StateBooting:
667                         cat = "booting"
668                 case wkr.state == StateUnknown:
669                         cat = "unknown"
670                 default:
671                         cat = "idle"
672                 }
673                 instances[entKey{cat, wkr.instType.Name}]++
674                 price[cat] += wkr.instType.Price
675                 cpu[cat] += int64(wkr.instType.VCPUs)
676                 mem[cat] += int64(wkr.instType.RAM)
677                 running += int64(len(wkr.running) + len(wkr.starting))
678         }
679         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
680                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
681                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
682                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
683                 // make sure to reset gauges for non-existing category/nodetype combinations
684                 for _, it := range wp.instanceTypes {
685                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
686                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
687                         }
688                 }
689         }
690         for k, v := range instances {
691                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
692         }
693         wp.mContainersRunning.Set(float64(running))
694 }
695
696 func (wp *Pool) runProbes() {
697         maxPPS := wp.maxProbesPerSecond
698         if maxPPS < 1 {
699                 maxPPS = defaultMaxProbesPerSecond
700         }
701         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
702         defer limitticker.Stop()
703
704         probeticker := time.NewTicker(wp.probeInterval)
705         defer probeticker.Stop()
706
707         workers := []cloud.InstanceID{}
708         for range probeticker.C {
709                 workers = workers[:0]
710                 wp.mtx.Lock()
711                 for id, wkr := range wp.workers {
712                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
713                                 continue
714                         }
715                         workers = append(workers, id)
716                 }
717                 wp.mtx.Unlock()
718
719                 for _, id := range workers {
720                         wp.mtx.Lock()
721                         wkr, ok := wp.workers[id]
722                         wp.mtx.Unlock()
723                         if !ok {
724                                 // Deleted while we were probing
725                                 // others
726                                 continue
727                         }
728                         go wkr.ProbeAndUpdate()
729                         select {
730                         case <-wp.stop:
731                                 return
732                         case <-limitticker.C:
733                         }
734                 }
735         }
736 }
737
738 func (wp *Pool) runSync() {
739         // sync once immediately, then wait syncInterval, sync again,
740         // etc.
741         timer := time.NewTimer(1)
742         for {
743                 select {
744                 case <-timer.C:
745                         err := wp.getInstancesAndSync()
746                         if err != nil {
747                                 wp.logger.WithError(err).Warn("sync failed")
748                         }
749                         timer.Reset(wp.syncInterval)
750                 case <-wp.stop:
751                         wp.logger.Debug("worker.Pool stopped")
752                         return
753                 }
754         }
755 }
756
757 // Stop synchronizing with the InstanceSet.
758 func (wp *Pool) Stop() {
759         wp.setupOnce.Do(wp.setup)
760         close(wp.stop)
761 }
762
763 // Instances returns an InstanceView for each worker in the pool,
764 // summarizing its current state and recent activity.
765 func (wp *Pool) Instances() []InstanceView {
766         var r []InstanceView
767         wp.setupOnce.Do(wp.setup)
768         wp.mtx.Lock()
769         for _, w := range wp.workers {
770                 r = append(r, InstanceView{
771                         Instance:             w.instance.ID(),
772                         Address:              w.instance.Address(),
773                         Price:                w.instType.Price,
774                         ArvadosInstanceType:  w.instType.Name,
775                         ProviderInstanceType: w.instType.ProviderType,
776                         LastContainerUUID:    w.lastUUID,
777                         LastBusy:             w.busy,
778                         WorkerState:          w.state.String(),
779                         IdleBehavior:         w.idleBehavior,
780                 })
781         }
782         wp.mtx.Unlock()
783         sort.Slice(r, func(i, j int) bool {
784                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
785         })
786         return r
787 }
788
789 // KillInstance destroys a cloud VM instance. It returns an error if
790 // the given instance does not exist.
791 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
792         wkr, ok := wp.workers[id]
793         if !ok {
794                 return errors.New("instance not found")
795         }
796         wkr.logger.WithField("Reason", reason).Info("shutting down")
797         wkr.reportBootOutcome(BootOutcomeAborted)
798         wkr.shutdown()
799         return nil
800 }
801
802 func (wp *Pool) setup() {
803         wp.creating = map[string]createCall{}
804         wp.exited = map[string]time.Time{}
805         wp.workers = map[cloud.InstanceID]*worker{}
806         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
807         wp.loadRunnerData()
808 }
809
810 // Load the runner program to be deployed on worker nodes into
811 // wp.runnerData, if necessary. Errors are logged.
812 //
813 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
814 //
815 // Caller must not have lock.
816 func (wp *Pool) loadRunnerData() error {
817         wp.mtx.Lock()
818         defer wp.mtx.Unlock()
819         if wp.runnerData != nil {
820                 return nil
821         } else if wp.runnerSource == "" {
822                 wp.runnerCmd = "crunch-run"
823                 wp.runnerData = []byte{}
824                 return nil
825         }
826         logger := wp.logger.WithField("source", wp.runnerSource)
827         logger.Debug("loading runner")
828         buf, err := ioutil.ReadFile(wp.runnerSource)
829         if err != nil {
830                 logger.WithError(err).Error("failed to load runner program")
831                 return err
832         }
833         wp.runnerData = buf
834         wp.runnerMD5 = md5.Sum(buf)
835         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
836         return nil
837 }
838
839 func (wp *Pool) notify() {
840         wp.mtx.RLock()
841         defer wp.mtx.RUnlock()
842         for _, send := range wp.subscribers {
843                 select {
844                 case send <- struct{}{}:
845                 default:
846                 }
847         }
848 }
849
850 func (wp *Pool) getInstancesAndSync() error {
851         wp.setupOnce.Do(wp.setup)
852         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
853                 return err
854         }
855         wp.logger.Debug("getting instance list")
856         threshold := time.Now()
857         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
858         if err != nil {
859                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
860                 return err
861         }
862         wp.sync(threshold, instances)
863         wp.logger.Debug("sync done")
864         return nil
865 }
866
867 // Add/remove/update workers based on instances, which was obtained
868 // from the instanceSet. However, don't clobber any other updates that
869 // already happened after threshold.
870 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
871         wp.mtx.Lock()
872         defer wp.mtx.Unlock()
873         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
874         notify := false
875
876         for _, inst := range instances {
877                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
878                 it, ok := wp.instanceTypes[itTag]
879                 if !ok {
880                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
881                         continue
882                 }
883                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
884                         notify = true
885                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
886                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
887                         wkr.shutdown()
888                 }
889         }
890
891         for id, wkr := range wp.workers {
892                 if wkr.updated.After(threshold) {
893                         continue
894                 }
895                 logger := wp.logger.WithFields(logrus.Fields{
896                         "Instance":    wkr.instance.ID(),
897                         "WorkerState": wkr.state,
898                 })
899                 logger.Info("instance disappeared in cloud")
900                 wkr.reportBootOutcome(BootOutcomeDisappeared)
901                 if wp.mDisappearances != nil {
902                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
903                 }
904                 delete(wp.workers, id)
905                 go wkr.Close()
906                 notify = true
907         }
908
909         if !wp.loaded {
910                 notify = true
911                 wp.loaded = true
912                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
913         }
914
915         if notify {
916                 go wp.notify()
917         }
918 }
919
920 func (wp *Pool) waitUntilLoaded() {
921         ch := wp.Subscribe()
922         wp.mtx.RLock()
923         defer wp.mtx.RUnlock()
924         for !wp.loaded {
925                 wp.mtx.RUnlock()
926                 <-ch
927                 wp.mtx.RLock()
928         }
929 }
930
931 // Return a random string of n hexadecimal digits (n*4 random bits). n
932 // must be even.
933 func randomHex(n int) string {
934         buf := make([]byte, n/2)
935         _, err := rand.Read(buf)
936         if err != nil {
937                 panic(err)
938         }
939         return fmt.Sprintf("%x", buf)
940 }