16739: a-d-c: add a concurrent node creation throttle option.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:                     logger,
100                 arvClient:                  arvClient,
101                 instanceSetID:              instanceSetID,
102                 instanceSet:                &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:                newExecutor,
104                 bootProbeCommand:           cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:               cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:                    cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:              cluster.InstanceTypes,
108                 maxProbesPerSecond:         cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 maxConcurrentNodeCreateOps: cluster.Containers.CloudVMs.MaxConcurrentNodeCreateOps,
110                 probeInterval:              duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111                 syncInterval:               duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112                 timeoutIdle:                duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113                 timeoutBooting:             duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114                 timeoutProbe:               duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115                 timeoutShutdown:            duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116                 timeoutTERM:                duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117                 timeoutSignal:              duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118                 installPublicKey:           installPublicKey,
119                 tagKeyPrefix:               cluster.Containers.CloudVMs.TagKeyPrefix,
120                 stop:                       make(chan bool),
121         }
122         wp.registerMetrics(reg)
123         go func() {
124                 wp.setupOnce.Do(wp.setup)
125                 go wp.runMetrics()
126                 go wp.runProbes()
127                 go wp.runSync()
128         }()
129         return wp
130 }
131
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
134 type Pool struct {
135         // configuration
136         logger                     logrus.FieldLogger
137         arvClient                  *arvados.Client
138         instanceSetID              cloud.InstanceSetID
139         instanceSet                *throttledInstanceSet
140         newExecutor                func(cloud.Instance) Executor
141         bootProbeCommand           string
142         runnerSource               string
143         imageID                    cloud.ImageID
144         instanceTypes              map[string]arvados.InstanceType
145         syncInterval               time.Duration
146         probeInterval              time.Duration
147         maxProbesPerSecond         int
148         maxConcurrentNodeCreateOps int
149         timeoutIdle                time.Duration
150         timeoutBooting             time.Duration
151         timeoutProbe               time.Duration
152         timeoutShutdown            time.Duration
153         timeoutTERM                time.Duration
154         timeoutSignal              time.Duration
155         installPublicKey           ssh.PublicKey
156         tagKeyPrefix               string
157
158         // private state
159         subscribers  map[<-chan struct{}]chan<- struct{}
160         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161         workers      map[cloud.InstanceID]*worker
162         loaded       bool                 // loaded list of instances from InstanceSet at least once
163         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164         atQuotaUntil time.Time
165         atQuotaErr   cloud.QuotaError
166         stop         chan bool
167         mtx          sync.RWMutex
168         setupOnce    sync.Once
169         runnerData   []byte
170         runnerMD5    [md5.Size]byte
171         runnerCmd    string
172
173         throttleCreate    throttle
174         throttleInstances throttle
175
176         mContainersRunning prometheus.Gauge
177         mInstances         *prometheus.GaugeVec
178         mInstancesPrice    *prometheus.GaugeVec
179         mVCPUs             *prometheus.GaugeVec
180         mMemory            *prometheus.GaugeVec
181         mBootOutcomes      *prometheus.CounterVec
182         mDisappearances    *prometheus.CounterVec
183 }
184
185 type createCall struct {
186         time         time.Time
187         instanceType arvados.InstanceType
188 }
189
190 func (wp *Pool) CheckHealth() error {
191         wp.setupOnce.Do(wp.setup)
192         if err := wp.loadRunnerData(); err != nil {
193                 return fmt.Errorf("error loading runner binary: %s", err)
194         }
195         return nil
196 }
197
198 // Subscribe returns a buffered channel that becomes ready after any
199 // change to the pool's state that could have scheduling implications:
200 // a worker's state changes, a new worker appears, the cloud
201 // provider's API rate limiting period ends, etc.
202 //
203 // Additional events that occur while the channel is already ready
204 // will be dropped, so it is OK if the caller services the channel
205 // slowly.
206 //
207 // Example:
208 //
209 //      ch := wp.Subscribe()
210 //      defer wp.Unsubscribe(ch)
211 //      for range ch {
212 //              tryScheduling(wp)
213 //              if done {
214 //                      break
215 //              }
216 //      }
217 func (wp *Pool) Subscribe() <-chan struct{} {
218         wp.setupOnce.Do(wp.setup)
219         wp.mtx.Lock()
220         defer wp.mtx.Unlock()
221         ch := make(chan struct{}, 1)
222         wp.subscribers[ch] = ch
223         return ch
224 }
225
226 // Unsubscribe stops sending updates to the given channel.
227 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
228         wp.setupOnce.Do(wp.setup)
229         wp.mtx.Lock()
230         defer wp.mtx.Unlock()
231         delete(wp.subscribers, ch)
232 }
233
234 // Unallocated returns the number of unallocated (creating + booting +
235 // idle + unknown) workers for each instance type.  Workers in
236 // hold/drain mode are not included.
237 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
238         wp.setupOnce.Do(wp.setup)
239         wp.mtx.RLock()
240         defer wp.mtx.RUnlock()
241         unalloc := map[arvados.InstanceType]int{}
242         creating := map[arvados.InstanceType]int{}
243         oldestCreate := map[arvados.InstanceType]time.Time{}
244         for _, cc := range wp.creating {
245                 it := cc.instanceType
246                 creating[it]++
247                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
248                         oldestCreate[it] = cc.time
249                 }
250         }
251         for _, wkr := range wp.workers {
252                 // Skip workers that are not expected to become
253                 // available soon. Note len(wkr.running)>0 is not
254                 // redundant here: it can be true even in
255                 // StateUnknown.
256                 if wkr.state == StateShutdown ||
257                         wkr.state == StateRunning ||
258                         wkr.idleBehavior != IdleBehaviorRun ||
259                         len(wkr.running) > 0 {
260                         continue
261                 }
262                 it := wkr.instType
263                 unalloc[it]++
264                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
265                         // If up to N new workers appear in
266                         // Instances() while we are waiting for N
267                         // Create() calls to complete, we assume we're
268                         // just seeing a race between Instances() and
269                         // Create() responses.
270                         //
271                         // The other common reason why nodes have
272                         // state==Unknown is that they appeared at
273                         // startup, before any Create calls. They
274                         // don't match the above timing condition, so
275                         // we never mistakenly attribute them to
276                         // pending Create calls.
277                         creating[it]--
278                 }
279         }
280         for it, c := range creating {
281                 unalloc[it] += c
282         }
283         return unalloc
284 }
285
286 type RateLimitError struct{ Retry time.Time }
287
288 func (e RateLimitError) Error() string {
289         return fmt.Sprintf("node creation request failed, hit maxConcurrentNodeCreateOps, wait until %s", e.Retry)
290 }
291 func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
292
293 // Create a new instance with the given type, and add it to the worker
294 // pool. The worker is added immediately; instance creation runs in
295 // the background.
296 //
297 // Create returns false if a pre-existing error state prevents it from
298 // even attempting to create a new instance. Those errors are logged
299 // by the Pool, so the caller does not need to log anything in such
300 // cases.
301 func (wp *Pool) Create(it arvados.InstanceType) bool {
302         logger := wp.logger.WithField("InstanceType", it.Name)
303         wp.setupOnce.Do(wp.setup)
304         if wp.loadRunnerData() != nil {
305                 // Boot probe is certain to fail.
306                 return false
307         }
308         wp.mtx.Lock()
309         defer wp.mtx.Unlock()
310         if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
311                 return false
312         }
313         // The maxConcurrentNodeCreateOps knob throttles the number of node create
314         // requests in flight. It was added to work around a limitation in Azure's
315         // managed disks, which support no more than 20 concurrent node creation
316         // requests from a single disk image (cf.
317         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
318         // The code assumes that node creation, from Azure's perspective, means the
319         // period until the instance appears in the "get all instances" list.
320         if wp.maxConcurrentNodeCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentNodeCreateOps {
321                 wp.instanceSet.throttleCreate.CheckRateLimitError(RateLimitError{Retry: time.Now().Add(5 * time.Second)}, wp.logger, "create instance", wp.notify)
322                 return false
323         }
324         now := time.Now()
325         secret := randomHex(instanceSecretLength)
326         wp.creating[secret] = createCall{time: now, instanceType: it}
327         go func() {
328                 defer wp.notify()
329                 tags := cloud.InstanceTags{
330                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
331                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
332                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
333                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
334                 }
335                 initCmd := TagVerifier{nil, secret}.InitCommand()
336                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
337                 wp.mtx.Lock()
338                 defer wp.mtx.Unlock()
339                 // delete() is deferred so the updateWorker() call
340                 // below knows to use StateBooting when adding a new
341                 // worker.
342                 defer delete(wp.creating, secret)
343                 if err != nil {
344                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
345                                 wp.atQuotaErr = err
346                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
347                                 time.AfterFunc(quotaErrorTTL, wp.notify)
348                         }
349                         logger.WithError(err).Error("create failed")
350                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
351                         return
352                 }
353                 wp.updateWorker(inst, it)
354         }()
355         return true
356 }
357
358 // AtQuota returns true if Create is not expected to work at the
359 // moment.
360 func (wp *Pool) AtQuota() bool {
361         wp.mtx.Lock()
362         defer wp.mtx.Unlock()
363         return time.Now().Before(wp.atQuotaUntil)
364 }
365
366 // SetIdleBehavior determines how the indicated instance will behave
367 // when it has no containers running.
368 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
369         wp.mtx.Lock()
370         defer wp.mtx.Unlock()
371         wkr, ok := wp.workers[id]
372         if !ok {
373                 return errors.New("requested instance does not exist")
374         }
375         wkr.setIdleBehavior(idleBehavior)
376         return nil
377 }
378
379 // Add or update worker attached to the given instance.
380 //
381 // The second return value is true if a new worker is created.
382 //
383 // A newly added instance has state=StateBooting if its tags match an
384 // entry in wp.creating, otherwise StateUnknown.
385 //
386 // Caller must have lock.
387 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
388         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
389         inst = TagVerifier{inst, secret}
390         id := inst.ID()
391         if wkr := wp.workers[id]; wkr != nil {
392                 wkr.executor.SetTarget(inst)
393                 wkr.instance = inst
394                 wkr.updated = time.Now()
395                 wkr.saveTags()
396                 return wkr, false
397         }
398
399         state := StateUnknown
400         if _, ok := wp.creating[secret]; ok {
401                 state = StateBooting
402         }
403
404         // If an instance has a valid IdleBehavior tag when it first
405         // appears, initialize the new worker accordingly (this is how
406         // we restore IdleBehavior that was set by a prior dispatch
407         // process); otherwise, default to "run". After this,
408         // wkr.idleBehavior is the source of truth, and will only be
409         // changed via SetIdleBehavior().
410         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
411         if !validIdleBehavior[idleBehavior] {
412                 idleBehavior = IdleBehaviorRun
413         }
414
415         logger := wp.logger.WithFields(logrus.Fields{
416                 "InstanceType": it.Name,
417                 "Instance":     inst.ID(),
418                 "Address":      inst.Address(),
419         })
420         logger.WithFields(logrus.Fields{
421                 "State":        state,
422                 "IdleBehavior": idleBehavior,
423         }).Infof("instance appeared in cloud")
424         now := time.Now()
425         wkr := &worker{
426                 mtx:          &wp.mtx,
427                 wp:           wp,
428                 logger:       logger,
429                 executor:     wp.newExecutor(inst),
430                 state:        state,
431                 idleBehavior: idleBehavior,
432                 instance:     inst,
433                 instType:     it,
434                 appeared:     now,
435                 probed:       now,
436                 busy:         now,
437                 updated:      now,
438                 running:      make(map[string]*remoteRunner),
439                 starting:     make(map[string]*remoteRunner),
440                 probing:      make(chan struct{}, 1),
441         }
442         wp.workers[id] = wkr
443         return wkr, true
444 }
445
446 // Shutdown shuts down a worker with the given type, or returns false
447 // if all workers with the given type are busy.
448 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
449         wp.setupOnce.Do(wp.setup)
450         wp.mtx.Lock()
451         defer wp.mtx.Unlock()
452         logger := wp.logger.WithField("InstanceType", it.Name)
453         logger.Info("shutdown requested")
454         for _, tryState := range []State{StateBooting, StateIdle} {
455                 // TODO: shutdown the worker with the longest idle
456                 // time (Idle) or the earliest create time (Booting)
457                 for _, wkr := range wp.workers {
458                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
459                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
460                                 wkr.reportBootOutcome(BootOutcomeAborted)
461                                 wkr.shutdown()
462                                 return true
463                         }
464                 }
465         }
466         return false
467 }
468
469 // CountWorkers returns the current number of workers in each state.
470 //
471 // CountWorkers blocks, if necessary, until the initial instance list
472 // has been loaded from the cloud provider.
473 func (wp *Pool) CountWorkers() map[State]int {
474         wp.setupOnce.Do(wp.setup)
475         wp.waitUntilLoaded()
476         wp.mtx.Lock()
477         defer wp.mtx.Unlock()
478         r := map[State]int{}
479         for _, w := range wp.workers {
480                 r[w.state]++
481         }
482         return r
483 }
484
485 // Running returns the container UUIDs being prepared/run on workers.
486 //
487 // In the returned map, the time value indicates when the Pool
488 // observed that the container process had exited. A container that
489 // has not yet exited has a zero time value. The caller should use
490 // ForgetContainer() to garbage-collect the entries for exited
491 // containers.
492 func (wp *Pool) Running() map[string]time.Time {
493         wp.setupOnce.Do(wp.setup)
494         wp.mtx.Lock()
495         defer wp.mtx.Unlock()
496         r := map[string]time.Time{}
497         for _, wkr := range wp.workers {
498                 for uuid := range wkr.running {
499                         r[uuid] = time.Time{}
500                 }
501                 for uuid := range wkr.starting {
502                         r[uuid] = time.Time{}
503                 }
504         }
505         for uuid, exited := range wp.exited {
506                 r[uuid] = exited
507         }
508         return r
509 }
510
511 // StartContainer starts a container on an idle worker immediately if
512 // possible, otherwise returns false.
513 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
514         wp.setupOnce.Do(wp.setup)
515         wp.mtx.Lock()
516         defer wp.mtx.Unlock()
517         var wkr *worker
518         for _, w := range wp.workers {
519                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
520                         if wkr == nil || w.busy.After(wkr.busy) {
521                                 wkr = w
522                         }
523                 }
524         }
525         if wkr == nil {
526                 return false
527         }
528         wkr.startContainer(ctr)
529         return true
530 }
531
532 // KillContainer kills the crunch-run process for the given container
533 // UUID, if it's running on any worker.
534 //
535 // KillContainer returns immediately; the act of killing the container
536 // takes some time, and runs in the background.
537 //
538 // KillContainer returns false if the container has already ended.
539 func (wp *Pool) KillContainer(uuid string, reason string) bool {
540         wp.mtx.Lock()
541         defer wp.mtx.Unlock()
542         logger := wp.logger.WithFields(logrus.Fields{
543                 "ContainerUUID": uuid,
544                 "Reason":        reason,
545         })
546         for _, wkr := range wp.workers {
547                 rr := wkr.running[uuid]
548                 if rr == nil {
549                         rr = wkr.starting[uuid]
550                 }
551                 if rr != nil {
552                         rr.Kill(reason)
553                         return true
554                 }
555         }
556         logger.Debug("cannot kill: already disappeared")
557         return false
558 }
559
560 // ForgetContainer clears the placeholder for the given exited
561 // container, so it isn't returned by subsequent calls to Running().
562 //
563 // ForgetContainer has no effect if the container has not yet exited.
564 //
565 // The "container exited at time T" placeholder (which necessitates
566 // ForgetContainer) exists to make it easier for the caller
567 // (scheduler) to distinguish a container that exited without
568 // finalizing its state from a container that exited too recently for
569 // its final state to have appeared in the scheduler's queue cache.
570 func (wp *Pool) ForgetContainer(uuid string) {
571         wp.mtx.Lock()
572         defer wp.mtx.Unlock()
573         if _, ok := wp.exited[uuid]; ok {
574                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
575                 delete(wp.exited, uuid)
576         }
577 }
578
579 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
580         if reg == nil {
581                 reg = prometheus.NewRegistry()
582         }
583         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
584                 Namespace: "arvados",
585                 Subsystem: "dispatchcloud",
586                 Name:      "containers_running",
587                 Help:      "Number of containers reported running by cloud VMs.",
588         })
589         reg.MustRegister(wp.mContainersRunning)
590         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
591                 Namespace: "arvados",
592                 Subsystem: "dispatchcloud",
593                 Name:      "instances_total",
594                 Help:      "Number of cloud VMs.",
595         }, []string{"category", "instance_type"})
596         reg.MustRegister(wp.mInstances)
597         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
598                 Namespace: "arvados",
599                 Subsystem: "dispatchcloud",
600                 Name:      "instances_price",
601                 Help:      "Price of cloud VMs.",
602         }, []string{"category"})
603         reg.MustRegister(wp.mInstancesPrice)
604         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
605                 Namespace: "arvados",
606                 Subsystem: "dispatchcloud",
607                 Name:      "vcpus_total",
608                 Help:      "Total VCPUs on all cloud VMs.",
609         }, []string{"category"})
610         reg.MustRegister(wp.mVCPUs)
611         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
612                 Namespace: "arvados",
613                 Subsystem: "dispatchcloud",
614                 Name:      "memory_bytes_total",
615                 Help:      "Total memory on all cloud VMs.",
616         }, []string{"category"})
617         reg.MustRegister(wp.mMemory)
618         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
619                 Namespace: "arvados",
620                 Subsystem: "dispatchcloud",
621                 Name:      "boot_outcomes",
622                 Help:      "Boot outcomes by type.",
623         }, []string{"outcome"})
624         for k := range validBootOutcomes {
625                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
626         }
627         reg.MustRegister(wp.mBootOutcomes)
628         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
629                 Namespace: "arvados",
630                 Subsystem: "dispatchcloud",
631                 Name:      "instances_disappeared",
632                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
633         }, []string{"state"})
634         for _, v := range stateString {
635                 wp.mDisappearances.WithLabelValues(v).Add(0)
636         }
637         reg.MustRegister(wp.mDisappearances)
638 }
639
640 func (wp *Pool) runMetrics() {
641         ch := wp.Subscribe()
642         defer wp.Unsubscribe(ch)
643         wp.updateMetrics()
644         for range ch {
645                 wp.updateMetrics()
646         }
647 }
648
649 func (wp *Pool) updateMetrics() {
650         wp.mtx.RLock()
651         defer wp.mtx.RUnlock()
652
653         type entKey struct {
654                 cat      string
655                 instType string
656         }
657         instances := map[entKey]int64{}
658         price := map[string]float64{}
659         cpu := map[string]int64{}
660         mem := map[string]int64{}
661         var running int64
662         for _, wkr := range wp.workers {
663                 var cat string
664                 switch {
665                 case len(wkr.running)+len(wkr.starting) > 0:
666                         cat = "inuse"
667                 case wkr.idleBehavior == IdleBehaviorHold:
668                         cat = "hold"
669                 case wkr.state == StateBooting:
670                         cat = "booting"
671                 case wkr.state == StateUnknown:
672                         cat = "unknown"
673                 default:
674                         cat = "idle"
675                 }
676                 instances[entKey{cat, wkr.instType.Name}]++
677                 price[cat] += wkr.instType.Price
678                 cpu[cat] += int64(wkr.instType.VCPUs)
679                 mem[cat] += int64(wkr.instType.RAM)
680                 running += int64(len(wkr.running) + len(wkr.starting))
681         }
682         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
683                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
684                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
685                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
686                 // make sure to reset gauges for non-existing category/nodetype combinations
687                 for _, it := range wp.instanceTypes {
688                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
689                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
690                         }
691                 }
692         }
693         for k, v := range instances {
694                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
695         }
696         wp.mContainersRunning.Set(float64(running))
697 }
698
699 func (wp *Pool) runProbes() {
700         maxPPS := wp.maxProbesPerSecond
701         if maxPPS < 1 {
702                 maxPPS = defaultMaxProbesPerSecond
703         }
704         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
705         defer limitticker.Stop()
706
707         probeticker := time.NewTicker(wp.probeInterval)
708         defer probeticker.Stop()
709
710         workers := []cloud.InstanceID{}
711         for range probeticker.C {
712                 workers = workers[:0]
713                 wp.mtx.Lock()
714                 for id, wkr := range wp.workers {
715                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
716                                 continue
717                         }
718                         workers = append(workers, id)
719                 }
720                 wp.mtx.Unlock()
721
722                 for _, id := range workers {
723                         wp.mtx.Lock()
724                         wkr, ok := wp.workers[id]
725                         wp.mtx.Unlock()
726                         if !ok {
727                                 // Deleted while we were probing
728                                 // others
729                                 continue
730                         }
731                         go wkr.ProbeAndUpdate()
732                         select {
733                         case <-wp.stop:
734                                 return
735                         case <-limitticker.C:
736                         }
737                 }
738         }
739 }
740
741 func (wp *Pool) runSync() {
742         // sync once immediately, then wait syncInterval, sync again,
743         // etc.
744         timer := time.NewTimer(1)
745         for {
746                 select {
747                 case <-timer.C:
748                         err := wp.getInstancesAndSync()
749                         if err != nil {
750                                 wp.logger.WithError(err).Warn("sync failed")
751                         }
752                         timer.Reset(wp.syncInterval)
753                 case <-wp.stop:
754                         wp.logger.Debug("worker.Pool stopped")
755                         return
756                 }
757         }
758 }
759
760 // Stop synchronizing with the InstanceSet.
761 func (wp *Pool) Stop() {
762         wp.setupOnce.Do(wp.setup)
763         close(wp.stop)
764 }
765
766 // Instances returns an InstanceView for each worker in the pool,
767 // summarizing its current state and recent activity.
768 func (wp *Pool) Instances() []InstanceView {
769         var r []InstanceView
770         wp.setupOnce.Do(wp.setup)
771         wp.mtx.Lock()
772         for _, w := range wp.workers {
773                 r = append(r, InstanceView{
774                         Instance:             w.instance.ID(),
775                         Address:              w.instance.Address(),
776                         Price:                w.instType.Price,
777                         ArvadosInstanceType:  w.instType.Name,
778                         ProviderInstanceType: w.instType.ProviderType,
779                         LastContainerUUID:    w.lastUUID,
780                         LastBusy:             w.busy,
781                         WorkerState:          w.state.String(),
782                         IdleBehavior:         w.idleBehavior,
783                 })
784         }
785         wp.mtx.Unlock()
786         sort.Slice(r, func(i, j int) bool {
787                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
788         })
789         return r
790 }
791
792 // KillInstance destroys a cloud VM instance. It returns an error if
793 // the given instance does not exist.
794 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
795         wkr, ok := wp.workers[id]
796         if !ok {
797                 return errors.New("instance not found")
798         }
799         wkr.logger.WithField("Reason", reason).Info("shutting down")
800         wkr.reportBootOutcome(BootOutcomeAborted)
801         wkr.shutdown()
802         return nil
803 }
804
805 func (wp *Pool) setup() {
806         wp.creating = map[string]createCall{}
807         wp.exited = map[string]time.Time{}
808         wp.workers = map[cloud.InstanceID]*worker{}
809         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
810         wp.loadRunnerData()
811 }
812
813 // Load the runner program to be deployed on worker nodes into
814 // wp.runnerData, if necessary. Errors are logged.
815 //
816 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
817 //
818 // Caller must not have lock.
819 func (wp *Pool) loadRunnerData() error {
820         wp.mtx.Lock()
821         defer wp.mtx.Unlock()
822         if wp.runnerData != nil {
823                 return nil
824         } else if wp.runnerSource == "" {
825                 wp.runnerCmd = "crunch-run"
826                 wp.runnerData = []byte{}
827                 return nil
828         }
829         logger := wp.logger.WithField("source", wp.runnerSource)
830         logger.Debug("loading runner")
831         buf, err := ioutil.ReadFile(wp.runnerSource)
832         if err != nil {
833                 logger.WithError(err).Error("failed to load runner program")
834                 return err
835         }
836         wp.runnerData = buf
837         wp.runnerMD5 = md5.Sum(buf)
838         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
839         return nil
840 }
841
842 func (wp *Pool) notify() {
843         wp.mtx.RLock()
844         defer wp.mtx.RUnlock()
845         for _, send := range wp.subscribers {
846                 select {
847                 case send <- struct{}{}:
848                 default:
849                 }
850         }
851 }
852
853 func (wp *Pool) getInstancesAndSync() error {
854         wp.setupOnce.Do(wp.setup)
855         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
856                 return err
857         }
858         wp.logger.Debug("getting instance list")
859         threshold := time.Now()
860         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
861         if err != nil {
862                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
863                 return err
864         }
865         wp.sync(threshold, instances)
866         wp.logger.Debug("sync done")
867         return nil
868 }
869
870 // Add/remove/update workers based on instances, which was obtained
871 // from the instanceSet. However, don't clobber any other updates that
872 // already happened after threshold.
873 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
874         wp.mtx.Lock()
875         defer wp.mtx.Unlock()
876         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
877         notify := false
878
879         for _, inst := range instances {
880                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
881                 it, ok := wp.instanceTypes[itTag]
882                 if !ok {
883                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
884                         continue
885                 }
886                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
887                         notify = true
888                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
889                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
890                         wkr.shutdown()
891                 }
892         }
893
894         for id, wkr := range wp.workers {
895                 if wkr.updated.After(threshold) {
896                         continue
897                 }
898                 logger := wp.logger.WithFields(logrus.Fields{
899                         "Instance":    wkr.instance.ID(),
900                         "WorkerState": wkr.state,
901                 })
902                 logger.Info("instance disappeared in cloud")
903                 wkr.reportBootOutcome(BootOutcomeDisappeared)
904                 if wp.mDisappearances != nil {
905                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
906                 }
907                 delete(wp.workers, id)
908                 go wkr.Close()
909                 notify = true
910         }
911
912         if !wp.loaded {
913                 notify = true
914                 wp.loaded = true
915                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
916         }
917
918         if notify {
919                 go wp.notify()
920         }
921 }
922
923 func (wp *Pool) waitUntilLoaded() {
924         ch := wp.Subscribe()
925         wp.mtx.RLock()
926         defer wp.mtx.RUnlock()
927         for !wp.loaded {
928                 wp.mtx.RUnlock()
929                 <-ch
930                 wp.mtx.RLock()
931         }
932 }
933
934 // Return a random string of n hexadecimal digits (n*4 random bits). n
935 // must be even.
936 func randomHex(n int) string {
937         buf := make([]byte, n/2)
938         _, err := rand.Read(buf)
939         if err != nil {
940                 panic(err)
941         }
942         return fmt.Sprintf("%x", buf)
943 }