Merge branch '15924-import-path'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/rand"
9         "errors"
10         "fmt"
11         "io"
12         "sort"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/cloud"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "github.com/prometheus/client_golang/prometheus"
20         "github.com/sirupsen/logrus"
21         "golang.org/x/crypto/ssh"
22 )
23
24 const (
25         tagKeyInstanceType   = "InstanceType"
26         tagKeyIdleBehavior   = "IdleBehavior"
27         tagKeyInstanceSecret = "InstanceSecret"
28         tagKeyInstanceSetID  = "InstanceSetID"
29 )
30
31 // An InstanceView shows a worker's current state and recent activity.
32 type InstanceView struct {
33         Instance             cloud.InstanceID `json:"instance"`
34         Address              string           `json:"address"`
35         Price                float64          `json:"price"`
36         ArvadosInstanceType  string           `json:"arvados_instance_type"`
37         ProviderInstanceType string           `json:"provider_instance_type"`
38         LastContainerUUID    string           `json:"last_container_uuid"`
39         LastBusy             time.Time        `json:"last_busy"`
40         WorkerState          string           `json:"worker_state"`
41         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
42 }
43
44 // An Executor executes shell commands on a remote host.
45 type Executor interface {
46         // Run cmd on the current target.
47         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
48
49         // Use the given target for subsequent operations. The new
50         // target is the same host as the previous target, but it
51         // might return a different address and verify a different
52         // host key.
53         //
54         // SetTarget is called frequently, and in most cases the new
55         // target will behave exactly the same as the old one. An
56         // implementation should optimize accordingly.
57         //
58         // SetTarget must not block on concurrent Execute calls.
59         SetTarget(cloud.ExecutorTarget)
60
61         Close()
62 }
63
64 const (
65         defaultSyncInterval       = time.Minute
66         defaultProbeInterval      = time.Second * 10
67         defaultMaxProbesPerSecond = 10
68         defaultTimeoutIdle        = time.Minute
69         defaultTimeoutBooting     = time.Minute * 10
70         defaultTimeoutProbe       = time.Minute * 10
71         defaultTimeoutShutdown    = time.Second * 10
72         defaultTimeoutTERM        = time.Minute * 2
73         defaultTimeoutSignal      = time.Second * 5
74
75         // Time after a quota error to try again anyway, even if no
76         // instances have been shutdown.
77         quotaErrorTTL = time.Minute
78
79         // Time between "X failed because rate limiting" messages
80         logRateLimitErrorInterval = time.Second * 10
81 )
82
83 func duration(conf arvados.Duration, def time.Duration) time.Duration {
84         if conf > 0 {
85                 return time.Duration(conf)
86         } else {
87                 return def
88         }
89 }
90
91 // NewPool creates a Pool of workers backed by instanceSet.
92 //
93 // New instances are configured and set up according to the given
94 // cluster configuration.
95 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
96         wp := &Pool{
97                 logger:             logger,
98                 arvClient:          arvClient,
99                 instanceSetID:      instanceSetID,
100                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
101                 newExecutor:        newExecutor,
102                 bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
103                 imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
104                 instanceTypes:      cluster.InstanceTypes,
105                 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
106                 probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
107                 syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
108                 timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
109                 timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
110                 timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
111                 timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
112                 timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
113                 timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
114                 installPublicKey:   installPublicKey,
115                 tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
116                 stop:               make(chan bool),
117         }
118         wp.registerMetrics(reg)
119         go func() {
120                 wp.setupOnce.Do(wp.setup)
121                 go wp.runMetrics()
122                 go wp.runProbes()
123                 go wp.runSync()
124         }()
125         return wp
126 }
127
128 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
129 // zero Pool should not be used. Call NewPool to create a new Pool.
130 type Pool struct {
131         // configuration
132         logger             logrus.FieldLogger
133         arvClient          *arvados.Client
134         instanceSetID      cloud.InstanceSetID
135         instanceSet        *throttledInstanceSet
136         newExecutor        func(cloud.Instance) Executor
137         bootProbeCommand   string
138         imageID            cloud.ImageID
139         instanceTypes      map[string]arvados.InstanceType
140         syncInterval       time.Duration
141         probeInterval      time.Duration
142         maxProbesPerSecond int
143         timeoutIdle        time.Duration
144         timeoutBooting     time.Duration
145         timeoutProbe       time.Duration
146         timeoutShutdown    time.Duration
147         timeoutTERM        time.Duration
148         timeoutSignal      time.Duration
149         installPublicKey   ssh.PublicKey
150         tagKeyPrefix       string
151
152         // private state
153         subscribers  map[<-chan struct{}]chan<- struct{}
154         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
155         workers      map[cloud.InstanceID]*worker
156         loaded       bool                 // loaded list of instances from InstanceSet at least once
157         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
158         atQuotaUntil time.Time
159         atQuotaErr   cloud.QuotaError
160         stop         chan bool
161         mtx          sync.RWMutex
162         setupOnce    sync.Once
163
164         throttleCreate    throttle
165         throttleInstances throttle
166
167         mContainersRunning prometheus.Gauge
168         mInstances         *prometheus.GaugeVec
169         mInstancesPrice    *prometheus.GaugeVec
170         mVCPUs             *prometheus.GaugeVec
171         mMemory            *prometheus.GaugeVec
172         mDisappearances    *prometheus.CounterVec
173 }
174
175 type createCall struct {
176         time         time.Time
177         instanceType arvados.InstanceType
178 }
179
180 // Subscribe returns a buffered channel that becomes ready after any
181 // change to the pool's state that could have scheduling implications:
182 // a worker's state changes, a new worker appears, the cloud
183 // provider's API rate limiting period ends, etc.
184 //
185 // Additional events that occur while the channel is already ready
186 // will be dropped, so it is OK if the caller services the channel
187 // slowly.
188 //
189 // Example:
190 //
191 //      ch := wp.Subscribe()
192 //      defer wp.Unsubscribe(ch)
193 //      for range ch {
194 //              tryScheduling(wp)
195 //              if done {
196 //                      break
197 //              }
198 //      }
199 func (wp *Pool) Subscribe() <-chan struct{} {
200         wp.setupOnce.Do(wp.setup)
201         wp.mtx.Lock()
202         defer wp.mtx.Unlock()
203         ch := make(chan struct{}, 1)
204         wp.subscribers[ch] = ch
205         return ch
206 }
207
208 // Unsubscribe stops sending updates to the given channel.
209 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
210         wp.setupOnce.Do(wp.setup)
211         wp.mtx.Lock()
212         defer wp.mtx.Unlock()
213         delete(wp.subscribers, ch)
214 }
215
216 // Unallocated returns the number of unallocated (creating + booting +
217 // idle + unknown) workers for each instance type.  Workers in
218 // hold/drain mode are not included.
219 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
220         wp.setupOnce.Do(wp.setup)
221         wp.mtx.RLock()
222         defer wp.mtx.RUnlock()
223         unalloc := map[arvados.InstanceType]int{}
224         creating := map[arvados.InstanceType]int{}
225         oldestCreate := map[arvados.InstanceType]time.Time{}
226         for _, cc := range wp.creating {
227                 it := cc.instanceType
228                 creating[it]++
229                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
230                         oldestCreate[it] = cc.time
231                 }
232         }
233         for _, wkr := range wp.workers {
234                 // Skip workers that are not expected to become
235                 // available soon. Note len(wkr.running)>0 is not
236                 // redundant here: it can be true even in
237                 // StateUnknown.
238                 if wkr.state == StateShutdown ||
239                         wkr.state == StateRunning ||
240                         wkr.idleBehavior != IdleBehaviorRun ||
241                         len(wkr.running) > 0 {
242                         continue
243                 }
244                 it := wkr.instType
245                 unalloc[it]++
246                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
247                         // If up to N new workers appear in
248                         // Instances() while we are waiting for N
249                         // Create() calls to complete, we assume we're
250                         // just seeing a race between Instances() and
251                         // Create() responses.
252                         //
253                         // The other common reason why nodes have
254                         // state==Unknown is that they appeared at
255                         // startup, before any Create calls. They
256                         // don't match the above timing condition, so
257                         // we never mistakenly attribute them to
258                         // pending Create calls.
259                         creating[it]--
260                 }
261         }
262         for it, c := range creating {
263                 unalloc[it] += c
264         }
265         return unalloc
266 }
267
268 // Create a new instance with the given type, and add it to the worker
269 // pool. The worker is added immediately; instance creation runs in
270 // the background.
271 //
272 // Create returns false if a pre-existing error state prevents it from
273 // even attempting to create a new instance. Those errors are logged
274 // by the Pool, so the caller does not need to log anything in such
275 // cases.
276 func (wp *Pool) Create(it arvados.InstanceType) bool {
277         logger := wp.logger.WithField("InstanceType", it.Name)
278         wp.setupOnce.Do(wp.setup)
279         wp.mtx.Lock()
280         defer wp.mtx.Unlock()
281         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
282                 return false
283         }
284         now := time.Now()
285         secret := randomHex(instanceSecretLength)
286         wp.creating[secret] = createCall{time: now, instanceType: it}
287         go func() {
288                 defer wp.notify()
289                 tags := cloud.InstanceTags{
290                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
291                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
292                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
293                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
294                 }
295                 initCmd := TagVerifier{nil, secret}.InitCommand()
296                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
297                 wp.mtx.Lock()
298                 defer wp.mtx.Unlock()
299                 // delete() is deferred so the updateWorker() call
300                 // below knows to use StateBooting when adding a new
301                 // worker.
302                 defer delete(wp.creating, secret)
303                 if err != nil {
304                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
305                                 wp.atQuotaErr = err
306                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
307                                 time.AfterFunc(quotaErrorTTL, wp.notify)
308                         }
309                         logger.WithError(err).Error("create failed")
310                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
311                         return
312                 }
313                 wp.updateWorker(inst, it)
314         }()
315         return true
316 }
317
318 // AtQuota returns true if Create is not expected to work at the
319 // moment.
320 func (wp *Pool) AtQuota() bool {
321         wp.mtx.Lock()
322         defer wp.mtx.Unlock()
323         return time.Now().Before(wp.atQuotaUntil)
324 }
325
326 // SetIdleBehavior determines how the indicated instance will behave
327 // when it has no containers running.
328 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
329         wp.mtx.Lock()
330         defer wp.mtx.Unlock()
331         wkr, ok := wp.workers[id]
332         if !ok {
333                 return errors.New("requested instance does not exist")
334         }
335         wkr.setIdleBehavior(idleBehavior)
336         return nil
337 }
338
339 // Add or update worker attached to the given instance.
340 //
341 // The second return value is true if a new worker is created.
342 //
343 // A newly added instance has state=StateBooting if its tags match an
344 // entry in wp.creating, otherwise StateUnknown.
345 //
346 // Caller must have lock.
347 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
348         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
349         inst = TagVerifier{inst, secret}
350         id := inst.ID()
351         if wkr := wp.workers[id]; wkr != nil {
352                 wkr.executor.SetTarget(inst)
353                 wkr.instance = inst
354                 wkr.updated = time.Now()
355                 wkr.saveTags()
356                 return wkr, false
357         }
358
359         state := StateUnknown
360         if _, ok := wp.creating[secret]; ok {
361                 state = StateBooting
362         }
363
364         // If an instance has a valid IdleBehavior tag when it first
365         // appears, initialize the new worker accordingly (this is how
366         // we restore IdleBehavior that was set by a prior dispatch
367         // process); otherwise, default to "run". After this,
368         // wkr.idleBehavior is the source of truth, and will only be
369         // changed via SetIdleBehavior().
370         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
371         if !validIdleBehavior[idleBehavior] {
372                 idleBehavior = IdleBehaviorRun
373         }
374
375         logger := wp.logger.WithFields(logrus.Fields{
376                 "InstanceType": it.Name,
377                 "Instance":     inst.ID(),
378                 "Address":      inst.Address(),
379         })
380         logger.WithFields(logrus.Fields{
381                 "State":        state,
382                 "IdleBehavior": idleBehavior,
383         }).Infof("instance appeared in cloud")
384         now := time.Now()
385         wkr := &worker{
386                 mtx:          &wp.mtx,
387                 wp:           wp,
388                 logger:       logger,
389                 executor:     wp.newExecutor(inst),
390                 state:        state,
391                 idleBehavior: idleBehavior,
392                 instance:     inst,
393                 instType:     it,
394                 appeared:     now,
395                 probed:       now,
396                 busy:         now,
397                 updated:      now,
398                 running:      make(map[string]*remoteRunner),
399                 starting:     make(map[string]*remoteRunner),
400                 probing:      make(chan struct{}, 1),
401         }
402         wp.workers[id] = wkr
403         return wkr, true
404 }
405
406 // Shutdown shuts down a worker with the given type, or returns false
407 // if all workers with the given type are busy.
408 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
409         wp.setupOnce.Do(wp.setup)
410         wp.mtx.Lock()
411         defer wp.mtx.Unlock()
412         logger := wp.logger.WithField("InstanceType", it.Name)
413         logger.Info("shutdown requested")
414         for _, tryState := range []State{StateBooting, StateIdle} {
415                 // TODO: shutdown the worker with the longest idle
416                 // time (Idle) or the earliest create time (Booting)
417                 for _, wkr := range wp.workers {
418                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
419                                 logger.WithField("Instance", wkr.instance).Info("shutting down")
420                                 wkr.shutdown()
421                                 return true
422                         }
423                 }
424         }
425         return false
426 }
427
428 // CountWorkers returns the current number of workers in each state.
429 //
430 // CountWorkers blocks, if necessary, until the initial instance list
431 // has been loaded from the cloud provider.
432 func (wp *Pool) CountWorkers() map[State]int {
433         wp.setupOnce.Do(wp.setup)
434         wp.waitUntilLoaded()
435         wp.mtx.Lock()
436         defer wp.mtx.Unlock()
437         r := map[State]int{}
438         for _, w := range wp.workers {
439                 r[w.state]++
440         }
441         return r
442 }
443
444 // Running returns the container UUIDs being prepared/run on workers.
445 //
446 // In the returned map, the time value indicates when the Pool
447 // observed that the container process had exited. A container that
448 // has not yet exited has a zero time value. The caller should use
449 // ForgetContainer() to garbage-collect the entries for exited
450 // containers.
451 func (wp *Pool) Running() map[string]time.Time {
452         wp.setupOnce.Do(wp.setup)
453         wp.mtx.Lock()
454         defer wp.mtx.Unlock()
455         r := map[string]time.Time{}
456         for _, wkr := range wp.workers {
457                 for uuid := range wkr.running {
458                         r[uuid] = time.Time{}
459                 }
460                 for uuid := range wkr.starting {
461                         r[uuid] = time.Time{}
462                 }
463         }
464         for uuid, exited := range wp.exited {
465                 r[uuid] = exited
466         }
467         return r
468 }
469
470 // StartContainer starts a container on an idle worker immediately if
471 // possible, otherwise returns false.
472 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
473         wp.setupOnce.Do(wp.setup)
474         wp.mtx.Lock()
475         defer wp.mtx.Unlock()
476         var wkr *worker
477         for _, w := range wp.workers {
478                 if w.instType == it && w.state == StateIdle {
479                         if wkr == nil || w.busy.After(wkr.busy) {
480                                 wkr = w
481                         }
482                 }
483         }
484         if wkr == nil {
485                 return false
486         }
487         wkr.startContainer(ctr)
488         return true
489 }
490
491 // KillContainer kills the crunch-run process for the given container
492 // UUID, if it's running on any worker.
493 //
494 // KillContainer returns immediately; the act of killing the container
495 // takes some time, and runs in the background.
496 //
497 // KillContainer returns false if the container has already ended.
498 func (wp *Pool) KillContainer(uuid string, reason string) bool {
499         wp.mtx.Lock()
500         defer wp.mtx.Unlock()
501         logger := wp.logger.WithFields(logrus.Fields{
502                 "ContainerUUID": uuid,
503                 "Reason":        reason,
504         })
505         for _, wkr := range wp.workers {
506                 rr := wkr.running[uuid]
507                 if rr == nil {
508                         rr = wkr.starting[uuid]
509                 }
510                 if rr != nil {
511                         rr.Kill(reason)
512                         return true
513                 }
514         }
515         logger.Debug("cannot kill: already disappeared")
516         return false
517 }
518
519 // ForgetContainer clears the placeholder for the given exited
520 // container, so it isn't returned by subsequent calls to Running().
521 //
522 // ForgetContainer has no effect if the container has not yet exited.
523 //
524 // The "container exited at time T" placeholder (which necessitates
525 // ForgetContainer) exists to make it easier for the caller
526 // (scheduler) to distinguish a container that exited without
527 // finalizing its state from a container that exited too recently for
528 // its final state to have appeared in the scheduler's queue cache.
529 func (wp *Pool) ForgetContainer(uuid string) {
530         wp.mtx.Lock()
531         defer wp.mtx.Unlock()
532         if _, ok := wp.exited[uuid]; ok {
533                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
534                 delete(wp.exited, uuid)
535         }
536 }
537
538 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
539         if reg == nil {
540                 reg = prometheus.NewRegistry()
541         }
542         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
543                 Namespace: "arvados",
544                 Subsystem: "dispatchcloud",
545                 Name:      "containers_running",
546                 Help:      "Number of containers reported running by cloud VMs.",
547         })
548         reg.MustRegister(wp.mContainersRunning)
549         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
550                 Namespace: "arvados",
551                 Subsystem: "dispatchcloud",
552                 Name:      "instances_total",
553                 Help:      "Number of cloud VMs.",
554         }, []string{"category"})
555         reg.MustRegister(wp.mInstances)
556         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
557                 Namespace: "arvados",
558                 Subsystem: "dispatchcloud",
559                 Name:      "instances_price",
560                 Help:      "Price of cloud VMs.",
561         }, []string{"category"})
562         reg.MustRegister(wp.mInstancesPrice)
563         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
564                 Namespace: "arvados",
565                 Subsystem: "dispatchcloud",
566                 Name:      "vcpus_total",
567                 Help:      "Total VCPUs on all cloud VMs.",
568         }, []string{"category"})
569         reg.MustRegister(wp.mVCPUs)
570         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
571                 Namespace: "arvados",
572                 Subsystem: "dispatchcloud",
573                 Name:      "memory_bytes_total",
574                 Help:      "Total memory on all cloud VMs.",
575         }, []string{"category"})
576         reg.MustRegister(wp.mMemory)
577         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
578                 Namespace: "arvados",
579                 Subsystem: "dispatchcloud",
580                 Name:      "instances_disappeared",
581                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
582         }, []string{"state"})
583         for _, v := range stateString {
584                 wp.mDisappearances.WithLabelValues(v).Add(0)
585         }
586         reg.MustRegister(wp.mDisappearances)
587 }
588
589 func (wp *Pool) runMetrics() {
590         ch := wp.Subscribe()
591         defer wp.Unsubscribe(ch)
592         wp.updateMetrics()
593         for range ch {
594                 wp.updateMetrics()
595         }
596 }
597
598 func (wp *Pool) updateMetrics() {
599         wp.mtx.RLock()
600         defer wp.mtx.RUnlock()
601
602         instances := map[string]int64{}
603         price := map[string]float64{}
604         cpu := map[string]int64{}
605         mem := map[string]int64{}
606         var running int64
607         for _, wkr := range wp.workers {
608                 var cat string
609                 switch {
610                 case len(wkr.running)+len(wkr.starting) > 0:
611                         cat = "inuse"
612                 case wkr.idleBehavior == IdleBehaviorHold:
613                         cat = "hold"
614                 case wkr.state == StateBooting:
615                         cat = "booting"
616                 case wkr.state == StateUnknown:
617                         cat = "unknown"
618                 default:
619                         cat = "idle"
620                 }
621                 instances[cat]++
622                 price[cat] += wkr.instType.Price
623                 cpu[cat] += int64(wkr.instType.VCPUs)
624                 mem[cat] += int64(wkr.instType.RAM)
625                 running += int64(len(wkr.running) + len(wkr.starting))
626         }
627         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
628                 wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
629                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
630                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
631                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
632         }
633         wp.mContainersRunning.Set(float64(running))
634 }
635
636 func (wp *Pool) runProbes() {
637         maxPPS := wp.maxProbesPerSecond
638         if maxPPS < 1 {
639                 maxPPS = defaultMaxProbesPerSecond
640         }
641         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
642         defer limitticker.Stop()
643
644         probeticker := time.NewTicker(wp.probeInterval)
645         defer probeticker.Stop()
646
647         workers := []cloud.InstanceID{}
648         for range probeticker.C {
649                 workers = workers[:0]
650                 wp.mtx.Lock()
651                 for id, wkr := range wp.workers {
652                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
653                                 continue
654                         }
655                         workers = append(workers, id)
656                 }
657                 wp.mtx.Unlock()
658
659                 for _, id := range workers {
660                         wp.mtx.Lock()
661                         wkr, ok := wp.workers[id]
662                         wp.mtx.Unlock()
663                         if !ok {
664                                 // Deleted while we were probing
665                                 // others
666                                 continue
667                         }
668                         go wkr.ProbeAndUpdate()
669                         select {
670                         case <-wp.stop:
671                                 return
672                         case <-limitticker.C:
673                         }
674                 }
675         }
676 }
677
678 func (wp *Pool) runSync() {
679         // sync once immediately, then wait syncInterval, sync again,
680         // etc.
681         timer := time.NewTimer(1)
682         for {
683                 select {
684                 case <-timer.C:
685                         err := wp.getInstancesAndSync()
686                         if err != nil {
687                                 wp.logger.WithError(err).Warn("sync failed")
688                         }
689                         timer.Reset(wp.syncInterval)
690                 case <-wp.stop:
691                         wp.logger.Debug("worker.Pool stopped")
692                         return
693                 }
694         }
695 }
696
697 // Stop synchronizing with the InstanceSet.
698 func (wp *Pool) Stop() {
699         wp.setupOnce.Do(wp.setup)
700         close(wp.stop)
701 }
702
703 // Instances returns an InstanceView for each worker in the pool,
704 // summarizing its current state and recent activity.
705 func (wp *Pool) Instances() []InstanceView {
706         var r []InstanceView
707         wp.setupOnce.Do(wp.setup)
708         wp.mtx.Lock()
709         for _, w := range wp.workers {
710                 r = append(r, InstanceView{
711                         Instance:             w.instance.ID(),
712                         Address:              w.instance.Address(),
713                         Price:                w.instType.Price,
714                         ArvadosInstanceType:  w.instType.Name,
715                         ProviderInstanceType: w.instType.ProviderType,
716                         LastContainerUUID:    w.lastUUID,
717                         LastBusy:             w.busy,
718                         WorkerState:          w.state.String(),
719                         IdleBehavior:         w.idleBehavior,
720                 })
721         }
722         wp.mtx.Unlock()
723         sort.Slice(r, func(i, j int) bool {
724                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
725         })
726         return r
727 }
728
729 // KillInstance destroys a cloud VM instance. It returns an error if
730 // the given instance does not exist.
731 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
732         wkr, ok := wp.workers[id]
733         if !ok {
734                 return errors.New("instance not found")
735         }
736         wkr.logger.WithField("Reason", reason).Info("shutting down")
737         wkr.shutdown()
738         return nil
739 }
740
741 func (wp *Pool) setup() {
742         wp.creating = map[string]createCall{}
743         wp.exited = map[string]time.Time{}
744         wp.workers = map[cloud.InstanceID]*worker{}
745         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
746 }
747
748 func (wp *Pool) notify() {
749         wp.mtx.RLock()
750         defer wp.mtx.RUnlock()
751         for _, send := range wp.subscribers {
752                 select {
753                 case send <- struct{}{}:
754                 default:
755                 }
756         }
757 }
758
759 func (wp *Pool) getInstancesAndSync() error {
760         wp.setupOnce.Do(wp.setup)
761         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
762                 return err
763         }
764         wp.logger.Debug("getting instance list")
765         threshold := time.Now()
766         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
767         if err != nil {
768                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
769                 return err
770         }
771         wp.sync(threshold, instances)
772         wp.logger.Debug("sync done")
773         return nil
774 }
775
776 // Add/remove/update workers based on instances, which was obtained
777 // from the instanceSet. However, don't clobber any other updates that
778 // already happened after threshold.
779 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
780         wp.mtx.Lock()
781         defer wp.mtx.Unlock()
782         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
783         notify := false
784
785         for _, inst := range instances {
786                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
787                 it, ok := wp.instanceTypes[itTag]
788                 if !ok {
789                         wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
790                         continue
791                 }
792                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
793                         notify = true
794                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
795                         wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
796                         wkr.shutdown()
797                 }
798         }
799
800         for id, wkr := range wp.workers {
801                 if wkr.updated.After(threshold) {
802                         continue
803                 }
804                 logger := wp.logger.WithFields(logrus.Fields{
805                         "Instance":    wkr.instance.ID(),
806                         "WorkerState": wkr.state,
807                 })
808                 logger.Info("instance disappeared in cloud")
809                 if wp.mDisappearances != nil {
810                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
811                 }
812                 delete(wp.workers, id)
813                 go wkr.Close()
814                 notify = true
815         }
816
817         if !wp.loaded {
818                 notify = true
819                 wp.loaded = true
820                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
821         }
822
823         if notify {
824                 go wp.notify()
825         }
826 }
827
828 func (wp *Pool) waitUntilLoaded() {
829         ch := wp.Subscribe()
830         wp.mtx.RLock()
831         defer wp.mtx.RUnlock()
832         for !wp.loaded {
833                 wp.mtx.RUnlock()
834                 <-ch
835                 wp.mtx.RLock()
836         }
837 }
838
839 // Return a random string of n hexadecimal digits (n*4 random bits). n
840 // must be even.
841 func randomHex(n int) string {
842         buf := make([]byte, n/2)
843         _, err := rand.Read(buf)
844         if err != nil {
845                 panic(err)
846         }
847         return fmt.Sprintf("%x", buf)
848 }