16636: add boot outcome metrics.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:             logger,
100                 arvClient:          arvClient,
101                 instanceSetID:      instanceSetID,
102                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:        newExecutor,
104                 bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:       cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:      cluster.InstanceTypes,
108                 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
110                 syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
111                 timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
112                 timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
113                 timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
114                 timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
115                 timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
116                 timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
117                 installPublicKey:   installPublicKey,
118                 tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
119                 stop:               make(chan bool),
120         }
121         wp.registerMetrics(reg)
122         go func() {
123                 wp.setupOnce.Do(wp.setup)
124                 go wp.runMetrics()
125                 go wp.runProbes()
126                 go wp.runSync()
127         }()
128         return wp
129 }
130
131 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
132 // zero Pool should not be used. Call NewPool to create a new Pool.
133 type Pool struct {
134         // configuration
135         logger             logrus.FieldLogger
136         arvClient          *arvados.Client
137         instanceSetID      cloud.InstanceSetID
138         instanceSet        *throttledInstanceSet
139         newExecutor        func(cloud.Instance) Executor
140         bootProbeCommand   string
141         runnerSource       string
142         imageID            cloud.ImageID
143         instanceTypes      map[string]arvados.InstanceType
144         syncInterval       time.Duration
145         probeInterval      time.Duration
146         maxProbesPerSecond int
147         timeoutIdle        time.Duration
148         timeoutBooting     time.Duration
149         timeoutProbe       time.Duration
150         timeoutShutdown    time.Duration
151         timeoutTERM        time.Duration
152         timeoutSignal      time.Duration
153         installPublicKey   ssh.PublicKey
154         tagKeyPrefix       string
155
156         // private state
157         subscribers  map[<-chan struct{}]chan<- struct{}
158         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
159         workers      map[cloud.InstanceID]*worker
160         loaded       bool                 // loaded list of instances from InstanceSet at least once
161         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
162         atQuotaUntil time.Time
163         atQuotaErr   cloud.QuotaError
164         stop         chan bool
165         mtx          sync.RWMutex
166         setupOnce    sync.Once
167         runnerData   []byte
168         runnerMD5    [md5.Size]byte
169         runnerCmd    string
170
171         throttleCreate    throttle
172         throttleInstances throttle
173
174         mContainersRunning prometheus.Gauge
175         mInstances         *prometheus.GaugeVec
176         mInstancesPrice    *prometheus.GaugeVec
177         mVCPUs             *prometheus.GaugeVec
178         mMemory            *prometheus.GaugeVec
179         mBootOutcomes      *prometheus.CounterVec
180         mDisappearances    *prometheus.CounterVec
181 }
182
183 type createCall struct {
184         time         time.Time
185         instanceType arvados.InstanceType
186 }
187
188 func (wp *Pool) CheckHealth() error {
189         wp.setupOnce.Do(wp.setup)
190         if err := wp.loadRunnerData(); err != nil {
191                 return fmt.Errorf("error loading runner binary: %s", err)
192         }
193         return nil
194 }
195
196 // Subscribe returns a buffered channel that becomes ready after any
197 // change to the pool's state that could have scheduling implications:
198 // a worker's state changes, a new worker appears, the cloud
199 // provider's API rate limiting period ends, etc.
200 //
201 // Additional events that occur while the channel is already ready
202 // will be dropped, so it is OK if the caller services the channel
203 // slowly.
204 //
205 // Example:
206 //
207 //      ch := wp.Subscribe()
208 //      defer wp.Unsubscribe(ch)
209 //      for range ch {
210 //              tryScheduling(wp)
211 //              if done {
212 //                      break
213 //              }
214 //      }
215 func (wp *Pool) Subscribe() <-chan struct{} {
216         wp.setupOnce.Do(wp.setup)
217         wp.mtx.Lock()
218         defer wp.mtx.Unlock()
219         ch := make(chan struct{}, 1)
220         wp.subscribers[ch] = ch
221         return ch
222 }
223
224 // Unsubscribe stops sending updates to the given channel.
225 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
226         wp.setupOnce.Do(wp.setup)
227         wp.mtx.Lock()
228         defer wp.mtx.Unlock()
229         delete(wp.subscribers, ch)
230 }
231
232 // Unallocated returns the number of unallocated (creating + booting +
233 // idle + unknown) workers for each instance type.  Workers in
234 // hold/drain mode are not included.
235 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
236         wp.setupOnce.Do(wp.setup)
237         wp.mtx.RLock()
238         defer wp.mtx.RUnlock()
239         unalloc := map[arvados.InstanceType]int{}
240         creating := map[arvados.InstanceType]int{}
241         oldestCreate := map[arvados.InstanceType]time.Time{}
242         for _, cc := range wp.creating {
243                 it := cc.instanceType
244                 creating[it]++
245                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
246                         oldestCreate[it] = cc.time
247                 }
248         }
249         for _, wkr := range wp.workers {
250                 // Skip workers that are not expected to become
251                 // available soon. Note len(wkr.running)>0 is not
252                 // redundant here: it can be true even in
253                 // StateUnknown.
254                 if wkr.state == StateShutdown ||
255                         wkr.state == StateRunning ||
256                         wkr.idleBehavior != IdleBehaviorRun ||
257                         len(wkr.running) > 0 {
258                         continue
259                 }
260                 it := wkr.instType
261                 unalloc[it]++
262                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
263                         // If up to N new workers appear in
264                         // Instances() while we are waiting for N
265                         // Create() calls to complete, we assume we're
266                         // just seeing a race between Instances() and
267                         // Create() responses.
268                         //
269                         // The other common reason why nodes have
270                         // state==Unknown is that they appeared at
271                         // startup, before any Create calls. They
272                         // don't match the above timing condition, so
273                         // we never mistakenly attribute them to
274                         // pending Create calls.
275                         creating[it]--
276                 }
277         }
278         for it, c := range creating {
279                 unalloc[it] += c
280         }
281         return unalloc
282 }
283
284 // Create a new instance with the given type, and add it to the worker
285 // pool. The worker is added immediately; instance creation runs in
286 // the background.
287 //
288 // Create returns false if a pre-existing error state prevents it from
289 // even attempting to create a new instance. Those errors are logged
290 // by the Pool, so the caller does not need to log anything in such
291 // cases.
292 func (wp *Pool) Create(it arvados.InstanceType) bool {
293         logger := wp.logger.WithField("InstanceType", it.Name)
294         wp.setupOnce.Do(wp.setup)
295         if wp.loadRunnerData() != nil {
296                 // Boot probe is certain to fail.
297                 return false
298         }
299         wp.mtx.Lock()
300         defer wp.mtx.Unlock()
301         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
302                 return false
303         }
304         now := time.Now()
305         secret := randomHex(instanceSecretLength)
306         wp.creating[secret] = createCall{time: now, instanceType: it}
307         go func() {
308                 defer wp.notify()
309                 tags := cloud.InstanceTags{
310                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
311                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
312                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
313                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
314                 }
315                 initCmd := TagVerifier{nil, secret}.InitCommand()
316                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
317                 wp.mtx.Lock()
318                 defer wp.mtx.Unlock()
319                 // delete() is deferred so the updateWorker() call
320                 // below knows to use StateBooting when adding a new
321                 // worker.
322                 defer delete(wp.creating, secret)
323                 if err != nil {
324                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
325                                 wp.atQuotaErr = err
326                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
327                                 time.AfterFunc(quotaErrorTTL, wp.notify)
328                         }
329                         logger.WithError(err).Error("create failed")
330                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
331                         return
332                 }
333                 wp.updateWorker(inst, it)
334         }()
335         return true
336 }
337
338 // AtQuota returns true if Create is not expected to work at the
339 // moment.
340 func (wp *Pool) AtQuota() bool {
341         wp.mtx.Lock()
342         defer wp.mtx.Unlock()
343         return time.Now().Before(wp.atQuotaUntil)
344 }
345
346 // SetIdleBehavior determines how the indicated instance will behave
347 // when it has no containers running.
348 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
349         wp.mtx.Lock()
350         defer wp.mtx.Unlock()
351         wkr, ok := wp.workers[id]
352         if !ok {
353                 return errors.New("requested instance does not exist")
354         }
355         wkr.setIdleBehavior(idleBehavior)
356         return nil
357 }
358
359 // Add or update worker attached to the given instance.
360 //
361 // The second return value is true if a new worker is created.
362 //
363 // A newly added instance has state=StateBooting if its tags match an
364 // entry in wp.creating, otherwise StateUnknown.
365 //
366 // Caller must have lock.
367 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
368         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
369         inst = TagVerifier{inst, secret}
370         id := inst.ID()
371         if wkr := wp.workers[id]; wkr != nil {
372                 wkr.executor.SetTarget(inst)
373                 wkr.instance = inst
374                 wkr.updated = time.Now()
375                 wkr.saveTags()
376                 return wkr, false
377         }
378
379         state := StateUnknown
380         if _, ok := wp.creating[secret]; ok {
381                 state = StateBooting
382         }
383
384         // If an instance has a valid IdleBehavior tag when it first
385         // appears, initialize the new worker accordingly (this is how
386         // we restore IdleBehavior that was set by a prior dispatch
387         // process); otherwise, default to "run". After this,
388         // wkr.idleBehavior is the source of truth, and will only be
389         // changed via SetIdleBehavior().
390         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
391         if !validIdleBehavior[idleBehavior] {
392                 idleBehavior = IdleBehaviorRun
393         }
394
395         logger := wp.logger.WithFields(logrus.Fields{
396                 "InstanceType": it.Name,
397                 "Instance":     inst.ID(),
398                 "Address":      inst.Address(),
399         })
400         logger.WithFields(logrus.Fields{
401                 "State":        state,
402                 "IdleBehavior": idleBehavior,
403         }).Infof("instance appeared in cloud")
404         now := time.Now()
405         wkr := &worker{
406                 mtx:          &wp.mtx,
407                 wp:           wp,
408                 logger:       logger,
409                 executor:     wp.newExecutor(inst),
410                 state:        state,
411                 idleBehavior: idleBehavior,
412                 instance:     inst,
413                 instType:     it,
414                 appeared:     now,
415                 probed:       now,
416                 busy:         now,
417                 updated:      now,
418                 running:      make(map[string]*remoteRunner),
419                 starting:     make(map[string]*remoteRunner),
420                 probing:      make(chan struct{}, 1),
421         }
422         wp.workers[id] = wkr
423         return wkr, true
424 }
425
426 // Shutdown shuts down a worker with the given type, or returns false
427 // if all workers with the given type are busy.
428 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
429         wp.setupOnce.Do(wp.setup)
430         wp.mtx.Lock()
431         defer wp.mtx.Unlock()
432         logger := wp.logger.WithField("InstanceType", it.Name)
433         logger.Info("shutdown requested")
434         for _, tryState := range []State{StateBooting, StateIdle} {
435                 // TODO: shutdown the worker with the longest idle
436                 // time (Idle) or the earliest create time (Booting)
437                 for _, wkr := range wp.workers {
438                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
439                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
440                                 wkr.shutdown()
441                                 return true
442                         }
443                 }
444         }
445         return false
446 }
447
448 // CountWorkers returns the current number of workers in each state.
449 //
450 // CountWorkers blocks, if necessary, until the initial instance list
451 // has been loaded from the cloud provider.
452 func (wp *Pool) CountWorkers() map[State]int {
453         wp.setupOnce.Do(wp.setup)
454         wp.waitUntilLoaded()
455         wp.mtx.Lock()
456         defer wp.mtx.Unlock()
457         r := map[State]int{}
458         for _, w := range wp.workers {
459                 r[w.state]++
460         }
461         return r
462 }
463
464 // Running returns the container UUIDs being prepared/run on workers.
465 //
466 // In the returned map, the time value indicates when the Pool
467 // observed that the container process had exited. A container that
468 // has not yet exited has a zero time value. The caller should use
469 // ForgetContainer() to garbage-collect the entries for exited
470 // containers.
471 func (wp *Pool) Running() map[string]time.Time {
472         wp.setupOnce.Do(wp.setup)
473         wp.mtx.Lock()
474         defer wp.mtx.Unlock()
475         r := map[string]time.Time{}
476         for _, wkr := range wp.workers {
477                 for uuid := range wkr.running {
478                         r[uuid] = time.Time{}
479                 }
480                 for uuid := range wkr.starting {
481                         r[uuid] = time.Time{}
482                 }
483         }
484         for uuid, exited := range wp.exited {
485                 r[uuid] = exited
486         }
487         return r
488 }
489
490 // StartContainer starts a container on an idle worker immediately if
491 // possible, otherwise returns false.
492 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
493         wp.setupOnce.Do(wp.setup)
494         wp.mtx.Lock()
495         defer wp.mtx.Unlock()
496         var wkr *worker
497         for _, w := range wp.workers {
498                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
499                         if wkr == nil || w.busy.After(wkr.busy) {
500                                 wkr = w
501                         }
502                 }
503         }
504         if wkr == nil {
505                 return false
506         }
507         wkr.startContainer(ctr)
508         return true
509 }
510
511 // KillContainer kills the crunch-run process for the given container
512 // UUID, if it's running on any worker.
513 //
514 // KillContainer returns immediately; the act of killing the container
515 // takes some time, and runs in the background.
516 //
517 // KillContainer returns false if the container has already ended.
518 func (wp *Pool) KillContainer(uuid string, reason string) bool {
519         wp.mtx.Lock()
520         defer wp.mtx.Unlock()
521         logger := wp.logger.WithFields(logrus.Fields{
522                 "ContainerUUID": uuid,
523                 "Reason":        reason,
524         })
525         for _, wkr := range wp.workers {
526                 rr := wkr.running[uuid]
527                 if rr == nil {
528                         rr = wkr.starting[uuid]
529                 }
530                 if rr != nil {
531                         rr.Kill(reason)
532                         return true
533                 }
534         }
535         logger.Debug("cannot kill: already disappeared")
536         return false
537 }
538
539 // ForgetContainer clears the placeholder for the given exited
540 // container, so it isn't returned by subsequent calls to Running().
541 //
542 // ForgetContainer has no effect if the container has not yet exited.
543 //
544 // The "container exited at time T" placeholder (which necessitates
545 // ForgetContainer) exists to make it easier for the caller
546 // (scheduler) to distinguish a container that exited without
547 // finalizing its state from a container that exited too recently for
548 // its final state to have appeared in the scheduler's queue cache.
549 func (wp *Pool) ForgetContainer(uuid string) {
550         wp.mtx.Lock()
551         defer wp.mtx.Unlock()
552         if _, ok := wp.exited[uuid]; ok {
553                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
554                 delete(wp.exited, uuid)
555         }
556 }
557
558 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
559         if reg == nil {
560                 reg = prometheus.NewRegistry()
561         }
562         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
563                 Namespace: "arvados",
564                 Subsystem: "dispatchcloud",
565                 Name:      "containers_running",
566                 Help:      "Number of containers reported running by cloud VMs.",
567         })
568         reg.MustRegister(wp.mContainersRunning)
569         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
570                 Namespace: "arvados",
571                 Subsystem: "dispatchcloud",
572                 Name:      "instances_total",
573                 Help:      "Number of cloud VMs.",
574         }, []string{"category", "instance_type"})
575         reg.MustRegister(wp.mInstances)
576         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
577                 Namespace: "arvados",
578                 Subsystem: "dispatchcloud",
579                 Name:      "instances_price",
580                 Help:      "Price of cloud VMs.",
581         }, []string{"category"})
582         reg.MustRegister(wp.mInstancesPrice)
583         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
584                 Namespace: "arvados",
585                 Subsystem: "dispatchcloud",
586                 Name:      "vcpus_total",
587                 Help:      "Total VCPUs on all cloud VMs.",
588         }, []string{"category"})
589         reg.MustRegister(wp.mVCPUs)
590         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
591                 Namespace: "arvados",
592                 Subsystem: "dispatchcloud",
593                 Name:      "memory_bytes_total",
594                 Help:      "Total memory on all cloud VMs.",
595         }, []string{"category"})
596         reg.MustRegister(wp.mMemory)
597         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
598                 Namespace: "arvados",
599                 Subsystem: "dispatchcloud",
600                 Name:      "boot_outcomes",
601                 Help:      "Boot outcomes by type.",
602         }, []string{"state"})
603         for k := range validBootOutcomes {
604                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
605         }
606         reg.MustRegister(wp.mBootOutcomes)
607         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
608                 Namespace: "arvados",
609                 Subsystem: "dispatchcloud",
610                 Name:      "instances_disappeared",
611                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
612         }, []string{"state"})
613         for _, v := range stateString {
614                 wp.mDisappearances.WithLabelValues(v).Add(0)
615         }
616         reg.MustRegister(wp.mDisappearances)
617 }
618
619 func (wp *Pool) runMetrics() {
620         ch := wp.Subscribe()
621         defer wp.Unsubscribe(ch)
622         wp.updateMetrics()
623         for range ch {
624                 wp.updateMetrics()
625         }
626 }
627
628 func (wp *Pool) updateMetrics() {
629         wp.mtx.RLock()
630         defer wp.mtx.RUnlock()
631
632         type entKey struct {
633                 cat      string
634                 instType string
635         }
636         instances := map[entKey]int64{}
637         price := map[string]float64{}
638         cpu := map[string]int64{}
639         mem := map[string]int64{}
640         var running int64
641         for _, wkr := range wp.workers {
642                 var cat string
643                 switch {
644                 case len(wkr.running)+len(wkr.starting) > 0:
645                         cat = "inuse"
646                 case wkr.idleBehavior == IdleBehaviorHold:
647                         cat = "hold"
648                 case wkr.state == StateBooting:
649                         cat = "booting"
650                 case wkr.state == StateUnknown:
651                         cat = "unknown"
652                 default:
653                         cat = "idle"
654                 }
655                 instances[entKey{cat, wkr.instType.Name}]++
656                 price[cat] += wkr.instType.Price
657                 cpu[cat] += int64(wkr.instType.VCPUs)
658                 mem[cat] += int64(wkr.instType.RAM)
659                 running += int64(len(wkr.running) + len(wkr.starting))
660         }
661         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
662                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
663                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
664                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
665                 // make sure to reset gauges for non-existing category/nodetype combinations
666                 for _, it := range wp.instanceTypes {
667                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
668                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
669                         }
670                 }
671         }
672         for k, v := range instances {
673                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
674         }
675         wp.mContainersRunning.Set(float64(running))
676 }
677
678 func (wp *Pool) runProbes() {
679         maxPPS := wp.maxProbesPerSecond
680         if maxPPS < 1 {
681                 maxPPS = defaultMaxProbesPerSecond
682         }
683         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
684         defer limitticker.Stop()
685
686         probeticker := time.NewTicker(wp.probeInterval)
687         defer probeticker.Stop()
688
689         workers := []cloud.InstanceID{}
690         for range probeticker.C {
691                 workers = workers[:0]
692                 wp.mtx.Lock()
693                 for id, wkr := range wp.workers {
694                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
695                                 continue
696                         }
697                         workers = append(workers, id)
698                 }
699                 wp.mtx.Unlock()
700
701                 for _, id := range workers {
702                         wp.mtx.Lock()
703                         wkr, ok := wp.workers[id]
704                         wp.mtx.Unlock()
705                         if !ok {
706                                 // Deleted while we were probing
707                                 // others
708                                 continue
709                         }
710                         go wkr.ProbeAndUpdate()
711                         select {
712                         case <-wp.stop:
713                                 return
714                         case <-limitticker.C:
715                         }
716                 }
717         }
718 }
719
720 func (wp *Pool) runSync() {
721         // sync once immediately, then wait syncInterval, sync again,
722         // etc.
723         timer := time.NewTimer(1)
724         for {
725                 select {
726                 case <-timer.C:
727                         err := wp.getInstancesAndSync()
728                         if err != nil {
729                                 wp.logger.WithError(err).Warn("sync failed")
730                         }
731                         timer.Reset(wp.syncInterval)
732                 case <-wp.stop:
733                         wp.logger.Debug("worker.Pool stopped")
734                         return
735                 }
736         }
737 }
738
739 // Stop synchronizing with the InstanceSet.
740 func (wp *Pool) Stop() {
741         wp.setupOnce.Do(wp.setup)
742         close(wp.stop)
743 }
744
745 // Instances returns an InstanceView for each worker in the pool,
746 // summarizing its current state and recent activity.
747 func (wp *Pool) Instances() []InstanceView {
748         var r []InstanceView
749         wp.setupOnce.Do(wp.setup)
750         wp.mtx.Lock()
751         for _, w := range wp.workers {
752                 r = append(r, InstanceView{
753                         Instance:             w.instance.ID(),
754                         Address:              w.instance.Address(),
755                         Price:                w.instType.Price,
756                         ArvadosInstanceType:  w.instType.Name,
757                         ProviderInstanceType: w.instType.ProviderType,
758                         LastContainerUUID:    w.lastUUID,
759                         LastBusy:             w.busy,
760                         WorkerState:          w.state.String(),
761                         IdleBehavior:         w.idleBehavior,
762                 })
763         }
764         wp.mtx.Unlock()
765         sort.Slice(r, func(i, j int) bool {
766                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
767         })
768         return r
769 }
770
771 // KillInstance destroys a cloud VM instance. It returns an error if
772 // the given instance does not exist.
773 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
774         wkr, ok := wp.workers[id]
775         if !ok {
776                 return errors.New("instance not found")
777         }
778         wkr.logger.WithField("Reason", reason).Info("shutting down")
779         wkr.shutdown()
780         return nil
781 }
782
783 func (wp *Pool) setup() {
784         wp.creating = map[string]createCall{}
785         wp.exited = map[string]time.Time{}
786         wp.workers = map[cloud.InstanceID]*worker{}
787         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
788         wp.loadRunnerData()
789 }
790
791 // Load the runner program to be deployed on worker nodes into
792 // wp.runnerData, if necessary. Errors are logged.
793 //
794 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
795 //
796 // Caller must not have lock.
797 func (wp *Pool) loadRunnerData() error {
798         wp.mtx.Lock()
799         defer wp.mtx.Unlock()
800         if wp.runnerData != nil {
801                 return nil
802         } else if wp.runnerSource == "" {
803                 wp.runnerCmd = "crunch-run"
804                 wp.runnerData = []byte{}
805                 return nil
806         }
807         logger := wp.logger.WithField("source", wp.runnerSource)
808         logger.Debug("loading runner")
809         buf, err := ioutil.ReadFile(wp.runnerSource)
810         if err != nil {
811                 logger.WithError(err).Error("failed to load runner program")
812                 return err
813         }
814         wp.runnerData = buf
815         wp.runnerMD5 = md5.Sum(buf)
816         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
817         return nil
818 }
819
820 func (wp *Pool) notify() {
821         wp.mtx.RLock()
822         defer wp.mtx.RUnlock()
823         for _, send := range wp.subscribers {
824                 select {
825                 case send <- struct{}{}:
826                 default:
827                 }
828         }
829 }
830
831 func (wp *Pool) getInstancesAndSync() error {
832         wp.setupOnce.Do(wp.setup)
833         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
834                 return err
835         }
836         wp.logger.Debug("getting instance list")
837         threshold := time.Now()
838         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
839         if err != nil {
840                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
841                 return err
842         }
843         wp.sync(threshold, instances)
844         wp.logger.Debug("sync done")
845         return nil
846 }
847
848 // Add/remove/update workers based on instances, which was obtained
849 // from the instanceSet. However, don't clobber any other updates that
850 // already happened after threshold.
851 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
852         wp.mtx.Lock()
853         defer wp.mtx.Unlock()
854         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
855         notify := false
856
857         for _, inst := range instances {
858                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
859                 it, ok := wp.instanceTypes[itTag]
860                 if !ok {
861                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
862                         continue
863                 }
864                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
865                         notify = true
866                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
867                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
868                         wkr.shutdown()
869                 }
870         }
871
872         for id, wkr := range wp.workers {
873                 if wkr.updated.After(threshold) {
874                         continue
875                 }
876                 logger := wp.logger.WithFields(logrus.Fields{
877                         "Instance":    wkr.instance.ID(),
878                         "WorkerState": wkr.state,
879                 })
880                 logger.Info("instance disappeared in cloud")
881                 wkr.reportBootOutcome(BootOutcomeDisappeared)
882                 if wp.mDisappearances != nil {
883                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
884                 }
885                 delete(wp.workers, id)
886                 go wkr.Close()
887                 notify = true
888         }
889
890         if !wp.loaded {
891                 notify = true
892                 wp.loaded = true
893                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
894         }
895
896         if notify {
897                 go wp.notify()
898         }
899 }
900
901 func (wp *Pool) waitUntilLoaded() {
902         ch := wp.Subscribe()
903         wp.mtx.RLock()
904         defer wp.mtx.RUnlock()
905         for !wp.loaded {
906                 wp.mtx.RUnlock()
907                 <-ch
908                 wp.mtx.RLock()
909         }
910 }
911
912 // Return a random string of n hexadecimal digits (n*4 random bits). n
913 // must be even.
914 func randomHex(n int) string {
915         buf := make([]byte, n/2)
916         _, err := rand.Read(buf)
917         if err != nil {
918                 panic(err)
919         }
920         return fmt.Sprintf("%x", buf)
921 }