16636: Merge branch 'master' into 16636-more-metrics
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval        = time.Minute
68         defaultProbeInterval       = time.Second * 10
69         defaultMaxProbesPerSecond  = 10
70         defaultTimeoutIdle         = time.Minute
71         defaultTimeoutBooting      = time.Minute * 10
72         defaultTimeoutProbe        = time.Minute * 10
73         defaultTimeoutShutdown     = time.Second * 10
74         defaultTimeoutTERM         = time.Minute * 2
75         defaultTimeoutSignal       = time.Second * 5
76         defaultTimeoutStaleRunLock = time.Second * 5
77
78         // Time after a quota error to try again anyway, even if no
79         // instances have been shutdown.
80         quotaErrorTTL = time.Minute
81
82         // Time between "X failed because rate limiting" messages
83         logRateLimitErrorInterval = time.Second * 10
84 )
85
86 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87         if conf > 0 {
88                 return time.Duration(conf)
89         } else {
90                 return def
91         }
92 }
93
94 // NewPool creates a Pool of workers backed by instanceSet.
95 //
96 // New instances are configured and set up according to the given
97 // cluster configuration.
98 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
99         wp := &Pool{
100                 logger:                         logger,
101                 arvClient:                      arvClient,
102                 instanceSetID:                  instanceSetID,
103                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
104                 newExecutor:                    newExecutor,
105                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
106                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
107                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
108                 instanceTypes:                  cluster.InstanceTypes,
109                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
110                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
111                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
112                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
113                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
114                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
115                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
116                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
117                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
118                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
119                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
120                 installPublicKey:               installPublicKey,
121                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
122                 stop:                           make(chan bool),
123         }
124         wp.registerMetrics(reg)
125         go func() {
126                 wp.setupOnce.Do(wp.setup)
127                 go wp.runMetrics()
128                 go wp.runProbes()
129                 go wp.runSync()
130         }()
131         return wp
132 }
133
134 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
135 // zero Pool should not be used. Call NewPool to create a new Pool.
136 type Pool struct {
137         // configuration
138         logger                         logrus.FieldLogger
139         arvClient                      *arvados.Client
140         instanceSetID                  cloud.InstanceSetID
141         instanceSet                    *throttledInstanceSet
142         newExecutor                    func(cloud.Instance) Executor
143         bootProbeCommand               string
144         runnerSource                   string
145         imageID                        cloud.ImageID
146         instanceTypes                  map[string]arvados.InstanceType
147         syncInterval                   time.Duration
148         probeInterval                  time.Duration
149         maxProbesPerSecond             int
150         maxConcurrentInstanceCreateOps int
151         timeoutIdle                    time.Duration
152         timeoutBooting                 time.Duration
153         timeoutProbe                   time.Duration
154         timeoutShutdown                time.Duration
155         timeoutTERM                    time.Duration
156         timeoutSignal                  time.Duration
157         timeoutStaleRunLock            time.Duration
158         installPublicKey               ssh.PublicKey
159         tagKeyPrefix                   string
160
161         // private state
162         subscribers  map[<-chan struct{}]chan<- struct{}
163         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
164         workers      map[cloud.InstanceID]*worker
165         loaded       bool                 // loaded list of instances from InstanceSet at least once
166         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
167         atQuotaUntil time.Time
168         atQuotaErr   cloud.QuotaError
169         stop         chan bool
170         mtx          sync.RWMutex
171         setupOnce    sync.Once
172         runnerData   []byte
173         runnerMD5    [md5.Size]byte
174         runnerCmd    string
175
176         mContainersRunning        prometheus.Gauge
177         mInstances                *prometheus.GaugeVec
178         mInstancesPrice           *prometheus.GaugeVec
179         mVCPUs                    *prometheus.GaugeVec
180         mMemory                   *prometheus.GaugeVec
181         mBootOutcomes             *prometheus.CounterVec
182         mDisappearances           *prometheus.CounterVec
183         mTimeToSSH                prometheus.Summary
184         mTimeToReadyForContainer  prometheus.Summary
185         mTimeFromShutdownToGone   prometheus.Summary
186         mTimeFromQueueToCrunchRun prometheus.Summary
187 }
188
189 type createCall struct {
190         time         time.Time
191         instanceType arvados.InstanceType
192 }
193
194 func (wp *Pool) CheckHealth() error {
195         wp.setupOnce.Do(wp.setup)
196         if err := wp.loadRunnerData(); err != nil {
197                 return fmt.Errorf("error loading runner binary: %s", err)
198         }
199         return nil
200 }
201
202 // Subscribe returns a buffered channel that becomes ready after any
203 // change to the pool's state that could have scheduling implications:
204 // a worker's state changes, a new worker appears, the cloud
205 // provider's API rate limiting period ends, etc.
206 //
207 // Additional events that occur while the channel is already ready
208 // will be dropped, so it is OK if the caller services the channel
209 // slowly.
210 //
211 // Example:
212 //
213 //      ch := wp.Subscribe()
214 //      defer wp.Unsubscribe(ch)
215 //      for range ch {
216 //              tryScheduling(wp)
217 //              if done {
218 //                      break
219 //              }
220 //      }
221 func (wp *Pool) Subscribe() <-chan struct{} {
222         wp.setupOnce.Do(wp.setup)
223         wp.mtx.Lock()
224         defer wp.mtx.Unlock()
225         ch := make(chan struct{}, 1)
226         wp.subscribers[ch] = ch
227         return ch
228 }
229
230 // Unsubscribe stops sending updates to the given channel.
231 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
232         wp.setupOnce.Do(wp.setup)
233         wp.mtx.Lock()
234         defer wp.mtx.Unlock()
235         delete(wp.subscribers, ch)
236 }
237
238 // Unallocated returns the number of unallocated (creating + booting +
239 // idle + unknown) workers for each instance type.  Workers in
240 // hold/drain mode are not included.
241 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
242         wp.setupOnce.Do(wp.setup)
243         wp.mtx.RLock()
244         defer wp.mtx.RUnlock()
245         unalloc := map[arvados.InstanceType]int{}
246         creating := map[arvados.InstanceType]int{}
247         oldestCreate := map[arvados.InstanceType]time.Time{}
248         for _, cc := range wp.creating {
249                 it := cc.instanceType
250                 creating[it]++
251                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
252                         oldestCreate[it] = cc.time
253                 }
254         }
255         for _, wkr := range wp.workers {
256                 // Skip workers that are not expected to become
257                 // available soon. Note len(wkr.running)>0 is not
258                 // redundant here: it can be true even in
259                 // StateUnknown.
260                 if wkr.state == StateShutdown ||
261                         wkr.state == StateRunning ||
262                         wkr.idleBehavior != IdleBehaviorRun ||
263                         len(wkr.running) > 0 {
264                         continue
265                 }
266                 it := wkr.instType
267                 unalloc[it]++
268                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
269                         // If up to N new workers appear in
270                         // Instances() while we are waiting for N
271                         // Create() calls to complete, we assume we're
272                         // just seeing a race between Instances() and
273                         // Create() responses.
274                         //
275                         // The other common reason why nodes have
276                         // state==Unknown is that they appeared at
277                         // startup, before any Create calls. They
278                         // don't match the above timing condition, so
279                         // we never mistakenly attribute them to
280                         // pending Create calls.
281                         creating[it]--
282                 }
283         }
284         for it, c := range creating {
285                 unalloc[it] += c
286         }
287         return unalloc
288 }
289
290 // Create a new instance with the given type, and add it to the worker
291 // pool. The worker is added immediately; instance creation runs in
292 // the background.
293 //
294 // Create returns false if a pre-existing error state prevents it from
295 // even attempting to create a new instance. Those errors are logged
296 // by the Pool, so the caller does not need to log anything in such
297 // cases.
298 func (wp *Pool) Create(it arvados.InstanceType) bool {
299         logger := wp.logger.WithField("InstanceType", it.Name)
300         wp.setupOnce.Do(wp.setup)
301         if wp.loadRunnerData() != nil {
302                 // Boot probe is certain to fail.
303                 return false
304         }
305         wp.mtx.Lock()
306         defer wp.mtx.Unlock()
307         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
308                 return false
309         }
310         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
311         // requests in flight. It was added to work around a limitation in Azure's
312         // managed disks, which support no more than 20 concurrent node creation
313         // requests from a single disk image (cf.
314         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
315         // The code assumes that node creation, from Azure's perspective, means the
316         // period until the instance appears in the "get all instances" list.
317         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
318                 logger.Info("reached MaxConcurrentInstanceCreateOps")
319                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
320                 return false
321         }
322         now := time.Now()
323         secret := randomHex(instanceSecretLength)
324         wp.creating[secret] = createCall{time: now, instanceType: it}
325         go func() {
326                 defer wp.notify()
327                 tags := cloud.InstanceTags{
328                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
329                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
330                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
331                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
332                 }
333                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
334                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
335                 wp.mtx.Lock()
336                 defer wp.mtx.Unlock()
337                 // delete() is deferred so the updateWorker() call
338                 // below knows to use StateBooting when adding a new
339                 // worker.
340                 defer delete(wp.creating, secret)
341                 if err != nil {
342                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
343                                 wp.atQuotaErr = err
344                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
345                                 time.AfterFunc(quotaErrorTTL, wp.notify)
346                         }
347                         logger.WithError(err).Error("create failed")
348                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
349                         return
350                 }
351                 wp.updateWorker(inst, it)
352         }()
353         return true
354 }
355
356 // AtQuota returns true if Create is not expected to work at the
357 // moment.
358 func (wp *Pool) AtQuota() bool {
359         wp.mtx.Lock()
360         defer wp.mtx.Unlock()
361         return time.Now().Before(wp.atQuotaUntil)
362 }
363
364 // SetIdleBehavior determines how the indicated instance will behave
365 // when it has no containers running.
366 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
367         wp.mtx.Lock()
368         defer wp.mtx.Unlock()
369         wkr, ok := wp.workers[id]
370         if !ok {
371                 return errors.New("requested instance does not exist")
372         }
373         wkr.setIdleBehavior(idleBehavior)
374         return nil
375 }
376
377 // Successful connection to the SSH daemon, update the mTimeToSSH metric
378 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
379         wp.mtx.Lock()
380         defer wp.mtx.Unlock()
381         wkr := wp.workers[inst.ID()]
382         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
383                 // the node is not in booting state (can happen if a-d-c is restarted) OR
384                 // this is not the first SSH connection
385                 return
386         }
387
388         wkr.firstSSHConnection = time.Now()
389         if wp.mTimeToSSH != nil {
390                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
391         }
392 }
393
394 // Add or update worker attached to the given instance.
395 //
396 // The second return value is true if a new worker is created.
397 //
398 // A newly added instance has state=StateBooting if its tags match an
399 // entry in wp.creating, otherwise StateUnknown.
400 //
401 // Caller must have lock.
402 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
403         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
404         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
405         id := inst.ID()
406         if wkr := wp.workers[id]; wkr != nil {
407                 wkr.executor.SetTarget(inst)
408                 wkr.instance = inst
409                 wkr.updated = time.Now()
410                 wkr.saveTags()
411                 return wkr, false
412         }
413
414         state := StateUnknown
415         if _, ok := wp.creating[secret]; ok {
416                 state = StateBooting
417         }
418
419         // If an instance has a valid IdleBehavior tag when it first
420         // appears, initialize the new worker accordingly (this is how
421         // we restore IdleBehavior that was set by a prior dispatch
422         // process); otherwise, default to "run". After this,
423         // wkr.idleBehavior is the source of truth, and will only be
424         // changed via SetIdleBehavior().
425         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
426         if !validIdleBehavior[idleBehavior] {
427                 idleBehavior = IdleBehaviorRun
428         }
429
430         logger := wp.logger.WithFields(logrus.Fields{
431                 "InstanceType": it.Name,
432                 "Instance":     inst.ID(),
433                 "Address":      inst.Address(),
434         })
435         logger.WithFields(logrus.Fields{
436                 "State":        state,
437                 "IdleBehavior": idleBehavior,
438         }).Infof("instance appeared in cloud")
439         now := time.Now()
440         wkr := &worker{
441                 mtx:          &wp.mtx,
442                 wp:           wp,
443                 logger:       logger,
444                 executor:     wp.newExecutor(inst),
445                 state:        state,
446                 idleBehavior: idleBehavior,
447                 instance:     inst,
448                 instType:     it,
449                 appeared:     now,
450                 probed:       now,
451                 busy:         now,
452                 updated:      now,
453                 running:      make(map[string]*remoteRunner),
454                 starting:     make(map[string]*remoteRunner),
455                 probing:      make(chan struct{}, 1),
456         }
457         wp.workers[id] = wkr
458         return wkr, true
459 }
460
461 // Shutdown shuts down a worker with the given type, or returns false
462 // if all workers with the given type are busy.
463 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
464         wp.setupOnce.Do(wp.setup)
465         wp.mtx.Lock()
466         defer wp.mtx.Unlock()
467         logger := wp.logger.WithField("InstanceType", it.Name)
468         logger.Info("shutdown requested")
469         for _, tryState := range []State{StateBooting, StateIdle} {
470                 // TODO: shutdown the worker with the longest idle
471                 // time (Idle) or the earliest create time (Booting)
472                 for _, wkr := range wp.workers {
473                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
474                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
475                                 wkr.reportBootOutcome(BootOutcomeAborted)
476                                 wkr.shutdown()
477                                 return true
478                         }
479                 }
480         }
481         return false
482 }
483
484 // CountWorkers returns the current number of workers in each state.
485 //
486 // CountWorkers blocks, if necessary, until the initial instance list
487 // has been loaded from the cloud provider.
488 func (wp *Pool) CountWorkers() map[State]int {
489         wp.setupOnce.Do(wp.setup)
490         wp.waitUntilLoaded()
491         wp.mtx.Lock()
492         defer wp.mtx.Unlock()
493         r := map[State]int{}
494         for _, w := range wp.workers {
495                 r[w.state]++
496         }
497         return r
498 }
499
500 // Running returns the container UUIDs being prepared/run on workers.
501 //
502 // In the returned map, the time value indicates when the Pool
503 // observed that the container process had exited. A container that
504 // has not yet exited has a zero time value. The caller should use
505 // ForgetContainer() to garbage-collect the entries for exited
506 // containers.
507 func (wp *Pool) Running() map[string]time.Time {
508         wp.setupOnce.Do(wp.setup)
509         wp.mtx.Lock()
510         defer wp.mtx.Unlock()
511         r := map[string]time.Time{}
512         for _, wkr := range wp.workers {
513                 for uuid := range wkr.running {
514                         r[uuid] = time.Time{}
515                 }
516                 for uuid := range wkr.starting {
517                         r[uuid] = time.Time{}
518                 }
519         }
520         for uuid, exited := range wp.exited {
521                 r[uuid] = exited
522         }
523         return r
524 }
525
526 // StartContainer starts a container on an idle worker immediately if
527 // possible, otherwise returns false.
528 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
529         wp.setupOnce.Do(wp.setup)
530         wp.mtx.Lock()
531         defer wp.mtx.Unlock()
532         var wkr *worker
533         for _, w := range wp.workers {
534                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
535                         if wkr == nil || w.busy.After(wkr.busy) {
536                                 wkr = w
537                         }
538                 }
539         }
540         if wkr == nil {
541                 return false
542         }
543         wkr.startContainer(ctr)
544         return true
545 }
546
547 // KillContainer kills the crunch-run process for the given container
548 // UUID, if it's running on any worker.
549 //
550 // KillContainer returns immediately; the act of killing the container
551 // takes some time, and runs in the background.
552 //
553 // KillContainer returns false if the container has already ended.
554 func (wp *Pool) KillContainer(uuid string, reason string) bool {
555         wp.mtx.Lock()
556         defer wp.mtx.Unlock()
557         logger := wp.logger.WithFields(logrus.Fields{
558                 "ContainerUUID": uuid,
559                 "Reason":        reason,
560         })
561         for _, wkr := range wp.workers {
562                 rr := wkr.running[uuid]
563                 if rr == nil {
564                         rr = wkr.starting[uuid]
565                 }
566                 if rr != nil {
567                         rr.Kill(reason)
568                         return true
569                 }
570         }
571         logger.Debug("cannot kill: already disappeared")
572         return false
573 }
574
575 // ForgetContainer clears the placeholder for the given exited
576 // container, so it isn't returned by subsequent calls to Running().
577 //
578 // ForgetContainer has no effect if the container has not yet exited.
579 //
580 // The "container exited at time T" placeholder (which necessitates
581 // ForgetContainer) exists to make it easier for the caller
582 // (scheduler) to distinguish a container that exited without
583 // finalizing its state from a container that exited too recently for
584 // its final state to have appeared in the scheduler's queue cache.
585 func (wp *Pool) ForgetContainer(uuid string) {
586         wp.mtx.Lock()
587         defer wp.mtx.Unlock()
588         if _, ok := wp.exited[uuid]; ok {
589                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
590                 delete(wp.exited, uuid)
591         }
592 }
593
594 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
595         if reg == nil {
596                 reg = prometheus.NewRegistry()
597         }
598         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
599                 Namespace: "arvados",
600                 Subsystem: "dispatchcloud",
601                 Name:      "containers_running",
602                 Help:      "Number of containers reported running by cloud VMs.",
603         })
604         reg.MustRegister(wp.mContainersRunning)
605         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
606                 Namespace: "arvados",
607                 Subsystem: "dispatchcloud",
608                 Name:      "instances_total",
609                 Help:      "Number of cloud VMs.",
610         }, []string{"category", "instance_type"})
611         reg.MustRegister(wp.mInstances)
612         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
613                 Namespace: "arvados",
614                 Subsystem: "dispatchcloud",
615                 Name:      "instances_price",
616                 Help:      "Price of cloud VMs.",
617         }, []string{"category"})
618         reg.MustRegister(wp.mInstancesPrice)
619         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
620                 Namespace: "arvados",
621                 Subsystem: "dispatchcloud",
622                 Name:      "vcpus_total",
623                 Help:      "Total VCPUs on all cloud VMs.",
624         }, []string{"category"})
625         reg.MustRegister(wp.mVCPUs)
626         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
627                 Namespace: "arvados",
628                 Subsystem: "dispatchcloud",
629                 Name:      "memory_bytes_total",
630                 Help:      "Total memory on all cloud VMs.",
631         }, []string{"category"})
632         reg.MustRegister(wp.mMemory)
633         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
634                 Namespace: "arvados",
635                 Subsystem: "dispatchcloud",
636                 Name:      "boot_outcomes",
637                 Help:      "Boot outcomes by type.",
638         }, []string{"outcome"})
639         for k := range validBootOutcomes {
640                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
641         }
642         reg.MustRegister(wp.mBootOutcomes)
643         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
644                 Namespace: "arvados",
645                 Subsystem: "dispatchcloud",
646                 Name:      "instances_disappeared",
647                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
648         }, []string{"state"})
649         for _, v := range stateString {
650                 wp.mDisappearances.WithLabelValues(v).Add(0)
651         }
652         reg.MustRegister(wp.mDisappearances)
653         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
654                 Namespace:  "arvados",
655                 Subsystem:  "dispatchcloud",
656                 Name:       "instances_time_to_ssh_seconds",
657                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
658                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
659         })
660         reg.MustRegister(wp.mTimeToSSH)
661         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
662                 Namespace:  "arvados",
663                 Subsystem:  "dispatchcloud",
664                 Name:       "instances_time_to_ready_for_container_seconds",
665                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
666                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
667         })
668         reg.MustRegister(wp.mTimeToReadyForContainer)
669         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
670                 Namespace:  "arvados",
671                 Subsystem:  "dispatchcloud",
672                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
673                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
674                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
675         })
676         reg.MustRegister(wp.mTimeFromShutdownToGone)
677         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
678                 Namespace:  "arvados",
679                 Subsystem:  "dispatchcloud",
680                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
681                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
682                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
683         })
684         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
685 }
686
687 func (wp *Pool) runMetrics() {
688         ch := wp.Subscribe()
689         defer wp.Unsubscribe(ch)
690         wp.updateMetrics()
691         for range ch {
692                 wp.updateMetrics()
693         }
694 }
695
696 func (wp *Pool) updateMetrics() {
697         wp.mtx.RLock()
698         defer wp.mtx.RUnlock()
699
700         type entKey struct {
701                 cat      string
702                 instType string
703         }
704         instances := map[entKey]int64{}
705         price := map[string]float64{}
706         cpu := map[string]int64{}
707         mem := map[string]int64{}
708         var running int64
709         for _, wkr := range wp.workers {
710                 var cat string
711                 switch {
712                 case len(wkr.running)+len(wkr.starting) > 0:
713                         cat = "inuse"
714                 case wkr.idleBehavior == IdleBehaviorHold:
715                         cat = "hold"
716                 case wkr.state == StateBooting:
717                         cat = "booting"
718                 case wkr.state == StateUnknown:
719                         cat = "unknown"
720                 default:
721                         cat = "idle"
722                 }
723                 instances[entKey{cat, wkr.instType.Name}]++
724                 price[cat] += wkr.instType.Price
725                 cpu[cat] += int64(wkr.instType.VCPUs)
726                 mem[cat] += int64(wkr.instType.RAM)
727                 running += int64(len(wkr.running) + len(wkr.starting))
728         }
729         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
730                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
731                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
732                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
733                 // make sure to reset gauges for non-existing category/nodetype combinations
734                 for _, it := range wp.instanceTypes {
735                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
736                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
737                         }
738                 }
739         }
740         for k, v := range instances {
741                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
742         }
743         wp.mContainersRunning.Set(float64(running))
744 }
745
746 func (wp *Pool) runProbes() {
747         maxPPS := wp.maxProbesPerSecond
748         if maxPPS < 1 {
749                 maxPPS = defaultMaxProbesPerSecond
750         }
751         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
752         defer limitticker.Stop()
753
754         probeticker := time.NewTicker(wp.probeInterval)
755         defer probeticker.Stop()
756
757         workers := []cloud.InstanceID{}
758         for range probeticker.C {
759                 workers = workers[:0]
760                 wp.mtx.Lock()
761                 for id, wkr := range wp.workers {
762                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
763                                 continue
764                         }
765                         workers = append(workers, id)
766                 }
767                 wp.mtx.Unlock()
768
769                 for _, id := range workers {
770                         wp.mtx.Lock()
771                         wkr, ok := wp.workers[id]
772                         wp.mtx.Unlock()
773                         if !ok {
774                                 // Deleted while we were probing
775                                 // others
776                                 continue
777                         }
778                         go wkr.ProbeAndUpdate()
779                         select {
780                         case <-wp.stop:
781                                 return
782                         case <-limitticker.C:
783                         }
784                 }
785         }
786 }
787
788 func (wp *Pool) runSync() {
789         // sync once immediately, then wait syncInterval, sync again,
790         // etc.
791         timer := time.NewTimer(1)
792         for {
793                 select {
794                 case <-timer.C:
795                         err := wp.getInstancesAndSync()
796                         if err != nil {
797                                 wp.logger.WithError(err).Warn("sync failed")
798                         }
799                         timer.Reset(wp.syncInterval)
800                 case <-wp.stop:
801                         wp.logger.Debug("worker.Pool stopped")
802                         return
803                 }
804         }
805 }
806
807 // Stop synchronizing with the InstanceSet.
808 func (wp *Pool) Stop() {
809         wp.setupOnce.Do(wp.setup)
810         close(wp.stop)
811 }
812
813 // Instances returns an InstanceView for each worker in the pool,
814 // summarizing its current state and recent activity.
815 func (wp *Pool) Instances() []InstanceView {
816         var r []InstanceView
817         wp.setupOnce.Do(wp.setup)
818         wp.mtx.Lock()
819         for _, w := range wp.workers {
820                 r = append(r, InstanceView{
821                         Instance:             w.instance.ID(),
822                         Address:              w.instance.Address(),
823                         Price:                w.instType.Price,
824                         ArvadosInstanceType:  w.instType.Name,
825                         ProviderInstanceType: w.instType.ProviderType,
826                         LastContainerUUID:    w.lastUUID,
827                         LastBusy:             w.busy,
828                         WorkerState:          w.state.String(),
829                         IdleBehavior:         w.idleBehavior,
830                 })
831         }
832         wp.mtx.Unlock()
833         sort.Slice(r, func(i, j int) bool {
834                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
835         })
836         return r
837 }
838
839 // KillInstance destroys a cloud VM instance. It returns an error if
840 // the given instance does not exist.
841 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
842         wkr, ok := wp.workers[id]
843         if !ok {
844                 return errors.New("instance not found")
845         }
846         wkr.logger.WithField("Reason", reason).Info("shutting down")
847         wkr.reportBootOutcome(BootOutcomeAborted)
848         wkr.shutdown()
849         return nil
850 }
851
852 func (wp *Pool) setup() {
853         wp.creating = map[string]createCall{}
854         wp.exited = map[string]time.Time{}
855         wp.workers = map[cloud.InstanceID]*worker{}
856         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
857         wp.loadRunnerData()
858 }
859
860 // Load the runner program to be deployed on worker nodes into
861 // wp.runnerData, if necessary. Errors are logged.
862 //
863 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
864 //
865 // Caller must not have lock.
866 func (wp *Pool) loadRunnerData() error {
867         wp.mtx.Lock()
868         defer wp.mtx.Unlock()
869         if wp.runnerData != nil {
870                 return nil
871         } else if wp.runnerSource == "" {
872                 wp.runnerCmd = "crunch-run"
873                 wp.runnerData = []byte{}
874                 return nil
875         }
876         logger := wp.logger.WithField("source", wp.runnerSource)
877         logger.Debug("loading runner")
878         buf, err := ioutil.ReadFile(wp.runnerSource)
879         if err != nil {
880                 logger.WithError(err).Error("failed to load runner program")
881                 return err
882         }
883         wp.runnerData = buf
884         wp.runnerMD5 = md5.Sum(buf)
885         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
886         return nil
887 }
888
889 func (wp *Pool) notify() {
890         wp.mtx.RLock()
891         defer wp.mtx.RUnlock()
892         for _, send := range wp.subscribers {
893                 select {
894                 case send <- struct{}{}:
895                 default:
896                 }
897         }
898 }
899
900 func (wp *Pool) getInstancesAndSync() error {
901         wp.setupOnce.Do(wp.setup)
902         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
903                 return err
904         }
905         wp.logger.Debug("getting instance list")
906         threshold := time.Now()
907         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
908         if err != nil {
909                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
910                 return err
911         }
912         wp.sync(threshold, instances)
913         wp.logger.Debug("sync done")
914         return nil
915 }
916
917 // Add/remove/update workers based on instances, which was obtained
918 // from the instanceSet. However, don't clobber any other updates that
919 // already happened after threshold.
920 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
921         wp.mtx.Lock()
922         defer wp.mtx.Unlock()
923         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
924         notify := false
925
926         for _, inst := range instances {
927                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
928                 it, ok := wp.instanceTypes[itTag]
929                 if !ok {
930                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
931                         continue
932                 }
933                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
934                         notify = true
935                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
936                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
937                         wkr.shutdown()
938                 }
939         }
940
941         for id, wkr := range wp.workers {
942                 if wkr.updated.After(threshold) {
943                         continue
944                 }
945                 logger := wp.logger.WithFields(logrus.Fields{
946                         "Instance":    wkr.instance.ID(),
947                         "WorkerState": wkr.state,
948                 })
949                 logger.Info("instance disappeared in cloud")
950                 wkr.reportBootOutcome(BootOutcomeDisappeared)
951                 if wp.mDisappearances != nil {
952                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
953                 }
954                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
955                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
956                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
957                 }
958                 delete(wp.workers, id)
959                 go wkr.Close()
960                 notify = true
961         }
962
963         if !wp.loaded {
964                 notify = true
965                 wp.loaded = true
966                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
967         }
968
969         if notify {
970                 go wp.notify()
971         }
972 }
973
974 func (wp *Pool) waitUntilLoaded() {
975         ch := wp.Subscribe()
976         wp.mtx.RLock()
977         defer wp.mtx.RUnlock()
978         for !wp.loaded {
979                 wp.mtx.RUnlock()
980                 <-ch
981                 wp.mtx.RLock()
982         }
983 }
984
985 // Return a random string of n hexadecimal digits (n*4 random bits). n
986 // must be even.
987 func randomHex(n int) string {
988         buf := make([]byte, n/2)
989         _, err := rand.Read(buf)
990         if err != nil {
991                 panic(err)
992         }
993         return fmt.Sprintf("%x", buf)
994 }