Merge branch '18874-merge-wb2'
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         mathrand "math/rand"
17         "sort"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cloud"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/prometheus/client_golang/prometheus"
25         "github.com/sirupsen/logrus"
26         "golang.org/x/crypto/ssh"
27 )
28
29 const (
30         tagKeyInstanceType   = "InstanceType"
31         tagKeyIdleBehavior   = "IdleBehavior"
32         tagKeyInstanceSecret = "InstanceSecret"
33         tagKeyInstanceSetID  = "InstanceSetID"
34 )
35
36 // An InstanceView shows a worker's current state and recent activity.
37 type InstanceView struct {
38         Instance             cloud.InstanceID `json:"instance"`
39         Address              string           `json:"address"`
40         Price                float64          `json:"price"`
41         ArvadosInstanceType  string           `json:"arvados_instance_type"`
42         ProviderInstanceType string           `json:"provider_instance_type"`
43         LastContainerUUID    string           `json:"last_container_uuid"`
44         LastBusy             time.Time        `json:"last_busy"`
45         WorkerState          string           `json:"worker_state"`
46         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
47 }
48
49 // An Executor executes shell commands on a remote host.
50 type Executor interface {
51         // Run cmd on the current target.
52         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
53
54         // Use the given target for subsequent operations. The new
55         // target is the same host as the previous target, but it
56         // might return a different address and verify a different
57         // host key.
58         //
59         // SetTarget is called frequently, and in most cases the new
60         // target will behave exactly the same as the old one. An
61         // implementation should optimize accordingly.
62         //
63         // SetTarget must not block on concurrent Execute calls.
64         SetTarget(cloud.ExecutorTarget)
65
66         Close()
67 }
68
69 const (
70         defaultSyncInterval        = time.Minute
71         defaultProbeInterval       = time.Second * 10
72         defaultMaxProbesPerSecond  = 10
73         defaultTimeoutIdle         = time.Minute
74         defaultTimeoutBooting      = time.Minute * 10
75         defaultTimeoutProbe        = time.Minute * 10
76         defaultTimeoutShutdown     = time.Second * 10
77         defaultTimeoutTERM         = time.Minute * 2
78         defaultTimeoutSignal       = time.Second * 5
79         defaultTimeoutStaleRunLock = time.Second * 5
80
81         // Time after a quota error to try again anyway, even if no
82         // instances have been shutdown.
83         quotaErrorTTL = time.Minute
84
85         // Time between "X failed because rate limiting" messages
86         logRateLimitErrorInterval = time.Second * 10
87 )
88
89 func duration(conf arvados.Duration, def time.Duration) time.Duration {
90         if conf > 0 {
91                 return time.Duration(conf)
92         }
93         return def
94 }
95
96 // NewPool creates a Pool of workers backed by instanceSet.
97 //
98 // New instances are configured and set up according to the given
99 // cluster configuration.
100 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
101         wp := &Pool{
102                 logger:                         logger,
103                 arvClient:                      arvClient,
104                 instanceSetID:                  instanceSetID,
105                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
106                 newExecutor:                    newExecutor,
107                 cluster:                        cluster,
108                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
109                 instanceInitCommand:            cloud.InitCommand(cluster.Containers.CloudVMs.InstanceInitCommand),
110                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
111                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
112                 instanceTypes:                  cluster.InstanceTypes,
113                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
114                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
115                 maxInstances:                   cluster.Containers.CloudVMs.MaxInstances,
116                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
117                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
118                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
119                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
120                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
121                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
122                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
123                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
124                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
125                 systemRootToken:                cluster.SystemRootToken,
126                 installPublicKey:               installPublicKey,
127                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
128                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
129                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
130                 stop:                           make(chan bool),
131         }
132         wp.registerMetrics(reg)
133         go func() {
134                 wp.setupOnce.Do(wp.setup)
135                 go wp.runMetrics()
136                 go wp.runProbes()
137                 go wp.runSync()
138         }()
139         return wp
140 }
141
142 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
143 // zero Pool should not be used. Call NewPool to create a new Pool.
144 type Pool struct {
145         // configuration
146         logger                         logrus.FieldLogger
147         arvClient                      *arvados.Client
148         instanceSetID                  cloud.InstanceSetID
149         instanceSet                    *throttledInstanceSet
150         newExecutor                    func(cloud.Instance) Executor
151         cluster                        *arvados.Cluster
152         bootProbeCommand               string
153         instanceInitCommand            cloud.InitCommand
154         runnerSource                   string
155         imageID                        cloud.ImageID
156         instanceTypes                  map[string]arvados.InstanceType
157         syncInterval                   time.Duration
158         probeInterval                  time.Duration
159         maxProbesPerSecond             int
160         maxConcurrentInstanceCreateOps int
161         maxInstances                   int
162         timeoutIdle                    time.Duration
163         timeoutBooting                 time.Duration
164         timeoutProbe                   time.Duration
165         timeoutShutdown                time.Duration
166         timeoutTERM                    time.Duration
167         timeoutSignal                  time.Duration
168         timeoutStaleRunLock            time.Duration
169         systemRootToken                string
170         installPublicKey               ssh.PublicKey
171         tagKeyPrefix                   string
172         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
173         runnerArgs                     []string // extra args passed to crunch-run
174
175         // private state
176         subscribers                map[<-chan struct{}]chan<- struct{}
177         creating                   map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
178         workers                    map[cloud.InstanceID]*worker
179         loaded                     bool                 // loaded list of instances from InstanceSet at least once
180         exited                     map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
181         atQuotaUntilFewerInstances int
182         atQuotaUntil               time.Time
183         atQuotaErr                 cloud.QuotaError
184         stop                       chan bool
185         mtx                        sync.RWMutex
186         setupOnce                  sync.Once
187         runnerData                 []byte
188         runnerMD5                  [md5.Size]byte
189         runnerCmd                  string
190
191         mContainersRunning        prometheus.Gauge
192         mInstances                *prometheus.GaugeVec
193         mInstancesPrice           *prometheus.GaugeVec
194         mVCPUs                    *prometheus.GaugeVec
195         mMemory                   *prometheus.GaugeVec
196         mBootOutcomes             *prometheus.CounterVec
197         mDisappearances           *prometheus.CounterVec
198         mTimeToSSH                prometheus.Summary
199         mTimeToReadyForContainer  prometheus.Summary
200         mTimeFromShutdownToGone   prometheus.Summary
201         mTimeFromQueueToCrunchRun prometheus.Summary
202         mRunProbeDuration         *prometheus.SummaryVec
203         mProbeAgeMax              prometheus.Gauge
204         mProbeAgeMedian           prometheus.Gauge
205 }
206
207 type createCall struct {
208         time         time.Time
209         instanceType arvados.InstanceType
210 }
211
212 func (wp *Pool) CheckHealth() error {
213         wp.setupOnce.Do(wp.setup)
214         if err := wp.loadRunnerData(); err != nil {
215                 return fmt.Errorf("error loading runner binary: %s", err)
216         }
217         return nil
218 }
219
220 // Subscribe returns a buffered channel that becomes ready after any
221 // change to the pool's state that could have scheduling implications:
222 // a worker's state changes, a new worker appears, the cloud
223 // provider's API rate limiting period ends, etc.
224 //
225 // Additional events that occur while the channel is already ready
226 // will be dropped, so it is OK if the caller services the channel
227 // slowly.
228 //
229 // Example:
230 //
231 //      ch := wp.Subscribe()
232 //      defer wp.Unsubscribe(ch)
233 //      for range ch {
234 //              tryScheduling(wp)
235 //              if done {
236 //                      break
237 //              }
238 //      }
239 func (wp *Pool) Subscribe() <-chan struct{} {
240         wp.setupOnce.Do(wp.setup)
241         wp.mtx.Lock()
242         defer wp.mtx.Unlock()
243         ch := make(chan struct{}, 1)
244         wp.subscribers[ch] = ch
245         return ch
246 }
247
248 // Unsubscribe stops sending updates to the given channel.
249 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
250         wp.setupOnce.Do(wp.setup)
251         wp.mtx.Lock()
252         defer wp.mtx.Unlock()
253         delete(wp.subscribers, ch)
254 }
255
256 // Unallocated returns the number of unallocated (creating + booting +
257 // idle + unknown) workers for each instance type.  Workers in
258 // hold/drain mode are not included.
259 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
260         wp.setupOnce.Do(wp.setup)
261         wp.mtx.RLock()
262         defer wp.mtx.RUnlock()
263         unalloc := map[arvados.InstanceType]int{}
264         creating := map[arvados.InstanceType]int{}
265         oldestCreate := map[arvados.InstanceType]time.Time{}
266         for _, cc := range wp.creating {
267                 it := cc.instanceType
268                 creating[it]++
269                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
270                         oldestCreate[it] = cc.time
271                 }
272         }
273         for _, wkr := range wp.workers {
274                 // Skip workers that are not expected to become
275                 // available soon. Note len(wkr.running)>0 is not
276                 // redundant here: it can be true even in
277                 // StateUnknown.
278                 if wkr.state == StateShutdown ||
279                         wkr.state == StateRunning ||
280                         wkr.idleBehavior != IdleBehaviorRun ||
281                         len(wkr.running) > 0 {
282                         continue
283                 }
284                 it := wkr.instType
285                 unalloc[it]++
286                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
287                         // If up to N new workers appear in
288                         // Instances() while we are waiting for N
289                         // Create() calls to complete, we assume we're
290                         // just seeing a race between Instances() and
291                         // Create() responses.
292                         //
293                         // The other common reason why nodes have
294                         // state==Unknown is that they appeared at
295                         // startup, before any Create calls. They
296                         // don't match the above timing condition, so
297                         // we never mistakenly attribute them to
298                         // pending Create calls.
299                         creating[it]--
300                 }
301         }
302         for it, c := range creating {
303                 unalloc[it] += c
304         }
305         return unalloc
306 }
307
308 // Create a new instance with the given type, and add it to the worker
309 // pool. The worker is added immediately; instance creation runs in
310 // the background.
311 //
312 // Create returns false if a pre-existing error or a configuration
313 // setting prevents it from even attempting to create a new
314 // instance. Those errors are logged by the Pool, so the caller does
315 // not need to log anything in such cases.
316 func (wp *Pool) Create(it arvados.InstanceType) bool {
317         logger := wp.logger.WithField("InstanceType", it.Name)
318         wp.setupOnce.Do(wp.setup)
319         if wp.loadRunnerData() != nil {
320                 // Boot probe is certain to fail.
321                 return false
322         }
323         wp.mtx.Lock()
324         defer wp.mtx.Unlock()
325         if time.Now().Before(wp.atQuotaUntil) ||
326                 wp.atQuotaUntilFewerInstances > 0 ||
327                 wp.instanceSet.throttleCreate.Error() != nil ||
328                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating)) {
329                 return false
330         }
331         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
332         // requests in flight. It was added to work around a limitation in Azure's
333         // managed disks, which support no more than 20 concurrent node creation
334         // requests from a single disk image (cf.
335         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
336         // The code assumes that node creation, from Azure's perspective, means the
337         // period until the instance appears in the "get all instances" list.
338         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
339                 logger.Info("reached MaxConcurrentInstanceCreateOps")
340                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
341                 return false
342         }
343         now := time.Now()
344         secret := randomHex(instanceSecretLength)
345         wp.creating[secret] = createCall{time: now, instanceType: it}
346         go func() {
347                 defer wp.notify()
348                 tags := cloud.InstanceTags{
349                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
350                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
351                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
352                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
353                 }
354                 initCmd := TagVerifier{nil, secret, nil}.InitCommand() + "\n" + wp.instanceInitCommand
355                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
356                 wp.mtx.Lock()
357                 defer wp.mtx.Unlock()
358                 // delete() is deferred so the updateWorker() call
359                 // below knows to use StateBooting when adding a new
360                 // worker.
361                 defer delete(wp.creating, secret)
362                 if err != nil {
363                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
364                                 wp.atQuotaErr = err
365                                 n := len(wp.workers) + len(wp.creating) - 1
366                                 if n < 1 {
367                                         // Quota error with no
368                                         // instances running --
369                                         // nothing to do but wait
370                                         wp.atQuotaUntilFewerInstances = 0
371                                         wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
372                                         time.AfterFunc(quotaErrorTTL, wp.notify)
373                                         logger.WithField("atQuotaUntil", wp.atQuotaUntil).Info("quota error with 0 running -- waiting for quotaErrorTTL")
374                                 } else if n < wp.atQuotaUntilFewerInstances || wp.atQuotaUntilFewerInstances == 0 {
375                                         // Quota error with N
376                                         // instances running -- report
377                                         // AtQuota until some
378                                         // instances shut down
379                                         wp.atQuotaUntilFewerInstances = n
380                                         wp.atQuotaUntil = time.Time{}
381                                         logger.WithField("atQuotaUntilFewerInstances", n).Info("quota error -- waiting for next instance shutdown")
382                                 }
383                         }
384                         logger.WithError(err).Error("create failed")
385                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
386                         return
387                 }
388                 wp.updateWorker(inst, it)
389         }()
390         if len(wp.creating)+len(wp.workers) == wp.maxInstances {
391                 logger.Infof("now at MaxInstances limit of %d instances", wp.maxInstances)
392         }
393         return true
394 }
395
396 // AtQuota returns true if Create is not expected to work at the
397 // moment (e.g., cloud provider has reported quota errors, or we are
398 // already at our own configured quota).
399 func (wp *Pool) AtQuota() bool {
400         wp.mtx.Lock()
401         defer wp.mtx.Unlock()
402         return wp.atQuotaUntilFewerInstances > 0 ||
403                 time.Now().Before(wp.atQuotaUntil) ||
404                 (wp.maxInstances > 0 && wp.maxInstances <= len(wp.workers)+len(wp.creating))
405 }
406
407 // SetIdleBehavior determines how the indicated instance will behave
408 // when it has no containers running.
409 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
410         wp.mtx.Lock()
411         defer wp.mtx.Unlock()
412         wkr, ok := wp.workers[id]
413         if !ok {
414                 return errors.New("requested instance does not exist")
415         }
416         wkr.setIdleBehavior(idleBehavior)
417         return nil
418 }
419
420 // Successful connection to the SSH daemon, update the mTimeToSSH metric
421 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
422         wp.mtx.Lock()
423         defer wp.mtx.Unlock()
424         wkr, ok := wp.workers[inst.ID()]
425         if !ok {
426                 // race: inst was removed from the pool
427                 return
428         }
429         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
430                 // the node is not in booting state (can happen if
431                 // a-d-c is restarted) OR this is not the first SSH
432                 // connection
433                 return
434         }
435
436         wkr.firstSSHConnection = time.Now()
437         if wp.mTimeToSSH != nil {
438                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
439         }
440 }
441
442 // Add or update worker attached to the given instance.
443 //
444 // The second return value is true if a new worker is created.
445 //
446 // A newly added instance has state=StateBooting if its tags match an
447 // entry in wp.creating, otherwise StateUnknown.
448 //
449 // Caller must have lock.
450 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
451         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
452         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
453         id := inst.ID()
454         if wkr := wp.workers[id]; wkr != nil {
455                 wkr.executor.SetTarget(inst)
456                 wkr.instance = inst
457                 wkr.updated = time.Now()
458                 wkr.saveTags()
459                 return wkr, false
460         }
461
462         state := StateUnknown
463         if _, ok := wp.creating[secret]; ok {
464                 state = StateBooting
465         }
466
467         // If an instance has a valid IdleBehavior tag when it first
468         // appears, initialize the new worker accordingly (this is how
469         // we restore IdleBehavior that was set by a prior dispatch
470         // process); otherwise, default to "run". After this,
471         // wkr.idleBehavior is the source of truth, and will only be
472         // changed via SetIdleBehavior().
473         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
474         if !validIdleBehavior[idleBehavior] {
475                 idleBehavior = IdleBehaviorRun
476         }
477
478         logger := wp.logger.WithFields(logrus.Fields{
479                 "InstanceType": it.Name,
480                 "Instance":     inst.ID(),
481                 "Address":      inst.Address(),
482         })
483         logger.WithFields(logrus.Fields{
484                 "State":        state,
485                 "IdleBehavior": idleBehavior,
486         }).Infof("instance appeared in cloud")
487         now := time.Now()
488         wkr := &worker{
489                 mtx:          &wp.mtx,
490                 wp:           wp,
491                 logger:       logger,
492                 executor:     wp.newExecutor(inst),
493                 state:        state,
494                 idleBehavior: idleBehavior,
495                 instance:     inst,
496                 instType:     it,
497                 appeared:     now,
498                 probed:       now,
499                 busy:         now,
500                 updated:      now,
501                 running:      make(map[string]*remoteRunner),
502                 starting:     make(map[string]*remoteRunner),
503                 probing:      make(chan struct{}, 1),
504         }
505         wp.workers[id] = wkr
506         return wkr, true
507 }
508
509 // Shutdown shuts down a worker with the given type, or returns false
510 // if all workers with the given type are busy.
511 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
512         wp.setupOnce.Do(wp.setup)
513         wp.mtx.Lock()
514         defer wp.mtx.Unlock()
515         logger := wp.logger.WithField("InstanceType", it.Name)
516         logger.Info("shutdown requested")
517         for _, tryState := range []State{StateBooting, StateIdle} {
518                 // TODO: shutdown the worker with the longest idle
519                 // time (Idle) or the earliest create time (Booting)
520                 for _, wkr := range wp.workers {
521                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
522                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
523                                 wkr.reportBootOutcome(BootOutcomeAborted)
524                                 wkr.shutdown()
525                                 return true
526                         }
527                 }
528         }
529         return false
530 }
531
532 // CountWorkers returns the current number of workers in each state.
533 //
534 // CountWorkers blocks, if necessary, until the initial instance list
535 // has been loaded from the cloud provider.
536 func (wp *Pool) CountWorkers() map[State]int {
537         wp.setupOnce.Do(wp.setup)
538         wp.waitUntilLoaded()
539         wp.mtx.Lock()
540         defer wp.mtx.Unlock()
541         r := map[State]int{}
542         for _, w := range wp.workers {
543                 r[w.state]++
544         }
545         return r
546 }
547
548 // Running returns the container UUIDs being prepared/run on workers.
549 //
550 // In the returned map, the time value indicates when the Pool
551 // observed that the container process had exited. A container that
552 // has not yet exited has a zero time value. The caller should use
553 // ForgetContainer() to garbage-collect the entries for exited
554 // containers.
555 func (wp *Pool) Running() map[string]time.Time {
556         wp.setupOnce.Do(wp.setup)
557         wp.mtx.Lock()
558         defer wp.mtx.Unlock()
559         r := map[string]time.Time{}
560         for _, wkr := range wp.workers {
561                 for uuid := range wkr.running {
562                         r[uuid] = time.Time{}
563                 }
564                 for uuid := range wkr.starting {
565                         r[uuid] = time.Time{}
566                 }
567         }
568         for uuid, exited := range wp.exited {
569                 r[uuid] = exited
570         }
571         return r
572 }
573
574 // StartContainer starts a container on an idle worker immediately if
575 // possible, otherwise returns false.
576 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
577         wp.setupOnce.Do(wp.setup)
578         wp.mtx.Lock()
579         defer wp.mtx.Unlock()
580         var wkr *worker
581         for _, w := range wp.workers {
582                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
583                         if wkr == nil || w.busy.After(wkr.busy) {
584                                 wkr = w
585                         }
586                 }
587         }
588         if wkr == nil {
589                 return false
590         }
591         wkr.startContainer(ctr)
592         return true
593 }
594
595 // KillContainer kills the crunch-run process for the given container
596 // UUID, if it's running on any worker.
597 //
598 // KillContainer returns immediately; the act of killing the container
599 // takes some time, and runs in the background.
600 //
601 // KillContainer returns false if the container has already ended.
602 func (wp *Pool) KillContainer(uuid string, reason string) bool {
603         wp.mtx.Lock()
604         defer wp.mtx.Unlock()
605         logger := wp.logger.WithFields(logrus.Fields{
606                 "ContainerUUID": uuid,
607                 "Reason":        reason,
608         })
609         for _, wkr := range wp.workers {
610                 rr := wkr.running[uuid]
611                 if rr == nil {
612                         rr = wkr.starting[uuid]
613                 }
614                 if rr != nil {
615                         rr.Kill(reason)
616                         return true
617                 }
618         }
619         logger.Debug("cannot kill: already disappeared")
620         return false
621 }
622
623 // ForgetContainer clears the placeholder for the given exited
624 // container, so it isn't returned by subsequent calls to Running().
625 //
626 // ForgetContainer has no effect if the container has not yet exited.
627 //
628 // The "container exited at time T" placeholder (which necessitates
629 // ForgetContainer) exists to make it easier for the caller
630 // (scheduler) to distinguish a container that exited without
631 // finalizing its state from a container that exited too recently for
632 // its final state to have appeared in the scheduler's queue cache.
633 func (wp *Pool) ForgetContainer(uuid string) {
634         wp.mtx.Lock()
635         defer wp.mtx.Unlock()
636         if _, ok := wp.exited[uuid]; ok {
637                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
638                 delete(wp.exited, uuid)
639         }
640 }
641
642 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
643         if reg == nil {
644                 reg = prometheus.NewRegistry()
645         }
646         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
647                 Namespace: "arvados",
648                 Subsystem: "dispatchcloud",
649                 Name:      "containers_running",
650                 Help:      "Number of containers reported running by cloud VMs.",
651         })
652         reg.MustRegister(wp.mContainersRunning)
653         wp.mProbeAgeMax = prometheus.NewGauge(prometheus.GaugeOpts{
654                 Namespace: "arvados",
655                 Subsystem: "dispatchcloud",
656                 Name:      "probe_age_seconds_max",
657                 Help:      "Maximum number of seconds since an instance's most recent successful probe.",
658         })
659         reg.MustRegister(wp.mProbeAgeMax)
660         wp.mProbeAgeMedian = prometheus.NewGauge(prometheus.GaugeOpts{
661                 Namespace: "arvados",
662                 Subsystem: "dispatchcloud",
663                 Name:      "probe_age_seconds_median",
664                 Help:      "Median number of seconds since an instance's most recent successful probe.",
665         })
666         reg.MustRegister(wp.mProbeAgeMedian)
667         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
668                 Namespace: "arvados",
669                 Subsystem: "dispatchcloud",
670                 Name:      "instances_total",
671                 Help:      "Number of cloud VMs.",
672         }, []string{"category", "instance_type"})
673         reg.MustRegister(wp.mInstances)
674         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
675                 Namespace: "arvados",
676                 Subsystem: "dispatchcloud",
677                 Name:      "instances_price",
678                 Help:      "Price of cloud VMs.",
679         }, []string{"category"})
680         reg.MustRegister(wp.mInstancesPrice)
681         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
682                 Namespace: "arvados",
683                 Subsystem: "dispatchcloud",
684                 Name:      "vcpus_total",
685                 Help:      "Total VCPUs on all cloud VMs.",
686         }, []string{"category"})
687         reg.MustRegister(wp.mVCPUs)
688         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
689                 Namespace: "arvados",
690                 Subsystem: "dispatchcloud",
691                 Name:      "memory_bytes_total",
692                 Help:      "Total memory on all cloud VMs.",
693         }, []string{"category"})
694         reg.MustRegister(wp.mMemory)
695         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
696                 Namespace: "arvados",
697                 Subsystem: "dispatchcloud",
698                 Name:      "boot_outcomes",
699                 Help:      "Boot outcomes by type.",
700         }, []string{"outcome"})
701         for k := range validBootOutcomes {
702                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
703         }
704         reg.MustRegister(wp.mBootOutcomes)
705         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
706                 Namespace: "arvados",
707                 Subsystem: "dispatchcloud",
708                 Name:      "instances_disappeared",
709                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
710         }, []string{"state"})
711         for _, v := range stateString {
712                 wp.mDisappearances.WithLabelValues(v).Add(0)
713         }
714         reg.MustRegister(wp.mDisappearances)
715         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
716                 Namespace:  "arvados",
717                 Subsystem:  "dispatchcloud",
718                 Name:       "instances_time_to_ssh_seconds",
719                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
720                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
721         })
722         reg.MustRegister(wp.mTimeToSSH)
723         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
724                 Namespace:  "arvados",
725                 Subsystem:  "dispatchcloud",
726                 Name:       "instances_time_to_ready_for_container_seconds",
727                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
728                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
729         })
730         reg.MustRegister(wp.mTimeToReadyForContainer)
731         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
732                 Namespace:  "arvados",
733                 Subsystem:  "dispatchcloud",
734                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
735                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
736                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
737         })
738         reg.MustRegister(wp.mTimeFromShutdownToGone)
739         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
740                 Namespace:  "arvados",
741                 Subsystem:  "dispatchcloud",
742                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
743                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
744                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
745         })
746         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
747         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
748                 Namespace:  "arvados",
749                 Subsystem:  "dispatchcloud",
750                 Name:       "instances_run_probe_duration_seconds",
751                 Help:       "Number of seconds per runProbe call.",
752                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
753         }, []string{"outcome"})
754         reg.MustRegister(wp.mRunProbeDuration)
755 }
756
757 func (wp *Pool) runMetrics() {
758         ch := wp.Subscribe()
759         defer wp.Unsubscribe(ch)
760         wp.updateMetrics()
761         for range ch {
762                 wp.updateMetrics()
763         }
764 }
765
766 func (wp *Pool) updateMetrics() {
767         wp.mtx.RLock()
768         defer wp.mtx.RUnlock()
769
770         type entKey struct {
771                 cat      string
772                 instType string
773         }
774         instances := map[entKey]int64{}
775         price := map[string]float64{}
776         cpu := map[string]int64{}
777         mem := map[string]int64{}
778         var running int64
779         now := time.Now()
780         var probed []time.Time
781         for _, wkr := range wp.workers {
782                 var cat string
783                 switch {
784                 case len(wkr.running)+len(wkr.starting) > 0:
785                         cat = "inuse"
786                 case wkr.idleBehavior == IdleBehaviorHold:
787                         cat = "hold"
788                 case wkr.state == StateBooting:
789                         cat = "booting"
790                 case wkr.state == StateUnknown:
791                         cat = "unknown"
792                 default:
793                         cat = "idle"
794                 }
795                 instances[entKey{cat, wkr.instType.Name}]++
796                 price[cat] += wkr.instType.Price
797                 cpu[cat] += int64(wkr.instType.VCPUs)
798                 mem[cat] += int64(wkr.instType.RAM)
799                 running += int64(len(wkr.running) + len(wkr.starting))
800                 probed = append(probed, wkr.probed)
801         }
802         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
803                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
804                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
805                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
806                 // make sure to reset gauges for non-existing category/nodetype combinations
807                 for _, it := range wp.instanceTypes {
808                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
809                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
810                         }
811                 }
812         }
813         for k, v := range instances {
814                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
815         }
816         wp.mContainersRunning.Set(float64(running))
817
818         if len(probed) == 0 {
819                 wp.mProbeAgeMax.Set(0)
820                 wp.mProbeAgeMedian.Set(0)
821         } else {
822                 sort.Slice(probed, func(i, j int) bool { return probed[i].Before(probed[j]) })
823                 wp.mProbeAgeMax.Set(now.Sub(probed[0]).Seconds())
824                 wp.mProbeAgeMedian.Set(now.Sub(probed[len(probed)/2]).Seconds())
825         }
826 }
827
828 func (wp *Pool) runProbes() {
829         maxPPS := wp.maxProbesPerSecond
830         if maxPPS < 1 {
831                 maxPPS = defaultMaxProbesPerSecond
832         }
833         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
834         defer limitticker.Stop()
835
836         probeticker := time.NewTicker(wp.probeInterval)
837         defer probeticker.Stop()
838
839         workers := []cloud.InstanceID{}
840         for range probeticker.C {
841                 // Add some jitter. Without this, if probeInterval is
842                 // a multiple of syncInterval and sync is
843                 // instantaneous (as with the loopback driver), the
844                 // first few probes race with sync operations and
845                 // don't update the workers.
846                 time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
847
848                 workers = workers[:0]
849                 wp.mtx.Lock()
850                 for id, wkr := range wp.workers {
851                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
852                                 continue
853                         }
854                         workers = append(workers, id)
855                 }
856                 wp.mtx.Unlock()
857
858                 for _, id := range workers {
859                         wp.mtx.Lock()
860                         wkr, ok := wp.workers[id]
861                         wp.mtx.Unlock()
862                         if !ok {
863                                 // Deleted while we were probing
864                                 // others
865                                 continue
866                         }
867                         go wkr.ProbeAndUpdate()
868                         select {
869                         case <-wp.stop:
870                                 return
871                         case <-limitticker.C:
872                         }
873                 }
874         }
875 }
876
877 func (wp *Pool) runSync() {
878         // sync once immediately, then wait syncInterval, sync again,
879         // etc.
880         timer := time.NewTimer(1)
881         for {
882                 select {
883                 case <-timer.C:
884                         err := wp.getInstancesAndSync()
885                         if err != nil {
886                                 wp.logger.WithError(err).Warn("sync failed")
887                         }
888                         timer.Reset(wp.syncInterval)
889                 case <-wp.stop:
890                         wp.logger.Debug("worker.Pool stopped")
891                         return
892                 }
893         }
894 }
895
896 // Stop synchronizing with the InstanceSet.
897 func (wp *Pool) Stop() {
898         wp.setupOnce.Do(wp.setup)
899         close(wp.stop)
900 }
901
902 // Instances returns an InstanceView for each worker in the pool,
903 // summarizing its current state and recent activity.
904 func (wp *Pool) Instances() []InstanceView {
905         var r []InstanceView
906         wp.setupOnce.Do(wp.setup)
907         wp.mtx.Lock()
908         for _, w := range wp.workers {
909                 r = append(r, InstanceView{
910                         Instance:             w.instance.ID(),
911                         Address:              w.instance.Address(),
912                         Price:                w.instType.Price,
913                         ArvadosInstanceType:  w.instType.Name,
914                         ProviderInstanceType: w.instType.ProviderType,
915                         LastContainerUUID:    w.lastUUID,
916                         LastBusy:             w.busy,
917                         WorkerState:          w.state.String(),
918                         IdleBehavior:         w.idleBehavior,
919                 })
920         }
921         wp.mtx.Unlock()
922         sort.Slice(r, func(i, j int) bool {
923                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
924         })
925         return r
926 }
927
928 // KillInstance destroys a cloud VM instance. It returns an error if
929 // the given instance does not exist.
930 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
931         wp.setupOnce.Do(wp.setup)
932         wp.mtx.Lock()
933         defer wp.mtx.Unlock()
934         wkr, ok := wp.workers[id]
935         if !ok {
936                 return errors.New("instance not found")
937         }
938         wkr.logger.WithField("Reason", reason).Info("shutting down")
939         wkr.reportBootOutcome(BootOutcomeAborted)
940         wkr.shutdown()
941         return nil
942 }
943
944 func (wp *Pool) setup() {
945         wp.creating = map[string]createCall{}
946         wp.exited = map[string]time.Time{}
947         wp.workers = map[cloud.InstanceID]*worker{}
948         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
949         wp.loadRunnerData()
950 }
951
952 // Load the runner program to be deployed on worker nodes into
953 // wp.runnerData, if necessary. Errors are logged.
954 //
955 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
956 //
957 // Caller must not have lock.
958 func (wp *Pool) loadRunnerData() error {
959         wp.mtx.Lock()
960         defer wp.mtx.Unlock()
961         if wp.runnerData != nil {
962                 return nil
963         } else if wp.runnerSource == "" {
964                 wp.runnerCmd = wp.runnerCmdDefault
965                 wp.runnerData = []byte{}
966                 return nil
967         }
968         logger := wp.logger.WithField("source", wp.runnerSource)
969         logger.Debug("loading runner")
970         buf, err := ioutil.ReadFile(wp.runnerSource)
971         if err != nil {
972                 logger.WithError(err).Error("failed to load runner program")
973                 return err
974         }
975         wp.runnerData = buf
976         wp.runnerMD5 = md5.Sum(buf)
977         wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
978         return nil
979 }
980
981 func (wp *Pool) notify() {
982         wp.mtx.RLock()
983         defer wp.mtx.RUnlock()
984         for _, send := range wp.subscribers {
985                 select {
986                 case send <- struct{}{}:
987                 default:
988                 }
989         }
990 }
991
992 func (wp *Pool) getInstancesAndSync() error {
993         wp.setupOnce.Do(wp.setup)
994         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
995                 return err
996         }
997         wp.logger.Debug("getting instance list")
998         threshold := time.Now()
999         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
1000         if err != nil {
1001                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
1002                 return err
1003         }
1004         wp.sync(threshold, instances)
1005         wp.logger.Debug("sync done")
1006         return nil
1007 }
1008
1009 // Add/remove/update workers based on instances, which was obtained
1010 // from the instanceSet. However, don't clobber any other updates that
1011 // already happened after threshold.
1012 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
1013         wp.mtx.Lock()
1014         defer wp.mtx.Unlock()
1015         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
1016         notify := false
1017
1018         for _, inst := range instances {
1019                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
1020                 it, ok := wp.instanceTypes[itTag]
1021                 if !ok {
1022                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
1023                         continue
1024                 }
1025                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
1026                         notify = true
1027                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
1028                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
1029                         wkr.shutdown()
1030                 }
1031         }
1032
1033         for id, wkr := range wp.workers {
1034                 if wkr.updated.After(threshold) {
1035                         continue
1036                 }
1037                 logger := wp.logger.WithFields(logrus.Fields{
1038                         "Instance":    wkr.instance.ID(),
1039                         "WorkerState": wkr.state,
1040                 })
1041                 logger.Info("instance disappeared in cloud")
1042                 wkr.reportBootOutcome(BootOutcomeDisappeared)
1043                 if wp.mDisappearances != nil {
1044                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
1045                 }
1046                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
1047                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
1048                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
1049                 }
1050                 delete(wp.workers, id)
1051                 go wkr.Close()
1052                 notify = true
1053         }
1054
1055         if wp.atQuotaUntilFewerInstances > len(wp.workers)+len(wp.creating) {
1056                 // After syncing, there are fewer instances (including
1057                 // pending creates) than there were last time we saw a
1058                 // quota error.  This might mean it's now possible to
1059                 // create new instances.  Reset our "at quota" state.
1060                 wp.atQuotaUntilFewerInstances = 0
1061         }
1062
1063         if !wp.loaded {
1064                 notify = true
1065                 wp.loaded = true
1066                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
1067         }
1068
1069         if notify {
1070                 go wp.notify()
1071         }
1072 }
1073
1074 func (wp *Pool) waitUntilLoaded() {
1075         ch := wp.Subscribe()
1076         wp.mtx.RLock()
1077         defer wp.mtx.RUnlock()
1078         for !wp.loaded {
1079                 wp.mtx.RUnlock()
1080                 <-ch
1081                 wp.mtx.RLock()
1082         }
1083 }
1084
1085 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1086         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1087         fmt.Fprint(h, uuid)
1088         return fmt.Sprintf("%x", h.Sum(nil))
1089 }
1090
1091 // Return a random string of n hexadecimal digits (n*4 random bits). n
1092 // must be even.
1093 func randomHex(n int) string {
1094         buf := make([]byte, n/2)
1095         _, err := rand.Read(buf)
1096         if err != nil {
1097                 panic(err)
1098         }
1099         return fmt.Sprintf("%x", buf)
1100 }