16636: Merge branch 'master' into 16636-more-metrics
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:                         logger,
100                 arvClient:                      arvClient,
101                 instanceSetID:                  instanceSetID,
102                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:                    newExecutor,
104                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:                  cluster.InstanceTypes,
108                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118                 installPublicKey:               installPublicKey,
119                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
120                 stop:                           make(chan bool),
121         }
122         wp.registerMetrics(reg)
123         go func() {
124                 wp.setupOnce.Do(wp.setup)
125                 go wp.runMetrics()
126                 go wp.runProbes()
127                 go wp.runSync()
128         }()
129         return wp
130 }
131
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
134 type Pool struct {
135         // configuration
136         logger                         logrus.FieldLogger
137         arvClient                      *arvados.Client
138         instanceSetID                  cloud.InstanceSetID
139         instanceSet                    *throttledInstanceSet
140         newExecutor                    func(cloud.Instance) Executor
141         bootProbeCommand               string
142         runnerSource                   string
143         imageID                        cloud.ImageID
144         instanceTypes                  map[string]arvados.InstanceType
145         syncInterval                   time.Duration
146         probeInterval                  time.Duration
147         maxProbesPerSecond             int
148         maxConcurrentInstanceCreateOps int
149         timeoutIdle                    time.Duration
150         timeoutBooting                 time.Duration
151         timeoutProbe                   time.Duration
152         timeoutShutdown                time.Duration
153         timeoutTERM                    time.Duration
154         timeoutSignal                  time.Duration
155         installPublicKey               ssh.PublicKey
156         tagKeyPrefix                   string
157
158         // private state
159         subscribers  map[<-chan struct{}]chan<- struct{}
160         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161         workers      map[cloud.InstanceID]*worker
162         loaded       bool                 // loaded list of instances from InstanceSet at least once
163         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164         atQuotaUntil time.Time
165         atQuotaErr   cloud.QuotaError
166         stop         chan bool
167         mtx          sync.RWMutex
168         setupOnce    sync.Once
169         runnerData   []byte
170         runnerMD5    [md5.Size]byte
171         runnerCmd    string
172
173         mContainersRunning        prometheus.Gauge
174         mInstances                *prometheus.GaugeVec
175         mInstancesPrice           *prometheus.GaugeVec
176         mVCPUs                    *prometheus.GaugeVec
177         mMemory                   *prometheus.GaugeVec
178         mBootOutcomes             *prometheus.CounterVec
179         mDisappearances           *prometheus.CounterVec
180         mTimeToSSH                prometheus.Summary
181         mTimeToReadyForContainer  prometheus.Summary
182         mTimeFromShutdownToGone   prometheus.Summary
183         mTimeFromQueueToCrunchRun prometheus.Summary
184 }
185
186 type createCall struct {
187         time         time.Time
188         instanceType arvados.InstanceType
189 }
190
191 func (wp *Pool) CheckHealth() error {
192         wp.setupOnce.Do(wp.setup)
193         if err := wp.loadRunnerData(); err != nil {
194                 return fmt.Errorf("error loading runner binary: %s", err)
195         }
196         return nil
197 }
198
199 // Subscribe returns a buffered channel that becomes ready after any
200 // change to the pool's state that could have scheduling implications:
201 // a worker's state changes, a new worker appears, the cloud
202 // provider's API rate limiting period ends, etc.
203 //
204 // Additional events that occur while the channel is already ready
205 // will be dropped, so it is OK if the caller services the channel
206 // slowly.
207 //
208 // Example:
209 //
210 //      ch := wp.Subscribe()
211 //      defer wp.Unsubscribe(ch)
212 //      for range ch {
213 //              tryScheduling(wp)
214 //              if done {
215 //                      break
216 //              }
217 //      }
218 func (wp *Pool) Subscribe() <-chan struct{} {
219         wp.setupOnce.Do(wp.setup)
220         wp.mtx.Lock()
221         defer wp.mtx.Unlock()
222         ch := make(chan struct{}, 1)
223         wp.subscribers[ch] = ch
224         return ch
225 }
226
227 // Unsubscribe stops sending updates to the given channel.
228 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
229         wp.setupOnce.Do(wp.setup)
230         wp.mtx.Lock()
231         defer wp.mtx.Unlock()
232         delete(wp.subscribers, ch)
233 }
234
235 // Unallocated returns the number of unallocated (creating + booting +
236 // idle + unknown) workers for each instance type.  Workers in
237 // hold/drain mode are not included.
238 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
239         wp.setupOnce.Do(wp.setup)
240         wp.mtx.RLock()
241         defer wp.mtx.RUnlock()
242         unalloc := map[arvados.InstanceType]int{}
243         creating := map[arvados.InstanceType]int{}
244         oldestCreate := map[arvados.InstanceType]time.Time{}
245         for _, cc := range wp.creating {
246                 it := cc.instanceType
247                 creating[it]++
248                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
249                         oldestCreate[it] = cc.time
250                 }
251         }
252         for _, wkr := range wp.workers {
253                 // Skip workers that are not expected to become
254                 // available soon. Note len(wkr.running)>0 is not
255                 // redundant here: it can be true even in
256                 // StateUnknown.
257                 if wkr.state == StateShutdown ||
258                         wkr.state == StateRunning ||
259                         wkr.idleBehavior != IdleBehaviorRun ||
260                         len(wkr.running) > 0 {
261                         continue
262                 }
263                 it := wkr.instType
264                 unalloc[it]++
265                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
266                         // If up to N new workers appear in
267                         // Instances() while we are waiting for N
268                         // Create() calls to complete, we assume we're
269                         // just seeing a race between Instances() and
270                         // Create() responses.
271                         //
272                         // The other common reason why nodes have
273                         // state==Unknown is that they appeared at
274                         // startup, before any Create calls. They
275                         // don't match the above timing condition, so
276                         // we never mistakenly attribute them to
277                         // pending Create calls.
278                         creating[it]--
279                 }
280         }
281         for it, c := range creating {
282                 unalloc[it] += c
283         }
284         return unalloc
285 }
286
287 // Create a new instance with the given type, and add it to the worker
288 // pool. The worker is added immediately; instance creation runs in
289 // the background.
290 //
291 // Create returns false if a pre-existing error state prevents it from
292 // even attempting to create a new instance. Those errors are logged
293 // by the Pool, so the caller does not need to log anything in such
294 // cases.
295 func (wp *Pool) Create(it arvados.InstanceType) bool {
296         logger := wp.logger.WithField("InstanceType", it.Name)
297         wp.setupOnce.Do(wp.setup)
298         if wp.loadRunnerData() != nil {
299                 // Boot probe is certain to fail.
300                 return false
301         }
302         wp.mtx.Lock()
303         defer wp.mtx.Unlock()
304         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
305                 return false
306         }
307         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
308         // requests in flight. It was added to work around a limitation in Azure's
309         // managed disks, which support no more than 20 concurrent node creation
310         // requests from a single disk image (cf.
311         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
312         // The code assumes that node creation, from Azure's perspective, means the
313         // period until the instance appears in the "get all instances" list.
314         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
315                 logger.Info("reached MaxConcurrentInstanceCreateOps")
316                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
317                 return false
318         }
319         now := time.Now()
320         secret := randomHex(instanceSecretLength)
321         wp.creating[secret] = createCall{time: now, instanceType: it}
322         go func() {
323                 defer wp.notify()
324                 tags := cloud.InstanceTags{
325                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
326                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
327                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
328                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
329                 }
330                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
331                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
332                 wp.mtx.Lock()
333                 defer wp.mtx.Unlock()
334                 // delete() is deferred so the updateWorker() call
335                 // below knows to use StateBooting when adding a new
336                 // worker.
337                 defer delete(wp.creating, secret)
338                 if err != nil {
339                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
340                                 wp.atQuotaErr = err
341                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
342                                 time.AfterFunc(quotaErrorTTL, wp.notify)
343                         }
344                         logger.WithError(err).Error("create failed")
345                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
346                         return
347                 }
348                 wp.updateWorker(inst, it)
349         }()
350         return true
351 }
352
353 // AtQuota returns true if Create is not expected to work at the
354 // moment.
355 func (wp *Pool) AtQuota() bool {
356         wp.mtx.Lock()
357         defer wp.mtx.Unlock()
358         return time.Now().Before(wp.atQuotaUntil)
359 }
360
361 // SetIdleBehavior determines how the indicated instance will behave
362 // when it has no containers running.
363 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
364         wp.mtx.Lock()
365         defer wp.mtx.Unlock()
366         wkr, ok := wp.workers[id]
367         if !ok {
368                 return errors.New("requested instance does not exist")
369         }
370         wkr.setIdleBehavior(idleBehavior)
371         return nil
372 }
373
374 // Successful connection to the SSH daemon, update the mTimeToSSH metric
375 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
376         wp.mtx.Lock()
377         defer wp.mtx.Unlock()
378         wkr := wp.workers[inst.ID()]
379         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
380                 // the node is not in booting state (can happen if a-d-c is restarted) OR
381                 // this is not the first SSH connection
382                 return
383         }
384
385         wkr.firstSSHConnection = time.Now()
386         if wp.mTimeToSSH != nil {
387                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
388         }
389 }
390
391 // Add or update worker attached to the given instance.
392 //
393 // The second return value is true if a new worker is created.
394 //
395 // A newly added instance has state=StateBooting if its tags match an
396 // entry in wp.creating, otherwise StateUnknown.
397 //
398 // Caller must have lock.
399 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
400         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
401         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
402         id := inst.ID()
403         if wkr := wp.workers[id]; wkr != nil {
404                 wkr.executor.SetTarget(inst)
405                 wkr.instance = inst
406                 wkr.updated = time.Now()
407                 wkr.saveTags()
408                 return wkr, false
409         }
410
411         state := StateUnknown
412         if _, ok := wp.creating[secret]; ok {
413                 state = StateBooting
414         }
415
416         // If an instance has a valid IdleBehavior tag when it first
417         // appears, initialize the new worker accordingly (this is how
418         // we restore IdleBehavior that was set by a prior dispatch
419         // process); otherwise, default to "run". After this,
420         // wkr.idleBehavior is the source of truth, and will only be
421         // changed via SetIdleBehavior().
422         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
423         if !validIdleBehavior[idleBehavior] {
424                 idleBehavior = IdleBehaviorRun
425         }
426
427         logger := wp.logger.WithFields(logrus.Fields{
428                 "InstanceType": it.Name,
429                 "Instance":     inst.ID(),
430                 "Address":      inst.Address(),
431         })
432         logger.WithFields(logrus.Fields{
433                 "State":        state,
434                 "IdleBehavior": idleBehavior,
435         }).Infof("instance appeared in cloud")
436         now := time.Now()
437         wkr := &worker{
438                 mtx:          &wp.mtx,
439                 wp:           wp,
440                 logger:       logger,
441                 executor:     wp.newExecutor(inst),
442                 state:        state,
443                 idleBehavior: idleBehavior,
444                 instance:     inst,
445                 instType:     it,
446                 appeared:     now,
447                 probed:       now,
448                 busy:         now,
449                 updated:      now,
450                 running:      make(map[string]*remoteRunner),
451                 starting:     make(map[string]*remoteRunner),
452                 probing:      make(chan struct{}, 1),
453         }
454         wp.workers[id] = wkr
455         return wkr, true
456 }
457
458 // Shutdown shuts down a worker with the given type, or returns false
459 // if all workers with the given type are busy.
460 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
461         wp.setupOnce.Do(wp.setup)
462         wp.mtx.Lock()
463         defer wp.mtx.Unlock()
464         logger := wp.logger.WithField("InstanceType", it.Name)
465         logger.Info("shutdown requested")
466         for _, tryState := range []State{StateBooting, StateIdle} {
467                 // TODO: shutdown the worker with the longest idle
468                 // time (Idle) or the earliest create time (Booting)
469                 for _, wkr := range wp.workers {
470                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
471                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
472                                 wkr.reportBootOutcome(BootOutcomeAborted)
473                                 wkr.shutdown()
474                                 return true
475                         }
476                 }
477         }
478         return false
479 }
480
481 // CountWorkers returns the current number of workers in each state.
482 //
483 // CountWorkers blocks, if necessary, until the initial instance list
484 // has been loaded from the cloud provider.
485 func (wp *Pool) CountWorkers() map[State]int {
486         wp.setupOnce.Do(wp.setup)
487         wp.waitUntilLoaded()
488         wp.mtx.Lock()
489         defer wp.mtx.Unlock()
490         r := map[State]int{}
491         for _, w := range wp.workers {
492                 r[w.state]++
493         }
494         return r
495 }
496
497 // Running returns the container UUIDs being prepared/run on workers.
498 //
499 // In the returned map, the time value indicates when the Pool
500 // observed that the container process had exited. A container that
501 // has not yet exited has a zero time value. The caller should use
502 // ForgetContainer() to garbage-collect the entries for exited
503 // containers.
504 func (wp *Pool) Running() map[string]time.Time {
505         wp.setupOnce.Do(wp.setup)
506         wp.mtx.Lock()
507         defer wp.mtx.Unlock()
508         r := map[string]time.Time{}
509         for _, wkr := range wp.workers {
510                 for uuid := range wkr.running {
511                         r[uuid] = time.Time{}
512                 }
513                 for uuid := range wkr.starting {
514                         r[uuid] = time.Time{}
515                 }
516         }
517         for uuid, exited := range wp.exited {
518                 r[uuid] = exited
519         }
520         return r
521 }
522
523 // StartContainer starts a container on an idle worker immediately if
524 // possible, otherwise returns false.
525 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
526         wp.setupOnce.Do(wp.setup)
527         wp.mtx.Lock()
528         defer wp.mtx.Unlock()
529         var wkr *worker
530         for _, w := range wp.workers {
531                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
532                         if wkr == nil || w.busy.After(wkr.busy) {
533                                 wkr = w
534                         }
535                 }
536         }
537         if wkr == nil {
538                 return false
539         }
540         wkr.startContainer(ctr)
541         return true
542 }
543
544 // KillContainer kills the crunch-run process for the given container
545 // UUID, if it's running on any worker.
546 //
547 // KillContainer returns immediately; the act of killing the container
548 // takes some time, and runs in the background.
549 //
550 // KillContainer returns false if the container has already ended.
551 func (wp *Pool) KillContainer(uuid string, reason string) bool {
552         wp.mtx.Lock()
553         defer wp.mtx.Unlock()
554         logger := wp.logger.WithFields(logrus.Fields{
555                 "ContainerUUID": uuid,
556                 "Reason":        reason,
557         })
558         for _, wkr := range wp.workers {
559                 rr := wkr.running[uuid]
560                 if rr == nil {
561                         rr = wkr.starting[uuid]
562                 }
563                 if rr != nil {
564                         rr.Kill(reason)
565                         return true
566                 }
567         }
568         logger.Debug("cannot kill: already disappeared")
569         return false
570 }
571
572 // ForgetContainer clears the placeholder for the given exited
573 // container, so it isn't returned by subsequent calls to Running().
574 //
575 // ForgetContainer has no effect if the container has not yet exited.
576 //
577 // The "container exited at time T" placeholder (which necessitates
578 // ForgetContainer) exists to make it easier for the caller
579 // (scheduler) to distinguish a container that exited without
580 // finalizing its state from a container that exited too recently for
581 // its final state to have appeared in the scheduler's queue cache.
582 func (wp *Pool) ForgetContainer(uuid string) {
583         wp.mtx.Lock()
584         defer wp.mtx.Unlock()
585         if _, ok := wp.exited[uuid]; ok {
586                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
587                 delete(wp.exited, uuid)
588         }
589 }
590
591 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
592         if reg == nil {
593                 reg = prometheus.NewRegistry()
594         }
595         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
596                 Namespace: "arvados",
597                 Subsystem: "dispatchcloud",
598                 Name:      "containers_running",
599                 Help:      "Number of containers reported running by cloud VMs.",
600         })
601         reg.MustRegister(wp.mContainersRunning)
602         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
603                 Namespace: "arvados",
604                 Subsystem: "dispatchcloud",
605                 Name:      "instances_total",
606                 Help:      "Number of cloud VMs.",
607         }, []string{"category", "instance_type"})
608         reg.MustRegister(wp.mInstances)
609         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
610                 Namespace: "arvados",
611                 Subsystem: "dispatchcloud",
612                 Name:      "instances_price",
613                 Help:      "Price of cloud VMs.",
614         }, []string{"category"})
615         reg.MustRegister(wp.mInstancesPrice)
616         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
617                 Namespace: "arvados",
618                 Subsystem: "dispatchcloud",
619                 Name:      "vcpus_total",
620                 Help:      "Total VCPUs on all cloud VMs.",
621         }, []string{"category"})
622         reg.MustRegister(wp.mVCPUs)
623         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
624                 Namespace: "arvados",
625                 Subsystem: "dispatchcloud",
626                 Name:      "memory_bytes_total",
627                 Help:      "Total memory on all cloud VMs.",
628         }, []string{"category"})
629         reg.MustRegister(wp.mMemory)
630         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
631                 Namespace: "arvados",
632                 Subsystem: "dispatchcloud",
633                 Name:      "boot_outcomes",
634                 Help:      "Boot outcomes by type.",
635         }, []string{"outcome"})
636         for k := range validBootOutcomes {
637                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
638         }
639         reg.MustRegister(wp.mBootOutcomes)
640         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
641                 Namespace: "arvados",
642                 Subsystem: "dispatchcloud",
643                 Name:      "instances_disappeared",
644                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
645         }, []string{"state"})
646         for _, v := range stateString {
647                 wp.mDisappearances.WithLabelValues(v).Add(0)
648         }
649         reg.MustRegister(wp.mDisappearances)
650         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
651                 Namespace:  "arvados",
652                 Subsystem:  "dispatchcloud",
653                 Name:       "instances_time_to_ssh_seconds",
654                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
655                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
656         })
657         reg.MustRegister(wp.mTimeToSSH)
658         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
659                 Namespace:  "arvados",
660                 Subsystem:  "dispatchcloud",
661                 Name:       "instances_time_to_ready_for_container_seconds",
662                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
663                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
664         })
665         reg.MustRegister(wp.mTimeToReadyForContainer)
666         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
667                 Namespace:  "arvados",
668                 Subsystem:  "dispatchcloud",
669                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
670                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
671                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
672         })
673         reg.MustRegister(wp.mTimeFromShutdownToGone)
674         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
675                 Namespace:  "arvados",
676                 Subsystem:  "dispatchcloud",
677                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
678                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
679                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
680         })
681         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
682 }
683
684 func (wp *Pool) runMetrics() {
685         ch := wp.Subscribe()
686         defer wp.Unsubscribe(ch)
687         wp.updateMetrics()
688         for range ch {
689                 wp.updateMetrics()
690         }
691 }
692
693 func (wp *Pool) updateMetrics() {
694         wp.mtx.RLock()
695         defer wp.mtx.RUnlock()
696
697         type entKey struct {
698                 cat      string
699                 instType string
700         }
701         instances := map[entKey]int64{}
702         price := map[string]float64{}
703         cpu := map[string]int64{}
704         mem := map[string]int64{}
705         var running int64
706         for _, wkr := range wp.workers {
707                 var cat string
708                 switch {
709                 case len(wkr.running)+len(wkr.starting) > 0:
710                         cat = "inuse"
711                 case wkr.idleBehavior == IdleBehaviorHold:
712                         cat = "hold"
713                 case wkr.state == StateBooting:
714                         cat = "booting"
715                 case wkr.state == StateUnknown:
716                         cat = "unknown"
717                 default:
718                         cat = "idle"
719                 }
720                 instances[entKey{cat, wkr.instType.Name}]++
721                 price[cat] += wkr.instType.Price
722                 cpu[cat] += int64(wkr.instType.VCPUs)
723                 mem[cat] += int64(wkr.instType.RAM)
724                 running += int64(len(wkr.running) + len(wkr.starting))
725         }
726         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
727                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
728                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
729                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
730                 // make sure to reset gauges for non-existing category/nodetype combinations
731                 for _, it := range wp.instanceTypes {
732                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
733                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
734                         }
735                 }
736         }
737         for k, v := range instances {
738                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
739         }
740         wp.mContainersRunning.Set(float64(running))
741 }
742
743 func (wp *Pool) runProbes() {
744         maxPPS := wp.maxProbesPerSecond
745         if maxPPS < 1 {
746                 maxPPS = defaultMaxProbesPerSecond
747         }
748         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
749         defer limitticker.Stop()
750
751         probeticker := time.NewTicker(wp.probeInterval)
752         defer probeticker.Stop()
753
754         workers := []cloud.InstanceID{}
755         for range probeticker.C {
756                 workers = workers[:0]
757                 wp.mtx.Lock()
758                 for id, wkr := range wp.workers {
759                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
760                                 continue
761                         }
762                         workers = append(workers, id)
763                 }
764                 wp.mtx.Unlock()
765
766                 for _, id := range workers {
767                         wp.mtx.Lock()
768                         wkr, ok := wp.workers[id]
769                         wp.mtx.Unlock()
770                         if !ok {
771                                 // Deleted while we were probing
772                                 // others
773                                 continue
774                         }
775                         go wkr.ProbeAndUpdate()
776                         select {
777                         case <-wp.stop:
778                                 return
779                         case <-limitticker.C:
780                         }
781                 }
782         }
783 }
784
785 func (wp *Pool) runSync() {
786         // sync once immediately, then wait syncInterval, sync again,
787         // etc.
788         timer := time.NewTimer(1)
789         for {
790                 select {
791                 case <-timer.C:
792                         err := wp.getInstancesAndSync()
793                         if err != nil {
794                                 wp.logger.WithError(err).Warn("sync failed")
795                         }
796                         timer.Reset(wp.syncInterval)
797                 case <-wp.stop:
798                         wp.logger.Debug("worker.Pool stopped")
799                         return
800                 }
801         }
802 }
803
804 // Stop synchronizing with the InstanceSet.
805 func (wp *Pool) Stop() {
806         wp.setupOnce.Do(wp.setup)
807         close(wp.stop)
808 }
809
810 // Instances returns an InstanceView for each worker in the pool,
811 // summarizing its current state and recent activity.
812 func (wp *Pool) Instances() []InstanceView {
813         var r []InstanceView
814         wp.setupOnce.Do(wp.setup)
815         wp.mtx.Lock()
816         for _, w := range wp.workers {
817                 r = append(r, InstanceView{
818                         Instance:             w.instance.ID(),
819                         Address:              w.instance.Address(),
820                         Price:                w.instType.Price,
821                         ArvadosInstanceType:  w.instType.Name,
822                         ProviderInstanceType: w.instType.ProviderType,
823                         LastContainerUUID:    w.lastUUID,
824                         LastBusy:             w.busy,
825                         WorkerState:          w.state.String(),
826                         IdleBehavior:         w.idleBehavior,
827                 })
828         }
829         wp.mtx.Unlock()
830         sort.Slice(r, func(i, j int) bool {
831                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
832         })
833         return r
834 }
835
836 // KillInstance destroys a cloud VM instance. It returns an error if
837 // the given instance does not exist.
838 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
839         wkr, ok := wp.workers[id]
840         if !ok {
841                 return errors.New("instance not found")
842         }
843         wkr.logger.WithField("Reason", reason).Info("shutting down")
844         wkr.reportBootOutcome(BootOutcomeAborted)
845         wkr.shutdown()
846         return nil
847 }
848
849 func (wp *Pool) setup() {
850         wp.creating = map[string]createCall{}
851         wp.exited = map[string]time.Time{}
852         wp.workers = map[cloud.InstanceID]*worker{}
853         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
854         wp.loadRunnerData()
855 }
856
857 // Load the runner program to be deployed on worker nodes into
858 // wp.runnerData, if necessary. Errors are logged.
859 //
860 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
861 //
862 // Caller must not have lock.
863 func (wp *Pool) loadRunnerData() error {
864         wp.mtx.Lock()
865         defer wp.mtx.Unlock()
866         if wp.runnerData != nil {
867                 return nil
868         } else if wp.runnerSource == "" {
869                 wp.runnerCmd = "crunch-run"
870                 wp.runnerData = []byte{}
871                 return nil
872         }
873         logger := wp.logger.WithField("source", wp.runnerSource)
874         logger.Debug("loading runner")
875         buf, err := ioutil.ReadFile(wp.runnerSource)
876         if err != nil {
877                 logger.WithError(err).Error("failed to load runner program")
878                 return err
879         }
880         wp.runnerData = buf
881         wp.runnerMD5 = md5.Sum(buf)
882         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
883         return nil
884 }
885
886 func (wp *Pool) notify() {
887         wp.mtx.RLock()
888         defer wp.mtx.RUnlock()
889         for _, send := range wp.subscribers {
890                 select {
891                 case send <- struct{}{}:
892                 default:
893                 }
894         }
895 }
896
897 func (wp *Pool) getInstancesAndSync() error {
898         wp.setupOnce.Do(wp.setup)
899         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
900                 return err
901         }
902         wp.logger.Debug("getting instance list")
903         threshold := time.Now()
904         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
905         if err != nil {
906                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
907                 return err
908         }
909         wp.sync(threshold, instances)
910         wp.logger.Debug("sync done")
911         return nil
912 }
913
914 // Add/remove/update workers based on instances, which was obtained
915 // from the instanceSet. However, don't clobber any other updates that
916 // already happened after threshold.
917 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
918         wp.mtx.Lock()
919         defer wp.mtx.Unlock()
920         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
921         notify := false
922
923         for _, inst := range instances {
924                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
925                 it, ok := wp.instanceTypes[itTag]
926                 if !ok {
927                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
928                         continue
929                 }
930                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
931                         notify = true
932                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
933                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
934                         wkr.shutdown()
935                 }
936         }
937
938         for id, wkr := range wp.workers {
939                 if wkr.updated.After(threshold) {
940                         continue
941                 }
942                 logger := wp.logger.WithFields(logrus.Fields{
943                         "Instance":    wkr.instance.ID(),
944                         "WorkerState": wkr.state,
945                 })
946                 logger.Info("instance disappeared in cloud")
947                 wkr.reportBootOutcome(BootOutcomeDisappeared)
948                 if wp.mDisappearances != nil {
949                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
950                 }
951                 if wp.mTimeFromShutdownToGone != nil {
952                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
953                 }
954                 delete(wp.workers, id)
955                 go wkr.Close()
956                 notify = true
957         }
958
959         if !wp.loaded {
960                 notify = true
961                 wp.loaded = true
962                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
963         }
964
965         if notify {
966                 go wp.notify()
967         }
968 }
969
970 func (wp *Pool) waitUntilLoaded() {
971         ch := wp.Subscribe()
972         wp.mtx.RLock()
973         defer wp.mtx.RUnlock()
974         for !wp.loaded {
975                 wp.mtx.RUnlock()
976                 <-ch
977                 wp.mtx.RLock()
978         }
979 }
980
981 // Return a random string of n hexadecimal digits (n*4 random bits). n
982 // must be even.
983 func randomHex(n int) string {
984         buf := make([]byte, n/2)
985         _, err := rand.Read(buf)
986         if err != nil {
987                 panic(err)
988         }
989         return fmt.Sprintf("%x", buf)
990 }