17170: Merge branch 'master'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "sort"
17         "strings"
18         "sync"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/cloud"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "github.com/prometheus/client_golang/prometheus"
24         "github.com/sirupsen/logrus"
25         "golang.org/x/crypto/ssh"
26 )
27
28 const (
29         tagKeyInstanceType   = "InstanceType"
30         tagKeyIdleBehavior   = "IdleBehavior"
31         tagKeyInstanceSecret = "InstanceSecret"
32         tagKeyInstanceSetID  = "InstanceSetID"
33 )
34
35 // An InstanceView shows a worker's current state and recent activity.
36 type InstanceView struct {
37         Instance             cloud.InstanceID `json:"instance"`
38         Address              string           `json:"address"`
39         Price                float64          `json:"price"`
40         ArvadosInstanceType  string           `json:"arvados_instance_type"`
41         ProviderInstanceType string           `json:"provider_instance_type"`
42         LastContainerUUID    string           `json:"last_container_uuid"`
43         LastBusy             time.Time        `json:"last_busy"`
44         WorkerState          string           `json:"worker_state"`
45         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
46 }
47
48 // An Executor executes shell commands on a remote host.
49 type Executor interface {
50         // Run cmd on the current target.
51         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
52
53         // Use the given target for subsequent operations. The new
54         // target is the same host as the previous target, but it
55         // might return a different address and verify a different
56         // host key.
57         //
58         // SetTarget is called frequently, and in most cases the new
59         // target will behave exactly the same as the old one. An
60         // implementation should optimize accordingly.
61         //
62         // SetTarget must not block on concurrent Execute calls.
63         SetTarget(cloud.ExecutorTarget)
64
65         Close()
66 }
67
68 const (
69         defaultSyncInterval        = time.Minute
70         defaultProbeInterval       = time.Second * 10
71         defaultMaxProbesPerSecond  = 10
72         defaultTimeoutIdle         = time.Minute
73         defaultTimeoutBooting      = time.Minute * 10
74         defaultTimeoutProbe        = time.Minute * 10
75         defaultTimeoutShutdown     = time.Second * 10
76         defaultTimeoutTERM         = time.Minute * 2
77         defaultTimeoutSignal       = time.Second * 5
78         defaultTimeoutStaleRunLock = time.Second * 5
79
80         // Time after a quota error to try again anyway, even if no
81         // instances have been shutdown.
82         quotaErrorTTL = time.Minute
83
84         // Time between "X failed because rate limiting" messages
85         logRateLimitErrorInterval = time.Second * 10
86 )
87
88 func duration(conf arvados.Duration, def time.Duration) time.Duration {
89         if conf > 0 {
90                 return time.Duration(conf)
91         }
92         return def
93 }
94
95 // NewPool creates a Pool of workers backed by instanceSet.
96 //
97 // New instances are configured and set up according to the given
98 // cluster configuration.
99 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100         wp := &Pool{
101                 logger:                         logger,
102                 arvClient:                      arvClient,
103                 instanceSetID:                  instanceSetID,
104                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
105                 newExecutor:                    newExecutor,
106                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
107                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
108                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
109                 instanceTypes:                  cluster.InstanceTypes,
110                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
111                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
112                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
113                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
114                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
115                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
116                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
117                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
118                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
119                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
120                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
121                 systemRootToken:                cluster.SystemRootToken,
122                 installPublicKey:               installPublicKey,
123                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
124                 stop:                           make(chan bool),
125         }
126         wp.registerMetrics(reg)
127         go func() {
128                 wp.setupOnce.Do(wp.setup)
129                 go wp.runMetrics()
130                 go wp.runProbes()
131                 go wp.runSync()
132         }()
133         return wp
134 }
135
136 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
137 // zero Pool should not be used. Call NewPool to create a new Pool.
138 type Pool struct {
139         // configuration
140         logger                         logrus.FieldLogger
141         arvClient                      *arvados.Client
142         instanceSetID                  cloud.InstanceSetID
143         instanceSet                    *throttledInstanceSet
144         newExecutor                    func(cloud.Instance) Executor
145         bootProbeCommand               string
146         runnerSource                   string
147         imageID                        cloud.ImageID
148         instanceTypes                  map[string]arvados.InstanceType
149         syncInterval                   time.Duration
150         probeInterval                  time.Duration
151         maxProbesPerSecond             int
152         maxConcurrentInstanceCreateOps int
153         timeoutIdle                    time.Duration
154         timeoutBooting                 time.Duration
155         timeoutProbe                   time.Duration
156         timeoutShutdown                time.Duration
157         timeoutTERM                    time.Duration
158         timeoutSignal                  time.Duration
159         timeoutStaleRunLock            time.Duration
160         systemRootToken                string
161         installPublicKey               ssh.PublicKey
162         tagKeyPrefix                   string
163
164         // private state
165         subscribers  map[<-chan struct{}]chan<- struct{}
166         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
167         workers      map[cloud.InstanceID]*worker
168         loaded       bool                 // loaded list of instances from InstanceSet at least once
169         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
170         atQuotaUntil time.Time
171         atQuotaErr   cloud.QuotaError
172         stop         chan bool
173         mtx          sync.RWMutex
174         setupOnce    sync.Once
175         runnerData   []byte
176         runnerMD5    [md5.Size]byte
177         runnerCmd    string
178
179         mContainersRunning        prometheus.Gauge
180         mInstances                *prometheus.GaugeVec
181         mInstancesPrice           *prometheus.GaugeVec
182         mVCPUs                    *prometheus.GaugeVec
183         mMemory                   *prometheus.GaugeVec
184         mBootOutcomes             *prometheus.CounterVec
185         mDisappearances           *prometheus.CounterVec
186         mTimeToSSH                prometheus.Summary
187         mTimeToReadyForContainer  prometheus.Summary
188         mTimeFromShutdownToGone   prometheus.Summary
189         mTimeFromQueueToCrunchRun prometheus.Summary
190         mRunProbeDuration         *prometheus.SummaryVec
191 }
192
193 type createCall struct {
194         time         time.Time
195         instanceType arvados.InstanceType
196 }
197
198 func (wp *Pool) CheckHealth() error {
199         wp.setupOnce.Do(wp.setup)
200         if err := wp.loadRunnerData(); err != nil {
201                 return fmt.Errorf("error loading runner binary: %s", err)
202         }
203         return nil
204 }
205
206 // Subscribe returns a buffered channel that becomes ready after any
207 // change to the pool's state that could have scheduling implications:
208 // a worker's state changes, a new worker appears, the cloud
209 // provider's API rate limiting period ends, etc.
210 //
211 // Additional events that occur while the channel is already ready
212 // will be dropped, so it is OK if the caller services the channel
213 // slowly.
214 //
215 // Example:
216 //
217 //      ch := wp.Subscribe()
218 //      defer wp.Unsubscribe(ch)
219 //      for range ch {
220 //              tryScheduling(wp)
221 //              if done {
222 //                      break
223 //              }
224 //      }
225 func (wp *Pool) Subscribe() <-chan struct{} {
226         wp.setupOnce.Do(wp.setup)
227         wp.mtx.Lock()
228         defer wp.mtx.Unlock()
229         ch := make(chan struct{}, 1)
230         wp.subscribers[ch] = ch
231         return ch
232 }
233
234 // Unsubscribe stops sending updates to the given channel.
235 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
236         wp.setupOnce.Do(wp.setup)
237         wp.mtx.Lock()
238         defer wp.mtx.Unlock()
239         delete(wp.subscribers, ch)
240 }
241
242 // Unallocated returns the number of unallocated (creating + booting +
243 // idle + unknown) workers for each instance type.  Workers in
244 // hold/drain mode are not included.
245 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
246         wp.setupOnce.Do(wp.setup)
247         wp.mtx.RLock()
248         defer wp.mtx.RUnlock()
249         unalloc := map[arvados.InstanceType]int{}
250         creating := map[arvados.InstanceType]int{}
251         oldestCreate := map[arvados.InstanceType]time.Time{}
252         for _, cc := range wp.creating {
253                 it := cc.instanceType
254                 creating[it]++
255                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
256                         oldestCreate[it] = cc.time
257                 }
258         }
259         for _, wkr := range wp.workers {
260                 // Skip workers that are not expected to become
261                 // available soon. Note len(wkr.running)>0 is not
262                 // redundant here: it can be true even in
263                 // StateUnknown.
264                 if wkr.state == StateShutdown ||
265                         wkr.state == StateRunning ||
266                         wkr.idleBehavior != IdleBehaviorRun ||
267                         len(wkr.running) > 0 {
268                         continue
269                 }
270                 it := wkr.instType
271                 unalloc[it]++
272                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
273                         // If up to N new workers appear in
274                         // Instances() while we are waiting for N
275                         // Create() calls to complete, we assume we're
276                         // just seeing a race between Instances() and
277                         // Create() responses.
278                         //
279                         // The other common reason why nodes have
280                         // state==Unknown is that they appeared at
281                         // startup, before any Create calls. They
282                         // don't match the above timing condition, so
283                         // we never mistakenly attribute them to
284                         // pending Create calls.
285                         creating[it]--
286                 }
287         }
288         for it, c := range creating {
289                 unalloc[it] += c
290         }
291         return unalloc
292 }
293
294 // Create a new instance with the given type, and add it to the worker
295 // pool. The worker is added immediately; instance creation runs in
296 // the background.
297 //
298 // Create returns false if a pre-existing error state prevents it from
299 // even attempting to create a new instance. Those errors are logged
300 // by the Pool, so the caller does not need to log anything in such
301 // cases.
302 func (wp *Pool) Create(it arvados.InstanceType) bool {
303         logger := wp.logger.WithField("InstanceType", it.Name)
304         wp.setupOnce.Do(wp.setup)
305         if wp.loadRunnerData() != nil {
306                 // Boot probe is certain to fail.
307                 return false
308         }
309         wp.mtx.Lock()
310         defer wp.mtx.Unlock()
311         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
312                 return false
313         }
314         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
315         // requests in flight. It was added to work around a limitation in Azure's
316         // managed disks, which support no more than 20 concurrent node creation
317         // requests from a single disk image (cf.
318         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
319         // The code assumes that node creation, from Azure's perspective, means the
320         // period until the instance appears in the "get all instances" list.
321         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
322                 logger.Info("reached MaxConcurrentInstanceCreateOps")
323                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
324                 return false
325         }
326         now := time.Now()
327         secret := randomHex(instanceSecretLength)
328         wp.creating[secret] = createCall{time: now, instanceType: it}
329         go func() {
330                 defer wp.notify()
331                 tags := cloud.InstanceTags{
332                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
333                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
334                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
335                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
336                 }
337                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
338                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
339                 wp.mtx.Lock()
340                 defer wp.mtx.Unlock()
341                 // delete() is deferred so the updateWorker() call
342                 // below knows to use StateBooting when adding a new
343                 // worker.
344                 defer delete(wp.creating, secret)
345                 if err != nil {
346                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
347                                 wp.atQuotaErr = err
348                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
349                                 time.AfterFunc(quotaErrorTTL, wp.notify)
350                         }
351                         logger.WithError(err).Error("create failed")
352                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
353                         return
354                 }
355                 wp.updateWorker(inst, it)
356         }()
357         return true
358 }
359
360 // AtQuota returns true if Create is not expected to work at the
361 // moment.
362 func (wp *Pool) AtQuota() bool {
363         wp.mtx.Lock()
364         defer wp.mtx.Unlock()
365         return time.Now().Before(wp.atQuotaUntil)
366 }
367
368 // SetIdleBehavior determines how the indicated instance will behave
369 // when it has no containers running.
370 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
371         wp.mtx.Lock()
372         defer wp.mtx.Unlock()
373         wkr, ok := wp.workers[id]
374         if !ok {
375                 return errors.New("requested instance does not exist")
376         }
377         wkr.setIdleBehavior(idleBehavior)
378         return nil
379 }
380
381 // Successful connection to the SSH daemon, update the mTimeToSSH metric
382 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
383         wp.mtx.Lock()
384         defer wp.mtx.Unlock()
385         wkr := wp.workers[inst.ID()]
386         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
387                 // the node is not in booting state (can happen if a-d-c is restarted) OR
388                 // this is not the first SSH connection
389                 return
390         }
391
392         wkr.firstSSHConnection = time.Now()
393         if wp.mTimeToSSH != nil {
394                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
395         }
396 }
397
398 // Add or update worker attached to the given instance.
399 //
400 // The second return value is true if a new worker is created.
401 //
402 // A newly added instance has state=StateBooting if its tags match an
403 // entry in wp.creating, otherwise StateUnknown.
404 //
405 // Caller must have lock.
406 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
407         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
408         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
409         id := inst.ID()
410         if wkr := wp.workers[id]; wkr != nil {
411                 wkr.executor.SetTarget(inst)
412                 wkr.instance = inst
413                 wkr.updated = time.Now()
414                 wkr.saveTags()
415                 return wkr, false
416         }
417
418         state := StateUnknown
419         if _, ok := wp.creating[secret]; ok {
420                 state = StateBooting
421         }
422
423         // If an instance has a valid IdleBehavior tag when it first
424         // appears, initialize the new worker accordingly (this is how
425         // we restore IdleBehavior that was set by a prior dispatch
426         // process); otherwise, default to "run". After this,
427         // wkr.idleBehavior is the source of truth, and will only be
428         // changed via SetIdleBehavior().
429         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
430         if !validIdleBehavior[idleBehavior] {
431                 idleBehavior = IdleBehaviorRun
432         }
433
434         logger := wp.logger.WithFields(logrus.Fields{
435                 "InstanceType": it.Name,
436                 "Instance":     inst.ID(),
437                 "Address":      inst.Address(),
438         })
439         logger.WithFields(logrus.Fields{
440                 "State":        state,
441                 "IdleBehavior": idleBehavior,
442         }).Infof("instance appeared in cloud")
443         now := time.Now()
444         wkr := &worker{
445                 mtx:          &wp.mtx,
446                 wp:           wp,
447                 logger:       logger,
448                 executor:     wp.newExecutor(inst),
449                 state:        state,
450                 idleBehavior: idleBehavior,
451                 instance:     inst,
452                 instType:     it,
453                 appeared:     now,
454                 probed:       now,
455                 busy:         now,
456                 updated:      now,
457                 running:      make(map[string]*remoteRunner),
458                 starting:     make(map[string]*remoteRunner),
459                 probing:      make(chan struct{}, 1),
460         }
461         wp.workers[id] = wkr
462         return wkr, true
463 }
464
465 // Shutdown shuts down a worker with the given type, or returns false
466 // if all workers with the given type are busy.
467 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
468         wp.setupOnce.Do(wp.setup)
469         wp.mtx.Lock()
470         defer wp.mtx.Unlock()
471         logger := wp.logger.WithField("InstanceType", it.Name)
472         logger.Info("shutdown requested")
473         for _, tryState := range []State{StateBooting, StateIdle} {
474                 // TODO: shutdown the worker with the longest idle
475                 // time (Idle) or the earliest create time (Booting)
476                 for _, wkr := range wp.workers {
477                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
478                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
479                                 wkr.reportBootOutcome(BootOutcomeAborted)
480                                 wkr.shutdown()
481                                 return true
482                         }
483                 }
484         }
485         return false
486 }
487
488 // CountWorkers returns the current number of workers in each state.
489 //
490 // CountWorkers blocks, if necessary, until the initial instance list
491 // has been loaded from the cloud provider.
492 func (wp *Pool) CountWorkers() map[State]int {
493         wp.setupOnce.Do(wp.setup)
494         wp.waitUntilLoaded()
495         wp.mtx.Lock()
496         defer wp.mtx.Unlock()
497         r := map[State]int{}
498         for _, w := range wp.workers {
499                 r[w.state]++
500         }
501         return r
502 }
503
504 // Running returns the container UUIDs being prepared/run on workers.
505 //
506 // In the returned map, the time value indicates when the Pool
507 // observed that the container process had exited. A container that
508 // has not yet exited has a zero time value. The caller should use
509 // ForgetContainer() to garbage-collect the entries for exited
510 // containers.
511 func (wp *Pool) Running() map[string]time.Time {
512         wp.setupOnce.Do(wp.setup)
513         wp.mtx.Lock()
514         defer wp.mtx.Unlock()
515         r := map[string]time.Time{}
516         for _, wkr := range wp.workers {
517                 for uuid := range wkr.running {
518                         r[uuid] = time.Time{}
519                 }
520                 for uuid := range wkr.starting {
521                         r[uuid] = time.Time{}
522                 }
523         }
524         for uuid, exited := range wp.exited {
525                 r[uuid] = exited
526         }
527         return r
528 }
529
530 // StartContainer starts a container on an idle worker immediately if
531 // possible, otherwise returns false.
532 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
533         wp.setupOnce.Do(wp.setup)
534         wp.mtx.Lock()
535         defer wp.mtx.Unlock()
536         var wkr *worker
537         for _, w := range wp.workers {
538                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
539                         if wkr == nil || w.busy.After(wkr.busy) {
540                                 wkr = w
541                         }
542                 }
543         }
544         if wkr == nil {
545                 return false
546         }
547         wkr.startContainer(ctr)
548         return true
549 }
550
551 // KillContainer kills the crunch-run process for the given container
552 // UUID, if it's running on any worker.
553 //
554 // KillContainer returns immediately; the act of killing the container
555 // takes some time, and runs in the background.
556 //
557 // KillContainer returns false if the container has already ended.
558 func (wp *Pool) KillContainer(uuid string, reason string) bool {
559         wp.mtx.Lock()
560         defer wp.mtx.Unlock()
561         logger := wp.logger.WithFields(logrus.Fields{
562                 "ContainerUUID": uuid,
563                 "Reason":        reason,
564         })
565         for _, wkr := range wp.workers {
566                 rr := wkr.running[uuid]
567                 if rr == nil {
568                         rr = wkr.starting[uuid]
569                 }
570                 if rr != nil {
571                         rr.Kill(reason)
572                         return true
573                 }
574         }
575         logger.Debug("cannot kill: already disappeared")
576         return false
577 }
578
579 // ForgetContainer clears the placeholder for the given exited
580 // container, so it isn't returned by subsequent calls to Running().
581 //
582 // ForgetContainer has no effect if the container has not yet exited.
583 //
584 // The "container exited at time T" placeholder (which necessitates
585 // ForgetContainer) exists to make it easier for the caller
586 // (scheduler) to distinguish a container that exited without
587 // finalizing its state from a container that exited too recently for
588 // its final state to have appeared in the scheduler's queue cache.
589 func (wp *Pool) ForgetContainer(uuid string) {
590         wp.mtx.Lock()
591         defer wp.mtx.Unlock()
592         if _, ok := wp.exited[uuid]; ok {
593                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
594                 delete(wp.exited, uuid)
595         }
596 }
597
598 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
599         if reg == nil {
600                 reg = prometheus.NewRegistry()
601         }
602         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
603                 Namespace: "arvados",
604                 Subsystem: "dispatchcloud",
605                 Name:      "containers_running",
606                 Help:      "Number of containers reported running by cloud VMs.",
607         })
608         reg.MustRegister(wp.mContainersRunning)
609         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
610                 Namespace: "arvados",
611                 Subsystem: "dispatchcloud",
612                 Name:      "instances_total",
613                 Help:      "Number of cloud VMs.",
614         }, []string{"category", "instance_type"})
615         reg.MustRegister(wp.mInstances)
616         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
617                 Namespace: "arvados",
618                 Subsystem: "dispatchcloud",
619                 Name:      "instances_price",
620                 Help:      "Price of cloud VMs.",
621         }, []string{"category"})
622         reg.MustRegister(wp.mInstancesPrice)
623         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
624                 Namespace: "arvados",
625                 Subsystem: "dispatchcloud",
626                 Name:      "vcpus_total",
627                 Help:      "Total VCPUs on all cloud VMs.",
628         }, []string{"category"})
629         reg.MustRegister(wp.mVCPUs)
630         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
631                 Namespace: "arvados",
632                 Subsystem: "dispatchcloud",
633                 Name:      "memory_bytes_total",
634                 Help:      "Total memory on all cloud VMs.",
635         }, []string{"category"})
636         reg.MustRegister(wp.mMemory)
637         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
638                 Namespace: "arvados",
639                 Subsystem: "dispatchcloud",
640                 Name:      "boot_outcomes",
641                 Help:      "Boot outcomes by type.",
642         }, []string{"outcome"})
643         for k := range validBootOutcomes {
644                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
645         }
646         reg.MustRegister(wp.mBootOutcomes)
647         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
648                 Namespace: "arvados",
649                 Subsystem: "dispatchcloud",
650                 Name:      "instances_disappeared",
651                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
652         }, []string{"state"})
653         for _, v := range stateString {
654                 wp.mDisappearances.WithLabelValues(v).Add(0)
655         }
656         reg.MustRegister(wp.mDisappearances)
657         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
658                 Namespace:  "arvados",
659                 Subsystem:  "dispatchcloud",
660                 Name:       "instances_time_to_ssh_seconds",
661                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
662                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
663         })
664         reg.MustRegister(wp.mTimeToSSH)
665         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
666                 Namespace:  "arvados",
667                 Subsystem:  "dispatchcloud",
668                 Name:       "instances_time_to_ready_for_container_seconds",
669                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
670                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
671         })
672         reg.MustRegister(wp.mTimeToReadyForContainer)
673         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
674                 Namespace:  "arvados",
675                 Subsystem:  "dispatchcloud",
676                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
677                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
678                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
679         })
680         reg.MustRegister(wp.mTimeFromShutdownToGone)
681         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
682                 Namespace:  "arvados",
683                 Subsystem:  "dispatchcloud",
684                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
685                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
686                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
687         })
688         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
689         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
690                 Namespace:  "arvados",
691                 Subsystem:  "dispatchcloud",
692                 Name:       "instances_run_probe_duration_seconds",
693                 Help:       "Number of seconds per runProbe call.",
694                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
695         }, []string{"outcome"})
696         reg.MustRegister(wp.mRunProbeDuration)
697 }
698
699 func (wp *Pool) runMetrics() {
700         ch := wp.Subscribe()
701         defer wp.Unsubscribe(ch)
702         wp.updateMetrics()
703         for range ch {
704                 wp.updateMetrics()
705         }
706 }
707
708 func (wp *Pool) updateMetrics() {
709         wp.mtx.RLock()
710         defer wp.mtx.RUnlock()
711
712         type entKey struct {
713                 cat      string
714                 instType string
715         }
716         instances := map[entKey]int64{}
717         price := map[string]float64{}
718         cpu := map[string]int64{}
719         mem := map[string]int64{}
720         var running int64
721         for _, wkr := range wp.workers {
722                 var cat string
723                 switch {
724                 case len(wkr.running)+len(wkr.starting) > 0:
725                         cat = "inuse"
726                 case wkr.idleBehavior == IdleBehaviorHold:
727                         cat = "hold"
728                 case wkr.state == StateBooting:
729                         cat = "booting"
730                 case wkr.state == StateUnknown:
731                         cat = "unknown"
732                 default:
733                         cat = "idle"
734                 }
735                 instances[entKey{cat, wkr.instType.Name}]++
736                 price[cat] += wkr.instType.Price
737                 cpu[cat] += int64(wkr.instType.VCPUs)
738                 mem[cat] += int64(wkr.instType.RAM)
739                 running += int64(len(wkr.running) + len(wkr.starting))
740         }
741         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
742                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
743                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
744                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
745                 // make sure to reset gauges for non-existing category/nodetype combinations
746                 for _, it := range wp.instanceTypes {
747                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
748                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
749                         }
750                 }
751         }
752         for k, v := range instances {
753                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
754         }
755         wp.mContainersRunning.Set(float64(running))
756 }
757
758 func (wp *Pool) runProbes() {
759         maxPPS := wp.maxProbesPerSecond
760         if maxPPS < 1 {
761                 maxPPS = defaultMaxProbesPerSecond
762         }
763         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
764         defer limitticker.Stop()
765
766         probeticker := time.NewTicker(wp.probeInterval)
767         defer probeticker.Stop()
768
769         workers := []cloud.InstanceID{}
770         for range probeticker.C {
771                 workers = workers[:0]
772                 wp.mtx.Lock()
773                 for id, wkr := range wp.workers {
774                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
775                                 continue
776                         }
777                         workers = append(workers, id)
778                 }
779                 wp.mtx.Unlock()
780
781                 for _, id := range workers {
782                         wp.mtx.Lock()
783                         wkr, ok := wp.workers[id]
784                         wp.mtx.Unlock()
785                         if !ok {
786                                 // Deleted while we were probing
787                                 // others
788                                 continue
789                         }
790                         go wkr.ProbeAndUpdate()
791                         select {
792                         case <-wp.stop:
793                                 return
794                         case <-limitticker.C:
795                         }
796                 }
797         }
798 }
799
800 func (wp *Pool) runSync() {
801         // sync once immediately, then wait syncInterval, sync again,
802         // etc.
803         timer := time.NewTimer(1)
804         for {
805                 select {
806                 case <-timer.C:
807                         err := wp.getInstancesAndSync()
808                         if err != nil {
809                                 wp.logger.WithError(err).Warn("sync failed")
810                         }
811                         timer.Reset(wp.syncInterval)
812                 case <-wp.stop:
813                         wp.logger.Debug("worker.Pool stopped")
814                         return
815                 }
816         }
817 }
818
819 // Stop synchronizing with the InstanceSet.
820 func (wp *Pool) Stop() {
821         wp.setupOnce.Do(wp.setup)
822         close(wp.stop)
823 }
824
825 // Instances returns an InstanceView for each worker in the pool,
826 // summarizing its current state and recent activity.
827 func (wp *Pool) Instances() []InstanceView {
828         var r []InstanceView
829         wp.setupOnce.Do(wp.setup)
830         wp.mtx.Lock()
831         for _, w := range wp.workers {
832                 r = append(r, InstanceView{
833                         Instance:             w.instance.ID(),
834                         Address:              w.instance.Address(),
835                         Price:                w.instType.Price,
836                         ArvadosInstanceType:  w.instType.Name,
837                         ProviderInstanceType: w.instType.ProviderType,
838                         LastContainerUUID:    w.lastUUID,
839                         LastBusy:             w.busy,
840                         WorkerState:          w.state.String(),
841                         IdleBehavior:         w.idleBehavior,
842                 })
843         }
844         wp.mtx.Unlock()
845         sort.Slice(r, func(i, j int) bool {
846                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
847         })
848         return r
849 }
850
851 // KillInstance destroys a cloud VM instance. It returns an error if
852 // the given instance does not exist.
853 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
854         wkr, ok := wp.workers[id]
855         if !ok {
856                 return errors.New("instance not found")
857         }
858         wkr.logger.WithField("Reason", reason).Info("shutting down")
859         wkr.reportBootOutcome(BootOutcomeAborted)
860         wkr.shutdown()
861         return nil
862 }
863
864 func (wp *Pool) setup() {
865         wp.creating = map[string]createCall{}
866         wp.exited = map[string]time.Time{}
867         wp.workers = map[cloud.InstanceID]*worker{}
868         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
869         wp.loadRunnerData()
870 }
871
872 // Load the runner program to be deployed on worker nodes into
873 // wp.runnerData, if necessary. Errors are logged.
874 //
875 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
876 //
877 // Caller must not have lock.
878 func (wp *Pool) loadRunnerData() error {
879         wp.mtx.Lock()
880         defer wp.mtx.Unlock()
881         if wp.runnerData != nil {
882                 return nil
883         } else if wp.runnerSource == "" {
884                 wp.runnerCmd = "crunch-run"
885                 wp.runnerData = []byte{}
886                 return nil
887         }
888         logger := wp.logger.WithField("source", wp.runnerSource)
889         logger.Debug("loading runner")
890         buf, err := ioutil.ReadFile(wp.runnerSource)
891         if err != nil {
892                 logger.WithError(err).Error("failed to load runner program")
893                 return err
894         }
895         wp.runnerData = buf
896         wp.runnerMD5 = md5.Sum(buf)
897         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
898         return nil
899 }
900
901 func (wp *Pool) notify() {
902         wp.mtx.RLock()
903         defer wp.mtx.RUnlock()
904         for _, send := range wp.subscribers {
905                 select {
906                 case send <- struct{}{}:
907                 default:
908                 }
909         }
910 }
911
912 func (wp *Pool) getInstancesAndSync() error {
913         wp.setupOnce.Do(wp.setup)
914         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
915                 return err
916         }
917         wp.logger.Debug("getting instance list")
918         threshold := time.Now()
919         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
920         if err != nil {
921                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
922                 return err
923         }
924         wp.sync(threshold, instances)
925         wp.logger.Debug("sync done")
926         return nil
927 }
928
929 // Add/remove/update workers based on instances, which was obtained
930 // from the instanceSet. However, don't clobber any other updates that
931 // already happened after threshold.
932 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
933         wp.mtx.Lock()
934         defer wp.mtx.Unlock()
935         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
936         notify := false
937
938         for _, inst := range instances {
939                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
940                 it, ok := wp.instanceTypes[itTag]
941                 if !ok {
942                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
943                         continue
944                 }
945                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
946                         notify = true
947                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
948                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
949                         wkr.shutdown()
950                 }
951         }
952
953         for id, wkr := range wp.workers {
954                 if wkr.updated.After(threshold) {
955                         continue
956                 }
957                 logger := wp.logger.WithFields(logrus.Fields{
958                         "Instance":    wkr.instance.ID(),
959                         "WorkerState": wkr.state,
960                 })
961                 logger.Info("instance disappeared in cloud")
962                 wkr.reportBootOutcome(BootOutcomeDisappeared)
963                 if wp.mDisappearances != nil {
964                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
965                 }
966                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
967                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
968                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
969                 }
970                 delete(wp.workers, id)
971                 go wkr.Close()
972                 notify = true
973         }
974
975         if !wp.loaded {
976                 notify = true
977                 wp.loaded = true
978                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
979         }
980
981         if notify {
982                 go wp.notify()
983         }
984 }
985
986 func (wp *Pool) waitUntilLoaded() {
987         ch := wp.Subscribe()
988         wp.mtx.RLock()
989         defer wp.mtx.RUnlock()
990         for !wp.loaded {
991                 wp.mtx.RUnlock()
992                 <-ch
993                 wp.mtx.RLock()
994         }
995 }
996
997 func (wp *Pool) gatewayAuthSecret(uuid string) string {
998         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
999         fmt.Fprint(h, uuid)
1000         return fmt.Sprintf("%x", h.Sum(nil))
1001 }
1002
1003 // Return a random string of n hexadecimal digits (n*4 random bits). n
1004 // must be even.
1005 func randomHex(n int) string {
1006         buf := make([]byte, n/2)
1007         _, err := rand.Read(buf)
1008         if err != nil {
1009                 panic(err)
1010         }
1011         return fmt.Sprintf("%x", buf)
1012 }