Merge branch '16631-no-run-on-drain'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:             logger,
100                 arvClient:          arvClient,
101                 instanceSetID:      instanceSetID,
102                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:        newExecutor,
104                 bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:       cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:      cluster.InstanceTypes,
108                 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
110                 syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
111                 timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
112                 timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
113                 timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
114                 timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
115                 timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
116                 timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
117                 installPublicKey:   installPublicKey,
118                 tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
119                 stop:               make(chan bool),
120         }
121         wp.registerMetrics(reg)
122         go func() {
123                 wp.setupOnce.Do(wp.setup)
124                 go wp.runMetrics()
125                 go wp.runProbes()
126                 go wp.runSync()
127         }()
128         return wp
129 }
130
131 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
132 // zero Pool should not be used. Call NewPool to create a new Pool.
133 type Pool struct {
134         // configuration
135         logger             logrus.FieldLogger
136         arvClient          *arvados.Client
137         instanceSetID      cloud.InstanceSetID
138         instanceSet        *throttledInstanceSet
139         newExecutor        func(cloud.Instance) Executor
140         bootProbeCommand   string
141         runnerSource       string
142         imageID            cloud.ImageID
143         instanceTypes      map[string]arvados.InstanceType
144         syncInterval       time.Duration
145         probeInterval      time.Duration
146         maxProbesPerSecond int
147         timeoutIdle        time.Duration
148         timeoutBooting     time.Duration
149         timeoutProbe       time.Duration
150         timeoutShutdown    time.Duration
151         timeoutTERM        time.Duration
152         timeoutSignal      time.Duration
153         installPublicKey   ssh.PublicKey
154         tagKeyPrefix       string
155
156         // private state
157         subscribers  map[<-chan struct{}]chan<- struct{}
158         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
159         workers      map[cloud.InstanceID]*worker
160         loaded       bool                 // loaded list of instances from InstanceSet at least once
161         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
162         atQuotaUntil time.Time
163         atQuotaErr   cloud.QuotaError
164         stop         chan bool
165         mtx          sync.RWMutex
166         setupOnce    sync.Once
167         runnerData   []byte
168         runnerMD5    [md5.Size]byte
169         runnerCmd    string
170
171         throttleCreate    throttle
172         throttleInstances throttle
173
174         mContainersRunning prometheus.Gauge
175         mInstances         *prometheus.GaugeVec
176         mInstancesPrice    *prometheus.GaugeVec
177         mVCPUs             *prometheus.GaugeVec
178         mMemory            *prometheus.GaugeVec
179         mDisappearances    *prometheus.CounterVec
180 }
181
182 type createCall struct {
183         time         time.Time
184         instanceType arvados.InstanceType
185 }
186
187 func (wp *Pool) CheckHealth() error {
188         wp.setupOnce.Do(wp.setup)
189         if err := wp.loadRunnerData(); err != nil {
190                 return fmt.Errorf("error loading runner binary: %s", err)
191         }
192         return nil
193 }
194
195 // Subscribe returns a buffered channel that becomes ready after any
196 // change to the pool's state that could have scheduling implications:
197 // a worker's state changes, a new worker appears, the cloud
198 // provider's API rate limiting period ends, etc.
199 //
200 // Additional events that occur while the channel is already ready
201 // will be dropped, so it is OK if the caller services the channel
202 // slowly.
203 //
204 // Example:
205 //
206 //      ch := wp.Subscribe()
207 //      defer wp.Unsubscribe(ch)
208 //      for range ch {
209 //              tryScheduling(wp)
210 //              if done {
211 //                      break
212 //              }
213 //      }
214 func (wp *Pool) Subscribe() <-chan struct{} {
215         wp.setupOnce.Do(wp.setup)
216         wp.mtx.Lock()
217         defer wp.mtx.Unlock()
218         ch := make(chan struct{}, 1)
219         wp.subscribers[ch] = ch
220         return ch
221 }
222
223 // Unsubscribe stops sending updates to the given channel.
224 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
225         wp.setupOnce.Do(wp.setup)
226         wp.mtx.Lock()
227         defer wp.mtx.Unlock()
228         delete(wp.subscribers, ch)
229 }
230
231 // Unallocated returns the number of unallocated (creating + booting +
232 // idle + unknown) workers for each instance type.  Workers in
233 // hold/drain mode are not included.
234 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
235         wp.setupOnce.Do(wp.setup)
236         wp.mtx.RLock()
237         defer wp.mtx.RUnlock()
238         unalloc := map[arvados.InstanceType]int{}
239         creating := map[arvados.InstanceType]int{}
240         oldestCreate := map[arvados.InstanceType]time.Time{}
241         for _, cc := range wp.creating {
242                 it := cc.instanceType
243                 creating[it]++
244                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
245                         oldestCreate[it] = cc.time
246                 }
247         }
248         for _, wkr := range wp.workers {
249                 // Skip workers that are not expected to become
250                 // available soon. Note len(wkr.running)>0 is not
251                 // redundant here: it can be true even in
252                 // StateUnknown.
253                 if wkr.state == StateShutdown ||
254                         wkr.state == StateRunning ||
255                         wkr.idleBehavior != IdleBehaviorRun ||
256                         len(wkr.running) > 0 {
257                         continue
258                 }
259                 it := wkr.instType
260                 unalloc[it]++
261                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
262                         // If up to N new workers appear in
263                         // Instances() while we are waiting for N
264                         // Create() calls to complete, we assume we're
265                         // just seeing a race between Instances() and
266                         // Create() responses.
267                         //
268                         // The other common reason why nodes have
269                         // state==Unknown is that they appeared at
270                         // startup, before any Create calls. They
271                         // don't match the above timing condition, so
272                         // we never mistakenly attribute them to
273                         // pending Create calls.
274                         creating[it]--
275                 }
276         }
277         for it, c := range creating {
278                 unalloc[it] += c
279         }
280         return unalloc
281 }
282
283 // Create a new instance with the given type, and add it to the worker
284 // pool. The worker is added immediately; instance creation runs in
285 // the background.
286 //
287 // Create returns false if a pre-existing error state prevents it from
288 // even attempting to create a new instance. Those errors are logged
289 // by the Pool, so the caller does not need to log anything in such
290 // cases.
291 func (wp *Pool) Create(it arvados.InstanceType) bool {
292         logger := wp.logger.WithField("InstanceType", it.Name)
293         wp.setupOnce.Do(wp.setup)
294         if wp.loadRunnerData() != nil {
295                 // Boot probe is certain to fail.
296                 return false
297         }
298         wp.mtx.Lock()
299         defer wp.mtx.Unlock()
300         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
301                 return false
302         }
303         now := time.Now()
304         secret := randomHex(instanceSecretLength)
305         wp.creating[secret] = createCall{time: now, instanceType: it}
306         go func() {
307                 defer wp.notify()
308                 tags := cloud.InstanceTags{
309                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
310                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
311                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
312                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
313                 }
314                 initCmd := TagVerifier{nil, secret}.InitCommand()
315                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
316                 wp.mtx.Lock()
317                 defer wp.mtx.Unlock()
318                 // delete() is deferred so the updateWorker() call
319                 // below knows to use StateBooting when adding a new
320                 // worker.
321                 defer delete(wp.creating, secret)
322                 if err != nil {
323                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
324                                 wp.atQuotaErr = err
325                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
326                                 time.AfterFunc(quotaErrorTTL, wp.notify)
327                         }
328                         logger.WithError(err).Error("create failed")
329                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
330                         return
331                 }
332                 wp.updateWorker(inst, it)
333         }()
334         return true
335 }
336
337 // AtQuota returns true if Create is not expected to work at the
338 // moment.
339 func (wp *Pool) AtQuota() bool {
340         wp.mtx.Lock()
341         defer wp.mtx.Unlock()
342         return time.Now().Before(wp.atQuotaUntil)
343 }
344
345 // SetIdleBehavior determines how the indicated instance will behave
346 // when it has no containers running.
347 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
348         wp.mtx.Lock()
349         defer wp.mtx.Unlock()
350         wkr, ok := wp.workers[id]
351         if !ok {
352                 return errors.New("requested instance does not exist")
353         }
354         wkr.setIdleBehavior(idleBehavior)
355         return nil
356 }
357
358 // Add or update worker attached to the given instance.
359 //
360 // The second return value is true if a new worker is created.
361 //
362 // A newly added instance has state=StateBooting if its tags match an
363 // entry in wp.creating, otherwise StateUnknown.
364 //
365 // Caller must have lock.
366 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
367         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
368         inst = TagVerifier{inst, secret}
369         id := inst.ID()
370         if wkr := wp.workers[id]; wkr != nil {
371                 wkr.executor.SetTarget(inst)
372                 wkr.instance = inst
373                 wkr.updated = time.Now()
374                 wkr.saveTags()
375                 return wkr, false
376         }
377
378         state := StateUnknown
379         if _, ok := wp.creating[secret]; ok {
380                 state = StateBooting
381         }
382
383         // If an instance has a valid IdleBehavior tag when it first
384         // appears, initialize the new worker accordingly (this is how
385         // we restore IdleBehavior that was set by a prior dispatch
386         // process); otherwise, default to "run". After this,
387         // wkr.idleBehavior is the source of truth, and will only be
388         // changed via SetIdleBehavior().
389         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
390         if !validIdleBehavior[idleBehavior] {
391                 idleBehavior = IdleBehaviorRun
392         }
393
394         logger := wp.logger.WithFields(logrus.Fields{
395                 "InstanceType": it.Name,
396                 "Instance":     inst.ID(),
397                 "Address":      inst.Address(),
398         })
399         logger.WithFields(logrus.Fields{
400                 "State":        state,
401                 "IdleBehavior": idleBehavior,
402         }).Infof("instance appeared in cloud")
403         now := time.Now()
404         wkr := &worker{
405                 mtx:          &wp.mtx,
406                 wp:           wp,
407                 logger:       logger,
408                 executor:     wp.newExecutor(inst),
409                 state:        state,
410                 idleBehavior: idleBehavior,
411                 instance:     inst,
412                 instType:     it,
413                 appeared:     now,
414                 probed:       now,
415                 busy:         now,
416                 updated:      now,
417                 running:      make(map[string]*remoteRunner),
418                 starting:     make(map[string]*remoteRunner),
419                 probing:      make(chan struct{}, 1),
420         }
421         wp.workers[id] = wkr
422         return wkr, true
423 }
424
425 // Shutdown shuts down a worker with the given type, or returns false
426 // if all workers with the given type are busy.
427 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
428         wp.setupOnce.Do(wp.setup)
429         wp.mtx.Lock()
430         defer wp.mtx.Unlock()
431         logger := wp.logger.WithField("InstanceType", it.Name)
432         logger.Info("shutdown requested")
433         for _, tryState := range []State{StateBooting, StateIdle} {
434                 // TODO: shutdown the worker with the longest idle
435                 // time (Idle) or the earliest create time (Booting)
436                 for _, wkr := range wp.workers {
437                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
438                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
439                                 wkr.shutdown()
440                                 return true
441                         }
442                 }
443         }
444         return false
445 }
446
447 // CountWorkers returns the current number of workers in each state.
448 //
449 // CountWorkers blocks, if necessary, until the initial instance list
450 // has been loaded from the cloud provider.
451 func (wp *Pool) CountWorkers() map[State]int {
452         wp.setupOnce.Do(wp.setup)
453         wp.waitUntilLoaded()
454         wp.mtx.Lock()
455         defer wp.mtx.Unlock()
456         r := map[State]int{}
457         for _, w := range wp.workers {
458                 r[w.state]++
459         }
460         return r
461 }
462
463 // Running returns the container UUIDs being prepared/run on workers.
464 //
465 // In the returned map, the time value indicates when the Pool
466 // observed that the container process had exited. A container that
467 // has not yet exited has a zero time value. The caller should use
468 // ForgetContainer() to garbage-collect the entries for exited
469 // containers.
470 func (wp *Pool) Running() map[string]time.Time {
471         wp.setupOnce.Do(wp.setup)
472         wp.mtx.Lock()
473         defer wp.mtx.Unlock()
474         r := map[string]time.Time{}
475         for _, wkr := range wp.workers {
476                 for uuid := range wkr.running {
477                         r[uuid] = time.Time{}
478                 }
479                 for uuid := range wkr.starting {
480                         r[uuid] = time.Time{}
481                 }
482         }
483         for uuid, exited := range wp.exited {
484                 r[uuid] = exited
485         }
486         return r
487 }
488
489 // StartContainer starts a container on an idle worker immediately if
490 // possible, otherwise returns false.
491 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
492         wp.setupOnce.Do(wp.setup)
493         wp.mtx.Lock()
494         defer wp.mtx.Unlock()
495         var wkr *worker
496         for _, w := range wp.workers {
497                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
498                         if wkr == nil || w.busy.After(wkr.busy) {
499                                 wkr = w
500                         }
501                 }
502         }
503         if wkr == nil {
504                 return false
505         }
506         wkr.startContainer(ctr)
507         return true
508 }
509
510 // KillContainer kills the crunch-run process for the given container
511 // UUID, if it's running on any worker.
512 //
513 // KillContainer returns immediately; the act of killing the container
514 // takes some time, and runs in the background.
515 //
516 // KillContainer returns false if the container has already ended.
517 func (wp *Pool) KillContainer(uuid string, reason string) bool {
518         wp.mtx.Lock()
519         defer wp.mtx.Unlock()
520         logger := wp.logger.WithFields(logrus.Fields{
521                 "ContainerUUID": uuid,
522                 "Reason":        reason,
523         })
524         for _, wkr := range wp.workers {
525                 rr := wkr.running[uuid]
526                 if rr == nil {
527                         rr = wkr.starting[uuid]
528                 }
529                 if rr != nil {
530                         rr.Kill(reason)
531                         return true
532                 }
533         }
534         logger.Debug("cannot kill: already disappeared")
535         return false
536 }
537
538 // ForgetContainer clears the placeholder for the given exited
539 // container, so it isn't returned by subsequent calls to Running().
540 //
541 // ForgetContainer has no effect if the container has not yet exited.
542 //
543 // The "container exited at time T" placeholder (which necessitates
544 // ForgetContainer) exists to make it easier for the caller
545 // (scheduler) to distinguish a container that exited without
546 // finalizing its state from a container that exited too recently for
547 // its final state to have appeared in the scheduler's queue cache.
548 func (wp *Pool) ForgetContainer(uuid string) {
549         wp.mtx.Lock()
550         defer wp.mtx.Unlock()
551         if _, ok := wp.exited[uuid]; ok {
552                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
553                 delete(wp.exited, uuid)
554         }
555 }
556
557 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
558         if reg == nil {
559                 reg = prometheus.NewRegistry()
560         }
561         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
562                 Namespace: "arvados",
563                 Subsystem: "dispatchcloud",
564                 Name:      "containers_running",
565                 Help:      "Number of containers reported running by cloud VMs.",
566         })
567         reg.MustRegister(wp.mContainersRunning)
568         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
569                 Namespace: "arvados",
570                 Subsystem: "dispatchcloud",
571                 Name:      "instances_total",
572                 Help:      "Number of cloud VMs.",
573         }, []string{"category", "instance_type"})
574         reg.MustRegister(wp.mInstances)
575         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
576                 Namespace: "arvados",
577                 Subsystem: "dispatchcloud",
578                 Name:      "instances_price",
579                 Help:      "Price of cloud VMs.",
580         }, []string{"category"})
581         reg.MustRegister(wp.mInstancesPrice)
582         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
583                 Namespace: "arvados",
584                 Subsystem: "dispatchcloud",
585                 Name:      "vcpus_total",
586                 Help:      "Total VCPUs on all cloud VMs.",
587         }, []string{"category"})
588         reg.MustRegister(wp.mVCPUs)
589         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
590                 Namespace: "arvados",
591                 Subsystem: "dispatchcloud",
592                 Name:      "memory_bytes_total",
593                 Help:      "Total memory on all cloud VMs.",
594         }, []string{"category"})
595         reg.MustRegister(wp.mMemory)
596         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
597                 Namespace: "arvados",
598                 Subsystem: "dispatchcloud",
599                 Name:      "instances_disappeared",
600                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
601         }, []string{"state"})
602         for _, v := range stateString {
603                 wp.mDisappearances.WithLabelValues(v).Add(0)
604         }
605         reg.MustRegister(wp.mDisappearances)
606 }
607
608 func (wp *Pool) runMetrics() {
609         ch := wp.Subscribe()
610         defer wp.Unsubscribe(ch)
611         wp.updateMetrics()
612         for range ch {
613                 wp.updateMetrics()
614         }
615 }
616
617 func (wp *Pool) updateMetrics() {
618         wp.mtx.RLock()
619         defer wp.mtx.RUnlock()
620
621         type entKey struct {
622                 cat      string
623                 instType string
624         }
625         instances := map[entKey]int64{}
626         price := map[string]float64{}
627         cpu := map[string]int64{}
628         mem := map[string]int64{}
629         var running int64
630         for _, wkr := range wp.workers {
631                 var cat string
632                 switch {
633                 case len(wkr.running)+len(wkr.starting) > 0:
634                         cat = "inuse"
635                 case wkr.idleBehavior == IdleBehaviorHold:
636                         cat = "hold"
637                 case wkr.state == StateBooting:
638                         cat = "booting"
639                 case wkr.state == StateUnknown:
640                         cat = "unknown"
641                 default:
642                         cat = "idle"
643                 }
644                 instances[entKey{cat, wkr.instType.Name}]++
645                 price[cat] += wkr.instType.Price
646                 cpu[cat] += int64(wkr.instType.VCPUs)
647                 mem[cat] += int64(wkr.instType.RAM)
648                 running += int64(len(wkr.running) + len(wkr.starting))
649         }
650         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
651                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
652                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
653                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
654                 // make sure to reset gauges for non-existing category/nodetype combinations
655                 for _, it := range wp.instanceTypes {
656                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
657                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
658                         }
659                 }
660         }
661         for k, v := range instances {
662                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
663         }
664         wp.mContainersRunning.Set(float64(running))
665 }
666
667 func (wp *Pool) runProbes() {
668         maxPPS := wp.maxProbesPerSecond
669         if maxPPS < 1 {
670                 maxPPS = defaultMaxProbesPerSecond
671         }
672         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
673         defer limitticker.Stop()
674
675         probeticker := time.NewTicker(wp.probeInterval)
676         defer probeticker.Stop()
677
678         workers := []cloud.InstanceID{}
679         for range probeticker.C {
680                 workers = workers[:0]
681                 wp.mtx.Lock()
682                 for id, wkr := range wp.workers {
683                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
684                                 continue
685                         }
686                         workers = append(workers, id)
687                 }
688                 wp.mtx.Unlock()
689
690                 for _, id := range workers {
691                         wp.mtx.Lock()
692                         wkr, ok := wp.workers[id]
693                         wp.mtx.Unlock()
694                         if !ok {
695                                 // Deleted while we were probing
696                                 // others
697                                 continue
698                         }
699                         go wkr.ProbeAndUpdate()
700                         select {
701                         case <-wp.stop:
702                                 return
703                         case <-limitticker.C:
704                         }
705                 }
706         }
707 }
708
709 func (wp *Pool) runSync() {
710         // sync once immediately, then wait syncInterval, sync again,
711         // etc.
712         timer := time.NewTimer(1)
713         for {
714                 select {
715                 case <-timer.C:
716                         err := wp.getInstancesAndSync()
717                         if err != nil {
718                                 wp.logger.WithError(err).Warn("sync failed")
719                         }
720                         timer.Reset(wp.syncInterval)
721                 case <-wp.stop:
722                         wp.logger.Debug("worker.Pool stopped")
723                         return
724                 }
725         }
726 }
727
728 // Stop synchronizing with the InstanceSet.
729 func (wp *Pool) Stop() {
730         wp.setupOnce.Do(wp.setup)
731         close(wp.stop)
732 }
733
734 // Instances returns an InstanceView for each worker in the pool,
735 // summarizing its current state and recent activity.
736 func (wp *Pool) Instances() []InstanceView {
737         var r []InstanceView
738         wp.setupOnce.Do(wp.setup)
739         wp.mtx.Lock()
740         for _, w := range wp.workers {
741                 r = append(r, InstanceView{
742                         Instance:             w.instance.ID(),
743                         Address:              w.instance.Address(),
744                         Price:                w.instType.Price,
745                         ArvadosInstanceType:  w.instType.Name,
746                         ProviderInstanceType: w.instType.ProviderType,
747                         LastContainerUUID:    w.lastUUID,
748                         LastBusy:             w.busy,
749                         WorkerState:          w.state.String(),
750                         IdleBehavior:         w.idleBehavior,
751                 })
752         }
753         wp.mtx.Unlock()
754         sort.Slice(r, func(i, j int) bool {
755                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
756         })
757         return r
758 }
759
760 // KillInstance destroys a cloud VM instance. It returns an error if
761 // the given instance does not exist.
762 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
763         wkr, ok := wp.workers[id]
764         if !ok {
765                 return errors.New("instance not found")
766         }
767         wkr.logger.WithField("Reason", reason).Info("shutting down")
768         wkr.shutdown()
769         return nil
770 }
771
772 func (wp *Pool) setup() {
773         wp.creating = map[string]createCall{}
774         wp.exited = map[string]time.Time{}
775         wp.workers = map[cloud.InstanceID]*worker{}
776         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
777         wp.loadRunnerData()
778 }
779
780 // Load the runner program to be deployed on worker nodes into
781 // wp.runnerData, if necessary. Errors are logged.
782 //
783 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
784 //
785 // Caller must not have lock.
786 func (wp *Pool) loadRunnerData() error {
787         wp.mtx.Lock()
788         defer wp.mtx.Unlock()
789         if wp.runnerData != nil {
790                 return nil
791         } else if wp.runnerSource == "" {
792                 wp.runnerCmd = "crunch-run"
793                 wp.runnerData = []byte{}
794                 return nil
795         }
796         logger := wp.logger.WithField("source", wp.runnerSource)
797         logger.Debug("loading runner")
798         buf, err := ioutil.ReadFile(wp.runnerSource)
799         if err != nil {
800                 logger.WithError(err).Error("failed to load runner program")
801                 return err
802         }
803         wp.runnerData = buf
804         wp.runnerMD5 = md5.Sum(buf)
805         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
806         return nil
807 }
808
809 func (wp *Pool) notify() {
810         wp.mtx.RLock()
811         defer wp.mtx.RUnlock()
812         for _, send := range wp.subscribers {
813                 select {
814                 case send <- struct{}{}:
815                 default:
816                 }
817         }
818 }
819
820 func (wp *Pool) getInstancesAndSync() error {
821         wp.setupOnce.Do(wp.setup)
822         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
823                 return err
824         }
825         wp.logger.Debug("getting instance list")
826         threshold := time.Now()
827         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
828         if err != nil {
829                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
830                 return err
831         }
832         wp.sync(threshold, instances)
833         wp.logger.Debug("sync done")
834         return nil
835 }
836
837 // Add/remove/update workers based on instances, which was obtained
838 // from the instanceSet. However, don't clobber any other updates that
839 // already happened after threshold.
840 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
841         wp.mtx.Lock()
842         defer wp.mtx.Unlock()
843         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
844         notify := false
845
846         for _, inst := range instances {
847                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
848                 it, ok := wp.instanceTypes[itTag]
849                 if !ok {
850                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
851                         continue
852                 }
853                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
854                         notify = true
855                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
856                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
857                         wkr.shutdown()
858                 }
859         }
860
861         for id, wkr := range wp.workers {
862                 if wkr.updated.After(threshold) {
863                         continue
864                 }
865                 logger := wp.logger.WithFields(logrus.Fields{
866                         "Instance":    wkr.instance.ID(),
867                         "WorkerState": wkr.state,
868                 })
869                 logger.Info("instance disappeared in cloud")
870                 if wp.mDisappearances != nil {
871                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
872                 }
873                 delete(wp.workers, id)
874                 go wkr.Close()
875                 notify = true
876         }
877
878         if !wp.loaded {
879                 notify = true
880                 wp.loaded = true
881                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
882         }
883
884         if notify {
885                 go wp.notify()
886         }
887 }
888
889 func (wp *Pool) waitUntilLoaded() {
890         ch := wp.Subscribe()
891         wp.mtx.RLock()
892         defer wp.mtx.RUnlock()
893         for !wp.loaded {
894                 wp.mtx.RUnlock()
895                 <-ch
896                 wp.mtx.RLock()
897         }
898 }
899
900 // Return a random string of n hexadecimal digits (n*4 random bits). n
901 // must be even.
902 func randomHex(n int) string {
903         buf := make([]byte, n/2)
904         _, err := rand.Read(buf)
905         if err != nil {
906                 panic(err)
907         }
908         return fmt.Sprintf("%x", buf)
909 }