Merge branch '21743-users-include-trash'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time after a capacity error to try again
86         capacityErrorTTL = time.Minute
87
88         // Time between "X failed because rate limiting" messages
89         logRateLimitErrorInterval = time.Second * 10
90 )
91
92 func duration(conf arvados.Duration, def time.Duration) time.Duration {
93         if conf > 0 {
94                 return time.Duration(conf)
95         }
96         return def
97 }
98
99 // NewPool creates a Pool of workers backed by instanceSet.
100 //
101 // New instances are configured and set up according to the given
102 // cluster configuration.
103 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
104         wp := &Pool{
105                 logger:                         logger,
106                 arvClient:                      arvClient,
107                 instanceSetID:                  instanceSetID,
108                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
109                 newExecutor:                    newExecutor,
110                 cluster:                        cluster,
111                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
112                 instanceInitCommand:            cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
113                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
114                 runnerDeployDirectory:          cluster.Containers.CloudVMs.DeployRunnerDirectory,
115                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
116                 instanceTypes:                  cluster.InstanceTypes,
117                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
118                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
119                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
120                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
121                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
122                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
123                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
124                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
125                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
126                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
127                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
128                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
129                 systemRootToken:                cluster.SystemRootToken,
130                 installPublicKey:               installPublicKey,
131                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
132                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
133                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
134                 stop:                           make(chan bool),
135         }
136         wp.registerMetrics(reg)
137         go func() {
138                 wp.setupOnce.Do(wp.setup)
139                 go wp.runMetrics()
140                 go wp.runProbes()
141                 go wp.runSync()
142         }()
143         return wp
144 }
145
146 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
147 // zero Pool should not be used. Call NewPool to create a new Pool.
148 type Pool struct {
149         // configuration
150         logger                         logrus.FieldLogger
151         arvClient                      *arvados.Client
152         instanceSetID                  cloud.InstanceSetID
153         instanceSet                    *throttledInstanceSet
154         newExecutor                    func(cloud.Instance) Executor
155         cluster                        *arvados.Cluster
156         bootProbeCommand               string
157         instanceInitCommand            cloud.InitCommand
158         runnerSource                   string
159         runnerDeployDirectory          string
160         imageID                        cloud.ImageID
161         instanceTypes                  map[string]arvados.InstanceType
162         syncInterval                   time.Duration
163         probeInterval                  time.Duration
164         maxProbesPerSecond             int
165         maxConcurrentInstanceCreateOps int
166         maxInstances                   int
167         timeoutIdle                    time.Duration
168         timeoutBooting                 time.Duration
169         timeoutProbe                   time.Duration
170         timeoutShutdown                time.Duration
171         timeoutTERM                    time.Duration
172         timeoutSignal                  time.Duration
173         timeoutStaleRunLock            time.Duration
174         systemRootToken                string
175         installPublicKey               ssh.PublicKey
176         tagKeyPrefix                   string
177         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
178         runnerArgs                     []string // extra args passed to crunch-run
179
180         // private state
181         subscribers                map[<-chan struct{}]chan<- struct{}
182         creating                   map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
183         workers                    map[cloud.InstanceID]*worker
184         loaded                     bool                 // loaded list of instances from InstanceSet at least once
185         exited                     map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
186         atQuotaUntilFewerInstances int
187         atQuotaUntil               time.Time
188         atQuotaErr                 cloud.QuotaError
189         atCapacityUntil            map[interface{}]time.Time
190         stop                       chan bool
191         mtx                        sync.RWMutex
192         setupOnce                  sync.Once
193         runnerData                 []byte
194         runnerMD5                  [md5.Size]byte
195         runnerCmd                  string
196
197         mContainersRunning        prometheus.Gauge
198         mInstances                *prometheus.GaugeVec
199         mInstancesPrice           *prometheus.GaugeVec
200         mVCPUs                    *prometheus.GaugeVec
201         mMemory                   *prometheus.GaugeVec
202         mBootOutcomes             *prometheus.CounterVec
203         mDisappearances           *prometheus.CounterVec
204         mTimeToSSH                prometheus.Summary
205         mTimeToReadyForContainer  prometheus.Summary
206         mTimeFromShutdownToGone   prometheus.Summary
207         mTimeFromQueueToCrunchRun prometheus.Summary
208         mRunProbeDuration         *prometheus.SummaryVec
209         mProbeAgeMax              prometheus.Gauge
210         mProbeAgeMedian           prometheus.Gauge
211 }
212
213 type createCall struct {
214         time         time.Time
215         instanceType arvados.InstanceType
216 }
217
218 func (wp *Pool) CheckHealth() error {
219         wp.setupOnce.Do(wp.setup)
220         if err := wp.loadRunnerData(); err != nil {
221                 return fmt.Errorf("error loading runner binary: %s", err)
222         }
223         return nil
224 }
225
226 // Subscribe returns a buffered channel that becomes ready after any
227 // change to the pool's state that could have scheduling implications:
228 // a worker's state changes, a new worker appears, the cloud
229 // provider's API rate limiting period ends, etc.
230 //
231 // Additional events that occur while the channel is already ready
232 // will be dropped, so it is OK if the caller services the channel
233 // slowly.
234 //
235 // Example:
236 //
237 //      ch := wp.Subscribe()
238 //      defer wp.Unsubscribe(ch)
239 //      for range ch {
240 //              tryScheduling(wp)
241 //              if done {
242 //                      break
243 //              }
244 //      }
245 func (wp *Pool) Subscribe() <-chan struct{} {
246         wp.setupOnce.Do(wp.setup)
247         wp.mtx.Lock()
248         defer wp.mtx.Unlock()
249         ch := make(chan struct{}, 1)
250         wp.subscribers[ch] = ch
251         return ch
252 }
253
254 // Unsubscribe stops sending updates to the given channel.
255 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
256         wp.setupOnce.Do(wp.setup)
257         wp.mtx.Lock()
258         defer wp.mtx.Unlock()
259         delete(wp.subscribers, ch)
260 }
261
262 // Unallocated returns the number of unallocated (creating + booting +
263 // idle + unknown) workers for each instance type.  Workers in
264 // hold/drain mode are not included.
265 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
266         wp.setupOnce.Do(wp.setup)
267         wp.mtx.RLock()
268         defer wp.mtx.RUnlock()
269         unalloc := map[arvados.InstanceType]int{}
270         creating := map[arvados.InstanceType]int{}
271         oldestCreate := map[arvados.InstanceType]time.Time{}
272         for _, cc := range wp.creating {
273                 it := cc.instanceType
274                 creating[it]++
275                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
276                         oldestCreate[it] = cc.time
277                 }
278         }
279         for _, wkr := range wp.workers {
280                 // Skip workers that are not expected to become
281                 // available soon. Note len(wkr.running)>0 is not
282                 // redundant here: it can be true even in
283                 // StateUnknown.
284                 if wkr.state == StateShutdown ||
285                         wkr.state == StateRunning ||
286                         wkr.idleBehavior != IdleBehaviorRun ||
287                         len(wkr.running) > 0 {
288                         continue
289                 }
290                 it := wkr.instType
291                 unalloc[it]++
292                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
293                         // If up to N new workers appear in
294                         // Instances() while we are waiting for N
295                         // Create() calls to complete, we assume we're
296                         // just seeing a race between Instances() and
297                         // Create() responses.
298                         //
299                         // The other common reason why nodes have
300                         // state==Unknown is that they appeared at
301                         // startup, before any Create calls. They
302                         // don't match the above timing condition, so
303                         // we never mistakenly attribute them to
304                         // pending Create calls.
305                         creating[it]--
306                 }
307         }
308         for it, c := range creating {
309                 unalloc[it] += c
310         }
311         return unalloc
312 }
313
314 // Create a new instance with the given type, and add it to the worker
315 // pool. The worker is added immediately; instance creation runs in
316 // the background.
317 //
318 // Create returns false if a pre-existing error or a configuration
319 // setting prevents it from even attempting to create a new
320 // instance. Those errors are logged by the Pool, so the caller does
321 // not need to log anything in such cases.
322 func (wp *Pool) Create(it arvados.InstanceType) bool {
323         logger := wp.logger.WithField("InstanceType", it.Name)
324         wp.setupOnce.Do(wp.setup)
325         if wp.loadRunnerData() != nil {
326                 // Boot probe is certain to fail.
327                 return false
328         }
329         if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
330                 return false
331         }
332         wp.mtx.Lock()
333         defer wp.mtx.Unlock()
334         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
335         // requests in flight. It was added to work around a limitation in Azure's
336         // managed disks, which support no more than 20 concurrent node creation
337         // requests from a single disk image (cf.
338         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
339         // The code assumes that node creation, from Azure's perspective, means the
340         // period until the instance appears in the "get all instances" list.
341         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
342                 logger.Info("reached MaxConcurrentInstanceCreateOps")
343                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
344                 return false
345         }
346         now := time.Now()
347         secret := randomHex(instanceSecretLength)
348         wp.creating[secret] = createCall{time: now, instanceType: it}
349         go func() {
350                 defer wp.notify()
351                 tags := cloud.InstanceTags{
352                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
353                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
354                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
355                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
356                 }
357                 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
358                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
359                 wp.mtx.Lock()
360                 defer wp.mtx.Unlock()
361                 // delete() is deferred so the updateWorker() call
362                 // below knows to use StateBooting when adding a new
363                 // worker.
364                 defer delete(wp.creating, secret)
365                 if err != nil {
366                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
367                                 wp.atQuotaErr = err
368                                 n := len(wp.workers) + len(wp.creating) - 1
369                                 if n < 1 {
370                                         // Quota error with no
371                                         // instances running --
372                                         // nothing to do but wait
373                                         wp.atQuotaUntilFewerInstances = 0
374                                         wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
375                                         time.AfterFunc(quotaErrorTTL, wp.notify)
376                                         logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
377                                 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
378                                         // Quota error with N
379                                         // instances running -- report
380                                         // AtQuota until some
381                                         // instances shut down
382                                         wp.atQuotaUntilFewerInstances = n
383                                         wp.atQuotaUntil = time.Time{}
384                                         logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
385                                 }
386                         }
387                         if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
388                                 var capKey interface{} = it.ProviderType
389                                 if err.IsInstanceTypeSpecific() {
390                                         capKey = it.ProviderType
391                                 } else if err.IsInstanceQuotaGroupSpecific() {
392                                         capKey = wp.instanceSet.InstanceQuotaGroup(it)
393                                 } else {
394                                         capKey = ""
395                                 }
396                                 if wp.atCapacityUntil == nil {
397                                         wp.atCapacityUntil = map[interface{}]time.Time{}
398                                 }
399                                 wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
400                                 time.AfterFunc(capacityErrorTTL, wp.notify)
401                         }
402                         logger.WithError(err).Error("create failed")
403                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
404                         return
405                 }
406                 wp.updateWorker(inst, it)
407         }()
408         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
409                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
410         }
411         return true
412 }
413
414 // AtCapacity returns true if Create() is currently expected to fail
415 // for the given instance type.
416 func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
417         wp.mtx.Lock()
418         defer wp.mtx.Unlock()
419         for _, capKey := range []interface{}{
420                 "",                                    // all instance types
421                 wp.instanceSet.InstanceQuotaGroup(it), // instance quota group
422                 it.ProviderType,                       // just this instance type
423         } {
424                 if t, ok := wp.atCapacityUntil[capKey]; ok && time.Now().Before(t) {
425                         return true
426                 }
427         }
428         return false
429 }
430
431 // AtQuota returns true if Create is not expected to work at the
432 // moment (e.g., cloud provider has reported quota errors, or we are
433 // already at our own configured quota).
434 func (wp *Pool) AtQuota() bool {
435         wp.mtx.Lock()
436         defer wp.mtx.Unlock()
437         return wp.atQuotaUntilFewerInstances > 0 ||
438                 time.Now().Before(wp.atQuotaUntil) ||
439                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
440 }
441
442 // SetIdleBehavior determines how the indicated instance will behave
443 // when it has no containers running.
444 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
445         wp.mtx.Lock()
446         defer wp.mtx.Unlock()
447         wkr, ok := wp.workers[id]
448         if !ok {
449                 return errors.New("requested instance does not exist")
450         }
451         wkr.setIdleBehavior(idleBehavior)
452         return nil
453 }
454
455 // Successful connection to the SSH daemon, update the mTimeToSSH metric
456 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
457         wp.mtx.Lock()
458         defer wp.mtx.Unlock()
459         wkr, ok := wp.workers[inst.ID()]
460         if !ok {
461                 // race: inst was removed from the pool
462                 return
463         }
464         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
465                 // the node is not in booting state (can happen if
466                 // a-d-c is restarted) OR this is not the first SSH
467                 // connection
468                 return
469         }
470
471         wkr.firstSSHConnection = time.Now()
472         if wp.mTimeToSSH != nil {
473                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
474         }
475 }
476
477 // Add or update worker attached to the given instance.
478 //
479 // The second return value is true if a new worker is created.
480 //
481 // A newly added instance has state=StateBooting if its tags match an
482 // entry in wp.creating, otherwise StateUnknown.
483 //
484 // Caller must have lock.
485 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
486         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
487         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
488         id := inst.ID()
489         if wkr := wp.workers[id]; wkr != nil {
490                 wkr.executor.SetTarget(inst)
491                 wkr.instance = inst
492                 wkr.updated = time.Now()
493                 wkr.saveTags()
494                 return wkr, false
495         }
496
497         state := StateUnknown
498         if _, ok := wp.creating[secret]; ok {
499                 state = StateBooting
500         }
501
502         // If an instance has a valid IdleBehavior tag when it first
503         // appears, initialize the new worker accordingly (this is how
504         // we restore IdleBehavior that was set by a prior dispatch
505         // process); otherwise, default to "run". After this,
506         // wkr.idleBehavior is the source of truth, and will only be
507         // changed via SetIdleBehavior().
508         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
509         if !validIdleBehavior[idleBehavior] {
510                 idleBehavior = IdleBehaviorRun
511         }
512
513         logger := wp.logger.WithFields(logrus.Fields{
514                 "InstanceType": it.Name,
515                 "Instance":     inst.ID(),
516                 "Address":      inst.Address(),
517         })
518         logger.WithFields(logrus.Fields{
519                 "State":        state,
520                 "IdleBehavior": idleBehavior,
521         }).Infof("instance appeared in cloud")
522         now := time.Now()
523         wkr := &worker{
524                 mtx:          &wp.mtx,
525                 wp:           wp,
526                 logger:       logger,
527                 executor:     wp.newExecutor(inst),
528                 state:        state,
529                 idleBehavior: idleBehavior,
530                 instance:     inst,
531                 instType:     it,
532                 appeared:     now,
533                 probed:       now,
534                 busy:         now,
535                 updated:      now,
536                 running:      make(map[string]*remoteRunner),
537                 starting:     make(map[string]*remoteRunner),
538                 probing:      make(chan struct{}, 1),
539         }
540         wp.workers[id] = wkr
541         return wkr, true
542 }
543
544 // Shutdown shuts down a worker with the given type, or returns false
545 // if all workers with the given type are busy.
546 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
547         wp.setupOnce.Do(wp.setup)
548         wp.mtx.Lock()
549         defer wp.mtx.Unlock()
550         logger := wp.logger.WithField("InstanceType", it.Name)
551         logger.Info("shutdown requested")
552         for _, tryState := range []State{StateBooting, StateIdle} {
553                 // TODO: shutdown the worker with the longest idle
554                 // time (Idle) or the earliest create time (Booting)
555                 for _, wkr := range wp.workers {
556                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
557                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
558                                 wkr.reportBootOutcome(BootOutcomeAborted)
559                                 wkr.shutdown()
560                                 return true
561                         }
562                 }
563         }
564         return false
565 }
566
567 // CountWorkers returns the current number of workers in each state.
568 //
569 // CountWorkers blocks, if necessary, until the initial instance list
570 // has been loaded from the cloud provider.
571 func (wp *Pool) CountWorkers() map[State]int {
572         wp.setupOnce.Do(wp.setup)
573         wp.waitUntilLoaded()
574         wp.mtx.Lock()
575         defer wp.mtx.Unlock()
576         r := map[State]int{}
577         for _, w := range wp.workers {
578                 r[w.state]++
579         }
580         return r
581 }
582
583 // Running returns the container UUIDs being prepared/run on workers.
584 //
585 // In the returned map, the time value indicates when the Pool
586 // observed that the container process had exited. A container that
587 // has not yet exited has a zero time value. The caller should use
588 // ForgetContainer() to garbage-collect the entries for exited
589 // containers.
590 func (wp *Pool) Running() map[string]time.Time {
591         wp.setupOnce.Do(wp.setup)
592         wp.mtx.Lock()
593         defer wp.mtx.Unlock()
594         r := map[string]time.Time{}
595         for _, wkr := range wp.workers {
596                 for uuid := range wkr.running {
597                         r[uuid] = time.Time{}
598                 }
599                 for uuid := range wkr.starting {
600                         r[uuid] = time.Time{}
601                 }
602         }
603         for uuid, exited := range wp.exited {
604                 r[uuid] = exited
605         }
606         return r
607 }
608
609 // StartContainer starts a container on an idle worker immediately if
610 // possible, otherwise returns false.
611 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
612         wp.setupOnce.Do(wp.setup)
613         wp.mtx.Lock()
614         defer wp.mtx.Unlock()
615         var wkr *worker
616         for _, w := range wp.workers {
617                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
618                         if wkr == nil || w.busy.After(wkr.busy) {
619                                 wkr = w
620                         }
621                 }
622         }
623         if wkr == nil {
624                 return false
625         }
626         wkr.startContainer(ctr)
627         return true
628 }
629
630 // KillContainer kills the crunch-run process for the given container
631 // UUID, if it's running on any worker.
632 //
633 // KillContainer returns immediately; the act of killing the container
634 // takes some time, and runs in the background.
635 //
636 // KillContainer returns false if the container has already ended.
637 func (wp *Pool) KillContainer(uuid string, reason string) bool {
638         wp.mtx.Lock()
639         defer wp.mtx.Unlock()
640         logger := wp.logger.WithFields(logrus.Fields{
641                 "ContainerUUID": uuid,
642                 "Reason":        reason,
643         })
644         for _, wkr := range wp.workers {
645                 rr := wkr.running[uuid]
646                 if rr == nil {
647                         rr = wkr.starting[uuid]
648                 }
649                 if rr != nil {
650                         rr.Kill(reason)
651                         return true
652                 }
653         }
654         logger.Debug("cannot kill: already disappeared")
655         return false
656 }
657
658 // ForgetContainer clears the placeholder for the given exited
659 // container, so it isn't returned by subsequent calls to Running().
660 //
661 // ForgetContainer has no effect if the container has not yet exited.
662 //
663 // The "container exited at time T" placeholder (which necessitates
664 // ForgetContainer) exists to make it easier for the caller
665 // (scheduler) to distinguish a container that exited without
666 // finalizing its state from a container that exited too recently for
667 // its final state to have appeared in the scheduler's queue cache.
668 func (wp *Pool) ForgetContainer(uuid string) {
669         wp.mtx.Lock()
670         defer wp.mtx.Unlock()
671         if _, ok := wp.exited[uuid]; ok {
672                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
673                 delete(wp.exited, uuid)
674         }
675 }
676
677 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
678         if reg == nil {
679                 reg = prometheus.NewRegistry()
680         }
681         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
682                 Namespace: "arvados",
683                 Subsystem: "dispatchcloud",
684                 Name:      "containers_running",
685                 Help:      "Number of containers reported running by cloud VMs.",
686         })
687         reg.MustRegister(wp.mContainersRunning)
688         wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
689                 Namespace: "arvados",
690                 Subsystem: "dispatchcloud",
691                 Name:      "probe_age_seconds_max",
692                 Help:      "Maximum number of seconds since an instance's most recent successful probe.",
693         })
694         reg.MustRegister(wp.mProbeAgeMax)
695         wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
696                 Namespace: "arvados",
697                 Subsystem: "dispatchcloud",
698                 Name:      "probe_age_seconds_median",
699                 Help:      "Median number of seconds since an instance's most recent successful probe.",
700         })
701         reg.MustRegister(wp.mProbeAgeMedian)
702         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
703                 Namespace: "arvados",
704                 Subsystem: "dispatchcloud",
705                 Name:      "instances_total",
706                 Help:      "Number of cloud VMs.",
707         }, []string{"category", "instance_type"})
708         reg.MustRegister(wp.mInstances)
709         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
710                 Namespace: "arvados",
711                 Subsystem: "dispatchcloud",
712                 Name:      "instances_price",
713                 Help:      "Price of cloud VMs.",
714         }, []string{"category"})
715         reg.MustRegister(wp.mInstancesPrice)
716         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
717                 Namespace: "arvados",
718                 Subsystem: "dispatchcloud",
719                 Name:      "vcpus_total",
720                 Help:      "Total VCPUs on all cloud VMs.",
721         }, []string{"category"})
722         reg.MustRegister(wp.mVCPUs)
723         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
724                 Namespace: "arvados",
725                 Subsystem: "dispatchcloud",
726                 Name:      "memory_bytes_total",
727                 Help:      "Total memory on all cloud VMs.",
728         }, []string{"category"})
729         reg.MustRegister(wp.mMemory)
730         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
731                 Namespace: "arvados",
732                 Subsystem: "dispatchcloud",
733                 Name:      "boot_outcomes",
734                 Help:      "Boot outcomes by type.",
735         }, []string{"outcome"})
736         for k := range validBootOutcomes {
737                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
738         }
739         reg.MustRegister(wp.mBootOutcomes)
740         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
741                 Namespace: "arvados",
742                 Subsystem: "dispatchcloud",
743                 Name:      "instances_disappeared",
744                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
745         }, []string{"state"})
746         for _, v := range stateString {
747                 wp.mDisappearances.WithLabelValues(v).Add(0)
748         }
749         reg.MustRegister(wp.mDisappearances)
750         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
751                 Namespace:  "arvados",
752                 Subsystem:  "dispatchcloud",
753                 Name:       "instances_time_to_ssh_seconds",
754                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
755                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
756         })
757         reg.MustRegister(wp.mTimeToSSH)
758         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
759                 Namespace:  "arvados",
760                 Subsystem:  "dispatchcloud",
761                 Name:       "instances_time_to_ready_for_container_seconds",
762                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
763                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
764         })
765         reg.MustRegister(wp.mTimeToReadyForContainer)
766         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
767                 Namespace:  "arvados",
768                 Subsystem:  "dispatchcloud",
769                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
770                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
771                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
772         })
773         reg.MustRegister(wp.mTimeFromShutdownToGone)
774         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
775                 Namespace:  "arvados",
776                 Subsystem:  "dispatchcloud",
777                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
778                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
779                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
780         })
781         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
782         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
783                 Namespace:  "arvados",
784                 Subsystem:  "dispatchcloud",
785                 Name:       "instances_run_probe_duration_seconds",
786                 Help:       "Number of seconds per runProbe call.",
787                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
788         }, []string{"outcome"})
789         reg.MustRegister(wp.mRunProbeDuration)
790 }
791
792 func (wp *Pool) runMetrics() {
793         ch := wp.Subscribe()
794         defer wp.Unsubscribe(ch)
795         wp.updateMetrics()
796         for range ch {
797                 wp.updateMetrics()
798         }
799 }
800
801 func (wp *Pool) updateMetrics() {
802         wp.mtx.RLock()
803         defer wp.mtx.RUnlock()
804
805         type entKey struct {
806                 cat      string
807                 instType string
808         }
809         instances := map[entKey]int64{}
810         price := map[string]float64{}
811         cpu := map[string]int64{}
812         mem := map[string]int64{}
813         var running int64
814         now := time.Now()
815         var probed []time.Time
816         for _, wkr := range wp.workers {
817                 var cat string
818                 switch {
819                 case len(wkr.running)+len(wkr.starting) > 0:
820                         cat = "inuse"
821                 case wkr.idleBehavior == IdleBehaviorHold:
822                         cat = "hold"
823                 case wkr.state == StateBooting:
824                         cat = "booting"
825                 case wkr.state == StateUnknown:
826                         cat = "unknown"
827                 default:
828                         cat = "idle"
829                 }
830                 instances[entKey{cat, wkr.instType.Name}]++
831                 price[cat] += wkr.instType.Price
832                 cpu[cat] += int64(wkr.instType.VCPUs)
833                 mem[cat] += int64(wkr.instType.RAM)
834                 running += int64(len(wkr.running) + len(wkr.starting))
835                 probed = append(probed, wkr.probed)
836         }
837         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
838                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
839                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
840                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
841                 // make sure to reset gauges for non-existing category/nodetype combinations
842                 for _, it := range wp.instanceTypes {
843                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
844                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
845                         }
846                 }
847         }
848         for k, v := range instances {
849                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
850         }
851         wp.mContainersRunning.Set(float64(running))
852
853         if len(probed) == 0 {
854                 wp.mProbeAgeMax.Set(0)
855                 wp.mProbeAgeMedian.Set(0)
856         } else {
857                 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
858                 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
859                 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
860         }
861 }
862
863 func (wp *Pool) runProbes() {
864         maxPPS := wp.maxProbesPerSecond
865         if maxPPS < 1 {
866                 maxPPS = defaultMaxProbesPerSecond
867         }
868         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
869         defer limitticker.Stop()
870
871         probeticker := time.NewTicker(wp.probeInterval)
872         defer probeticker.Stop()
873
874         workers := []cloud.InstanceID{}
875         for range probeticker.C {
876                 // Add some jitter. Without this, if probeInterval is
877                 // a multiple of syncInterval and sync is
878                 // instantaneous (as with the loopback driver), the
879                 // first few probes race with sync operations and
880                 // don't update the workers.
881                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
882
883                 workers = workers[:0]
884                 wp.mtx.Lock()
885                 for id, wkr := range wp.workers {
886                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
887                                 continue
888                         }
889                         workers = append(workers, id)
890                 }
891                 wp.mtx.Unlock()
892
893                 for _, id := range workers {
894                         wp.mtx.Lock()
895                         wkr, ok := wp.workers[id]
896                         wp.mtx.Unlock()
897                         if !ok {
898                                 // Deleted while we were probing
899                                 // others
900                                 continue
901                         }
902                         go wkr.ProbeAndUpdate()
903                         select {
904                         case <-wp.stop:
905                                 return
906                         case <-limitticker.C:
907                         }
908                 }
909         }
910 }
911
912 func (wp *Pool) runSync() {
913         // sync once immediately, then wait syncInterval, sync again,
914         // etc.
915         timer := time.NewTimer(1)
916         for {
917                 select {
918                 case <-timer.C:
919                         err := wp.getInstancesAndSync()
920                         if err != nil {
921                                 wp.logger.WithError(err).Warn("sync failed")
922                         }
923                         timer.Reset(wp.syncInterval)
924                 case <-wp.stop:
925                         wp.logger.Debug("worker.Pool stopped")
926                         return
927                 }
928         }
929 }
930
931 // Stop synchronizing with the InstanceSet.
932 func (wp *Pool) Stop() {
933         wp.setupOnce.Do(wp.setup)
934         close(wp.stop)
935 }
936
937 // Instances returns an InstanceView for each worker in the pool,
938 // summarizing its current state and recent activity.
939 func (wp *Pool) Instances() []InstanceView {
940         var r []InstanceView
941         wp.setupOnce.Do(wp.setup)
942         wp.mtx.Lock()
943         for _, w := range wp.workers {
944                 r = append(r, InstanceView{
945                         Instance:             w.instance.ID(),
946                         Address:              w.instance.Address(),
947                         Price:                w.instType.Price,
948                         ArvadosInstanceType:  w.instType.Name,
949                         ProviderInstanceType: w.instType.ProviderType,
950                         LastContainerUUID:    w.lastUUID,
951                         LastBusy:             w.busy,
952                         WorkerState:          w.state.String(),
953                         IdleBehavior:         w.idleBehavior,
954                 })
955         }
956         wp.mtx.Unlock()
957         sort.Slice(r, func(i, j int) bool {
958                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
959         })
960         return r
961 }
962
963 // KillInstance destroys a cloud VM instance. It returns an error if
964 // the given instance does not exist.
965 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
966         wp.setupOnce.Do(wp.setup)
967         wp.mtx.Lock()
968         defer wp.mtx.Unlock()
969         wkr, ok := wp.workers[id]
970         if !ok {
971                 return errors.New("instance not found")
972         }
973         wkr.logger.WithField("Reason", reason).Info("shutting down")
974         wkr.reportBootOutcome(BootOutcomeAborted)
975         wkr.shutdown()
976         return nil
977 }
978
979 func (wp *Pool) setup() {
980         wp.creating = map[string]createCall{}
981         wp.exited = map[string]time.Time{}
982         wp.workers = map[cloud.InstanceID]*worker{}
983         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
984         wp.loadRunnerData()
985 }
986
987 // Load the runner program to be deployed on worker nodes into
988 // wp.runnerData, if necessary. Errors are logged.
989 //
990 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
991 //
992 // Caller must not have lock.
993 func (wp *Pool) loadRunnerData() error {
994         wp.mtx.Lock()
995         defer wp.mtx.Unlock()
996         if wp.runnerData != nil {
997                 return nil
998         } else if wp.runnerSource == "" {
999                 wp.runnerCmd = wp.runnerCmdDefault
1000                 wp.runnerData = []byte{}
1001                 return nil
1002         }
1003         logger := wp.logger.WithField("source", wp.runnerSource)
1004         logger.Debug("loading runner")
1005         buf, err := ioutil.ReadFile(wp.runnerSource)
1006         if err != nil {
1007                 logger.WithError(err).Error("failed to load runner program")
1008                 return err
1009         }
1010         wp.runnerData = buf
1011         wp.runnerMD5 = md5.Sum(buf)
1012         wp.runnerCmd = fmt.Sprintf("%s/crunch-run~%x", wp.runnerDeployDirectory, wp.runnerMD5)
1013         return nil
1014 }
1015
1016 func (wp *Pool) notify() {
1017         wp.mtx.RLock()
1018         defer wp.mtx.RUnlock()
1019         for _, send := range wp.subscribers {
1020                 select {
1021                 case send <- struct{}{}:
1022                 default:
1023                 }
1024         }
1025 }
1026
1027 func (wp *Pool) getInstancesAndSync() error {
1028         wp.setupOnce.Do(wp.setup)
1029         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
1030                 return err
1031         }
1032         wp.logger.Debug("getting instance list")
1033         threshold := time.Now()
1034         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1035         if err != nil {
1036                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1037                 return err
1038         }
1039         wp.sync(threshold, instances)
1040         wp.logger.Debug("sync done")
1041         return nil
1042 }
1043
1044 // Add/remove/update workers based on instances, which was obtained
1045 // from the instanceSet. However, don't clobber any other updates that
1046 // already happened after threshold.
1047 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1048         wp.mtx.Lock()
1049         defer wp.mtx.Unlock()
1050         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1051         notify := false
1052
1053         for _, inst := range instances {
1054                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1055                 it, ok := wp.instanceTypes[itTag]
1056                 if !ok {
1057                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1058                         continue
1059                 }
1060                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1061                         notify = true
1062                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1063                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1064                         wkr.shutdown()
1065                 }
1066         }
1067
1068         for id, wkr := range wp.workers {
1069                 if wkr.updated.After(threshold) {
1070                         continue
1071                 }
1072                 logger := wp.logger.WithFields(logrus.Fields{
1073                         "Instance":    wkr.instance.ID(),
1074                         "WorkerState": wkr.state,
1075                 })
1076                 logger.Info("instance disappeared in cloud")
1077                 wkr.reportBootOutcome(BootOutcomeDisappeared)
1078                 if wp.mDisappearances != nil {
1079                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1080                 }
1081                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1082                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1083                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1084                 }
1085                 delete(wp.workers, id)
1086                 go wkr.Close()
1087                 notify = true
1088         }
1089
1090         if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1091                 // After syncing, there are fewer instances (including
1092                 // pending creates) than there were last time we saw a
1093                 // quota error.  This might mean it's now possible to
1094                 // create new instances.  Reset our "at quota" state.
1095                 wp.atQuotaUntilFewerInstances = 0
1096         }
1097
1098         if !wp.loaded {
1099                 notify = true
1100                 wp.loaded = true
1101                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1102         }
1103
1104         if notify {
1105                 go wp.notify()
1106         }
1107 }
1108
1109 func (wp *Pool) waitUntilLoaded() {
1110         ch := wp.Subscribe()
1111         wp.mtx.RLock()
1112         defer wp.mtx.RUnlock()
1113         for !wp.loaded {
1114                 wp.mtx.RUnlock()
1115                 <-ch
1116                 wp.mtx.RLock()
1117         }
1118 }
1119
1120 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1121         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1122         fmt.Fprint(h, uuid)
1123         return fmt.Sprintf("%x", h.Sum(nil))
1124 }
1125
1126 // Return a random string of n hexadecimal digits (n*4 random bits). n
1127 // must be even.
1128 func randomHex(n int) string {
1129         buf := make([]byte, n/2)
1130         _, err := rand.Read(buf)
1131         if err != nil {
1132                 panic(err)
1133         }
1134         return fmt.Sprintf("%x", buf)
1135 }