16636: add instances_time_from_shutdown_request_to_disappearance_seconds
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval       = time.Minute
68         defaultProbeInterval      = time.Second * 10
69         defaultMaxProbesPerSecond = 10
70         defaultTimeoutIdle        = time.Minute
71         defaultTimeoutBooting     = time.Minute * 10
72         defaultTimeoutProbe       = time.Minute * 10
73         defaultTimeoutShutdown    = time.Second * 10
74         defaultTimeoutTERM        = time.Minute * 2
75         defaultTimeoutSignal      = time.Second * 5
76
77         // Time after a quota error to try again anyway, even if no
78         // instances have been shutdown.
79         quotaErrorTTL = time.Minute
80
81         // Time between "X failed because rate limiting" messages
82         logRateLimitErrorInterval = time.Second * 10
83 )
84
85 func duration(conf arvados.Duration, def time.Duration) time.Duration {
86         if conf > 0 {
87                 return time.Duration(conf)
88         } else {
89                 return def
90         }
91 }
92
93 // NewPool creates a Pool of workers backed by instanceSet.
94 //
95 // New instances are configured and set up according to the given
96 // cluster configuration.
97 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
98         wp := &Pool{
99                 logger:                         logger,
100                 arvClient:                      arvClient,
101                 instanceSetID:                  instanceSetID,
102                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
103                 newExecutor:                    newExecutor,
104                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
105                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
106                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
107                 instanceTypes:                  cluster.InstanceTypes,
108                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
109                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
110                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
111                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
112                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
113                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
114                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
115                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
116                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
117                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
118                 installPublicKey:               installPublicKey,
119                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
120                 stop:                           make(chan bool),
121         }
122         wp.registerMetrics(reg)
123         go func() {
124                 wp.setupOnce.Do(wp.setup)
125                 go wp.runMetrics()
126                 go wp.runProbes()
127                 go wp.runSync()
128         }()
129         return wp
130 }
131
132 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
133 // zero Pool should not be used. Call NewPool to create a new Pool.
134 type Pool struct {
135         // configuration
136         logger                         logrus.FieldLogger
137         arvClient                      *arvados.Client
138         instanceSetID                  cloud.InstanceSetID
139         instanceSet                    *throttledInstanceSet
140         newExecutor                    func(cloud.Instance) Executor
141         bootProbeCommand               string
142         runnerSource                   string
143         imageID                        cloud.ImageID
144         instanceTypes                  map[string]arvados.InstanceType
145         syncInterval                   time.Duration
146         probeInterval                  time.Duration
147         maxProbesPerSecond             int
148         maxConcurrentInstanceCreateOps int
149         timeoutIdle                    time.Duration
150         timeoutBooting                 time.Duration
151         timeoutProbe                   time.Duration
152         timeoutShutdown                time.Duration
153         timeoutTERM                    time.Duration
154         timeoutSignal                  time.Duration
155         installPublicKey               ssh.PublicKey
156         tagKeyPrefix                   string
157
158         // private state
159         subscribers  map[<-chan struct{}]chan<- struct{}
160         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
161         workers      map[cloud.InstanceID]*worker
162         loaded       bool                 // loaded list of instances from InstanceSet at least once
163         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
164         atQuotaUntil time.Time
165         atQuotaErr   cloud.QuotaError
166         stop         chan bool
167         mtx          sync.RWMutex
168         setupOnce    sync.Once
169         runnerData   []byte
170         runnerMD5    [md5.Size]byte
171         runnerCmd    string
172
173         mContainersRunning       prometheus.Gauge
174         mInstances               *prometheus.GaugeVec
175         mInstancesPrice          *prometheus.GaugeVec
176         mVCPUs                   *prometheus.GaugeVec
177         mMemory                  *prometheus.GaugeVec
178         mBootOutcomes            *prometheus.CounterVec
179         mDisappearances          *prometheus.CounterVec
180         mTimeToSSH               prometheus.Summary
181         mTimeToReadyForContainer prometheus.Summary
182         mTimeFromShutdownToGone  prometheus.Summary
183 }
184
185 type createCall struct {
186         time         time.Time
187         instanceType arvados.InstanceType
188 }
189
190 func (wp *Pool) CheckHealth() error {
191         wp.setupOnce.Do(wp.setup)
192         if err := wp.loadRunnerData(); err != nil {
193                 return fmt.Errorf("error loading runner binary: %s", err)
194         }
195         return nil
196 }
197
198 // Subscribe returns a buffered channel that becomes ready after any
199 // change to the pool's state that could have scheduling implications:
200 // a worker's state changes, a new worker appears, the cloud
201 // provider's API rate limiting period ends, etc.
202 //
203 // Additional events that occur while the channel is already ready
204 // will be dropped, so it is OK if the caller services the channel
205 // slowly.
206 //
207 // Example:
208 //
209 //      ch := wp.Subscribe()
210 //      defer wp.Unsubscribe(ch)
211 //      for range ch {
212 //              tryScheduling(wp)
213 //              if done {
214 //                      break
215 //              }
216 //      }
217 func (wp *Pool) Subscribe() <-chan struct{} {
218         wp.setupOnce.Do(wp.setup)
219         wp.mtx.Lock()
220         defer wp.mtx.Unlock()
221         ch := make(chan struct{}, 1)
222         wp.subscribers[ch] = ch
223         return ch
224 }
225
226 // Unsubscribe stops sending updates to the given channel.
227 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
228         wp.setupOnce.Do(wp.setup)
229         wp.mtx.Lock()
230         defer wp.mtx.Unlock()
231         delete(wp.subscribers, ch)
232 }
233
234 // Unallocated returns the number of unallocated (creating + booting +
235 // idle + unknown) workers for each instance type.  Workers in
236 // hold/drain mode are not included.
237 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
238         wp.setupOnce.Do(wp.setup)
239         wp.mtx.RLock()
240         defer wp.mtx.RUnlock()
241         unalloc := map[arvados.InstanceType]int{}
242         creating := map[arvados.InstanceType]int{}
243         oldestCreate := map[arvados.InstanceType]time.Time{}
244         for _, cc := range wp.creating {
245                 it := cc.instanceType
246                 creating[it]++
247                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
248                         oldestCreate[it] = cc.time
249                 }
250         }
251         for _, wkr := range wp.workers {
252                 // Skip workers that are not expected to become
253                 // available soon. Note len(wkr.running)>0 is not
254                 // redundant here: it can be true even in
255                 // StateUnknown.
256                 if wkr.state == StateShutdown ||
257                         wkr.state == StateRunning ||
258                         wkr.idleBehavior != IdleBehaviorRun ||
259                         len(wkr.running) > 0 {
260                         continue
261                 }
262                 it := wkr.instType
263                 unalloc[it]++
264                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
265                         // If up to N new workers appear in
266                         // Instances() while we are waiting for N
267                         // Create() calls to complete, we assume we're
268                         // just seeing a race between Instances() and
269                         // Create() responses.
270                         //
271                         // The other common reason why nodes have
272                         // state==Unknown is that they appeared at
273                         // startup, before any Create calls. They
274                         // don't match the above timing condition, so
275                         // we never mistakenly attribute them to
276                         // pending Create calls.
277                         creating[it]--
278                 }
279         }
280         for it, c := range creating {
281                 unalloc[it] += c
282         }
283         return unalloc
284 }
285
286 // Create a new instance with the given type, and add it to the worker
287 // pool. The worker is added immediately; instance creation runs in
288 // the background.
289 //
290 // Create returns false if a pre-existing error state prevents it from
291 // even attempting to create a new instance. Those errors are logged
292 // by the Pool, so the caller does not need to log anything in such
293 // cases.
294 func (wp *Pool) Create(it arvados.InstanceType) bool {
295         logger := wp.logger.WithField("InstanceType", it.Name)
296         wp.setupOnce.Do(wp.setup)
297         if wp.loadRunnerData() != nil {
298                 // Boot probe is certain to fail.
299                 return false
300         }
301         wp.mtx.Lock()
302         defer wp.mtx.Unlock()
303         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
304                 return false
305         }
306         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
307         // requests in flight. It was added to work around a limitation in Azure's
308         // managed disks, which support no more than 20 concurrent node creation
309         // requests from a single disk image (cf.
310         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
311         // The code assumes that node creation, from Azure's perspective, means the
312         // period until the instance appears in the "get all instances" list.
313         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
314                 logger.Info("reached MaxConcurrentInstanceCreateOps")
315                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
316                 return false
317         }
318         now := time.Now()
319         secret := randomHex(instanceSecretLength)
320         wp.creating[secret] = createCall{time: now, instanceType: it}
321         go func() {
322                 defer wp.notify()
323                 tags := cloud.InstanceTags{
324                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
325                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
326                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
327                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
328                 }
329                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
330                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
331                 wp.mtx.Lock()
332                 defer wp.mtx.Unlock()
333                 // delete() is deferred so the updateWorker() call
334                 // below knows to use StateBooting when adding a new
335                 // worker.
336                 defer delete(wp.creating, secret)
337                 if err != nil {
338                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
339                                 wp.atQuotaErr = err
340                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
341                                 time.AfterFunc(quotaErrorTTL, wp.notify)
342                         }
343                         logger.WithError(err).Error("create failed")
344                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
345                         return
346                 }
347                 wp.updateWorker(inst, it)
348         }()
349         return true
350 }
351
352 // AtQuota returns true if Create is not expected to work at the
353 // moment.
354 func (wp *Pool) AtQuota() bool {
355         wp.mtx.Lock()
356         defer wp.mtx.Unlock()
357         return time.Now().Before(wp.atQuotaUntil)
358 }
359
360 // SetIdleBehavior determines how the indicated instance will behave
361 // when it has no containers running.
362 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
363         wp.mtx.Lock()
364         defer wp.mtx.Unlock()
365         wkr, ok := wp.workers[id]
366         if !ok {
367                 return errors.New("requested instance does not exist")
368         }
369         wkr.setIdleBehavior(idleBehavior)
370         return nil
371 }
372
373 // Successful connection to the SSH daemon, update the mTimeToSSH metric
374 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
375         wp.mtx.Lock()
376         defer wp.mtx.Unlock()
377         wkr := wp.workers[inst.ID()]
378         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
379                 // the node is not in booting state (can happen if a-d-c is restarted) OR
380                 // this is not the first SSH connection
381                 return
382         }
383
384         wkr.firstSSHConnection = time.Now()
385         if wp.mTimeToSSH != nil {
386                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
387         }
388 }
389
390 // Add or update worker attached to the given instance.
391 //
392 // The second return value is true if a new worker is created.
393 //
394 // A newly added instance has state=StateBooting if its tags match an
395 // entry in wp.creating, otherwise StateUnknown.
396 //
397 // Caller must have lock.
398 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
399         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
400         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
401         id := inst.ID()
402         if wkr := wp.workers[id]; wkr != nil {
403                 wkr.executor.SetTarget(inst)
404                 wkr.instance = inst
405                 wkr.updated = time.Now()
406                 wkr.saveTags()
407                 return wkr, false
408         }
409
410         state := StateUnknown
411         if _, ok := wp.creating[secret]; ok {
412                 state = StateBooting
413         }
414
415         // If an instance has a valid IdleBehavior tag when it first
416         // appears, initialize the new worker accordingly (this is how
417         // we restore IdleBehavior that was set by a prior dispatch
418         // process); otherwise, default to "run". After this,
419         // wkr.idleBehavior is the source of truth, and will only be
420         // changed via SetIdleBehavior().
421         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
422         if !validIdleBehavior[idleBehavior] {
423                 idleBehavior = IdleBehaviorRun
424         }
425
426         logger := wp.logger.WithFields(logrus.Fields{
427                 "InstanceType": it.Name,
428                 "Instance":     inst.ID(),
429                 "Address":      inst.Address(),
430         })
431         logger.WithFields(logrus.Fields{
432                 "State":        state,
433                 "IdleBehavior": idleBehavior,
434         }).Infof("instance appeared in cloud")
435         now := time.Now()
436         wkr := &worker{
437                 mtx:          &wp.mtx,
438                 wp:           wp,
439                 logger:       logger,
440                 executor:     wp.newExecutor(inst),
441                 state:        state,
442                 idleBehavior: idleBehavior,
443                 instance:     inst,
444                 instType:     it,
445                 appeared:     now,
446                 probed:       now,
447                 busy:         now,
448                 updated:      now,
449                 running:      make(map[string]*remoteRunner),
450                 starting:     make(map[string]*remoteRunner),
451                 probing:      make(chan struct{}, 1),
452         }
453         wp.workers[id] = wkr
454         return wkr, true
455 }
456
457 // Shutdown shuts down a worker with the given type, or returns false
458 // if all workers with the given type are busy.
459 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
460         wp.setupOnce.Do(wp.setup)
461         wp.mtx.Lock()
462         defer wp.mtx.Unlock()
463         logger := wp.logger.WithField("InstanceType", it.Name)
464         logger.Info("shutdown requested")
465         for _, tryState := range []State{StateBooting, StateIdle} {
466                 // TODO: shutdown the worker with the longest idle
467                 // time (Idle) or the earliest create time (Booting)
468                 for _, wkr := range wp.workers {
469                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
470                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
471                                 wkr.reportBootOutcome(BootOutcomeAborted)
472                                 wkr.shutdown()
473                                 return true
474                         }
475                 }
476         }
477         return false
478 }
479
480 // CountWorkers returns the current number of workers in each state.
481 //
482 // CountWorkers blocks, if necessary, until the initial instance list
483 // has been loaded from the cloud provider.
484 func (wp *Pool) CountWorkers() map[State]int {
485         wp.setupOnce.Do(wp.setup)
486         wp.waitUntilLoaded()
487         wp.mtx.Lock()
488         defer wp.mtx.Unlock()
489         r := map[State]int{}
490         for _, w := range wp.workers {
491                 r[w.state]++
492         }
493         return r
494 }
495
496 // Running returns the container UUIDs being prepared/run on workers.
497 //
498 // In the returned map, the time value indicates when the Pool
499 // observed that the container process had exited. A container that
500 // has not yet exited has a zero time value. The caller should use
501 // ForgetContainer() to garbage-collect the entries for exited
502 // containers.
503 func (wp *Pool) Running() map[string]time.Time {
504         wp.setupOnce.Do(wp.setup)
505         wp.mtx.Lock()
506         defer wp.mtx.Unlock()
507         r := map[string]time.Time{}
508         for _, wkr := range wp.workers {
509                 for uuid := range wkr.running {
510                         r[uuid] = time.Time{}
511                 }
512                 for uuid := range wkr.starting {
513                         r[uuid] = time.Time{}
514                 }
515         }
516         for uuid, exited := range wp.exited {
517                 r[uuid] = exited
518         }
519         return r
520 }
521
522 // StartContainer starts a container on an idle worker immediately if
523 // possible, otherwise returns false.
524 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
525         wp.setupOnce.Do(wp.setup)
526         wp.mtx.Lock()
527         defer wp.mtx.Unlock()
528         var wkr *worker
529         for _, w := range wp.workers {
530                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
531                         if wkr == nil || w.busy.After(wkr.busy) {
532                                 wkr = w
533                         }
534                 }
535         }
536         if wkr == nil {
537                 return false
538         }
539         wkr.startContainer(ctr)
540         return true
541 }
542
543 // KillContainer kills the crunch-run process for the given container
544 // UUID, if it's running on any worker.
545 //
546 // KillContainer returns immediately; the act of killing the container
547 // takes some time, and runs in the background.
548 //
549 // KillContainer returns false if the container has already ended.
550 func (wp *Pool) KillContainer(uuid string, reason string) bool {
551         wp.mtx.Lock()
552         defer wp.mtx.Unlock()
553         logger := wp.logger.WithFields(logrus.Fields{
554                 "ContainerUUID": uuid,
555                 "Reason":        reason,
556         })
557         for _, wkr := range wp.workers {
558                 rr := wkr.running[uuid]
559                 if rr == nil {
560                         rr = wkr.starting[uuid]
561                 }
562                 if rr != nil {
563                         rr.Kill(reason)
564                         return true
565                 }
566         }
567         logger.Debug("cannot kill: already disappeared")
568         return false
569 }
570
571 // ForgetContainer clears the placeholder for the given exited
572 // container, so it isn't returned by subsequent calls to Running().
573 //
574 // ForgetContainer has no effect if the container has not yet exited.
575 //
576 // The "container exited at time T" placeholder (which necessitates
577 // ForgetContainer) exists to make it easier for the caller
578 // (scheduler) to distinguish a container that exited without
579 // finalizing its state from a container that exited too recently for
580 // its final state to have appeared in the scheduler's queue cache.
581 func (wp *Pool) ForgetContainer(uuid string) {
582         wp.mtx.Lock()
583         defer wp.mtx.Unlock()
584         if _, ok := wp.exited[uuid]; ok {
585                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
586                 delete(wp.exited, uuid)
587         }
588 }
589
590 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
591         if reg == nil {
592                 reg = prometheus.NewRegistry()
593         }
594         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
595                 Namespace: "arvados",
596                 Subsystem: "dispatchcloud",
597                 Name:      "containers_running",
598                 Help:      "Number of containers reported running by cloud VMs.",
599         })
600         reg.MustRegister(wp.mContainersRunning)
601         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
602                 Namespace: "arvados",
603                 Subsystem: "dispatchcloud",
604                 Name:      "instances_total",
605                 Help:      "Number of cloud VMs.",
606         }, []string{"category", "instance_type"})
607         reg.MustRegister(wp.mInstances)
608         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
609                 Namespace: "arvados",
610                 Subsystem: "dispatchcloud",
611                 Name:      "instances_price",
612                 Help:      "Price of cloud VMs.",
613         }, []string{"category"})
614         reg.MustRegister(wp.mInstancesPrice)
615         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
616                 Namespace: "arvados",
617                 Subsystem: "dispatchcloud",
618                 Name:      "vcpus_total",
619                 Help:      "Total VCPUs on all cloud VMs.",
620         }, []string{"category"})
621         reg.MustRegister(wp.mVCPUs)
622         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
623                 Namespace: "arvados",
624                 Subsystem: "dispatchcloud",
625                 Name:      "memory_bytes_total",
626                 Help:      "Total memory on all cloud VMs.",
627         }, []string{"category"})
628         reg.MustRegister(wp.mMemory)
629         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
630                 Namespace: "arvados",
631                 Subsystem: "dispatchcloud",
632                 Name:      "boot_outcomes",
633                 Help:      "Boot outcomes by type.",
634         }, []string{"outcome"})
635         for k := range validBootOutcomes {
636                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
637         }
638         reg.MustRegister(wp.mBootOutcomes)
639         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
640                 Namespace: "arvados",
641                 Subsystem: "dispatchcloud",
642                 Name:      "instances_disappeared",
643                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
644         }, []string{"state"})
645         for _, v := range stateString {
646                 wp.mDisappearances.WithLabelValues(v).Add(0)
647         }
648         reg.MustRegister(wp.mDisappearances)
649         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
650                 Namespace:  "arvados",
651                 Subsystem:  "dispatchcloud",
652                 Name:       "instances_time_to_ssh_seconds",
653                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
654                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
655         })
656         reg.MustRegister(wp.mTimeToSSH)
657         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
658                 Namespace:  "arvados",
659                 Subsystem:  "dispatchcloud",
660                 Name:       "instances_time_to_ready_for_container_seconds",
661                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
662                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
663         })
664         reg.MustRegister(wp.mTimeToReadyForContainer)
665         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
666                 Namespace:  "arvados",
667                 Subsystem:  "dispatchcloud",
668                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
669                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
670                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
671         })
672         reg.MustRegister(wp.mTimeFromShutdownToGone)
673 }
674
675 func (wp *Pool) runMetrics() {
676         ch := wp.Subscribe()
677         defer wp.Unsubscribe(ch)
678         wp.updateMetrics()
679         for range ch {
680                 wp.updateMetrics()
681         }
682 }
683
684 func (wp *Pool) updateMetrics() {
685         wp.mtx.RLock()
686         defer wp.mtx.RUnlock()
687
688         type entKey struct {
689                 cat      string
690                 instType string
691         }
692         instances := map[entKey]int64{}
693         price := map[string]float64{}
694         cpu := map[string]int64{}
695         mem := map[string]int64{}
696         var running int64
697         for _, wkr := range wp.workers {
698                 var cat string
699                 switch {
700                 case len(wkr.running)+len(wkr.starting) > 0:
701                         cat = "inuse"
702                 case wkr.idleBehavior == IdleBehaviorHold:
703                         cat = "hold"
704                 case wkr.state == StateBooting:
705                         cat = "booting"
706                 case wkr.state == StateUnknown:
707                         cat = "unknown"
708                 default:
709                         cat = "idle"
710                 }
711                 instances[entKey{cat, wkr.instType.Name}]++
712                 price[cat] += wkr.instType.Price
713                 cpu[cat] += int64(wkr.instType.VCPUs)
714                 mem[cat] += int64(wkr.instType.RAM)
715                 running += int64(len(wkr.running) + len(wkr.starting))
716         }
717         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
718                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
719                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
720                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
721                 // make sure to reset gauges for non-existing category/nodetype combinations
722                 for _, it := range wp.instanceTypes {
723                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
724                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
725                         }
726                 }
727         }
728         for k, v := range instances {
729                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
730         }
731         wp.mContainersRunning.Set(float64(running))
732 }
733
734 func (wp *Pool) runProbes() {
735         maxPPS := wp.maxProbesPerSecond
736         if maxPPS < 1 {
737                 maxPPS = defaultMaxProbesPerSecond
738         }
739         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
740         defer limitticker.Stop()
741
742         probeticker := time.NewTicker(wp.probeInterval)
743         defer probeticker.Stop()
744
745         workers := []cloud.InstanceID{}
746         for range probeticker.C {
747                 workers = workers[:0]
748                 wp.mtx.Lock()
749                 for id, wkr := range wp.workers {
750                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
751                                 continue
752                         }
753                         workers = append(workers, id)
754                 }
755                 wp.mtx.Unlock()
756
757                 for _, id := range workers {
758                         wp.mtx.Lock()
759                         wkr, ok := wp.workers[id]
760                         wp.mtx.Unlock()
761                         if !ok {
762                                 // Deleted while we were probing
763                                 // others
764                                 continue
765                         }
766                         go wkr.ProbeAndUpdate()
767                         select {
768                         case <-wp.stop:
769                                 return
770                         case <-limitticker.C:
771                         }
772                 }
773         }
774 }
775
776 func (wp *Pool) runSync() {
777         // sync once immediately, then wait syncInterval, sync again,
778         // etc.
779         timer := time.NewTimer(1)
780         for {
781                 select {
782                 case <-timer.C:
783                         err := wp.getInstancesAndSync()
784                         if err != nil {
785                                 wp.logger.WithError(err).Warn("sync failed")
786                         }
787                         timer.Reset(wp.syncInterval)
788                 case <-wp.stop:
789                         wp.logger.Debug("worker.Pool stopped")
790                         return
791                 }
792         }
793 }
794
795 // Stop synchronizing with the InstanceSet.
796 func (wp *Pool) Stop() {
797         wp.setupOnce.Do(wp.setup)
798         close(wp.stop)
799 }
800
801 // Instances returns an InstanceView for each worker in the pool,
802 // summarizing its current state and recent activity.
803 func (wp *Pool) Instances() []InstanceView {
804         var r []InstanceView
805         wp.setupOnce.Do(wp.setup)
806         wp.mtx.Lock()
807         for _, w := range wp.workers {
808                 r = append(r, InstanceView{
809                         Instance:             w.instance.ID(),
810                         Address:              w.instance.Address(),
811                         Price:                w.instType.Price,
812                         ArvadosInstanceType:  w.instType.Name,
813                         ProviderInstanceType: w.instType.ProviderType,
814                         LastContainerUUID:    w.lastUUID,
815                         LastBusy:             w.busy,
816                         WorkerState:          w.state.String(),
817                         IdleBehavior:         w.idleBehavior,
818                 })
819         }
820         wp.mtx.Unlock()
821         sort.Slice(r, func(i, j int) bool {
822                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
823         })
824         return r
825 }
826
827 // KillInstance destroys a cloud VM instance. It returns an error if
828 // the given instance does not exist.
829 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
830         wkr, ok := wp.workers[id]
831         if !ok {
832                 return errors.New("instance not found")
833         }
834         wkr.logger.WithField("Reason", reason).Info("shutting down")
835         wkr.reportBootOutcome(BootOutcomeAborted)
836         wkr.shutdown()
837         return nil
838 }
839
840 func (wp *Pool) setup() {
841         wp.creating = map[string]createCall{}
842         wp.exited = map[string]time.Time{}
843         wp.workers = map[cloud.InstanceID]*worker{}
844         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
845         wp.loadRunnerData()
846 }
847
848 // Load the runner program to be deployed on worker nodes into
849 // wp.runnerData, if necessary. Errors are logged.
850 //
851 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
852 //
853 // Caller must not have lock.
854 func (wp *Pool) loadRunnerData() error {
855         wp.mtx.Lock()
856         defer wp.mtx.Unlock()
857         if wp.runnerData != nil {
858                 return nil
859         } else if wp.runnerSource == "" {
860                 wp.runnerCmd = "crunch-run"
861                 wp.runnerData = []byte{}
862                 return nil
863         }
864         logger := wp.logger.WithField("source", wp.runnerSource)
865         logger.Debug("loading runner")
866         buf, err := ioutil.ReadFile(wp.runnerSource)
867         if err != nil {
868                 logger.WithError(err).Error("failed to load runner program")
869                 return err
870         }
871         wp.runnerData = buf
872         wp.runnerMD5 = md5.Sum(buf)
873         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
874         return nil
875 }
876
877 func (wp *Pool) notify() {
878         wp.mtx.RLock()
879         defer wp.mtx.RUnlock()
880         for _, send := range wp.subscribers {
881                 select {
882                 case send <- struct{}{}:
883                 default:
884                 }
885         }
886 }
887
888 func (wp *Pool) getInstancesAndSync() error {
889         wp.setupOnce.Do(wp.setup)
890         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
891                 return err
892         }
893         wp.logger.Debug("getting instance list")
894         threshold := time.Now()
895         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
896         if err != nil {
897                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
898                 return err
899         }
900         wp.sync(threshold, instances)
901         wp.logger.Debug("sync done")
902         return nil
903 }
904
905 // Add/remove/update workers based on instances, which was obtained
906 // from the instanceSet. However, don't clobber any other updates that
907 // already happened after threshold.
908 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
909         wp.mtx.Lock()
910         defer wp.mtx.Unlock()
911         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
912         notify := false
913
914         for _, inst := range instances {
915                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
916                 it, ok := wp.instanceTypes[itTag]
917                 if !ok {
918                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
919                         continue
920                 }
921                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
922                         notify = true
923                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
924                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
925                         wkr.shutdown()
926                 }
927         }
928
929         for id, wkr := range wp.workers {
930                 if wkr.updated.After(threshold) {
931                         continue
932                 }
933                 logger := wp.logger.WithFields(logrus.Fields{
934                         "Instance":    wkr.instance.ID(),
935                         "WorkerState": wkr.state,
936                 })
937                 logger.Info("instance disappeared in cloud")
938                 wkr.reportBootOutcome(BootOutcomeDisappeared)
939                 if wp.mDisappearances != nil {
940                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
941                 }
942                 if wp.mTimeFromShutdownToGone != nil {
943                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
944                 }
945                 delete(wp.workers, id)
946                 go wkr.Close()
947                 notify = true
948         }
949
950         if !wp.loaded {
951                 notify = true
952                 wp.loaded = true
953                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
954         }
955
956         if notify {
957                 go wp.notify()
958         }
959 }
960
961 func (wp *Pool) waitUntilLoaded() {
962         ch := wp.Subscribe()
963         wp.mtx.RLock()
964         defer wp.mtx.RUnlock()
965         for !wp.loaded {
966                 wp.mtx.RUnlock()
967                 <-ch
968                 wp.mtx.RLock()
969         }
970 }
971
972 // Return a random string of n hexadecimal digits (n*4 random bits). n
973 // must be even.
974 func randomHex(n int) string {
975         buf := make([]byte, n/2)
976         _, err := rand.Read(buf)
977         if err != nil {
978                 panic(err)
979         }
980         return fmt.Sprintf("%x", buf)
981 }