Merge branch '17074-optimize-itemsavailable' into main. Closes #17074
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time after a capacity error to try again
86         capacityErrorTTL = time.Minute
87
88         // Time between "X failed because rate limiting" messages
89         logRateLimitErrorInterval = time.Second * 10
90 )
91
92 func duration(conf arvados.Duration, def time.Duration) time.Duration {
93         if conf > 0 {
94                 return time.Duration(conf)
95         }
96         return def
97 }
98
99 // NewPool creates a Pool of workers backed by instanceSet.
100 //
101 // New instances are configured and set up according to the given
102 // cluster configuration.
103 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
104         wp := &Pool{
105                 logger:                         logger,
106                 arvClient:                      arvClient,
107                 instanceSetID:                  instanceSetID,
108                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
109                 newExecutor:                    newExecutor,
110                 cluster:                        cluster,
111                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
112                 instanceInitCommand:            cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
113                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
114                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
115                 instanceTypes:                  cluster.InstanceTypes,
116                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
117                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
118                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
119                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
120                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
121                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
122                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
123                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
124                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
125                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
126                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
127                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
128                 systemRootToken:                cluster.SystemRootToken,
129                 installPublicKey:               installPublicKey,
130                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
131                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
132                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
133                 stop:                           make(chan bool),
134         }
135         wp.registerMetrics(reg)
136         go func() {
137                 wp.setupOnce.Do(wp.setup)
138                 go wp.runMetrics()
139                 go wp.runProbes()
140                 go wp.runSync()
141         }()
142         return wp
143 }
144
145 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
146 // zero Pool should not be used. Call NewPool to create a new Pool.
147 type Pool struct {
148         // configuration
149         logger                         logrus.FieldLogger
150         arvClient                      *arvados.Client
151         instanceSetID                  cloud.InstanceSetID
152         instanceSet                    *throttledInstanceSet
153         newExecutor                    func(cloud.Instance) Executor
154         cluster                        *arvados.Cluster
155         bootProbeCommand               string
156         instanceInitCommand            cloud.InitCommand
157         runnerSource                   string
158         imageID                        cloud.ImageID
159         instanceTypes                  map[string]arvados.InstanceType
160         syncInterval                   time.Duration
161         probeInterval                  time.Duration
162         maxProbesPerSecond             int
163         maxConcurrentInstanceCreateOps int
164         maxInstances                   int
165         timeoutIdle                    time.Duration
166         timeoutBooting                 time.Duration
167         timeoutProbe                   time.Duration
168         timeoutShutdown                time.Duration
169         timeoutTERM                    time.Duration
170         timeoutSignal                  time.Duration
171         timeoutStaleRunLock            time.Duration
172         systemRootToken                string
173         installPublicKey               ssh.PublicKey
174         tagKeyPrefix                   string
175         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
176         runnerArgs                     []string // extra args passed to crunch-run
177
178         // private state
179         subscribers                map[<-chan struct{}]chan<- struct{}
180         creating                   map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
181         workers                    map[cloud.InstanceID]*worker
182         loaded                     bool                 // loaded list of instances from InstanceSet at least once
183         exited                     map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
184         atQuotaUntilFewerInstances int
185         atQuotaUntil               time.Time
186         atQuotaErr                 cloud.QuotaError
187         atCapacityUntil            map[interface{}]time.Time
188         stop                       chan bool
189         mtx                        sync.RWMutex
190         setupOnce                  sync.Once
191         runnerData                 []byte
192         runnerMD5                  [md5.Size]byte
193         runnerCmd                  string
194
195         mContainersRunning        prometheus.Gauge
196         mInstances                *prometheus.GaugeVec
197         mInstancesPrice           *prometheus.GaugeVec
198         mVCPUs                    *prometheus.GaugeVec
199         mMemory                   *prometheus.GaugeVec
200         mBootOutcomes             *prometheus.CounterVec
201         mDisappearances           *prometheus.CounterVec
202         mTimeToSSH                prometheus.Summary
203         mTimeToReadyForContainer  prometheus.Summary
204         mTimeFromShutdownToGone   prometheus.Summary
205         mTimeFromQueueToCrunchRun prometheus.Summary
206         mRunProbeDuration         *prometheus.SummaryVec
207         mProbeAgeMax              prometheus.Gauge
208         mProbeAgeMedian           prometheus.Gauge
209 }
210
211 type createCall struct {
212         time         time.Time
213         instanceType arvados.InstanceType
214 }
215
216 func (wp *Pool) CheckHealth() error {
217         wp.setupOnce.Do(wp.setup)
218         if err := wp.loadRunnerData(); err != nil {
219                 return fmt.Errorf("error loading runner binary: %s", err)
220         }
221         return nil
222 }
223
224 // Subscribe returns a buffered channel that becomes ready after any
225 // change to the pool's state that could have scheduling implications:
226 // a worker's state changes, a new worker appears, the cloud
227 // provider's API rate limiting period ends, etc.
228 //
229 // Additional events that occur while the channel is already ready
230 // will be dropped, so it is OK if the caller services the channel
231 // slowly.
232 //
233 // Example:
234 //
235 //      ch := wp.Subscribe()
236 //      defer wp.Unsubscribe(ch)
237 //      for range ch {
238 //              tryScheduling(wp)
239 //              if done {
240 //                      break
241 //              }
242 //      }
243 func (wp *Pool) Subscribe() <-chan struct{} {
244         wp.setupOnce.Do(wp.setup)
245         wp.mtx.Lock()
246         defer wp.mtx.Unlock()
247         ch := make(chan struct{}, 1)
248         wp.subscribers[ch] = ch
249         return ch
250 }
251
252 // Unsubscribe stops sending updates to the given channel.
253 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
254         wp.setupOnce.Do(wp.setup)
255         wp.mtx.Lock()
256         defer wp.mtx.Unlock()
257         delete(wp.subscribers, ch)
258 }
259
260 // Unallocated returns the number of unallocated (creating + booting +
261 // idle + unknown) workers for each instance type.  Workers in
262 // hold/drain mode are not included.
263 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
264         wp.setupOnce.Do(wp.setup)
265         wp.mtx.RLock()
266         defer wp.mtx.RUnlock()
267         unalloc := map[arvados.InstanceType]int{}
268         creating := map[arvados.InstanceType]int{}
269         oldestCreate := map[arvados.InstanceType]time.Time{}
270         for _, cc := range wp.creating {
271                 it := cc.instanceType
272                 creating[it]++
273                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
274                         oldestCreate[it] = cc.time
275                 }
276         }
277         for _, wkr := range wp.workers {
278                 // Skip workers that are not expected to become
279                 // available soon. Note len(wkr.running)>0 is not
280                 // redundant here: it can be true even in
281                 // StateUnknown.
282                 if wkr.state == StateShutdown ||
283                         wkr.state == StateRunning ||
284                         wkr.idleBehavior != IdleBehaviorRun ||
285                         len(wkr.running) > 0 {
286                         continue
287                 }
288                 it := wkr.instType
289                 unalloc[it]++
290                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
291                         // If up to N new workers appear in
292                         // Instances() while we are waiting for N
293                         // Create() calls to complete, we assume we're
294                         // just seeing a race between Instances() and
295                         // Create() responses.
296                         //
297                         // The other common reason why nodes have
298                         // state==Unknown is that they appeared at
299                         // startup, before any Create calls. They
300                         // don't match the above timing condition, so
301                         // we never mistakenly attribute them to
302                         // pending Create calls.
303                         creating[it]--
304                 }
305         }
306         for it, c := range creating {
307                 unalloc[it] += c
308         }
309         return unalloc
310 }
311
312 // Create a new instance with the given type, and add it to the worker
313 // pool. The worker is added immediately; instance creation runs in
314 // the background.
315 //
316 // Create returns false if a pre-existing error or a configuration
317 // setting prevents it from even attempting to create a new
318 // instance. Those errors are logged by the Pool, so the caller does
319 // not need to log anything in such cases.
320 func (wp *Pool) Create(it arvados.InstanceType) bool {
321         logger := wp.logger.WithField("InstanceType", it.Name)
322         wp.setupOnce.Do(wp.setup)
323         if wp.loadRunnerData() != nil {
324                 // Boot probe is certain to fail.
325                 return false
326         }
327         if wp.AtCapacity(it) || wp.AtQuota() || wp.instanceSet.throttleCreate.Error() != nil {
328                 return false
329         }
330         wp.mtx.Lock()
331         defer wp.mtx.Unlock()
332         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
333         // requests in flight. It was added to work around a limitation in Azure's
334         // managed disks, which support no more than 20 concurrent node creation
335         // requests from a single disk image (cf.
336         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
337         // The code assumes that node creation, from Azure's perspective, means the
338         // period until the instance appears in the "get all instances" list.
339         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
340                 logger.Info("reached MaxConcurrentInstanceCreateOps")
341                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
342                 return false
343         }
344         now := time.Now()
345         secret := randomHex(instanceSecretLength)
346         wp.creating[secret] = createCall{time: now, instanceType: it}
347         go func() {
348                 defer wp.notify()
349                 tags := cloud.InstanceTags{
350                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
351                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
352                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
353                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
354                 }
355                 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
356                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
357                 wp.mtx.Lock()
358                 defer wp.mtx.Unlock()
359                 // delete() is deferred so the updateWorker() call
360                 // below knows to use StateBooting when adding a new
361                 // worker.
362                 defer delete(wp.creating, secret)
363                 if err != nil {
364                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
365                                 wp.atQuotaErr = err
366                                 n := len(wp.workers) + len(wp.creating) - 1
367                                 if n < 1 {
368                                         // Quota error with no
369                                         // instances running --
370                                         // nothing to do but wait
371                                         wp.atQuotaUntilFewerInstances = 0
372                                         wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
373                                         time.AfterFunc(quotaErrorTTL, wp.notify)
374                                         logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
375                                 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
376                                         // Quota error with N
377                                         // instances running -- report
378                                         // AtQuota until some
379                                         // instances shut down
380                                         wp.atQuotaUntilFewerInstances = n
381                                         wp.atQuotaUntil = time.Time{}
382                                         logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
383                                 }
384                         }
385                         if err, ok := err.(cloud.CapacityError); ok && err.IsCapacityError() {
386                                 var capKey interface{} = it.ProviderType
387                                 if err.IsInstanceTypeSpecific() {
388                                         capKey = it.ProviderType
389                                 } else if err.IsInstanceQuotaGroupSpecific() {
390                                         capKey = wp.instanceSet.InstanceQuotaGroup(it)
391                                 } else {
392                                         capKey = ""
393                                 }
394                                 if wp.atCapacityUntil == nil {
395                                         wp.atCapacityUntil = map[interface{}]time.Time{}
396                                 }
397                                 wp.atCapacityUntil[capKey] = time.Now().Add(capacityErrorTTL)
398                                 time.AfterFunc(capacityErrorTTL, wp.notify)
399                         }
400                         logger.WithError(err).Error("create failed")
401                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
402                         return
403                 }
404                 wp.updateWorker(inst, it)
405         }()
406         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
407                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
408         }
409         return true
410 }
411
412 // AtCapacity returns true if Create() is currently expected to fail
413 // for the given instance type.
414 func (wp *Pool) AtCapacity(it arvados.InstanceType) bool {
415         wp.mtx.Lock()
416         defer wp.mtx.Unlock()
417         for _, capKey := range []interface{}{
418                 "",                                    // all instance types
419                 wp.instanceSet.InstanceQuotaGroup(it), // instance quota group
420                 it.ProviderType,                       // just this instance type
421         } {
422                 if t, ok := wp.atCapacityUntil[capKey]; ok && time.Now().Before(t) {
423                         return true
424                 }
425         }
426         return false
427 }
428
429 // AtQuota returns true if Create is not expected to work at the
430 // moment (e.g., cloud provider has reported quota errors, or we are
431 // already at our own configured quota).
432 func (wp *Pool) AtQuota() bool {
433         wp.mtx.Lock()
434         defer wp.mtx.Unlock()
435         return wp.atQuotaUntilFewerInstances > 0 ||
436                 time.Now().Before(wp.atQuotaUntil) ||
437                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
438 }
439
440 // SetIdleBehavior determines how the indicated instance will behave
441 // when it has no containers running.
442 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
443         wp.mtx.Lock()
444         defer wp.mtx.Unlock()
445         wkr, ok := wp.workers[id]
446         if !ok {
447                 return errors.New("requested instance does not exist")
448         }
449         wkr.setIdleBehavior(idleBehavior)
450         return nil
451 }
452
453 // Successful connection to the SSH daemon, update the mTimeToSSH metric
454 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
455         wp.mtx.Lock()
456         defer wp.mtx.Unlock()
457         wkr, ok := wp.workers[inst.ID()]
458         if !ok {
459                 // race: inst was removed from the pool
460                 return
461         }
462         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
463                 // the node is not in booting state (can happen if
464                 // a-d-c is restarted) OR this is not the first SSH
465                 // connection
466                 return
467         }
468
469         wkr.firstSSHConnection = time.Now()
470         if wp.mTimeToSSH != nil {
471                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
472         }
473 }
474
475 // Add or update worker attached to the given instance.
476 //
477 // The second return value is true if a new worker is created.
478 //
479 // A newly added instance has state=StateBooting if its tags match an
480 // entry in wp.creating, otherwise StateUnknown.
481 //
482 // Caller must have lock.
483 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
484         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
485         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
486         id := inst.ID()
487         if wkr := wp.workers[id]; wkr != nil {
488                 wkr.executor.SetTarget(inst)
489                 wkr.instance = inst
490                 wkr.updated = time.Now()
491                 wkr.saveTags()
492                 return wkr, false
493         }
494
495         state := StateUnknown
496         if _, ok := wp.creating[secret]; ok {
497                 state = StateBooting
498         }
499
500         // If an instance has a valid IdleBehavior tag when it first
501         // appears, initialize the new worker accordingly (this is how
502         // we restore IdleBehavior that was set by a prior dispatch
503         // process); otherwise, default to "run". After this,
504         // wkr.idleBehavior is the source of truth, and will only be
505         // changed via SetIdleBehavior().
506         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
507         if !validIdleBehavior[idleBehavior] {
508                 idleBehavior = IdleBehaviorRun
509         }
510
511         logger := wp.logger.WithFields(logrus.Fields{
512                 "InstanceType": it.Name,
513                 "Instance":     inst.ID(),
514                 "Address":      inst.Address(),
515         })
516         logger.WithFields(logrus.Fields{
517                 "State":        state,
518                 "IdleBehavior": idleBehavior,
519         }).Infof("instance appeared in cloud")
520         now := time.Now()
521         wkr := &worker{
522                 mtx:          &wp.mtx,
523                 wp:           wp,
524                 logger:       logger,
525                 executor:     wp.newExecutor(inst),
526                 state:        state,
527                 idleBehavior: idleBehavior,
528                 instance:     inst,
529                 instType:     it,
530                 appeared:     now,
531                 probed:       now,
532                 busy:         now,
533                 updated:      now,
534                 running:      make(map[string]*remoteRunner),
535                 starting:     make(map[string]*remoteRunner),
536                 probing:      make(chan struct{}, 1),
537         }
538         wp.workers[id] = wkr
539         return wkr, true
540 }
541
542 // Shutdown shuts down a worker with the given type, or returns false
543 // if all workers with the given type are busy.
544 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
545         wp.setupOnce.Do(wp.setup)
546         wp.mtx.Lock()
547         defer wp.mtx.Unlock()
548         logger := wp.logger.WithField("InstanceType", it.Name)
549         logger.Info("shutdown requested")
550         for _, tryState := range []State{StateBooting, StateIdle} {
551                 // TODO: shutdown the worker with the longest idle
552                 // time (Idle) or the earliest create time (Booting)
553                 for _, wkr := range wp.workers {
554                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
555                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
556                                 wkr.reportBootOutcome(BootOutcomeAborted)
557                                 wkr.shutdown()
558                                 return true
559                         }
560                 }
561         }
562         return false
563 }
564
565 // CountWorkers returns the current number of workers in each state.
566 //
567 // CountWorkers blocks, if necessary, until the initial instance list
568 // has been loaded from the cloud provider.
569 func (wp *Pool) CountWorkers() map[State]int {
570         wp.setupOnce.Do(wp.setup)
571         wp.waitUntilLoaded()
572         wp.mtx.Lock()
573         defer wp.mtx.Unlock()
574         r := map[State]int{}
575         for _, w := range wp.workers {
576                 r[w.state]++
577         }
578         return r
579 }
580
581 // Running returns the container UUIDs being prepared/run on workers.
582 //
583 // In the returned map, the time value indicates when the Pool
584 // observed that the container process had exited. A container that
585 // has not yet exited has a zero time value. The caller should use
586 // ForgetContainer() to garbage-collect the entries for exited
587 // containers.
588 func (wp *Pool) Running() map[string]time.Time {
589         wp.setupOnce.Do(wp.setup)
590         wp.mtx.Lock()
591         defer wp.mtx.Unlock()
592         r := map[string]time.Time{}
593         for _, wkr := range wp.workers {
594                 for uuid := range wkr.running {
595                         r[uuid] = time.Time{}
596                 }
597                 for uuid := range wkr.starting {
598                         r[uuid] = time.Time{}
599                 }
600         }
601         for uuid, exited := range wp.exited {
602                 r[uuid] = exited
603         }
604         return r
605 }
606
607 // StartContainer starts a container on an idle worker immediately if
608 // possible, otherwise returns false.
609 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
610         wp.setupOnce.Do(wp.setup)
611         wp.mtx.Lock()
612         defer wp.mtx.Unlock()
613         var wkr *worker
614         for _, w := range wp.workers {
615                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
616                         if wkr == nil || w.busy.After(wkr.busy) {
617                                 wkr = w
618                         }
619                 }
620         }
621         if wkr == nil {
622                 return false
623         }
624         wkr.startContainer(ctr)
625         return true
626 }
627
628 // KillContainer kills the crunch-run process for the given container
629 // UUID, if it's running on any worker.
630 //
631 // KillContainer returns immediately; the act of killing the container
632 // takes some time, and runs in the background.
633 //
634 // KillContainer returns false if the container has already ended.
635 func (wp *Pool) KillContainer(uuid string, reason string) bool {
636         wp.mtx.Lock()
637         defer wp.mtx.Unlock()
638         logger := wp.logger.WithFields(logrus.Fields{
639                 "ContainerUUID": uuid,
640                 "Reason":        reason,
641         })
642         for _, wkr := range wp.workers {
643                 rr := wkr.running[uuid]
644                 if rr == nil {
645                         rr = wkr.starting[uuid]
646                 }
647                 if rr != nil {
648                         rr.Kill(reason)
649                         return true
650                 }
651         }
652         logger.Debug("cannot kill: already disappeared")
653         return false
654 }
655
656 // ForgetContainer clears the placeholder for the given exited
657 // container, so it isn't returned by subsequent calls to Running().
658 //
659 // ForgetContainer has no effect if the container has not yet exited.
660 //
661 // The "container exited at time T" placeholder (which necessitates
662 // ForgetContainer) exists to make it easier for the caller
663 // (scheduler) to distinguish a container that exited without
664 // finalizing its state from a container that exited too recently for
665 // its final state to have appeared in the scheduler's queue cache.
666 func (wp *Pool) ForgetContainer(uuid string) {
667         wp.mtx.Lock()
668         defer wp.mtx.Unlock()
669         if _, ok := wp.exited[uuid]; ok {
670                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
671                 delete(wp.exited, uuid)
672         }
673 }
674
675 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
676         if reg == nil {
677                 reg = prometheus.NewRegistry()
678         }
679         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
680                 Namespace: "arvados",
681                 Subsystem: "dispatchcloud",
682                 Name:      "containers_running",
683                 Help:      "Number of containers reported running by cloud VMs.",
684         })
685         reg.MustRegister(wp.mContainersRunning)
686         wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
687                 Namespace: "arvados",
688                 Subsystem: "dispatchcloud",
689                 Name:      "probe_age_seconds_max",
690                 Help:      "Maximum number of seconds since an instance's most recent successful probe.",
691         })
692         reg.MustRegister(wp.mProbeAgeMax)
693         wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
694                 Namespace: "arvados",
695                 Subsystem: "dispatchcloud",
696                 Name:      "probe_age_seconds_median",
697                 Help:      "Median number of seconds since an instance's most recent successful probe.",
698         })
699         reg.MustRegister(wp.mProbeAgeMedian)
700         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
701                 Namespace: "arvados",
702                 Subsystem: "dispatchcloud",
703                 Name:      "instances_total",
704                 Help:      "Number of cloud VMs.",
705         }, []string{"category", "instance_type"})
706         reg.MustRegister(wp.mInstances)
707         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
708                 Namespace: "arvados",
709                 Subsystem: "dispatchcloud",
710                 Name:      "instances_price",
711                 Help:      "Price of cloud VMs.",
712         }, []string{"category"})
713         reg.MustRegister(wp.mInstancesPrice)
714         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
715                 Namespace: "arvados",
716                 Subsystem: "dispatchcloud",
717                 Name:      "vcpus_total",
718                 Help:      "Total VCPUs on all cloud VMs.",
719         }, []string{"category"})
720         reg.MustRegister(wp.mVCPUs)
721         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
722                 Namespace: "arvados",
723                 Subsystem: "dispatchcloud",
724                 Name:      "memory_bytes_total",
725                 Help:      "Total memory on all cloud VMs.",
726         }, []string{"category"})
727         reg.MustRegister(wp.mMemory)
728         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
729                 Namespace: "arvados",
730                 Subsystem: "dispatchcloud",
731                 Name:      "boot_outcomes",
732                 Help:      "Boot outcomes by type.",
733         }, []string{"outcome"})
734         for k := range validBootOutcomes {
735                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
736         }
737         reg.MustRegister(wp.mBootOutcomes)
738         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
739                 Namespace: "arvados",
740                 Subsystem: "dispatchcloud",
741                 Name:      "instances_disappeared",
742                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
743         }, []string{"state"})
744         for _, v := range stateString {
745                 wp.mDisappearances.WithLabelValues(v).Add(0)
746         }
747         reg.MustRegister(wp.mDisappearances)
748         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
749                 Namespace:  "arvados",
750                 Subsystem:  "dispatchcloud",
751                 Name:       "instances_time_to_ssh_seconds",
752                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
753                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
754         })
755         reg.MustRegister(wp.mTimeToSSH)
756         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
757                 Namespace:  "arvados",
758                 Subsystem:  "dispatchcloud",
759                 Name:       "instances_time_to_ready_for_container_seconds",
760                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
761                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
762         })
763         reg.MustRegister(wp.mTimeToReadyForContainer)
764         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
765                 Namespace:  "arvados",
766                 Subsystem:  "dispatchcloud",
767                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
768                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
769                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
770         })
771         reg.MustRegister(wp.mTimeFromShutdownToGone)
772         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
773                 Namespace:  "arvados",
774                 Subsystem:  "dispatchcloud",
775                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
776                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
777                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
778         })
779         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
780         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
781                 Namespace:  "arvados",
782                 Subsystem:  "dispatchcloud",
783                 Name:       "instances_run_probe_duration_seconds",
784                 Help:       "Number of seconds per runProbe call.",
785                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
786         }, []string{"outcome"})
787         reg.MustRegister(wp.mRunProbeDuration)
788 }
789
790 func (wp *Pool) runMetrics() {
791         ch := wp.Subscribe()
792         defer wp.Unsubscribe(ch)
793         wp.updateMetrics()
794         for range ch {
795                 wp.updateMetrics()
796         }
797 }
798
799 func (wp *Pool) updateMetrics() {
800         wp.mtx.RLock()
801         defer wp.mtx.RUnlock()
802
803         type entKey struct {
804                 cat      string
805                 instType string
806         }
807         instances := map[entKey]int64{}
808         price := map[string]float64{}
809         cpu := map[string]int64{}
810         mem := map[string]int64{}
811         var running int64
812         now := time.Now()
813         var probed []time.Time
814         for _, wkr := range wp.workers {
815                 var cat string
816                 switch {
817                 case len(wkr.running)+len(wkr.starting) > 0:
818                         cat = "inuse"
819                 case wkr.idleBehavior == IdleBehaviorHold:
820                         cat = "hold"
821                 case wkr.state == StateBooting:
822                         cat = "booting"
823                 case wkr.state == StateUnknown:
824                         cat = "unknown"
825                 default:
826                         cat = "idle"
827                 }
828                 instances[entKey{cat, wkr.instType.Name}]++
829                 price[cat] += wkr.instType.Price
830                 cpu[cat] += int64(wkr.instType.VCPUs)
831                 mem[cat] += int64(wkr.instType.RAM)
832                 running += int64(len(wkr.running) + len(wkr.starting))
833                 probed = append(probed, wkr.probed)
834         }
835         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
836                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
837                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
838                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
839                 // make sure to reset gauges for non-existing category/nodetype combinations
840                 for _, it := range wp.instanceTypes {
841                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
842                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
843                         }
844                 }
845         }
846         for k, v := range instances {
847                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
848         }
849         wp.mContainersRunning.Set(float64(running))
850
851         if len(probed) == 0 {
852                 wp.mProbeAgeMax.Set(0)
853                 wp.mProbeAgeMedian.Set(0)
854         } else {
855                 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
856                 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
857                 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
858         }
859 }
860
861 func (wp *Pool) runProbes() {
862         maxPPS := wp.maxProbesPerSecond
863         if maxPPS < 1 {
864                 maxPPS = defaultMaxProbesPerSecond
865         }
866         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
867         defer limitticker.Stop()
868
869         probeticker := time.NewTicker(wp.probeInterval)
870         defer probeticker.Stop()
871
872         workers := []cloud.InstanceID{}
873         for range probeticker.C {
874                 // Add some jitter. Without this, if probeInterval is
875                 // a multiple of syncInterval and sync is
876                 // instantaneous (as with the loopback driver), the
877                 // first few probes race with sync operations and
878                 // don't update the workers.
879                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
880
881                 workers = workers[:0]
882                 wp.mtx.Lock()
883                 for id, wkr := range wp.workers {
884                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
885                                 continue
886                         }
887                         workers = append(workers, id)
888                 }
889                 wp.mtx.Unlock()
890
891                 for _, id := range workers {
892                         wp.mtx.Lock()
893                         wkr, ok := wp.workers[id]
894                         wp.mtx.Unlock()
895                         if !ok {
896                                 // Deleted while we were probing
897                                 // others
898                                 continue
899                         }
900                         go wkr.ProbeAndUpdate()
901                         select {
902                         case <-wp.stop:
903                                 return
904                         case <-limitticker.C:
905                         }
906                 }
907         }
908 }
909
910 func (wp *Pool) runSync() {
911         // sync once immediately, then wait syncInterval, sync again,
912         // etc.
913         timer := time.NewTimer(1)
914         for {
915                 select {
916                 case <-timer.C:
917                         err := wp.getInstancesAndSync()
918                         if err != nil {
919                                 wp.logger.WithError(err).Warn("sync failed")
920                         }
921                         timer.Reset(wp.syncInterval)
922                 case <-wp.stop:
923                         wp.logger.Debug("worker.Pool stopped")
924                         return
925                 }
926         }
927 }
928
929 // Stop synchronizing with the InstanceSet.
930 func (wp *Pool) Stop() {
931         wp.setupOnce.Do(wp.setup)
932         close(wp.stop)
933 }
934
935 // Instances returns an InstanceView for each worker in the pool,
936 // summarizing its current state and recent activity.
937 func (wp *Pool) Instances() []InstanceView {
938         var r []InstanceView
939         wp.setupOnce.Do(wp.setup)
940         wp.mtx.Lock()
941         for _, w := range wp.workers {
942                 r = append(r, InstanceView{
943                         Instance:             w.instance.ID(),
944                         Address:              w.instance.Address(),
945                         Price:                w.instType.Price,
946                         ArvadosInstanceType:  w.instType.Name,
947                         ProviderInstanceType: w.instType.ProviderType,
948                         LastContainerUUID:    w.lastUUID,
949                         LastBusy:             w.busy,
950                         WorkerState:          w.state.String(),
951                         IdleBehavior:         w.idleBehavior,
952                 })
953         }
954         wp.mtx.Unlock()
955         sort.Slice(r, func(i, j int) bool {
956                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
957         })
958         return r
959 }
960
961 // KillInstance destroys a cloud VM instance. It returns an error if
962 // the given instance does not exist.
963 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
964         wp.setupOnce.Do(wp.setup)
965         wp.mtx.Lock()
966         defer wp.mtx.Unlock()
967         wkr, ok := wp.workers[id]
968         if !ok {
969                 return errors.New("instance not found")
970         }
971         wkr.logger.WithField("Reason", reason).Info("shutting down")
972         wkr.reportBootOutcome(BootOutcomeAborted)
973         wkr.shutdown()
974         return nil
975 }
976
977 func (wp *Pool) setup() {
978         wp.creating = map[string]createCall{}
979         wp.exited = map[string]time.Time{}
980         wp.workers = map[cloud.InstanceID]*worker{}
981         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
982         wp.loadRunnerData()
983 }
984
985 // Load the runner program to be deployed on worker nodes into
986 // wp.runnerData, if necessary. Errors are logged.
987 //
988 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
989 //
990 // Caller must not have lock.
991 func (wp *Pool) loadRunnerData() error {
992         wp.mtx.Lock()
993         defer wp.mtx.Unlock()
994         if wp.runnerData != nil {
995                 return nil
996         } else if wp.runnerSource == "" {
997                 wp.runnerCmd = wp.runnerCmdDefault
998                 wp.runnerData = []byte{}
999                 return nil
1000         }
1001         logger := wp.logger.WithField("source", wp.runnerSource)
1002         logger.Debug("loading runner")
1003         buf, err := ioutil.ReadFile(wp.runnerSource)
1004         if err != nil {
1005                 logger.WithError(err).Error("failed to load runner program")
1006                 return err
1007         }
1008         wp.runnerData = buf
1009         wp.runnerMD5 = md5.Sum(buf)
1010         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
1011         return nil
1012 }
1013
1014 func (wp *Pool) notify() {
1015         wp.mtx.RLock()
1016         defer wp.mtx.RUnlock()
1017         for _, send := range wp.subscribers {
1018                 select {
1019                 case send <- struct{}{}:
1020                 default:
1021                 }
1022         }
1023 }
1024
1025 func (wp *Pool) getInstancesAndSync() error {
1026         wp.setupOnce.Do(wp.setup)
1027         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
1028                 return err
1029         }
1030         wp.logger.Debug("getting instance list")
1031         threshold := time.Now()
1032         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1033         if err != nil {
1034                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1035                 return err
1036         }
1037         wp.sync(threshold, instances)
1038         wp.logger.Debug("sync done")
1039         return nil
1040 }
1041
1042 // Add/remove/update workers based on instances, which was obtained
1043 // from the instanceSet. However, don't clobber any other updates that
1044 // already happened after threshold.
1045 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1046         wp.mtx.Lock()
1047         defer wp.mtx.Unlock()
1048         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1049         notify := false
1050
1051         for _, inst := range instances {
1052                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1053                 it, ok := wp.instanceTypes[itTag]
1054                 if !ok {
1055                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1056                         continue
1057                 }
1058                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1059                         notify = true
1060                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1061                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1062                         wkr.shutdown()
1063                 }
1064         }
1065
1066         for id, wkr := range wp.workers {
1067                 if wkr.updated.After(threshold) {
1068                         continue
1069                 }
1070                 logger := wp.logger.WithFields(logrus.Fields{
1071                         "Instance":    wkr.instance.ID(),
1072                         "WorkerState": wkr.state,
1073                 })
1074                 logger.Info("instance disappeared in cloud")
1075                 wkr.reportBootOutcome(BootOutcomeDisappeared)
1076                 if wp.mDisappearances != nil {
1077                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1078                 }
1079                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1080                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1081                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1082                 }
1083                 delete(wp.workers, id)
1084                 go wkr.Close()
1085                 notify = true
1086         }
1087
1088         if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1089                 // After syncing, there are fewer instances (including
1090                 // pending creates) than there were last time we saw a
1091                 // quota error.  This might mean it's now possible to
1092                 // create new instances.  Reset our "at quota" state.
1093                 wp.atQuotaUntilFewerInstances = 0
1094         }
1095
1096         if !wp.loaded {
1097                 notify = true
1098                 wp.loaded = true
1099                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1100         }
1101
1102         if notify {
1103                 go wp.notify()
1104         }
1105 }
1106
1107 func (wp *Pool) waitUntilLoaded() {
1108         ch := wp.Subscribe()
1109         wp.mtx.RLock()
1110         defer wp.mtx.RUnlock()
1111         for !wp.loaded {
1112                 wp.mtx.RUnlock()
1113                 <-ch
1114                 wp.mtx.RLock()
1115         }
1116 }
1117
1118 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1119         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1120         fmt.Fprint(h, uuid)
1121         return fmt.Sprintf("%x", h.Sum(nil))
1122 }
1123
1124 // Return a random string of n hexadecimal digits (n*4 random bits). n
1125 // must be even.
1126 func randomHex(n int) string {
1127         buf := make([]byte, n/2)
1128         _, err := rand.Read(buf)
1129         if err != nil {
1130                 panic(err)
1131         }
1132         return fmt.Sprintf("%x", buf)
1133 }