16838: a-d-c: metrics: add runProbe success/failure duration metrics.
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/md5"
9         "crypto/rand"
10         "errors"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "sort"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cloud"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/prometheus/client_golang/prometheus"
22         "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/ssh"
24 )
25
26 const (
27         tagKeyInstanceType   = "InstanceType"
28         tagKeyIdleBehavior   = "IdleBehavior"
29         tagKeyInstanceSecret = "InstanceSecret"
30         tagKeyInstanceSetID  = "InstanceSetID"
31 )
32
33 // An InstanceView shows a worker's current state and recent activity.
34 type InstanceView struct {
35         Instance             cloud.InstanceID `json:"instance"`
36         Address              string           `json:"address"`
37         Price                float64          `json:"price"`
38         ArvadosInstanceType  string           `json:"arvados_instance_type"`
39         ProviderInstanceType string           `json:"provider_instance_type"`
40         LastContainerUUID    string           `json:"last_container_uuid"`
41         LastBusy             time.Time        `json:"last_busy"`
42         WorkerState          string           `json:"worker_state"`
43         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
44 }
45
46 // An Executor executes shell commands on a remote host.
47 type Executor interface {
48         // Run cmd on the current target.
49         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
50
51         // Use the given target for subsequent operations. The new
52         // target is the same host as the previous target, but it
53         // might return a different address and verify a different
54         // host key.
55         //
56         // SetTarget is called frequently, and in most cases the new
57         // target will behave exactly the same as the old one. An
58         // implementation should optimize accordingly.
59         //
60         // SetTarget must not block on concurrent Execute calls.
61         SetTarget(cloud.ExecutorTarget)
62
63         Close()
64 }
65
66 const (
67         defaultSyncInterval        = time.Minute
68         defaultProbeInterval       = time.Second * 10
69         defaultMaxProbesPerSecond  = 10
70         defaultTimeoutIdle         = time.Minute
71         defaultTimeoutBooting      = time.Minute * 10
72         defaultTimeoutProbe        = time.Minute * 10
73         defaultTimeoutShutdown     = time.Second * 10
74         defaultTimeoutTERM         = time.Minute * 2
75         defaultTimeoutSignal       = time.Second * 5
76         defaultTimeoutStaleRunLock = time.Second * 5
77
78         // Time after a quota error to try again anyway, even if no
79         // instances have been shutdown.
80         quotaErrorTTL = time.Minute
81
82         // Time between "X failed because rate limiting" messages
83         logRateLimitErrorInterval = time.Second * 10
84 )
85
86 func duration(conf arvados.Duration, def time.Duration) time.Duration {
87         if conf > 0 {
88                 return time.Duration(conf)
89         } else {
90                 return def
91         }
92 }
93
94 // NewPool creates a Pool of workers backed by instanceSet.
95 //
96 // New instances are configured and set up according to the given
97 // cluster configuration.
98 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
99         wp := &Pool{
100                 logger:                         logger,
101                 arvClient:                      arvClient,
102                 instanceSetID:                  instanceSetID,
103                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
104                 newExecutor:                    newExecutor,
105                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
106                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
107                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
108                 instanceTypes:                  cluster.InstanceTypes,
109                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
110                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
111                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
112                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
113                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
114                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
115                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
116                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
117                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
118                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
119                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
120                 installPublicKey:               installPublicKey,
121                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
122                 stop:                           make(chan bool),
123         }
124         wp.registerMetrics(reg)
125         go func() {
126                 wp.setupOnce.Do(wp.setup)
127                 go wp.runMetrics()
128                 go wp.runProbes()
129                 go wp.runSync()
130         }()
131         return wp
132 }
133
134 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
135 // zero Pool should not be used. Call NewPool to create a new Pool.
136 type Pool struct {
137         // configuration
138         logger                         logrus.FieldLogger
139         arvClient                      *arvados.Client
140         instanceSetID                  cloud.InstanceSetID
141         instanceSet                    *throttledInstanceSet
142         newExecutor                    func(cloud.Instance) Executor
143         bootProbeCommand               string
144         runnerSource                   string
145         imageID                        cloud.ImageID
146         instanceTypes                  map[string]arvados.InstanceType
147         syncInterval                   time.Duration
148         probeInterval                  time.Duration
149         maxProbesPerSecond             int
150         maxConcurrentInstanceCreateOps int
151         timeoutIdle                    time.Duration
152         timeoutBooting                 time.Duration
153         timeoutProbe                   time.Duration
154         timeoutShutdown                time.Duration
155         timeoutTERM                    time.Duration
156         timeoutSignal                  time.Duration
157         timeoutStaleRunLock            time.Duration
158         installPublicKey               ssh.PublicKey
159         tagKeyPrefix                   string
160
161         // private state
162         subscribers  map[<-chan struct{}]chan<- struct{}
163         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
164         workers      map[cloud.InstanceID]*worker
165         loaded       bool                 // loaded list of instances from InstanceSet at least once
166         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
167         atQuotaUntil time.Time
168         atQuotaErr   cloud.QuotaError
169         stop         chan bool
170         mtx          sync.RWMutex
171         setupOnce    sync.Once
172         runnerData   []byte
173         runnerMD5    [md5.Size]byte
174         runnerCmd    string
175
176         mContainersRunning        prometheus.Gauge
177         mInstances                *prometheus.GaugeVec
178         mInstancesPrice           *prometheus.GaugeVec
179         mVCPUs                    *prometheus.GaugeVec
180         mMemory                   *prometheus.GaugeVec
181         mBootOutcomes             *prometheus.CounterVec
182         mDisappearances           *prometheus.CounterVec
183         mTimeToSSH                prometheus.Summary
184         mTimeToReadyForContainer  prometheus.Summary
185         mTimeFromShutdownToGone   prometheus.Summary
186         mTimeFromQueueToCrunchRun prometheus.Summary
187         mRunProbeDuration         *prometheus.SummaryVec
188 }
189
190 type createCall struct {
191         time         time.Time
192         instanceType arvados.InstanceType
193 }
194
195 func (wp *Pool) CheckHealth() error {
196         wp.setupOnce.Do(wp.setup)
197         if err := wp.loadRunnerData(); err != nil {
198                 return fmt.Errorf("error loading runner binary: %s", err)
199         }
200         return nil
201 }
202
203 // Subscribe returns a buffered channel that becomes ready after any
204 // change to the pool's state that could have scheduling implications:
205 // a worker's state changes, a new worker appears, the cloud
206 // provider's API rate limiting period ends, etc.
207 //
208 // Additional events that occur while the channel is already ready
209 // will be dropped, so it is OK if the caller services the channel
210 // slowly.
211 //
212 // Example:
213 //
214 //      ch := wp.Subscribe()
215 //      defer wp.Unsubscribe(ch)
216 //      for range ch {
217 //              tryScheduling(wp)
218 //              if done {
219 //                      break
220 //              }
221 //      }
222 func (wp *Pool) Subscribe() <-chan struct{} {
223         wp.setupOnce.Do(wp.setup)
224         wp.mtx.Lock()
225         defer wp.mtx.Unlock()
226         ch := make(chan struct{}, 1)
227         wp.subscribers[ch] = ch
228         return ch
229 }
230
231 // Unsubscribe stops sending updates to the given channel.
232 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
233         wp.setupOnce.Do(wp.setup)
234         wp.mtx.Lock()
235         defer wp.mtx.Unlock()
236         delete(wp.subscribers, ch)
237 }
238
239 // Unallocated returns the number of unallocated (creating + booting +
240 // idle + unknown) workers for each instance type.  Workers in
241 // hold/drain mode are not included.
242 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
243         wp.setupOnce.Do(wp.setup)
244         wp.mtx.RLock()
245         defer wp.mtx.RUnlock()
246         unalloc := map[arvados.InstanceType]int{}
247         creating := map[arvados.InstanceType]int{}
248         oldestCreate := map[arvados.InstanceType]time.Time{}
249         for _, cc := range wp.creating {
250                 it := cc.instanceType
251                 creating[it]++
252                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
253                         oldestCreate[it] = cc.time
254                 }
255         }
256         for _, wkr := range wp.workers {
257                 // Skip workers that are not expected to become
258                 // available soon. Note len(wkr.running)>0 is not
259                 // redundant here: it can be true even in
260                 // StateUnknown.
261                 if wkr.state == StateShutdown ||
262                         wkr.state == StateRunning ||
263                         wkr.idleBehavior != IdleBehaviorRun ||
264                         len(wkr.running) > 0 {
265                         continue
266                 }
267                 it := wkr.instType
268                 unalloc[it]++
269                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
270                         // If up to N new workers appear in
271                         // Instances() while we are waiting for N
272                         // Create() calls to complete, we assume we're
273                         // just seeing a race between Instances() and
274                         // Create() responses.
275                         //
276                         // The other common reason why nodes have
277                         // state==Unknown is that they appeared at
278                         // startup, before any Create calls. They
279                         // don't match the above timing condition, so
280                         // we never mistakenly attribute them to
281                         // pending Create calls.
282                         creating[it]--
283                 }
284         }
285         for it, c := range creating {
286                 unalloc[it] += c
287         }
288         return unalloc
289 }
290
291 // Create a new instance with the given type, and add it to the worker
292 // pool. The worker is added immediately; instance creation runs in
293 // the background.
294 //
295 // Create returns false if a pre-existing error state prevents it from
296 // even attempting to create a new instance. Those errors are logged
297 // by the Pool, so the caller does not need to log anything in such
298 // cases.
299 func (wp *Pool) Create(it arvados.InstanceType) bool {
300         logger := wp.logger.WithField("InstanceType", it.Name)
301         wp.setupOnce.Do(wp.setup)
302         if wp.loadRunnerData() != nil {
303                 // Boot probe is certain to fail.
304                 return false
305         }
306         wp.mtx.Lock()
307         defer wp.mtx.Unlock()
308         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
309                 return false
310         }
311         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
312         // requests in flight. It was added to work around a limitation in Azure's
313         // managed disks, which support no more than 20 concurrent node creation
314         // requests from a single disk image (cf.
315         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
316         // The code assumes that node creation, from Azure's perspective, means the
317         // period until the instance appears in the "get all instances" list.
318         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
319                 logger.Info("reached MaxConcurrentInstanceCreateOps")
320                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
321                 return false
322         }
323         now := time.Now()
324         secret := randomHex(instanceSecretLength)
325         wp.creating[secret] = createCall{time: now, instanceType: it}
326         go func() {
327                 defer wp.notify()
328                 tags := cloud.InstanceTags{
329                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
330                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
331                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
332                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
333                 }
334                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
335                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
336                 wp.mtx.Lock()
337                 defer wp.mtx.Unlock()
338                 // delete() is deferred so the updateWorker() call
339                 // below knows to use StateBooting when adding a new
340                 // worker.
341                 defer delete(wp.creating, secret)
342                 if err != nil {
343                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
344                                 wp.atQuotaErr = err
345                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
346                                 time.AfterFunc(quotaErrorTTL, wp.notify)
347                         }
348                         logger.WithError(err).Error("create failed")
349                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
350                         return
351                 }
352                 wp.updateWorker(inst, it)
353         }()
354         return true
355 }
356
357 // AtQuota returns true if Create is not expected to work at the
358 // moment.
359 func (wp *Pool) AtQuota() bool {
360         wp.mtx.Lock()
361         defer wp.mtx.Unlock()
362         return time.Now().Before(wp.atQuotaUntil)
363 }
364
365 // SetIdleBehavior determines how the indicated instance will behave
366 // when it has no containers running.
367 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
368         wp.mtx.Lock()
369         defer wp.mtx.Unlock()
370         wkr, ok := wp.workers[id]
371         if !ok {
372                 return errors.New("requested instance does not exist")
373         }
374         wkr.setIdleBehavior(idleBehavior)
375         return nil
376 }
377
378 // Successful connection to the SSH daemon, update the mTimeToSSH metric
379 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
380         wp.mtx.Lock()
381         defer wp.mtx.Unlock()
382         wkr := wp.workers[inst.ID()]
383         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
384                 // the node is not in booting state (can happen if a-d-c is restarted) OR
385                 // this is not the first SSH connection
386                 return
387         }
388
389         wkr.firstSSHConnection = time.Now()
390         if wp.mTimeToSSH != nil {
391                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
392         }
393 }
394
395 // Add or update worker attached to the given instance.
396 //
397 // The second return value is true if a new worker is created.
398 //
399 // A newly added instance has state=StateBooting if its tags match an
400 // entry in wp.creating, otherwise StateUnknown.
401 //
402 // Caller must have lock.
403 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
404         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
405         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
406         id := inst.ID()
407         if wkr := wp.workers[id]; wkr != nil {
408                 wkr.executor.SetTarget(inst)
409                 wkr.instance = inst
410                 wkr.updated = time.Now()
411                 wkr.saveTags()
412                 return wkr, false
413         }
414
415         state := StateUnknown
416         if _, ok := wp.creating[secret]; ok {
417                 state = StateBooting
418         }
419
420         // If an instance has a valid IdleBehavior tag when it first
421         // appears, initialize the new worker accordingly (this is how
422         // we restore IdleBehavior that was set by a prior dispatch
423         // process); otherwise, default to "run". After this,
424         // wkr.idleBehavior is the source of truth, and will only be
425         // changed via SetIdleBehavior().
426         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
427         if !validIdleBehavior[idleBehavior] {
428                 idleBehavior = IdleBehaviorRun
429         }
430
431         logger := wp.logger.WithFields(logrus.Fields{
432                 "InstanceType": it.Name,
433                 "Instance":     inst.ID(),
434                 "Address":      inst.Address(),
435         })
436         logger.WithFields(logrus.Fields{
437                 "State":        state,
438                 "IdleBehavior": idleBehavior,
439         }).Infof("instance appeared in cloud")
440         now := time.Now()
441         wkr := &worker{
442                 mtx:          &wp.mtx,
443                 wp:           wp,
444                 logger:       logger,
445                 executor:     wp.newExecutor(inst),
446                 state:        state,
447                 idleBehavior: idleBehavior,
448                 instance:     inst,
449                 instType:     it,
450                 appeared:     now,
451                 probed:       now,
452                 busy:         now,
453                 updated:      now,
454                 running:      make(map[string]*remoteRunner),
455                 starting:     make(map[string]*remoteRunner),
456                 probing:      make(chan struct{}, 1),
457         }
458         wp.workers[id] = wkr
459         return wkr, true
460 }
461
462 // Shutdown shuts down a worker with the given type, or returns false
463 // if all workers with the given type are busy.
464 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
465         wp.setupOnce.Do(wp.setup)
466         wp.mtx.Lock()
467         defer wp.mtx.Unlock()
468         logger := wp.logger.WithField("InstanceType", it.Name)
469         logger.Info("shutdown requested")
470         for _, tryState := range []State{StateBooting, StateIdle} {
471                 // TODO: shutdown the worker with the longest idle
472                 // time (Idle) or the earliest create time (Booting)
473                 for _, wkr := range wp.workers {
474                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
475                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
476                                 wkr.reportBootOutcome(BootOutcomeAborted)
477                                 wkr.shutdown()
478                                 return true
479                         }
480                 }
481         }
482         return false
483 }
484
485 // CountWorkers returns the current number of workers in each state.
486 //
487 // CountWorkers blocks, if necessary, until the initial instance list
488 // has been loaded from the cloud provider.
489 func (wp *Pool) CountWorkers() map[State]int {
490         wp.setupOnce.Do(wp.setup)
491         wp.waitUntilLoaded()
492         wp.mtx.Lock()
493         defer wp.mtx.Unlock()
494         r := map[State]int{}
495         for _, w := range wp.workers {
496                 r[w.state]++
497         }
498         return r
499 }
500
501 // Running returns the container UUIDs being prepared/run on workers.
502 //
503 // In the returned map, the time value indicates when the Pool
504 // observed that the container process had exited. A container that
505 // has not yet exited has a zero time value. The caller should use
506 // ForgetContainer() to garbage-collect the entries for exited
507 // containers.
508 func (wp *Pool) Running() map[string]time.Time {
509         wp.setupOnce.Do(wp.setup)
510         wp.mtx.Lock()
511         defer wp.mtx.Unlock()
512         r := map[string]time.Time{}
513         for _, wkr := range wp.workers {
514                 for uuid := range wkr.running {
515                         r[uuid] = time.Time{}
516                 }
517                 for uuid := range wkr.starting {
518                         r[uuid] = time.Time{}
519                 }
520         }
521         for uuid, exited := range wp.exited {
522                 r[uuid] = exited
523         }
524         return r
525 }
526
527 // StartContainer starts a container on an idle worker immediately if
528 // possible, otherwise returns false.
529 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
530         wp.setupOnce.Do(wp.setup)
531         wp.mtx.Lock()
532         defer wp.mtx.Unlock()
533         var wkr *worker
534         for _, w := range wp.workers {
535                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
536                         if wkr == nil || w.busy.After(wkr.busy) {
537                                 wkr = w
538                         }
539                 }
540         }
541         if wkr == nil {
542                 return false
543         }
544         wkr.startContainer(ctr)
545         return true
546 }
547
548 // KillContainer kills the crunch-run process for the given container
549 // UUID, if it's running on any worker.
550 //
551 // KillContainer returns immediately; the act of killing the container
552 // takes some time, and runs in the background.
553 //
554 // KillContainer returns false if the container has already ended.
555 func (wp *Pool) KillContainer(uuid string, reason string) bool {
556         wp.mtx.Lock()
557         defer wp.mtx.Unlock()
558         logger := wp.logger.WithFields(logrus.Fields{
559                 "ContainerUUID": uuid,
560                 "Reason":        reason,
561         })
562         for _, wkr := range wp.workers {
563                 rr := wkr.running[uuid]
564                 if rr == nil {
565                         rr = wkr.starting[uuid]
566                 }
567                 if rr != nil {
568                         rr.Kill(reason)
569                         return true
570                 }
571         }
572         logger.Debug("cannot kill: already disappeared")
573         return false
574 }
575
576 // ForgetContainer clears the placeholder for the given exited
577 // container, so it isn't returned by subsequent calls to Running().
578 //
579 // ForgetContainer has no effect if the container has not yet exited.
580 //
581 // The "container exited at time T" placeholder (which necessitates
582 // ForgetContainer) exists to make it easier for the caller
583 // (scheduler) to distinguish a container that exited without
584 // finalizing its state from a container that exited too recently for
585 // its final state to have appeared in the scheduler's queue cache.
586 func (wp *Pool) ForgetContainer(uuid string) {
587         wp.mtx.Lock()
588         defer wp.mtx.Unlock()
589         if _, ok := wp.exited[uuid]; ok {
590                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
591                 delete(wp.exited, uuid)
592         }
593 }
594
595 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
596         if reg == nil {
597                 reg = prometheus.NewRegistry()
598         }
599         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
600                 Namespace: "arvados",
601                 Subsystem: "dispatchcloud",
602                 Name:      "containers_running",
603                 Help:      "Number of containers reported running by cloud VMs.",
604         })
605         reg.MustRegister(wp.mContainersRunning)
606         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
607                 Namespace: "arvados",
608                 Subsystem: "dispatchcloud",
609                 Name:      "instances_total",
610                 Help:      "Number of cloud VMs.",
611         }, []string{"category", "instance_type"})
612         reg.MustRegister(wp.mInstances)
613         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
614                 Namespace: "arvados",
615                 Subsystem: "dispatchcloud",
616                 Name:      "instances_price",
617                 Help:      "Price of cloud VMs.",
618         }, []string{"category"})
619         reg.MustRegister(wp.mInstancesPrice)
620         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
621                 Namespace: "arvados",
622                 Subsystem: "dispatchcloud",
623                 Name:      "vcpus_total",
624                 Help:      "Total VCPUs on all cloud VMs.",
625         }, []string{"category"})
626         reg.MustRegister(wp.mVCPUs)
627         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
628                 Namespace: "arvados",
629                 Subsystem: "dispatchcloud",
630                 Name:      "memory_bytes_total",
631                 Help:      "Total memory on all cloud VMs.",
632         }, []string{"category"})
633         reg.MustRegister(wp.mMemory)
634         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
635                 Namespace: "arvados",
636                 Subsystem: "dispatchcloud",
637                 Name:      "boot_outcomes",
638                 Help:      "Boot outcomes by type.",
639         }, []string{"outcome"})
640         for k := range validBootOutcomes {
641                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
642         }
643         reg.MustRegister(wp.mBootOutcomes)
644         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
645                 Namespace: "arvados",
646                 Subsystem: "dispatchcloud",
647                 Name:      "instances_disappeared",
648                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
649         }, []string{"state"})
650         for _, v := range stateString {
651                 wp.mDisappearances.WithLabelValues(v).Add(0)
652         }
653         reg.MustRegister(wp.mDisappearances)
654         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
655                 Namespace:  "arvados",
656                 Subsystem:  "dispatchcloud",
657                 Name:       "instances_time_to_ssh_seconds",
658                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
659                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
660         })
661         reg.MustRegister(wp.mTimeToSSH)
662         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
663                 Namespace:  "arvados",
664                 Subsystem:  "dispatchcloud",
665                 Name:       "instances_time_to_ready_for_container_seconds",
666                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
667                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
668         })
669         reg.MustRegister(wp.mTimeToReadyForContainer)
670         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
671                 Namespace:  "arvados",
672                 Subsystem:  "dispatchcloud",
673                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
674                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
675                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
676         })
677         reg.MustRegister(wp.mTimeFromShutdownToGone)
678         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
679                 Namespace:  "arvados",
680                 Subsystem:  "dispatchcloud",
681                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
682                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
683                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
684         })
685         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
686         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
687                 Namespace:  "arvados",
688                 Subsystem:  "dispatchcloud",
689                 Name:       "instances_run_probe_duration_seconds",
690                 Help:       "Number of seconds per runProbe call.",
691                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
692         }, []string{"outcome"})
693         for _, v := range []string{"success", "fail"} {
694                 wp.mRunProbeDuration.WithLabelValues(v).Observe(0)
695         }
696         reg.MustRegister(wp.mRunProbeDuration)
697 }
698
699 func (wp *Pool) runMetrics() {
700         ch := wp.Subscribe()
701         defer wp.Unsubscribe(ch)
702         wp.updateMetrics()
703         for range ch {
704                 wp.updateMetrics()
705         }
706 }
707
708 func (wp *Pool) updateMetrics() {
709         wp.mtx.RLock()
710         defer wp.mtx.RUnlock()
711
712         type entKey struct {
713                 cat      string
714                 instType string
715         }
716         instances := map[entKey]int64{}
717         price := map[string]float64{}
718         cpu := map[string]int64{}
719         mem := map[string]int64{}
720         var running int64
721         for _, wkr := range wp.workers {
722                 var cat string
723                 switch {
724                 case len(wkr.running)+len(wkr.starting) > 0:
725                         cat = "inuse"
726                 case wkr.idleBehavior == IdleBehaviorHold:
727                         cat = "hold"
728                 case wkr.state == StateBooting:
729                         cat = "booting"
730                 case wkr.state == StateUnknown:
731                         cat = "unknown"
732                 default:
733                         cat = "idle"
734                 }
735                 instances[entKey{cat, wkr.instType.Name}]++
736                 price[cat] += wkr.instType.Price
737                 cpu[cat] += int64(wkr.instType.VCPUs)
738                 mem[cat] += int64(wkr.instType.RAM)
739                 running += int64(len(wkr.running) + len(wkr.starting))
740         }
741         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
742                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
743                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
744                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
745                 // make sure to reset gauges for non-existing category/nodetype combinations
746                 for _, it := range wp.instanceTypes {
747                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
748                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
749                         }
750                 }
751         }
752         for k, v := range instances {
753                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
754         }
755         wp.mContainersRunning.Set(float64(running))
756 }
757
758 func (wp *Pool) runProbes() {
759         maxPPS := wp.maxProbesPerSecond
760         if maxPPS < 1 {
761                 maxPPS = defaultMaxProbesPerSecond
762         }
763         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
764         defer limitticker.Stop()
765
766         probeticker := time.NewTicker(wp.probeInterval)
767         defer probeticker.Stop()
768
769         workers := []cloud.InstanceID{}
770         for range probeticker.C {
771                 workers = workers[:0]
772                 wp.mtx.Lock()
773                 for id, wkr := range wp.workers {
774                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
775                                 continue
776                         }
777                         workers = append(workers, id)
778                 }
779                 wp.mtx.Unlock()
780
781                 for _, id := range workers {
782                         wp.mtx.Lock()
783                         wkr, ok := wp.workers[id]
784                         wp.mtx.Unlock()
785                         if !ok {
786                                 // Deleted while we were probing
787                                 // others
788                                 continue
789                         }
790                         go wkr.ProbeAndUpdate()
791                         select {
792                         case <-wp.stop:
793                                 return
794                         case <-limitticker.C:
795                         }
796                 }
797         }
798 }
799
800 func (wp *Pool) runSync() {
801         // sync once immediately, then wait syncInterval, sync again,
802         // etc.
803         timer := time.NewTimer(1)
804         for {
805                 select {
806                 case <-timer.C:
807                         err := wp.getInstancesAndSync()
808                         if err != nil {
809                                 wp.logger.WithError(err).Warn("sync failed")
810                         }
811                         timer.Reset(wp.syncInterval)
812                 case <-wp.stop:
813                         wp.logger.Debug("worker.Pool stopped")
814                         return
815                 }
816         }
817 }
818
819 // Stop synchronizing with the InstanceSet.
820 func (wp *Pool) Stop() {
821         wp.setupOnce.Do(wp.setup)
822         close(wp.stop)
823 }
824
825 // Instances returns an InstanceView for each worker in the pool,
826 // summarizing its current state and recent activity.
827 func (wp *Pool) Instances() []InstanceView {
828         var r []InstanceView
829         wp.setupOnce.Do(wp.setup)
830         wp.mtx.Lock()
831         for _, w := range wp.workers {
832                 r = append(r, InstanceView{
833                         Instance:             w.instance.ID(),
834                         Address:              w.instance.Address(),
835                         Price:                w.instType.Price,
836                         ArvadosInstanceType:  w.instType.Name,
837                         ProviderInstanceType: w.instType.ProviderType,
838                         LastContainerUUID:    w.lastUUID,
839                         LastBusy:             w.busy,
840                         WorkerState:          w.state.String(),
841                         IdleBehavior:         w.idleBehavior,
842                 })
843         }
844         wp.mtx.Unlock()
845         sort.Slice(r, func(i, j int) bool {
846                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
847         })
848         return r
849 }
850
851 // KillInstance destroys a cloud VM instance. It returns an error if
852 // the given instance does not exist.
853 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
854         wkr, ok := wp.workers[id]
855         if !ok {
856                 return errors.New("instance not found")
857         }
858         wkr.logger.WithField("Reason", reason).Info("shutting down")
859         wkr.reportBootOutcome(BootOutcomeAborted)
860         wkr.shutdown()
861         return nil
862 }
863
864 func (wp *Pool) setup() {
865         wp.creating = map[string]createCall{}
866         wp.exited = map[string]time.Time{}
867         wp.workers = map[cloud.InstanceID]*worker{}
868         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
869         wp.loadRunnerData()
870 }
871
872 // Load the runner program to be deployed on worker nodes into
873 // wp.runnerData, if necessary. Errors are logged.
874 //
875 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
876 //
877 // Caller must not have lock.
878 func (wp *Pool) loadRunnerData() error {
879         wp.mtx.Lock()
880         defer wp.mtx.Unlock()
881         if wp.runnerData != nil {
882                 return nil
883         } else if wp.runnerSource == "" {
884                 wp.runnerCmd = "crunch-run"
885                 wp.runnerData = []byte{}
886                 return nil
887         }
888         logger := wp.logger.WithField("source", wp.runnerSource)
889         logger.Debug("loading runner")
890         buf, err := ioutil.ReadFile(wp.runnerSource)
891         if err != nil {
892                 logger.WithError(err).Error("failed to load runner program")
893                 return err
894         }
895         wp.runnerData = buf
896         wp.runnerMD5 = md5.Sum(buf)
897         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
898         return nil
899 }
900
901 func (wp *Pool) notify() {
902         wp.mtx.RLock()
903         defer wp.mtx.RUnlock()
904         for _, send := range wp.subscribers {
905                 select {
906                 case send <- struct{}{}:
907                 default:
908                 }
909         }
910 }
911
912 func (wp *Pool) getInstancesAndSync() error {
913         wp.setupOnce.Do(wp.setup)
914         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
915                 return err
916         }
917         wp.logger.Debug("getting instance list")
918         threshold := time.Now()
919         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
920         if err != nil {
921                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
922                 return err
923         }
924         wp.sync(threshold, instances)
925         wp.logger.Debug("sync done")
926         return nil
927 }
928
929 // Add/remove/update workers based on instances, which was obtained
930 // from the instanceSet. However, don't clobber any other updates that
931 // already happened after threshold.
932 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
933         wp.mtx.Lock()
934         defer wp.mtx.Unlock()
935         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
936         notify := false
937
938         for _, inst := range instances {
939                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
940                 it, ok := wp.instanceTypes[itTag]
941                 if !ok {
942                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
943                         continue
944                 }
945                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
946                         notify = true
947                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
948                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
949                         wkr.shutdown()
950                 }
951         }
952
953         for id, wkr := range wp.workers {
954                 if wkr.updated.After(threshold) {
955                         continue
956                 }
957                 logger := wp.logger.WithFields(logrus.Fields{
958                         "Instance":    wkr.instance.ID(),
959                         "WorkerState": wkr.state,
960                 })
961                 logger.Info("instance disappeared in cloud")
962                 wkr.reportBootOutcome(BootOutcomeDisappeared)
963                 if wp.mDisappearances != nil {
964                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
965                 }
966                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
967                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
968                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
969                 }
970                 delete(wp.workers, id)
971                 go wkr.Close()
972                 notify = true
973         }
974
975         if !wp.loaded {
976                 notify = true
977                 wp.loaded = true
978                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
979         }
980
981         if notify {
982                 go wp.notify()
983         }
984 }
985
986 func (wp *Pool) waitUntilLoaded() {
987         ch := wp.Subscribe()
988         wp.mtx.RLock()
989         defer wp.mtx.RUnlock()
990         for !wp.loaded {
991                 wp.mtx.RUnlock()
992                 <-ch
993                 wp.mtx.RLock()
994         }
995 }
996
997 // Return a random string of n hexadecimal digits (n*4 random bits). n
998 // must be even.
999 func randomHex(n int) string {
1000         buf := make([]byte, n/2)
1001         _, err := rand.Read(buf)
1002         if err != nil {
1003                 panic(err)
1004         }
1005         return fmt.Sprintf("%x", buf)
1006 }