21700: Install Bundler system-wide in Rails postinst
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time after a capacity error to try again
86         capacityErrorTTL = time.Minute
87
88         // Time between "X failed because rate limiting" messages
89         logRateLimitErrorInterval = time.Second * 10
90 )
91
92 func duration(conf arvados.Duration, def time.Duration) time.Duration {
93         if conf > 0 {
94                 return time.Duration(conf)
95         }
96         return def
97 }
98
99 // NewPool creates a Pool of workers backed by instanceSet.
100 //
101 // New instances are configured and set up according to the given
102 // cluster configuration.
103 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
104         wp := &Pool{
105                 logger:                         logger,
106                 arvClient:                      arvClient,
107                 instanceSetID:                  instanceSetID,
108                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
109                 newExecutor:                    newExecutor,
110                 cluster:                        cluster,
111                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
112                 instanceInitCommand:            cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
113                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
114                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
115                 instanceTypes:                  cluster.InstanceTypes,
116                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
117                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
118                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
119                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
120                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
121                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
122                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
123                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
124                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
125                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
126                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
127                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
128                 systemRootToken:                cluster.SystemRootToken,
129                 installPublicKey:               installPublicKey,
130                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
131                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
132                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
133                 stop:                           make(chan bool),
134         }
135         wp.registerMetrics(reg)
136         go func() {
137                 wp.setupOnce.Do(wp.setup)
138                 go wp.runMetrics()
139                 go wp.runProbes()
140                 go wp.runSync()
141         }()
142         return wp
143 }
144
145 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
146 // zero Pool should not be used. Call NewPool to create a new Pool.
147 type Pool struct {
148         // configuration
149         logger                         logrus.FieldLogger
150         arvClient                      *arvados.Client
151         instanceSetID                  cloud.InstanceSetID
152         instanceSet                    *throttledInstanceSet
153         newExecutor                    func(cloud.Instance) Executor
154         cluster                        *arvados.Cluster
155         bootProbeCommand               string
156         instanceInitCommand            cloud.InitCommand
157         runnerSource                   string
158         imageID                        cloud.ImageID
159         instanceTypes                  map[string]arvados.InstanceType
160         syncInterval                   time.Duration
161         probeInterval                  time.Duration
162         maxProbesPerSecond             int
163         maxConcurrentInstanceCreateOps int
164         maxInstances                   int
165         timeoutIdle                    time.Duration
166         timeoutBooting                 time.Duration
167         timeoutProbe                   time.Duration
168         timeoutShutdown                time.Duration
169         timeoutTERM                    time.Duration
170         timeoutSignal                  time.Duration
171         timeoutStaleRunLock            time.Duration
172         systemRootToken                string
173         installPublicKey               ssh.PublicKey
174         tagKeyPrefix                   string
175         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
176         runnerArgs                     []string // extra args passed to crunch-run
177
178         // private state
179         subscribers                map[<-chan struct{}]chan<- struct{}
180         creating                   map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
181         workers                    map[cloud.InstanceID]*worker
182         loaded                     bool                 // loaded list of instances from InstanceSet at least once
183         exited                     map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
184         atQuotaUntilFewerInstances int
185         atQuotaUntil               time.Time
186         atQuotaErr                 cloud.QuotaError
187         atCapacityUntil            map[string]time.Time
188         stop                       chan bool
189         mtx                        sync.RWMutex
190         setupOnce                  sync.Once
191         runnerData                 []byte
192         runnerMD5                  [md5.Size]byte
193         runnerCmd                  string
194
195         mContainersRunning        prometheus.Gauge
196         mInstances                *prometheus.GaugeVec
197         mInstancesPrice           *prometheus.GaugeVec
198         mVCPUs                    *prometheus.GaugeVec
199         mMemory                   *prometheus.GaugeVec
200         mBootOutcomes             *prometheus.CounterVec
201         mDisappearances           *prometheus.CounterVec
202         mTimeToSSH                prometheus.Summary
203         mTimeToReadyForContainer  prometheus.Summary
204         mTimeFromShutdownToGone   prometheus.Summary
205         mTimeFromQueueToCrunchRun prometheus.Summary
206         mRunProbeDuration         *prometheus.SummaryVec
207         mProbeAgeMax              prometheus.Gauge
208         mProbeAgeMedian           prometheus.Gauge
209 }
210
211 type createCall struct {
212         time         time.Time
213         instanceType arvados.InstanceType
214 }
215
216 func (wp *Pool) CheckHealth() error {
217         wp.setupOnce.Do(wp.setup)
218         if err := wp.loadRunnerData(); err != nil {
219                 return fmt.Errorf("error loading runner binary: %s", err)
220         }
221         return nil
222 }
223
224 // Subscribe returns a buffered channel that becomes ready after any
225 // change to the pool's state that could have scheduling implications:
226 // a worker's state changes, a new worker appears, the cloud
227 // provider's API rate limiting period ends, etc.
228 //
229 // Additional events that occur while the channel is already ready
230 // will be dropped, so it is OK if the caller services the channel
231 // slowly.
232 //
233 // Example:
234 //
235 //      ch := wp.Subscribe()
236 //      defer wp.Unsubscribe(ch)
237 //      for range ch {
238 //              tryScheduling(wp)
239 //              if done {
240 //                      break
241 //              }
242 //      }
243 func (wp *Pool) Subscribe() <-chan struct{} {
244         wp.setupOnce.Do(wp.setup)
245         wp.mtx.Lock()
246         defer wp.mtx.Unlock()
247         ch := make(chan struct{}, 1)
248         wp.subscribers[ch] = ch
249         return ch
250 }
251
252 // Unsubscribe stops sending updates to the given channel.
253 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
254         wp.setupOnce.Do(wp.setup)
255         wp.mtx.Lock()
256         defer wp.mtx.Unlock()
257         delete(wp.subscribers, ch)
258 }
259
260 // Unallocated returns the number of unallocated (creating + booting +
261 // idle + unknown) workers for each instance type.  Workers in
262 // hold/drain mode are not included.
263 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
264         wp.setupOnce.Do(wp.setup)
265         wp.mtx.RLock()
266         defer wp.mtx.RUnlock()
267         unalloc := map[arvados.InstanceType]int{}
268         creating := map[arvados.InstanceType]int{}
269         oldestCreate := map[arvados.InstanceType]time.Time{}
270         for _, cc := range wp.creating {
271                 it := cc.instanceType
272                 creating[it]++
273                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
274                         oldestCreate[it] = cc.time
275                 }
276         }
277         for _, wkr := range wp.workers {
278                 // Skip workers that are not expected to become
279                 // available soon. Note len(wkr.running)>0 is not
280                 // redundant here: it can be true even in
281                 // StateUnknown.
282                 if wkr.state == StateShutdown ||
283                         wkr.state == StateRunning ||
284                         wkr.idleBehavior != IdleBehaviorRun ||
285                         len(wkr.running) > 0 {
286                         continue
287                 }
288                 it := wkr.instType
289                 unalloc[it]++
290                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
291                         // If up to N new workers appear in
292                         // Instances() while we are waiting for N
293                         // Create() calls to complete, we assume we're
294                         // just seeing a race between Instances() and
295                         // Create() responses.
296                         //
297                         // The other common reason why nodes have
298                         // state==Unknown is that they appeared at
299                         // startup, before any Create calls. They
300                         // don't match the above timing condition, so
301                         // we never mistakenly attribute them to
302                         // pending Create calls.
303                         creating[it]--
304                 }
305         }
306         for it, c := range creating {
307                 unalloc[it] += c
308         }
309         return unalloc
310 }
311
312 // Create a new instance with the given type, and add it to the worker
313 // pool. The worker is added immediately; instance creation runs in
314 // the background.
315 //
316 // Create returns false if a pre-existing error or a configuration
317 // setting prevents it from even attempting to create a new
318 // instance. Those errors are logged by the Pool, so the caller does
319 // not need to log anything in such cases.
320 func (wp *Pool) Create(it arvados.InstanceType) bool {
321         logger := wp.logger.WithField("InstanceType", it.Name)
322         wp.setupOnce.Do(wp.setup)
323         if wp.loadRunnerData() != nil {
324                 // Boot probe is certain to fail.
325                 return false
326         }
327         if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
328                 return false
329         }
330         wp.mtx.Lock()
331         defer wp.mtx.Unlock()
332         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
333         // requests in flight. It was added to work around a limitation in Azure's
334         // managed disks, which support no more than 20 concurrent node creation
335         // requests from a single disk image (cf.
336         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
337         // The code assumes that node creation, from Azure's perspective, means the
338         // period until the instance appears in the "get all instances" list.
339         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
340                 logger.Info("reached MaxConcurrentInstanceCreateOps")
341                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
342                 return false
343         }
344         now := time.Now()
345         secret := randomHex(instanceSecretLength)
346         wp.creating[secret] = createCall{time: now, instanceType: it}
347         go func() {
348                 defer wp.notify()
349                 tags := cloud.InstanceTags{
350                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
351                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
352                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
353                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
354                 }
355                 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
356                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
357                 wp.mtx.Lock()
358                 defer wp.mtx.Unlock()
359                 // delete() is deferred so the updateWorker() call
360                 // below knows to use StateBooting when adding a new
361                 // worker.
362                 defer delete(wp.creating, secret)
363                 if err != nil {
364                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
365                                 wp.atQuotaErr = err
366                                 n := len(wp.workers) + len(wp.creating) - 1
367                                 if n < 1 {
368                                         // Quota error with no
369                                         // instances running --
370                                         // nothing to do but wait
371                                         wp.atQuotaUntilFewerInstances = 0
372                                         wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
373                                         time.AfterFunc(quotaErrorTTL, wp.notify)
374                                         logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
375                                 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
376                                         // Quota error with N
377                                         // instances running -- report
378                                         // AtQuota until some
379                                         // instances shut down
380                                         wp.atQuotaUntilFewerInstances = n
381                                         wp.atQuotaUntil = time.Time{}
382                                         logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
383                                 }
384                         }
385                         if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
386                                 capKey := it.ProviderType
387                                 if !err.IsInstanceTypeSpecific() {
388                                         // set capacity flag for all
389                                         // instance types
390                                         capKey = ""
391                                 }
392                                 if wp.atCapacityUntil == nil {
393                                         wp.atCapacityUntil = map[string]time.Time{}
394                                 }
395                                 wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
396                                 time.AfterFunc(capacityErrorTTL, wp.notify)
397                         }
398                         logger.WithError(err).Error("create failed")
399                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
400                         return
401                 }
402                 wp.updateWorker(inst, it)
403         }()
404         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
405                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
406         }
407         return true
408 }
409
410 // AtCapacity returns true if Create() is currently expected to fail
411 // for the given instance type.
412 func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
413         wp.mtx.Lock()
414         defer wp.mtx.Unlock()
415         if t, ok := wp.atCapacityUntil[it.ProviderType]; ok && time.Now().Before(t) {
416                 // at capacity for this instance type
417                 return true
418         }
419         if t, ok := wp.atCapacityUntil[""]; ok && time.Now().Before(t) {
420                 // at capacity for all instance types
421                 return true
422         }
423         return false
424 }
425
426 // AtQuota returns true if Create is not expected to work at the
427 // moment (e.g., cloud provider has reported quota errors, or we are
428 // already at our own configured quota).
429 func (wp *Pool) AtQuota() bool {
430         wp.mtx.Lock()
431         defer wp.mtx.Unlock()
432         return wp.atQuotaUntilFewerInstances > 0 ||
433                 time.Now().Before(wp.atQuotaUntil) ||
434                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
435 }
436
437 // SetIdleBehavior determines how the indicated instance will behave
438 // when it has no containers running.
439 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
440         wp.mtx.Lock()
441         defer wp.mtx.Unlock()
442         wkr, ok := wp.workers[id]
443         if !ok {
444                 return errors.New("requested instance does not exist")
445         }
446         wkr.setIdleBehavior(idleBehavior)
447         return nil
448 }
449
450 // Successful connection to the SSH daemon, update the mTimeToSSH metric
451 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
452         wp.mtx.Lock()
453         defer wp.mtx.Unlock()
454         wkr, ok := wp.workers[inst.ID()]
455         if !ok {
456                 // race: inst was removed from the pool
457                 return
458         }
459         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
460                 // the node is not in booting state (can happen if
461                 // a-d-c is restarted) OR this is not the first SSH
462                 // connection
463                 return
464         }
465
466         wkr.firstSSHConnection = time.Now()
467         if wp.mTimeToSSH != nil {
468                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
469         }
470 }
471
472 // Add or update worker attached to the given instance.
473 //
474 // The second return value is true if a new worker is created.
475 //
476 // A newly added instance has state=StateBooting if its tags match an
477 // entry in wp.creating, otherwise StateUnknown.
478 //
479 // Caller must have lock.
480 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
481         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
482         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
483         id := inst.ID()
484         if wkr := wp.workers[id]; wkr != nil {
485                 wkr.executor.SetTarget(inst)
486                 wkr.instance = inst
487                 wkr.updated = time.Now()
488                 wkr.saveTags()
489                 return wkr, false
490         }
491
492         state := StateUnknown
493         if _, ok := wp.creating[secret]; ok {
494                 state = StateBooting
495         }
496
497         // If an instance has a valid IdleBehavior tag when it first
498         // appears, initialize the new worker accordingly (this is how
499         // we restore IdleBehavior that was set by a prior dispatch
500         // process); otherwise, default to "run". After this,
501         // wkr.idleBehavior is the source of truth, and will only be
502         // changed via SetIdleBehavior().
503         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
504         if !validIdleBehavior[idleBehavior] {
505                 idleBehavior = IdleBehaviorRun
506         }
507
508         logger := wp.logger.WithFields(logrus.Fields{
509                 "InstanceType": it.Name,
510                 "Instance":     inst.ID(),
511                 "Address":      inst.Address(),
512         })
513         logger.WithFields(logrus.Fields{
514                 "State":        state,
515                 "IdleBehavior": idleBehavior,
516         }).Infof("instance appeared in cloud")
517         now := time.Now()
518         wkr := &worker{
519                 mtx:          &wp.mtx,
520                 wp:           wp,
521                 logger:       logger,
522                 executor:     wp.newExecutor(inst),
523                 state:        state,
524                 idleBehavior: idleBehavior,
525                 instance:     inst,
526                 instType:     it,
527                 appeared:     now,
528                 probed:       now,
529                 busy:         now,
530                 updated:      now,
531                 running:      make(map[string]*remoteRunner),
532                 starting:     make(map[string]*remoteRunner),
533                 probing:      make(chan struct{}, 1),
534         }
535         wp.workers[id] = wkr
536         return wkr, true
537 }
538
539 // Shutdown shuts down a worker with the given type, or returns false
540 // if all workers with the given type are busy.
541 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
542         wp.setupOnce.Do(wp.setup)
543         wp.mtx.Lock()
544         defer wp.mtx.Unlock()
545         logger := wp.logger.WithField("InstanceType", it.Name)
546         logger.Info("shutdown requested")
547         for _, tryState := range []State{StateBooting, StateIdle} {
548                 // TODO: shutdown the worker with the longest idle
549                 // time (Idle) or the earliest create time (Booting)
550                 for _, wkr := range wp.workers {
551                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
552                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
553                                 wkr.reportBootOutcome(BootOutcomeAborted)
554                                 wkr.shutdown()
555                                 return true
556                         }
557                 }
558         }
559         return false
560 }
561
562 // CountWorkers returns the current number of workers in each state.
563 //
564 // CountWorkers blocks, if necessary, until the initial instance list
565 // has been loaded from the cloud provider.
566 func (wp *Pool) CountWorkers() map[State]int {
567         wp.setupOnce.Do(wp.setup)
568         wp.waitUntilLoaded()
569         wp.mtx.Lock()
570         defer wp.mtx.Unlock()
571         r := map[State]int{}
572         for _, w := range wp.workers {
573                 r[w.state]++
574         }
575         return r
576 }
577
578 // Running returns the container UUIDs being prepared/run on workers.
579 //
580 // In the returned map, the time value indicates when the Pool
581 // observed that the container process had exited. A container that
582 // has not yet exited has a zero time value. The caller should use
583 // ForgetContainer() to garbage-collect the entries for exited
584 // containers.
585 func (wp *Pool) Running() map[string]time.Time {
586         wp.setupOnce.Do(wp.setup)
587         wp.mtx.Lock()
588         defer wp.mtx.Unlock()
589         r := map[string]time.Time{}
590         for _, wkr := range wp.workers {
591                 for uuid := range wkr.running {
592                         r[uuid] = time.Time{}
593                 }
594                 for uuid := range wkr.starting {
595                         r[uuid] = time.Time{}
596                 }
597         }
598         for uuid, exited := range wp.exited {
599                 r[uuid] = exited
600         }
601         return r
602 }
603
604 // StartContainer starts a container on an idle worker immediately if
605 // possible, otherwise returns false.
606 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
607         wp.setupOnce.Do(wp.setup)
608         wp.mtx.Lock()
609         defer wp.mtx.Unlock()
610         var wkr *worker
611         for _, w := range wp.workers {
612                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
613                         if wkr == nil || w.busy.After(wkr.busy) {
614                                 wkr = w
615                         }
616                 }
617         }
618         if wkr == nil {
619                 return false
620         }
621         wkr.startContainer(ctr)
622         return true
623 }
624
625 // KillContainer kills the crunch-run process for the given container
626 // UUID, if it's running on any worker.
627 //
628 // KillContainer returns immediately; the act of killing the container
629 // takes some time, and runs in the background.
630 //
631 // KillContainer returns false if the container has already ended.
632 func (wp *Pool) KillContainer(uuid string, reason string) bool {
633         wp.mtx.Lock()
634         defer wp.mtx.Unlock()
635         logger := wp.logger.WithFields(logrus.Fields{
636                 "ContainerUUID": uuid,
637                 "Reason":        reason,
638         })
639         for _, wkr := range wp.workers {
640                 rr := wkr.running[uuid]
641                 if rr == nil {
642                         rr = wkr.starting[uuid]
643                 }
644                 if rr != nil {
645                         rr.Kill(reason)
646                         return true
647                 }
648         }
649         logger.Debug("cannot kill: already disappeared")
650         return false
651 }
652
653 // ForgetContainer clears the placeholder for the given exited
654 // container, so it isn't returned by subsequent calls to Running().
655 //
656 // ForgetContainer has no effect if the container has not yet exited.
657 //
658 // The "container exited at time T" placeholder (which necessitates
659 // ForgetContainer) exists to make it easier for the caller
660 // (scheduler) to distinguish a container that exited without
661 // finalizing its state from a container that exited too recently for
662 // its final state to have appeared in the scheduler's queue cache.
663 func (wp *Pool) ForgetContainer(uuid string) {
664         wp.mtx.Lock()
665         defer wp.mtx.Unlock()
666         if _, ok := wp.exited[uuid]; ok {
667                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
668                 delete(wp.exited, uuid)
669         }
670 }
671
672 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
673         if reg == nil {
674                 reg = prometheus.NewRegistry()
675         }
676         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
677                 Namespace: "arvados",
678                 Subsystem: "dispatchcloud",
679                 Name:      "containers_running",
680                 Help:      "Number of containers reported running by cloud VMs.",
681         })
682         reg.MustRegister(wp.mContainersRunning)
683         wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
684                 Namespace: "arvados",
685                 Subsystem: "dispatchcloud",
686                 Name:      "probe_age_seconds_max",
687                 Help:      "Maximum number of seconds since an instance's most recent successful probe.",
688         })
689         reg.MustRegister(wp.mProbeAgeMax)
690         wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
691                 Namespace: "arvados",
692                 Subsystem: "dispatchcloud",
693                 Name:      "probe_age_seconds_median",
694                 Help:      "Median number of seconds since an instance's most recent successful probe.",
695         })
696         reg.MustRegister(wp.mProbeAgeMedian)
697         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
698                 Namespace: "arvados",
699                 Subsystem: "dispatchcloud",
700                 Name:      "instances_total",
701                 Help:      "Number of cloud VMs.",
702         }, []string{"category", "instance_type"})
703         reg.MustRegister(wp.mInstances)
704         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
705                 Namespace: "arvados",
706                 Subsystem: "dispatchcloud",
707                 Name:      "instances_price",
708                 Help:      "Price of cloud VMs.",
709         }, []string{"category"})
710         reg.MustRegister(wp.mInstancesPrice)
711         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
712                 Namespace: "arvados",
713                 Subsystem: "dispatchcloud",
714                 Name:      "vcpus_total",
715                 Help:      "Total VCPUs on all cloud VMs.",
716         }, []string{"category"})
717         reg.MustRegister(wp.mVCPUs)
718         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
719                 Namespace: "arvados",
720                 Subsystem: "dispatchcloud",
721                 Name:      "memory_bytes_total",
722                 Help:      "Total memory on all cloud VMs.",
723         }, []string{"category"})
724         reg.MustRegister(wp.mMemory)
725         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
726                 Namespace: "arvados",
727                 Subsystem: "dispatchcloud",
728                 Name:      "boot_outcomes",
729                 Help:      "Boot outcomes by type.",
730         }, []string{"outcome"})
731         for k := range validBootOutcomes {
732                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
733         }
734         reg.MustRegister(wp.mBootOutcomes)
735         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
736                 Namespace: "arvados",
737                 Subsystem: "dispatchcloud",
738                 Name:      "instances_disappeared",
739                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
740         }, []string{"state"})
741         for _, v := range stateString {
742                 wp.mDisappearances.WithLabelValues(v).Add(0)
743         }
744         reg.MustRegister(wp.mDisappearances)
745         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
746                 Namespace:  "arvados",
747                 Subsystem:  "dispatchcloud",
748                 Name:       "instances_time_to_ssh_seconds",
749                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
750                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
751         })
752         reg.MustRegister(wp.mTimeToSSH)
753         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
754                 Namespace:  "arvados",
755                 Subsystem:  "dispatchcloud",
756                 Name:       "instances_time_to_ready_for_container_seconds",
757                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
758                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
759         })
760         reg.MustRegister(wp.mTimeToReadyForContainer)
761         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
762                 Namespace:  "arvados",
763                 Subsystem:  "dispatchcloud",
764                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
765                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
766                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
767         })
768         reg.MustRegister(wp.mTimeFromShutdownToGone)
769         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
770                 Namespace:  "arvados",
771                 Subsystem:  "dispatchcloud",
772                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
773                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
774                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
775         })
776         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
777         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
778                 Namespace:  "arvados",
779                 Subsystem:  "dispatchcloud",
780                 Name:       "instances_run_probe_duration_seconds",
781                 Help:       "Number of seconds per runProbe call.",
782                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
783         }, []string{"outcome"})
784         reg.MustRegister(wp.mRunProbeDuration)
785 }
786
787 func (wp *Pool) runMetrics() {
788         ch := wp.Subscribe()
789         defer wp.Unsubscribe(ch)
790         wp.updateMetrics()
791         for range ch {
792                 wp.updateMetrics()
793         }
794 }
795
796 func (wp *Pool) updateMetrics() {
797         wp.mtx.RLock()
798         defer wp.mtx.RUnlock()
799
800         type entKey struct {
801                 cat      string
802                 instType string
803         }
804         instances := map[entKey]int64{}
805         price := map[string]float64{}
806         cpu := map[string]int64{}
807         mem := map[string]int64{}
808         var running int64
809         now := time.Now()
810         var probed []time.Time
811         for _, wkr := range wp.workers {
812                 var cat string
813                 switch {
814                 case len(wkr.running)+len(wkr.starting) > 0:
815                         cat = "inuse"
816                 case wkr.idleBehavior == IdleBehaviorHold:
817                         cat = "hold"
818                 case wkr.state == StateBooting:
819                         cat = "booting"
820                 case wkr.state == StateUnknown:
821                         cat = "unknown"
822                 default:
823                         cat = "idle"
824                 }
825                 instances[entKey{cat, wkr.instType.Name}]++
826                 price[cat] += wkr.instType.Price
827                 cpu[cat] += int64(wkr.instType.VCPUs)
828                 mem[cat] += int64(wkr.instType.RAM)
829                 running += int64(len(wkr.running) + len(wkr.starting))
830                 probed = append(probed, wkr.probed)
831         }
832         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
833                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
834                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
835                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
836                 // make sure to reset gauges for non-existing category/nodetype combinations
837                 for _, it := range wp.instanceTypes {
838                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
839                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
840                         }
841                 }
842         }
843         for k, v := range instances {
844                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
845         }
846         wp.mContainersRunning.Set(float64(running))
847
848         if len(probed) == 0 {
849                 wp.mProbeAgeMax.Set(0)
850                 wp.mProbeAgeMedian.Set(0)
851         } else {
852                 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
853                 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
854                 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
855         }
856 }
857
858 func (wp *Pool) runProbes() {
859         maxPPS := wp.maxProbesPerSecond
860         if maxPPS < 1 {
861                 maxPPS = defaultMaxProbesPerSecond
862         }
863         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
864         defer limitticker.Stop()
865
866         probeticker := time.NewTicker(wp.probeInterval)
867         defer probeticker.Stop()
868
869         workers := []cloud.InstanceID{}
870         for range probeticker.C {
871                 // Add some jitter. Without this, if probeInterval is
872                 // a multiple of syncInterval and sync is
873                 // instantaneous (as with the loopback driver), the
874                 // first few probes race with sync operations and
875                 // don't update the workers.
876                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
877
878                 workers = workers[:0]
879                 wp.mtx.Lock()
880                 for id, wkr := range wp.workers {
881                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
882                                 continue
883                         }
884                         workers = append(workers, id)
885                 }
886                 wp.mtx.Unlock()
887
888                 for _, id := range workers {
889                         wp.mtx.Lock()
890                         wkr, ok := wp.workers[id]
891                         wp.mtx.Unlock()
892                         if !ok {
893                                 // Deleted while we were probing
894                                 // others
895                                 continue
896                         }
897                         go wkr.ProbeAndUpdate()
898                         select {
899                         case <-wp.stop:
900                                 return
901                         case <-limitticker.C:
902                         }
903                 }
904         }
905 }
906
907 func (wp *Pool) runSync() {
908         // sync once immediately, then wait syncInterval, sync again,
909         // etc.
910         timer := time.NewTimer(1)
911         for {
912                 select {
913                 case <-timer.C:
914                         err := wp.getInstancesAndSync()
915                         if err != nil {
916                                 wp.logger.WithError(err).Warn("sync failed")
917                         }
918                         timer.Reset(wp.syncInterval)
919                 case <-wp.stop:
920                         wp.logger.Debug("worker.Pool stopped")
921                         return
922                 }
923         }
924 }
925
926 // Stop synchronizing with the InstanceSet.
927 func (wp *Pool) Stop() {
928         wp.setupOnce.Do(wp.setup)
929         close(wp.stop)
930 }
931
932 // Instances returns an InstanceView for each worker in the pool,
933 // summarizing its current state and recent activity.
934 func (wp *Pool) Instances() []InstanceView {
935         var r []InstanceView
936         wp.setupOnce.Do(wp.setup)
937         wp.mtx.Lock()
938         for _, w := range wp.workers {
939                 r = append(r, InstanceView{
940                         Instance:             w.instance.ID(),
941                         Address:              w.instance.Address(),
942                         Price:                w.instType.Price,
943                         ArvadosInstanceType:  w.instType.Name,
944                         ProviderInstanceType: w.instType.ProviderType,
945                         LastContainerUUID:    w.lastUUID,
946                         LastBusy:             w.busy,
947                         WorkerState:          w.state.String(),
948                         IdleBehavior:         w.idleBehavior,
949                 })
950         }
951         wp.mtx.Unlock()
952         sort.Slice(r, func(i, j int) bool {
953                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
954         })
955         return r
956 }
957
958 // KillInstance destroys a cloud VM instance. It returns an error if
959 // the given instance does not exist.
960 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
961         wp.setupOnce.Do(wp.setup)
962         wp.mtx.Lock()
963         defer wp.mtx.Unlock()
964         wkr, ok := wp.workers[id]
965         if !ok {
966                 return errors.New("instance not found")
967         }
968         wkr.logger.WithField("Reason", reason).Info("shutting down")
969         wkr.reportBootOutcome(BootOutcomeAborted)
970         wkr.shutdown()
971         return nil
972 }
973
974 func (wp *Pool) setup() {
975         wp.creating = map[string]createCall{}
976         wp.exited = map[string]time.Time{}
977         wp.workers = map[cloud.InstanceID]*worker{}
978         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
979         wp.loadRunnerData()
980 }
981
982 // Load the runner program to be deployed on worker nodes into
983 // wp.runnerData, if necessary. Errors are logged.
984 //
985 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
986 //
987 // Caller must not have lock.
988 func (wp *Pool) loadRunnerData() error {
989         wp.mtx.Lock()
990         defer wp.mtx.Unlock()
991         if wp.runnerData != nil {
992                 return nil
993         } else if wp.runnerSource == "" {
994                 wp.runnerCmd = wp.runnerCmdDefault
995                 wp.runnerData = []byte{}
996                 return nil
997         }
998         logger := wp.logger.WithField("source", wp.runnerSource)
999         logger.Debug("loading runner")
1000         buf, err := ioutil.ReadFile(wp.runnerSource)
1001         if err != nil {
1002                 logger.WithError(err).Error("failed to load runner program")
1003                 return err
1004         }
1005         wp.runnerData = buf
1006         wp.runnerMD5 = md5.Sum(buf)
1007         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
1008         return nil
1009 }
1010
1011 func (wp *Pool) notify() {
1012         wp.mtx.RLock()
1013         defer wp.mtx.RUnlock()
1014         for _, send := range wp.subscribers {
1015                 select {
1016                 case send <- struct{}{}:
1017                 default:
1018                 }
1019         }
1020 }
1021
1022 func (wp *Pool) getInstancesAndSync() error {
1023         wp.setupOnce.Do(wp.setup)
1024         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
1025                 return err
1026         }
1027         wp.logger.Debug("getting instance list")
1028         threshold := time.Now()
1029         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1030         if err != nil {
1031                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1032                 return err
1033         }
1034         wp.sync(threshold, instances)
1035         wp.logger.Debug("sync done")
1036         return nil
1037 }
1038
1039 // Add/remove/update workers based on instances, which was obtained
1040 // from the instanceSet. However, don't clobber any other updates that
1041 // already happened after threshold.
1042 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1043         wp.mtx.Lock()
1044         defer wp.mtx.Unlock()
1045         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1046         notify := false
1047
1048         for _, inst := range instances {
1049                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1050                 it, ok := wp.instanceTypes[itTag]
1051                 if !ok {
1052                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1053                         continue
1054                 }
1055                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1056                         notify = true
1057                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1058                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1059                         wkr.shutdown()
1060                 }
1061         }
1062
1063         for id, wkr := range wp.workers {
1064                 if wkr.updated.After(threshold) {
1065                         continue
1066                 }
1067                 logger := wp.logger.WithFields(logrus.Fields{
1068                         "Instance":    wkr.instance.ID(),
1069                         "WorkerState": wkr.state,
1070                 })
1071                 logger.Info("instance disappeared in cloud")
1072                 wkr.reportBootOutcome(BootOutcomeDisappeared)
1073                 if wp.mDisappearances != nil {
1074                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1075                 }
1076                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1077                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1078                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1079                 }
1080                 delete(wp.workers, id)
1081                 go wkr.Close()
1082                 notify = true
1083         }
1084
1085         if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1086                 // After syncing, there are fewer instances (including
1087                 // pending creates) than there were last time we saw a
1088                 // quota error.  This might mean it's now possible to
1089                 // create new instances.  Reset our "at quota" state.
1090                 wp.atQuotaUntilFewerInstances = 0
1091         }
1092
1093         if !wp.loaded {
1094                 notify = true
1095                 wp.loaded = true
1096                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1097         }
1098
1099         if notify {
1100                 go wp.notify()
1101         }
1102 }
1103
1104 func (wp *Pool) waitUntilLoaded() {
1105         ch := wp.Subscribe()
1106         wp.mtx.RLock()
1107         defer wp.mtx.RUnlock()
1108         for !wp.loaded {
1109                 wp.mtx.RUnlock()
1110                 <-ch
1111                 wp.mtx.RLock()
1112         }
1113 }
1114
1115 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1116         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1117         fmt.Fprint(h, uuid)
1118         return fmt.Sprintf("%x", h.Sum(nil))
1119 }
1120
1121 // Return a random string of n hexadecimal digits (n*4 random bits). n
1122 // must be even.
1123 func randomHex(n int) string {
1124         buf := make([]byte, n/2)
1125         _, err := rand.Read(buf)
1126         if err != nil {
1127                 panic(err)
1128         }
1129         return fmt.Sprintf("%x", buf)
1130 }