15944: Merge branch 'master' into 15944-arvbox-publicdev-fix
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:             logger,
100                 arvClient:          arvClient,
101                 instanceSetID:      instanceSetID,
102                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:        newExecutor,
104                 bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:       cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:      cluster.InstanceTypes,
108                 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
110                 syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
111                 timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
112                 timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
113                 timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
114                 timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
115                 timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
116                 timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
117                 installPublicKey:   installPublicKey,
118                 tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
119                 stop:               make(chan bool),
120         }
121         wp.registerMetrics(reg)
122         go func() {
123                 wp.setupOnce.Do(wp.setup)
124                 go wp.runMetrics()
125                 go wp.runProbes()
126                 go wp.runSync()
127         }()
128         return wp
129 }
130
131 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
132 // zero Pool should not be used. Call NewPool to create a new Pool.
133 type Pool struct {
134         // configuration
135         logger             logrus.FieldLogger
136         arvClient          *arvados.Client
137         instanceSetID      cloud.InstanceSetID
138         instanceSet        *throttledInstanceSet
139         newExecutor        func(cloud.Instance) Executor
140         bootProbeCommand   string
141         runnerSource       string
142         imageID            cloud.ImageID
143         instanceTypes      map[string]arvados.InstanceType
144         syncInterval       time.Duration
145         probeInterval      time.Duration
146         maxProbesPerSecond int
147         timeoutIdle        time.Duration
148         timeoutBooting     time.Duration
149         timeoutProbe       time.Duration
150         timeoutShutdown    time.Duration
151         timeoutTERM        time.Duration
152         timeoutSignal      time.Duration
153         installPublicKey   ssh.PublicKey
154         tagKeyPrefix       string
155
156         // private state
157         subscribers  map[<-chan struct{}]chan<- struct{}
158         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
159         workers      map[cloud.InstanceID]*worker
160         loaded       bool                 // loaded list of instances from InstanceSet at least once
161         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
162         atQuotaUntil time.Time
163         atQuotaErr   cloud.QuotaError
164         stop         chan bool
165         mtx          sync.RWMutex
166         setupOnce    sync.Once
167         runnerData   []byte
168         runnerMD5    [md5.Size]byte
169         runnerCmd    string
170
171         throttleCreate    throttle
172         throttleInstances throttle
173
174         mContainersRunning prometheus.Gauge
175         mInstances         *prometheus.GaugeVec
176         mInstancesPrice    *prometheus.GaugeVec
177         mVCPUs             *prometheus.GaugeVec
178         mMemory            *prometheus.GaugeVec
179         mDisappearances    *prometheus.CounterVec
180 }
181
182 type createCall struct {
183         time         time.Time
184         instanceType arvados.InstanceType
185 }
186
187 func (wp *Pool) CheckHealth() error {
188         wp.setupOnce.Do(wp.setup)
189         if err := wp.loadRunnerData(); err != nil {
190                 return fmt.Errorf("error loading runner binary: %s", err)
191         }
192         return nil
193 }
194
195 // Subscribe returns a buffered channel that becomes ready after any
196 // change to the pool's state that could have scheduling implications:
197 // a worker's state changes, a new worker appears, the cloud
198 // provider's API rate limiting period ends, etc.
199 //
200 // Additional events that occur while the channel is already ready
201 // will be dropped, so it is OK if the caller services the channel
202 // slowly.
203 //
204 // Example:
205 //
206 //      ch := wp.Subscribe()
207 //      defer wp.Unsubscribe(ch)
208 //      for range ch {
209 //              tryScheduling(wp)
210 //              if done {
211 //                      break
212 //              }
213 //      }
214 func (wp *Pool) Subscribe() <-chan struct{} {
215         wp.setupOnce.Do(wp.setup)
216         wp.mtx.Lock()
217         defer wp.mtx.Unlock()
218         ch := make(chan struct{}, 1)
219         wp.subscribers[ch] = ch
220         return ch
221 }
222
223 // Unsubscribe stops sending updates to the given channel.
224 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
225         wp.setupOnce.Do(wp.setup)
226         wp.mtx.Lock()
227         defer wp.mtx.Unlock()
228         delete(wp.subscribers, ch)
229 }
230
231 // Unallocated returns the number of unallocated (creating + booting +
232 // idle + unknown) workers for each instance type.  Workers in
233 // hold/drain mode are not included.
234 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
235         wp.setupOnce.Do(wp.setup)
236         wp.mtx.RLock()
237         defer wp.mtx.RUnlock()
238         unalloc := map[arvados.InstanceType]int{}
239         creating := map[arvados.InstanceType]int{}
240         oldestCreate := map[arvados.InstanceType]time.Time{}
241         for _, cc := range wp.creating {
242                 it := cc.instanceType
243                 creating[it]++
244                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
245                         oldestCreate[it] = cc.time
246                 }
247         }
248         for _, wkr := range wp.workers {
249                 // Skip workers that are not expected to become
250                 // available soon. Note len(wkr.running)>0 is not
251                 // redundant here: it can be true even in
252                 // StateUnknown.
253                 if wkr.state == StateShutdown ||
254                         wkr.state == StateRunning ||
255                         wkr.idleBehavior != IdleBehaviorRun ||
256                         len(wkr.running) > 0 {
257                         continue
258                 }
259                 it := wkr.instType
260                 unalloc[it]++
261                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
262                         // If up to N new workers appear in
263                         // Instances() while we are waiting for N
264                         // Create() calls to complete, we assume we're
265                         // just seeing a race between Instances() and
266                         // Create() responses.
267                         //
268                         // The other common reason why nodes have
269                         // state==Unknown is that they appeared at
270                         // startup, before any Create calls. They
271                         // don't match the above timing condition, so
272                         // we never mistakenly attribute them to
273                         // pending Create calls.
274                         creating[it]--
275                 }
276         }
277         for it, c := range creating {
278                 unalloc[it] += c
279         }
280         return unalloc
281 }
282
283 // Create a new instance with the given type, and add it to the worker
284 // pool. The worker is added immediately; instance creation runs in
285 // the background.
286 //
287 // Create returns false if a pre-existing error state prevents it from
288 // even attempting to create a new instance. Those errors are logged
289 // by the Pool, so the caller does not need to log anything in such
290 // cases.
291 func (wp *Pool) Create(it arvados.InstanceType) bool {
292         logger := wp.logger.WithField("InstanceType", it.Name)
293         wp.setupOnce.Do(wp.setup)
294         if wp.loadRunnerData() != nil {
295                 // Boot probe is certain to fail.
296                 return false
297         }
298         wp.mtx.Lock()
299         defer wp.mtx.Unlock()
300         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
301                 return false
302         }
303         now := time.Now()
304         secret := randomHex(instanceSecretLength)
305         wp.creating[secret] = createCall{time: now, instanceType: it}
306         go func() {
307                 defer wp.notify()
308                 tags := cloud.InstanceTags{
309                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
310                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
311                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
312                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
313                 }
314                 initCmd := TagVerifier{nil, secret}.InitCommand()
315                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
316                 wp.mtx.Lock()
317                 defer wp.mtx.Unlock()
318                 // delete() is deferred so the updateWorker() call
319                 // below knows to use StateBooting when adding a new
320                 // worker.
321                 defer delete(wp.creating, secret)
322                 if err != nil {
323                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
324                                 wp.atQuotaErr = err
325                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
326                                 time.AfterFunc(quotaErrorTTL, wp.notify)
327                         }
328                         logger.WithError(err).Error("create failed")
329                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
330                         return
331                 }
332                 wp.updateWorker(inst, it)
333         }()
334         return true
335 }
336
337 // AtQuota returns true if Create is not expected to work at the
338 // moment.
339 func (wp *Pool) AtQuota() bool {
340         wp.mtx.Lock()
341         defer wp.mtx.Unlock()
342         return time.Now().Before(wp.atQuotaUntil)
343 }
344
345 // SetIdleBehavior determines how the indicated instance will behave
346 // when it has no containers running.
347 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
348         wp.mtx.Lock()
349         defer wp.mtx.Unlock()
350         wkr, ok := wp.workers[id]
351         if !ok {
352                 return errors.New("requested instance does not exist")
353         }
354         wkr.setIdleBehavior(idleBehavior)
355         return nil
356 }
357
358 // Add or update worker attached to the given instance.
359 //
360 // The second return value is true if a new worker is created.
361 //
362 // A newly added instance has state=StateBooting if its tags match an
363 // entry in wp.creating, otherwise StateUnknown.
364 //
365 // Caller must have lock.
366 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
367         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
368         inst = TagVerifier{inst, secret}
369         id := inst.ID()
370         if wkr := wp.workers[id]; wkr != nil {
371                 wkr.executor.SetTarget(inst)
372                 wkr.instance = inst
373                 wkr.updated = time.Now()
374                 wkr.saveTags()
375                 return wkr, false
376         }
377
378         state := StateUnknown
379         if _, ok := wp.creating[secret]; ok {
380                 state = StateBooting
381         }
382
383         // If an instance has a valid IdleBehavior tag when it first
384         // appears, initialize the new worker accordingly (this is how
385         // we restore IdleBehavior that was set by a prior dispatch
386         // process); otherwise, default to "run". After this,
387         // wkr.idleBehavior is the source of truth, and will only be
388         // changed via SetIdleBehavior().
389         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
390         if !validIdleBehavior[idleBehavior] {
391                 idleBehavior = IdleBehaviorRun
392         }
393
394         logger := wp.logger.WithFields(logrus.Fields{
395                 "InstanceType": it.Name,
396                 "Instance":     inst.ID(),
397                 "Address":      inst.Address(),
398         })
399         logger.WithFields(logrus.Fields{
400                 "State":        state,
401                 "IdleBehavior": idleBehavior,
402         }).Infof("instance appeared in cloud")
403         now := time.Now()
404         wkr := &worker{
405                 mtx:          &wp.mtx,
406                 wp:           wp,
407                 logger:       logger,
408                 executor:     wp.newExecutor(inst),
409                 state:        state,
410                 idleBehavior: idleBehavior,
411                 instance:     inst,
412                 instType:     it,
413                 appeared:     now,
414                 probed:       now,
415                 busy:         now,
416                 updated:      now,
417                 running:      make(map[string]*remoteRunner),
418                 starting:     make(map[string]*remoteRunner),
419                 probing:      make(chan struct{}, 1),
420         }
421         wp.workers[id] = wkr
422         return wkr, true
423 }
424
425 // Shutdown shuts down a worker with the given type, or returns false
426 // if all workers with the given type are busy.
427 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
428         wp.setupOnce.Do(wp.setup)
429         wp.mtx.Lock()
430         defer wp.mtx.Unlock()
431         logger := wp.logger.WithField("InstanceType", it.Name)
432         logger.Info("shutdown requested")
433         for _, tryState := range []State{StateBooting, StateIdle} {
434                 // TODO: shutdown the worker with the longest idle
435                 // time (Idle) or the earliest create time (Booting)
436                 for _, wkr := range wp.workers {
437                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
438                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
439                                 wkr.shutdown()
440                                 return true
441                         }
442                 }
443         }
444         return false
445 }
446
447 // CountWorkers returns the current number of workers in each state.
448 //
449 // CountWorkers blocks, if necessary, until the initial instance list
450 // has been loaded from the cloud provider.
451 func (wp *Pool) CountWorkers() map[State]int {
452         wp.setupOnce.Do(wp.setup)
453         wp.waitUntilLoaded()
454         wp.mtx.Lock()
455         defer wp.mtx.Unlock()
456         r := map[State]int{}
457         for _, w := range wp.workers {
458                 r[w.state]++
459         }
460         return r
461 }
462
463 // Running returns the container UUIDs being prepared/run on workers.
464 //
465 // In the returned map, the time value indicates when the Pool
466 // observed that the container process had exited. A container that
467 // has not yet exited has a zero time value. The caller should use
468 // ForgetContainer() to garbage-collect the entries for exited
469 // containers.
470 func (wp *Pool) Running() map[string]time.Time {
471         wp.setupOnce.Do(wp.setup)
472         wp.mtx.Lock()
473         defer wp.mtx.Unlock()
474         r := map[string]time.Time{}
475         for _, wkr := range wp.workers {
476                 for uuid := range wkr.running {
477                         r[uuid] = time.Time{}
478                 }
479                 for uuid := range wkr.starting {
480                         r[uuid] = time.Time{}
481                 }
482         }
483         for uuid, exited := range wp.exited {
484                 r[uuid] = exited
485         }
486         return r
487 }
488
489 // StartContainer starts a container on an idle worker immediately if
490 // possible, otherwise returns false.
491 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
492         wp.setupOnce.Do(wp.setup)
493         wp.mtx.Lock()
494         defer wp.mtx.Unlock()
495         var wkr *worker
496         for _, w := range wp.workers {
497                 if w.instType == it && w.state == StateIdle {
498                         if wkr == nil || w.busy.After(wkr.busy) {
499                                 wkr = w
500                         }
501                 }
502         }
503         if wkr == nil {
504                 return false
505         }
506         wkr.startContainer(ctr)
507         return true
508 }
509
510 // KillContainer kills the crunch-run process for the given container
511 // UUID, if it's running on any worker.
512 //
513 // KillContainer returns immediately; the act of killing the container
514 // takes some time, and runs in the background.
515 //
516 // KillContainer returns false if the container has already ended.
517 func (wp *Pool) KillContainer(uuid string, reason string) bool {
518         wp.mtx.Lock()
519         defer wp.mtx.Unlock()
520         logger := wp.logger.WithFields(logrus.Fields{
521                 "ContainerUUID": uuid,
522                 "Reason":        reason,
523         })
524         for _, wkr := range wp.workers {
525                 rr := wkr.running[uuid]
526                 if rr == nil {
527                         rr = wkr.starting[uuid]
528                 }
529                 if rr != nil {
530                         rr.Kill(reason)
531                         return true
532                 }
533         }
534         logger.Debug("cannot kill: already disappeared")
535         return false
536 }
537
538 // ForgetContainer clears the placeholder for the given exited
539 // container, so it isn't returned by subsequent calls to Running().
540 //
541 // ForgetContainer has no effect if the container has not yet exited.
542 //
543 // The "container exited at time T" placeholder (which necessitates
544 // ForgetContainer) exists to make it easier for the caller
545 // (scheduler) to distinguish a container that exited without
546 // finalizing its state from a container that exited too recently for
547 // its final state to have appeared in the scheduler's queue cache.
548 func (wp *Pool) ForgetContainer(uuid string) {
549         wp.mtx.Lock()
550         defer wp.mtx.Unlock()
551         if _, ok := wp.exited[uuid]; ok {
552                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
553                 delete(wp.exited, uuid)
554         }
555 }
556
557 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
558         if reg == nil {
559                 reg = prometheus.NewRegistry()
560         }
561         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
562                 Namespace: "arvados",
563                 Subsystem: "dispatchcloud",
564                 Name:      "containers_running",
565                 Help:      "Number of containers reported running by cloud VMs.",
566         })
567         reg.MustRegister(wp.mContainersRunning)
568         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
569                 Namespace: "arvados",
570                 Subsystem: "dispatchcloud",
571                 Name:      "instances_total",
572                 Help:      "Number of cloud VMs.",
573         }, []string{"category"})
574         reg.MustRegister(wp.mInstances)
575         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
576                 Namespace: "arvados",
577                 Subsystem: "dispatchcloud",
578                 Name:      "instances_price",
579                 Help:      "Price of cloud VMs.",
580         }, []string{"category"})
581         reg.MustRegister(wp.mInstancesPrice)
582         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
583                 Namespace: "arvados",
584                 Subsystem: "dispatchcloud",
585                 Name:      "vcpus_total",
586                 Help:      "Total VCPUs on all cloud VMs.",
587         }, []string{"category"})
588         reg.MustRegister(wp.mVCPUs)
589         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
590                 Namespace: "arvados",
591                 Subsystem: "dispatchcloud",
592                 Name:      "memory_bytes_total",
593                 Help:      "Total memory on all cloud VMs.",
594         }, []string{"category"})
595         reg.MustRegister(wp.mMemory)
596         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
597                 Namespace: "arvados",
598                 Subsystem: "dispatchcloud",
599                 Name:      "instances_disappeared",
600                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
601         }, []string{"state"})
602         for _, v := range stateString {
603                 wp.mDisappearances.WithLabelValues(v).Add(0)
604         }
605         reg.MustRegister(wp.mDisappearances)
606 }
607
608 func (wp *Pool) runMetrics() {
609         ch := wp.Subscribe()
610         defer wp.Unsubscribe(ch)
611         wp.updateMetrics()
612         for range ch {
613                 wp.updateMetrics()
614         }
615 }
616
617 func (wp *Pool) updateMetrics() {
618         wp.mtx.RLock()
619         defer wp.mtx.RUnlock()
620
621         instances := map[string]int64{}
622         price := map[string]float64{}
623         cpu := map[string]int64{}
624         mem := map[string]int64{}
625         var running int64
626         for _, wkr := range wp.workers {
627                 var cat string
628                 switch {
629                 case len(wkr.running)+len(wkr.starting) > 0:
630                         cat = "inuse"
631                 case wkr.idleBehavior == IdleBehaviorHold:
632                         cat = "hold"
633                 case wkr.state == StateBooting:
634                         cat = "booting"
635                 case wkr.state == StateUnknown:
636                         cat = "unknown"
637                 default:
638                         cat = "idle"
639                 }
640                 instances[cat]++
641                 price[cat] += wkr.instType.Price
642                 cpu[cat] += int64(wkr.instType.VCPUs)
643                 mem[cat] += int64(wkr.instType.RAM)
644                 running += int64(len(wkr.running) + len(wkr.starting))
645         }
646         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
647                 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
648                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
649                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
650                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
651         }
652         wp.mContainersRunning.Set(float64(running))
653 }
654
655 func (wp *Pool) runProbes() {
656         maxPPS := wp.maxProbesPerSecond
657         if maxPPS < 1 {
658                 maxPPS = defaultMaxProbesPerSecond
659         }
660         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
661         defer limitticker.Stop()
662
663         probeticker := time.NewTicker(wp.probeInterval)
664         defer probeticker.Stop()
665
666         workers := []cloud.InstanceID{}
667         for range probeticker.C {
668                 workers = workers[:0]
669                 wp.mtx.Lock()
670                 for id, wkr := range wp.workers {
671                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
672                                 continue
673                         }
674                         workers = append(workers, id)
675                 }
676                 wp.mtx.Unlock()
677
678                 for _, id := range workers {
679                         wp.mtx.Lock()
680                         wkr, ok := wp.workers[id]
681                         wp.mtx.Unlock()
682                         if !ok {
683                                 // Deleted while we were probing
684                                 // others
685                                 continue
686                         }
687                         go wkr.ProbeAndUpdate()
688                         select {
689                         case <-wp.stop:
690                                 return
691                         case <-limitticker.C:
692                         }
693                 }
694         }
695 }
696
697 func (wp *Pool) runSync() {
698         // sync once immediately, then wait syncInterval, sync again,
699         // etc.
700         timer := time.NewTimer(1)
701         for {
702                 select {
703                 case <-timer.C:
704                         err := wp.getInstancesAndSync()
705                         if err != nil {
706                                 wp.logger.WithError(err).Warn("sync failed")
707                         }
708                         timer.Reset(wp.syncInterval)
709                 case <-wp.stop:
710                         wp.logger.Debug("worker.Pool stopped")
711                         return
712                 }
713         }
714 }
715
716 // Stop synchronizing with the InstanceSet.
717 func (wp *Pool) Stop() {
718         wp.setupOnce.Do(wp.setup)
719         close(wp.stop)
720 }
721
722 // Instances returns an InstanceView for each worker in the pool,
723 // summarizing its current state and recent activity.
724 func (wp *Pool) Instances() []InstanceView {
725         var r []InstanceView
726         wp.setupOnce.Do(wp.setup)
727         wp.mtx.Lock()
728         for _, w := range wp.workers {
729                 r = append(r, InstanceView{
730                         Instance:             w.instance.ID(),
731                         Address:              w.instance.Address(),
732                         Price:                w.instType.Price,
733                         ArvadosInstanceType:  w.instType.Name,
734                         ProviderInstanceType: w.instType.ProviderType,
735                         LastContainerUUID:    w.lastUUID,
736                         LastBusy:             w.busy,
737                         WorkerState:          w.state.String(),
738                         IdleBehavior:         w.idleBehavior,
739                 })
740         }
741         wp.mtx.Unlock()
742         sort.Slice(r, func(i, j int) bool {
743                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
744         })
745         return r
746 }
747
748 // KillInstance destroys a cloud VM instance. It returns an error if
749 // the given instance does not exist.
750 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
751         wkr, ok := wp.workers[id]
752         if !ok {
753                 return errors.New("instance not found")
754         }
755         wkr.logger.WithField("Reason", reason).Info("shutting down")
756         wkr.shutdown()
757         return nil
758 }
759
760 func (wp *Pool) setup() {
761         wp.creating = map[string]createCall{}
762         wp.exited = map[string]time.Time{}
763         wp.workers = map[cloud.InstanceID]*worker{}
764         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
765         wp.loadRunnerData()
766 }
767
768 // Load the runner program to be deployed on worker nodes into
769 // wp.runnerData, if necessary. Errors are logged.
770 //
771 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
772 //
773 // Caller must not have lock.
774 func (wp *Pool) loadRunnerData() error {
775         wp.mtx.Lock()
776         defer wp.mtx.Unlock()
777         if wp.runnerData != nil {
778                 return nil
779         } else if wp.runnerSource == "" {
780                 wp.runnerCmd = "crunch-run"
781                 wp.runnerData = []byte{}
782                 return nil
783         }
784         logger := wp.logger.WithField("source", wp.runnerSource)
785         logger.Debug("loading runner")
786         buf, err := ioutil.ReadFile(wp.runnerSource)
787         if err != nil {
788                 logger.WithError(err).Error("failed to load runner program")
789                 return err
790         }
791         wp.runnerData = buf
792         wp.runnerMD5 = md5.Sum(buf)
793         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
794         return nil
795 }
796
797 func (wp *Pool) notify() {
798         wp.mtx.RLock()
799         defer wp.mtx.RUnlock()
800         for _, send := range wp.subscribers {
801                 select {
802                 case send <- struct{}{}:
803                 default:
804                 }
805         }
806 }
807
808 func (wp *Pool) getInstancesAndSync() error {
809         wp.setupOnce.Do(wp.setup)
810         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
811                 return err
812         }
813         wp.logger.Debug("getting instance list")
814         threshold := time.Now()
815         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
816         if err != nil {
817                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
818                 return err
819         }
820         wp.sync(threshold, instances)
821         wp.logger.Debug("sync done")
822         return nil
823 }
824
825 // Add/remove/update workers based on instances, which was obtained
826 // from the instanceSet. However, don't clobber any other updates that
827 // already happened after threshold.
828 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
829         wp.mtx.Lock()
830         defer wp.mtx.Unlock()
831         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
832         notify := false
833
834         for _, inst := range instances {
835                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
836                 it, ok := wp.instanceTypes[itTag]
837                 if !ok {
838                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
839                         continue
840                 }
841                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
842                         notify = true
843                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
844                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
845                         wkr.shutdown()
846                 }
847         }
848
849         for id, wkr := range wp.workers {
850                 if wkr.updated.After(threshold) {
851                         continue
852                 }
853                 logger := wp.logger.WithFields(logrus.Fields{
854                         "Instance":    wkr.instance.ID(),
855                         "WorkerState": wkr.state,
856                 })
857                 logger.Info("instance disappeared in cloud")
858                 if wp.mDisappearances != nil {
859                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
860                 }
861                 delete(wp.workers, id)
862                 go wkr.Close()
863                 notify = true
864         }
865
866         if !wp.loaded {
867                 notify = true
868                 wp.loaded = true
869                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
870         }
871
872         if notify {
873                 go wp.notify()
874         }
875 }
876
877 func (wp *Pool) waitUntilLoaded() {
878         ch := wp.Subscribe()
879         wp.mtx.RLock()
880         defer wp.mtx.RUnlock()
881         for !wp.loaded {
882                 wp.mtx.RUnlock()
883                 <-ch
884                 wp.mtx.RLock()
885         }
886 }
887
888 // Return a random string of n hexadecimal digits (n*4 random bits). n
889 // must be even.
890 func randomHex(n int) string {
891         buf := make([]byte, n/2)
892         _, err := rand.Read(buf)
893         if err != nil {
894                 panic(err)
895         }
896         return fmt.Sprintf("%x", buf)
897 }