16535: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:             logger,
100                 arvClient:          arvClient,
101                 instanceSetID:      instanceSetID,
102                 instanceSet:        &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:        newExecutor,
104                 bootProbeCommand:   cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:       cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:            cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:      cluster.InstanceTypes,
108                 maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 probeInterval:      duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
110                 syncInterval:       duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
111                 timeoutIdle:        duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
112                 timeoutBooting:     duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
113                 timeoutProbe:       duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
114                 timeoutShutdown:    duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
115                 timeoutTERM:        duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
116                 timeoutSignal:      duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
117                 installPublicKey:   installPublicKey,
118                 tagKeyPrefix:       cluster.Containers.CloudVMs.TagKeyPrefix,
119                 stop:               make(chan bool),
120         }
121         wp.registerMetrics(reg)
122         go func() {
123                 wp.setupOnce.Do(wp.setup)
124                 go wp.runMetrics()
125                 go wp.runProbes()
126                 go wp.runSync()
127         }()
128         return wp
129 }
130
131 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
132 // zero Pool should not be used. Call NewPool to create a new Pool.
133 type Pool struct {
134         // configuration
135         logger             logrus.FieldLogger
136         arvClient          *arvados.Client
137         instanceSetID      cloud.InstanceSetID
138         instanceSet        *throttledInstanceSet
139         newExecutor        func(cloud.Instance) Executor
140         bootProbeCommand   string
141         runnerSource       string
142         imageID            cloud.ImageID
143         instanceTypes      map[string]arvados.InstanceType
144         syncInterval       time.Duration
145         probeInterval      time.Duration
146         maxProbesPerSecond int
147         timeoutIdle        time.Duration
148         timeoutBooting     time.Duration
149         timeoutProbe       time.Duration
150         timeoutShutdown    time.Duration
151         timeoutTERM        time.Duration
152         timeoutSignal      time.Duration
153         installPublicKey   ssh.PublicKey
154         tagKeyPrefix       string
155
156         // private state
157         subscribers  map[<-chan struct{}]chan<- struct{}
158         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
159         workers      map[cloud.InstanceID]*worker
160         loaded       bool                 // loaded list of instances from InstanceSet at least once
161         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
162         atQuotaUntil time.Time
163         atQuotaErr   cloud.QuotaError
164         stop         chan bool
165         mtx          sync.RWMutex
166         setupOnce    sync.Once
167         runnerData   []byte
168         runnerMD5    [md5.Size]byte
169         runnerCmd    string
170
171         throttleCreate    throttle
172         throttleInstances throttle
173
174         mContainersRunning prometheus.Gauge
175         mInstances         *prometheus.GaugeVec
176         mInstancesPrice    *prometheus.GaugeVec
177         mVCPUs             *prometheus.GaugeVec
178         mMemory            *prometheus.GaugeVec
179         mBootOutcomes      *prometheus.CounterVec
180         mDisappearances    *prometheus.CounterVec
181 }
182
183 type createCall struct {
184         time         time.Time
185         instanceType arvados.InstanceType
186 }
187
188 func (wp *Pool) CheckHealth() error {
189         wp.setupOnce.Do(wp.setup)
190         if err := wp.loadRunnerData(); err != nil {
191                 return fmt.Errorf("error loading runner binary: %s", err)
192         }
193         return nil
194 }
195
196 // Subscribe returns a buffered channel that becomes ready after any
197 // change to the pool's state that could have scheduling implications:
198 // a worker's state changes, a new worker appears, the cloud
199 // provider's API rate limiting period ends, etc.
200 //
201 // Additional events that occur while the channel is already ready
202 // will be dropped, so it is OK if the caller services the channel
203 // slowly.
204 //
205 // Example:
206 //
207 //      ch := wp.Subscribe()
208 //      defer wp.Unsubscribe(ch)
209 //      for range ch {
210 //              tryScheduling(wp)
211 //              if done {
212 //                      break
213 //              }
214 //      }
215 func (wp *Pool) Subscribe() <-chan struct{} {
216         wp.setupOnce.Do(wp.setup)
217         wp.mtx.Lock()
218         defer wp.mtx.Unlock()
219         ch := make(chan struct{}, 1)
220         wp.subscribers[ch] = ch
221         return ch
222 }
223
224 // Unsubscribe stops sending updates to the given channel.
225 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
226         wp.setupOnce.Do(wp.setup)
227         wp.mtx.Lock()
228         defer wp.mtx.Unlock()
229         delete(wp.subscribers, ch)
230 }
231
232 // Unallocated returns the number of unallocated (creating + booting +
233 // idle + unknown) workers for each instance type.  Workers in
234 // hold/drain mode are not included.
235 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
236         wp.setupOnce.Do(wp.setup)
237         wp.mtx.RLock()
238         defer wp.mtx.RUnlock()
239         unalloc := map[arvados.InstanceType]int{}
240         creating := map[arvados.InstanceType]int{}
241         oldestCreate := map[arvados.InstanceType]time.Time{}
242         for _, cc := range wp.creating {
243                 it := cc.instanceType
244                 creating[it]++
245                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
246                         oldestCreate[it] = cc.time
247                 }
248         }
249         for _, wkr := range wp.workers {
250                 // Skip workers that are not expected to become
251                 // available soon. Note len(wkr.running)>0 is not
252                 // redundant here: it can be true even in
253                 // StateUnknown.
254                 if wkr.state == StateShutdown ||
255                         wkr.state == StateRunning ||
256                         wkr.idleBehavior != IdleBehaviorRun ||
257                         len(wkr.running) > 0 {
258                         continue
259                 }
260                 it := wkr.instType
261                 unalloc[it]++
262                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
263                         // If up to N new workers appear in
264                         // Instances() while we are waiting for N
265                         // Create() calls to complete, we assume we're
266                         // just seeing a race between Instances() and
267                         // Create() responses.
268                         //
269                         // The other common reason why nodes have
270                         // state==Unknown is that they appeared at
271                         // startup, before any Create calls. They
272                         // don't match the above timing condition, so
273                         // we never mistakenly attribute them to
274                         // pending Create calls.
275                         creating[it]--
276                 }
277         }
278         for it, c := range creating {
279                 unalloc[it] += c
280         }
281         return unalloc
282 }
283
284 // Create a new instance with the given type, and add it to the worker
285 // pool. The worker is added immediately; instance creation runs in
286 // the background.
287 //
288 // Create returns false if a pre-existing error state prevents it from
289 // even attempting to create a new instance. Those errors are logged
290 // by the Pool, so the caller does not need to log anything in such
291 // cases.
292 func (wp *Pool) Create(it arvados.InstanceType) bool {
293         logger := wp.logger.WithField("InstanceType", it.Name)
294         wp.setupOnce.Do(wp.setup)
295         if wp.loadRunnerData() != nil {
296                 // Boot probe is certain to fail.
297                 return false
298         }
299         wp.mtx.Lock()
300         defer wp.mtx.Unlock()
301         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
302                 return false
303         }
304         now := time.Now()
305         secret := randomHex(instanceSecretLength)
306         wp.creating[secret] = createCall{time: now, instanceType: it}
307         go func() {
308                 defer wp.notify()
309                 tags := cloud.InstanceTags{
310                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
311                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
312                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
313                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
314                 }
315                 initCmd := TagVerifier{nil, secret}.InitCommand()
316                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
317                 wp.mtx.Lock()
318                 defer wp.mtx.Unlock()
319                 // delete() is deferred so the updateWorker() call
320                 // below knows to use StateBooting when adding a new
321                 // worker.
322                 defer delete(wp.creating, secret)
323                 if err != nil {
324                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
325                                 wp.atQuotaErr = err
326                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
327                                 time.AfterFunc(quotaErrorTTL, wp.notify)
328                         }
329                         logger.WithError(err).Error("create failed")
330                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
331                         return
332                 }
333                 wp.updateWorker(inst, it)
334         }()
335         return true
336 }
337
338 // AtQuota returns true if Create is not expected to work at the
339 // moment.
340 func (wp *Pool) AtQuota() bool {
341         wp.mtx.Lock()
342         defer wp.mtx.Unlock()
343         return time.Now().Before(wp.atQuotaUntil)
344 }
345
346 // SetIdleBehavior determines how the indicated instance will behave
347 // when it has no containers running.
348 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
349         wp.mtx.Lock()
350         defer wp.mtx.Unlock()
351         wkr, ok := wp.workers[id]
352         if !ok {
353                 return errors.New("requested instance does not exist")
354         }
355         wkr.setIdleBehavior(idleBehavior)
356         return nil
357 }
358
359 // Add or update worker attached to the given instance.
360 //
361 // The second return value is true if a new worker is created.
362 //
363 // A newly added instance has state=StateBooting if its tags match an
364 // entry in wp.creating, otherwise StateUnknown.
365 //
366 // Caller must have lock.
367 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
368         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
369         inst = TagVerifier{inst, secret}
370         id := inst.ID()
371         if wkr := wp.workers[id]; wkr != nil {
372                 wkr.executor.SetTarget(inst)
373                 wkr.instance = inst
374                 wkr.updated = time.Now()
375                 wkr.saveTags()
376                 return wkr, false
377         }
378
379         state := StateUnknown
380         if _, ok := wp.creating[secret]; ok {
381                 state = StateBooting
382         }
383
384         // If an instance has a valid IdleBehavior tag when it first
385         // appears, initialize the new worker accordingly (this is how
386         // we restore IdleBehavior that was set by a prior dispatch
387         // process); otherwise, default to "run". After this,
388         // wkr.idleBehavior is the source of truth, and will only be
389         // changed via SetIdleBehavior().
390         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
391         if !validIdleBehavior[idleBehavior] {
392                 idleBehavior = IdleBehaviorRun
393         }
394
395         logger := wp.logger.WithFields(logrus.Fields{
396                 "InstanceType": it.Name,
397                 "Instance":     inst.ID(),
398                 "Address":      inst.Address(),
399         })
400         logger.WithFields(logrus.Fields{
401                 "State":        state,
402                 "IdleBehavior": idleBehavior,
403         }).Infof("instance appeared in cloud")
404         now := time.Now()
405         wkr := &worker{
406                 mtx:          &wp.mtx,
407                 wp:           wp,
408                 logger:       logger,
409                 executor:     wp.newExecutor(inst),
410                 state:        state,
411                 idleBehavior: idleBehavior,
412                 instance:     inst,
413                 instType:     it,
414                 appeared:     now,
415                 probed:       now,
416                 busy:         now,
417                 updated:      now,
418                 running:      make(map[string]*remoteRunner),
419                 starting:     make(map[string]*remoteRunner),
420                 probing:      make(chan struct{}, 1),
421         }
422         wp.workers[id] = wkr
423         return wkr, true
424 }
425
426 // Shutdown shuts down a worker with the given type, or returns false
427 // if all workers with the given type are busy.
428 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
429         wp.setupOnce.Do(wp.setup)
430         wp.mtx.Lock()
431         defer wp.mtx.Unlock()
432         logger := wp.logger.WithField("InstanceType", it.Name)
433         logger.Info("shutdown requested")
434         for _, tryState := range []State{StateBooting, StateIdle} {
435                 // TODO: shutdown the worker with the longest idle
436                 // time (Idle) or the earliest create time (Booting)
437                 for _, wkr := range wp.workers {
438                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
439                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
440                                 wkr.reportBootOutcome(BootOutcomeAborted)
441                                 wkr.shutdown()
442                                 return true
443                         }
444                 }
445         }
446         return false
447 }
448
449 // CountWorkers returns the current number of workers in each state.
450 //
451 // CountWorkers blocks, if necessary, until the initial instance list
452 // has been loaded from the cloud provider.
453 func (wp *Pool) CountWorkers() map[State]int {
454         wp.setupOnce.Do(wp.setup)
455         wp.waitUntilLoaded()
456         wp.mtx.Lock()
457         defer wp.mtx.Unlock()
458         r := map[State]int{}
459         for _, w := range wp.workers {
460                 r[w.state]++
461         }
462         return r
463 }
464
465 // Running returns the container UUIDs being prepared/run on workers.
466 //
467 // In the returned map, the time value indicates when the Pool
468 // observed that the container process had exited. A container that
469 // has not yet exited has a zero time value. The caller should use
470 // ForgetContainer() to garbage-collect the entries for exited
471 // containers.
472 func (wp *Pool) Running() map[string]time.Time {
473         wp.setupOnce.Do(wp.setup)
474         wp.mtx.Lock()
475         defer wp.mtx.Unlock()
476         r := map[string]time.Time{}
477         for _, wkr := range wp.workers {
478                 for uuid := range wkr.running {
479                         r[uuid] = time.Time{}
480                 }
481                 for uuid := range wkr.starting {
482                         r[uuid] = time.Time{}
483                 }
484         }
485         for uuid, exited := range wp.exited {
486                 r[uuid] = exited
487         }
488         return r
489 }
490
491 // StartContainer starts a container on an idle worker immediately if
492 // possible, otherwise returns false.
493 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
494         wp.setupOnce.Do(wp.setup)
495         wp.mtx.Lock()
496         defer wp.mtx.Unlock()
497         var wkr *worker
498         for _, w := range wp.workers {
499                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
500                         if wkr == nil || w.busy.After(wkr.busy) {
501                                 wkr = w
502                         }
503                 }
504         }
505         if wkr == nil {
506                 return false
507         }
508         wkr.startContainer(ctr)
509         return true
510 }
511
512 // KillContainer kills the crunch-run process for the given container
513 // UUID, if it's running on any worker.
514 //
515 // KillContainer returns immediately; the act of killing the container
516 // takes some time, and runs in the background.
517 //
518 // KillContainer returns false if the container has already ended.
519 func (wp *Pool) KillContainer(uuid string, reason string) bool {
520         wp.mtx.Lock()
521         defer wp.mtx.Unlock()
522         logger := wp.logger.WithFields(logrus.Fields{
523                 "ContainerUUID": uuid,
524                 "Reason":        reason,
525         })
526         for _, wkr := range wp.workers {
527                 rr := wkr.running[uuid]
528                 if rr == nil {
529                         rr = wkr.starting[uuid]
530                 }
531                 if rr != nil {
532                         rr.Kill(reason)
533                         return true
534                 }
535         }
536         logger.Debug("cannot kill: already disappeared")
537         return false
538 }
539
540 // ForgetContainer clears the placeholder for the given exited
541 // container, so it isn't returned by subsequent calls to Running().
542 //
543 // ForgetContainer has no effect if the container has not yet exited.
544 //
545 // The "container exited at time T" placeholder (which necessitates
546 // ForgetContainer) exists to make it easier for the caller
547 // (scheduler) to distinguish a container that exited without
548 // finalizing its state from a container that exited too recently for
549 // its final state to have appeared in the scheduler's queue cache.
550 func (wp *Pool) ForgetContainer(uuid string) {
551         wp.mtx.Lock()
552         defer wp.mtx.Unlock()
553         if _, ok := wp.exited[uuid]; ok {
554                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
555                 delete(wp.exited, uuid)
556         }
557 }
558
559 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
560         if reg == nil {
561                 reg = prometheus.NewRegistry()
562         }
563         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
564                 Namespace: "arvados",
565                 Subsystem: "dispatchcloud",
566                 Name:      "containers_running",
567                 Help:      "Number of containers reported running by cloud VMs.",
568         })
569         reg.MustRegister(wp.mContainersRunning)
570         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
571                 Namespace: "arvados",
572                 Subsystem: "dispatchcloud",
573                 Name:      "instances_total",
574                 Help:      "Number of cloud VMs.",
575         }, []string{"category", "instance_type"})
576         reg.MustRegister(wp.mInstances)
577         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
578                 Namespace: "arvados",
579                 Subsystem: "dispatchcloud",
580                 Name:      "instances_price",
581                 Help:      "Price of cloud VMs.",
582         }, []string{"category"})
583         reg.MustRegister(wp.mInstancesPrice)
584         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
585                 Namespace: "arvados",
586                 Subsystem: "dispatchcloud",
587                 Name:      "vcpus_total",
588                 Help:      "Total VCPUs on all cloud VMs.",
589         }, []string{"category"})
590         reg.MustRegister(wp.mVCPUs)
591         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
592                 Namespace: "arvados",
593                 Subsystem: "dispatchcloud",
594                 Name:      "memory_bytes_total",
595                 Help:      "Total memory on all cloud VMs.",
596         }, []string{"category"})
597         reg.MustRegister(wp.mMemory)
598         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
599                 Namespace: "arvados",
600                 Subsystem: "dispatchcloud",
601                 Name:      "boot_outcomes",
602                 Help:      "Boot outcomes by type.",
603         }, []string{"outcome"})
604         for k := range validBootOutcomes {
605                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
606         }
607         reg.MustRegister(wp.mBootOutcomes)
608         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
609                 Namespace: "arvados",
610                 Subsystem: "dispatchcloud",
611                 Name:      "instances_disappeared",
612                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
613         }, []string{"state"})
614         for _, v := range stateString {
615                 wp.mDisappearances.WithLabelValues(v).Add(0)
616         }
617         reg.MustRegister(wp.mDisappearances)
618 }
619
620 func (wp *Pool) runMetrics() {
621         ch := wp.Subscribe()
622         defer wp.Unsubscribe(ch)
623         wp.updateMetrics()
624         for range ch {
625                 wp.updateMetrics()
626         }
627 }
628
629 func (wp *Pool) updateMetrics() {
630         wp.mtx.RLock()
631         defer wp.mtx.RUnlock()
632
633         type entKey struct {
634                 cat      string
635                 instType string
636         }
637         instances := map[entKey]int64{}
638         price := map[string]float64{}
639         cpu := map[string]int64{}
640         mem := map[string]int64{}
641         var running int64
642         for _, wkr := range wp.workers {
643                 var cat string
644                 switch {
645                 case len(wkr.running)+len(wkr.starting) > 0:
646                         cat = "inuse"
647                 case wkr.idleBehavior == IdleBehaviorHold:
648                         cat = "hold"
649                 case wkr.state == StateBooting:
650                         cat = "booting"
651                 case wkr.state == StateUnknown:
652                         cat = "unknown"
653                 default:
654                         cat = "idle"
655                 }
656                 instances[entKey{cat, wkr.instType.Name}]++
657                 price[cat] += wkr.instType.Price
658                 cpu[cat] += int64(wkr.instType.VCPUs)
659                 mem[cat] += int64(wkr.instType.RAM)
660                 running += int64(len(wkr.running) + len(wkr.starting))
661         }
662         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
663                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
664                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
665                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
666                 // make sure to reset gauges for non-existing category/nodetype combinations
667                 for _, it := range wp.instanceTypes {
668                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
669                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
670                         }
671                 }
672         }
673         for k, v := range instances {
674                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
675         }
676         wp.mContainersRunning.Set(float64(running))
677 }
678
679 func (wp *Pool) runProbes() {
680         maxPPS := wp.maxProbesPerSecond
681         if maxPPS < 1 {
682                 maxPPS = defaultMaxProbesPerSecond
683         }
684         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
685         defer limitticker.Stop()
686
687         probeticker := time.NewTicker(wp.probeInterval)
688         defer probeticker.Stop()
689
690         workers := []cloud.InstanceID{}
691         for range probeticker.C {
692                 workers = workers[:0]
693                 wp.mtx.Lock()
694                 for id, wkr := range wp.workers {
695                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
696                                 continue
697                         }
698                         workers = append(workers, id)
699                 }
700                 wp.mtx.Unlock()
701
702                 for _, id := range workers {
703                         wp.mtx.Lock()
704                         wkr, ok := wp.workers[id]
705                         wp.mtx.Unlock()
706                         if !ok {
707                                 // Deleted while we were probing
708                                 // others
709                                 continue
710                         }
711                         go wkr.ProbeAndUpdate()
712                         select {
713                         case <-wp.stop:
714                                 return
715                         case <-limitticker.C:
716                         }
717                 }
718         }
719 }
720
721 func (wp *Pool) runSync() {
722         // sync once immediately, then wait syncInterval, sync again,
723         // etc.
724         timer := time.NewTimer(1)
725         for {
726                 select {
727                 case <-timer.C:
728                         err := wp.getInstancesAndSync()
729                         if err != nil {
730                                 wp.logger.WithError(err).Warn("sync failed")
731                         }
732                         timer.Reset(wp.syncInterval)
733                 case <-wp.stop:
734                         wp.logger.Debug("worker.Pool stopped")
735                         return
736                 }
737         }
738 }
739
740 // Stop synchronizing with the InstanceSet.
741 func (wp *Pool) Stop() {
742         wp.setupOnce.Do(wp.setup)
743         close(wp.stop)
744 }
745
746 // Instances returns an InstanceView for each worker in the pool,
747 // summarizing its current state and recent activity.
748 func (wp *Pool) Instances() []InstanceView {
749         var r []InstanceView
750         wp.setupOnce.Do(wp.setup)
751         wp.mtx.Lock()
752         for _, w := range wp.workers {
753                 r = append(r, InstanceView{
754                         Instance:             w.instance.ID(),
755                         Address:              w.instance.Address(),
756                         Price:                w.instType.Price,
757                         ArvadosInstanceType:  w.instType.Name,
758                         ProviderInstanceType: w.instType.ProviderType,
759                         LastContainerUUID:    w.lastUUID,
760                         LastBusy:             w.busy,
761                         WorkerState:          w.state.String(),
762                         IdleBehavior:         w.idleBehavior,
763                 })
764         }
765         wp.mtx.Unlock()
766         sort.Slice(r, func(i, j int) bool {
767                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
768         })
769         return r
770 }
771
772 // KillInstance destroys a cloud VM instance. It returns an error if
773 // the given instance does not exist.
774 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
775         wkr, ok := wp.workers[id]
776         if !ok {
777                 return errors.New("instance not found")
778         }
779         wkr.logger.WithField("Reason", reason).Info("shutting down")
780         wkr.reportBootOutcome(BootOutcomeAborted)
781         wkr.shutdown()
782         return nil
783 }
784
785 func (wp *Pool) setup() {
786         wp.creating = map[string]createCall{}
787         wp.exited = map[string]time.Time{}
788         wp.workers = map[cloud.InstanceID]*worker{}
789         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
790         wp.loadRunnerData()
791 }
792
793 // Load the runner program to be deployed on worker nodes into
794 // wp.runnerData, if necessary. Errors are logged.
795 //
796 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
797 //
798 // Caller must not have lock.
799 func (wp *Pool) loadRunnerData() error {
800         wp.mtx.Lock()
801         defer wp.mtx.Unlock()
802         if wp.runnerData != nil {
803                 return nil
804         } else if wp.runnerSource == "" {
805                 wp.runnerCmd = "crunch-run"
806                 wp.runnerData = []byte{}
807                 return nil
808         }
809         logger := wp.logger.WithField("source", wp.runnerSource)
810         logger.Debug("loading runner")
811         buf, err := ioutil.ReadFile(wp.runnerSource)
812         if err != nil {
813                 logger.WithError(err).Error("failed to load runner program")
814                 return err
815         }
816         wp.runnerData = buf
817         wp.runnerMD5 = md5.Sum(buf)
818         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
819         return nil
820 }
821
822 func (wp *Pool) notify() {
823         wp.mtx.RLock()
824         defer wp.mtx.RUnlock()
825         for _, send := range wp.subscribers {
826                 select {
827                 case send <- struct{}{}:
828                 default:
829                 }
830         }
831 }
832
833 func (wp *Pool) getInstancesAndSync() error {
834         wp.setupOnce.Do(wp.setup)
835         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
836                 return err
837         }
838         wp.logger.Debug("getting instance list")
839         threshold := time.Now()
840         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
841         if err != nil {
842                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
843                 return err
844         }
845         wp.sync(threshold, instances)
846         wp.logger.Debug("sync done")
847         return nil
848 }
849
850 // Add/remove/update workers based on instances, which was obtained
851 // from the instanceSet. However, don't clobber any other updates that
852 // already happened after threshold.
853 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
854         wp.mtx.Lock()
855         defer wp.mtx.Unlock()
856         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
857         notify := false
858
859         for _, inst := range instances {
860                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
861                 it, ok := wp.instanceTypes[itTag]
862                 if !ok {
863                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
864                         continue
865                 }
866                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
867                         notify = true
868                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
869                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
870                         wkr.shutdown()
871                 }
872         }
873
874         for id, wkr := range wp.workers {
875                 if wkr.updated.After(threshold) {
876                         continue
877                 }
878                 logger := wp.logger.WithFields(logrus.Fields{
879                         "Instance":    wkr.instance.ID(),
880                         "WorkerState": wkr.state,
881                 })
882                 logger.Info("instance disappeared in cloud")
883                 wkr.reportBootOutcome(BootOutcomeDisappeared)
884                 if wp.mDisappearances != nil {
885                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
886                 }
887                 delete(wp.workers, id)
888                 go wkr.Close()
889                 notify = true
890         }
891
892         if !wp.loaded {
893                 notify = true
894                 wp.loaded = true
895                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
896         }
897
898         if notify {
899                 go wp.notify()
900         }
901 }
902
903 func (wp *Pool) waitUntilLoaded() {
904         ch := wp.Subscribe()
905         wp.mtx.RLock()
906         defer wp.mtx.RUnlock()
907         for !wp.loaded {
908                 wp.mtx.RUnlock()
909                 <-ch
910                 wp.mtx.RLock()
911         }
912 }
913
914 // Return a random string of n hexadecimal digits (n*4 random bits). n
915 // must be even.
916 func randomHex(n int) string {
917         buf := make([]byte, n/2)
918         _, err := rand.Read(buf)
919         if err != nil {
920                 panic(err)
921         }
922         return fmt.Sprintf("%x", buf)
923 }