a5924cf997f7b4bc9d838134054be58b3bd25127
[arvados.git] / lib / dispatchcloud / worker / pool.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package worker
6
7 import (
8         "crypto/hmac"
9         "crypto/md5"
10         "crypto/rand"
11         "crypto/sha256"
12         "errors"
13         "fmt"
14         "io"
15         "io/ioutil"
16         "sort"
17         "strings"
18         "sync"
19         "time"
20
21         "git.arvados.org/arvados.git/lib/cloud"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "github.com/prometheus/client_golang/prometheus"
24         "github.com/sirupsen/logrus"
25         "golang.org/x/crypto/ssh"
26 )
27
28 const (
29         tagKeyInstanceType   = "InstanceType"
30         tagKeyIdleBehavior   = "IdleBehavior"
31         tagKeyInstanceSecret = "InstanceSecret"
32         tagKeyInstanceSetID  = "InstanceSetID"
33 )
34
35 // An InstanceView shows a worker's current state and recent activity.
36 type InstanceView struct {
37         Instance             cloud.InstanceID `json:"instance"`
38         Address              string           `json:"address"`
39         Price                float64          `json:"price"`
40         ArvadosInstanceType  string           `json:"arvados_instance_type"`
41         ProviderInstanceType string           `json:"provider_instance_type"`
42         LastContainerUUID    string           `json:"last_container_uuid"`
43         LastBusy             time.Time        `json:"last_busy"`
44         WorkerState          string           `json:"worker_state"`
45         IdleBehavior         IdleBehavior     `json:"idle_behavior"`
46 }
47
48 // An Executor executes shell commands on a remote host.
49 type Executor interface {
50         // Run cmd on the current target.
51         Execute(env map[string]string, cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
52
53         // Use the given target for subsequent operations. The new
54         // target is the same host as the previous target, but it
55         // might return a different address and verify a different
56         // host key.
57         //
58         // SetTarget is called frequently, and in most cases the new
59         // target will behave exactly the same as the old one. An
60         // implementation should optimize accordingly.
61         //
62         // SetTarget must not block on concurrent Execute calls.
63         SetTarget(cloud.ExecutorTarget)
64
65         Close()
66 }
67
68 const (
69         defaultSyncInterval        = time.Minute
70         defaultProbeInterval       = time.Second * 10
71         defaultMaxProbesPerSecond  = 10
72         defaultTimeoutIdle         = time.Minute
73         defaultTimeoutBooting      = time.Minute * 10
74         defaultTimeoutProbe        = time.Minute * 10
75         defaultTimeoutShutdown     = time.Second * 10
76         defaultTimeoutTERM         = time.Minute * 2
77         defaultTimeoutSignal       = time.Second * 5
78         defaultTimeoutStaleRunLock = time.Second * 5
79
80         // Time after a quota error to try again anyway, even if no
81         // instances have been shutdown.
82         quotaErrorTTL = time.Minute
83
84         // Time between "X failed because rate limiting" messages
85         logRateLimitErrorInterval = time.Second * 10
86 )
87
88 func duration(conf arvados.Duration, def time.Duration) time.Duration {
89         if conf > 0 {
90                 return time.Duration(conf)
91         }
92         return def
93 }
94
95 // NewPool creates a Pool of workers backed by instanceSet.
96 //
97 // New instances are configured and set up according to the given
98 // cluster configuration.
99 func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
100         wp := &Pool{
101                 logger:                         logger,
102                 arvClient:                      arvClient,
103                 instanceSetID:                  instanceSetID,
104                 instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
105                 newExecutor:                    newExecutor,
106                 bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
107                 runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
108                 imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
109                 instanceTypes:                  cluster.InstanceTypes,
110                 maxProbesPerSecond:             cluster.Containers.CloudVMs.MaxProbesPerSecond,
111                 maxConcurrentInstanceCreateOps: cluster.Containers.CloudVMs.MaxConcurrentInstanceCreateOps,
112                 probeInterval:                  duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
113                 syncInterval:                   duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
114                 timeoutIdle:                    duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
115                 timeoutBooting:                 duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
116                 timeoutProbe:                   duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
117                 timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
118                 timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
119                 timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
120                 timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
121                 systemRootToken:                cluster.SystemRootToken,
122                 installPublicKey:               installPublicKey,
123                 tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
124                 runnerCmdDefault:               cluster.Containers.CrunchRunCommand,
125                 runnerArgs:                     append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
126                 stop:                           make(chan bool),
127         }
128         wp.registerMetrics(reg)
129         go func() {
130                 wp.setupOnce.Do(wp.setup)
131                 go wp.runMetrics()
132                 go wp.runProbes()
133                 go wp.runSync()
134         }()
135         return wp
136 }
137
138 // Pool is a resizable worker pool backed by a cloud.InstanceSet. A
139 // zero Pool should not be used. Call NewPool to create a new Pool.
140 type Pool struct {
141         // configuration
142         logger                         logrus.FieldLogger
143         arvClient                      *arvados.Client
144         instanceSetID                  cloud.InstanceSetID
145         instanceSet                    *throttledInstanceSet
146         newExecutor                    func(cloud.Instance) Executor
147         bootProbeCommand               string
148         runnerSource                   string
149         imageID                        cloud.ImageID
150         instanceTypes                  map[string]arvados.InstanceType
151         syncInterval                   time.Duration
152         probeInterval                  time.Duration
153         maxProbesPerSecond             int
154         maxConcurrentInstanceCreateOps int
155         timeoutIdle                    time.Duration
156         timeoutBooting                 time.Duration
157         timeoutProbe                   time.Duration
158         timeoutShutdown                time.Duration
159         timeoutTERM                    time.Duration
160         timeoutSignal                  time.Duration
161         timeoutStaleRunLock            time.Duration
162         systemRootToken                string
163         installPublicKey               ssh.PublicKey
164         tagKeyPrefix                   string
165         runnerCmdDefault               string   // crunch-run command to use if not deploying a binary
166         runnerArgs                     []string // extra args passed to crunch-run
167
168         // private state
169         subscribers  map[<-chan struct{}]chan<- struct{}
170         creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
171         workers      map[cloud.InstanceID]*worker
172         loaded       bool                 // loaded list of instances from InstanceSet at least once
173         exited       map[string]time.Time // containers whose crunch-run proc has exited, but ForgetContainer has not been called
174         atQuotaUntil time.Time
175         atQuotaErr   cloud.QuotaError
176         stop         chan bool
177         mtx          sync.RWMutex
178         setupOnce    sync.Once
179         runnerData   []byte
180         runnerMD5    [md5.Size]byte
181         runnerCmd    string
182
183         mContainersRunning        prometheus.Gauge
184         mInstances                *prometheus.GaugeVec
185         mInstancesPrice           *prometheus.GaugeVec
186         mVCPUs                    *prometheus.GaugeVec
187         mMemory                   *prometheus.GaugeVec
188         mBootOutcomes             *prometheus.CounterVec
189         mDisappearances           *prometheus.CounterVec
190         mTimeToSSH                prometheus.Summary
191         mTimeToReadyForContainer  prometheus.Summary
192         mTimeFromShutdownToGone   prometheus.Summary
193         mTimeFromQueueToCrunchRun prometheus.Summary
194         mRunProbeDuration         *prometheus.SummaryVec
195 }
196
197 type createCall struct {
198         time         time.Time
199         instanceType arvados.InstanceType
200 }
201
202 func (wp *Pool) CheckHealth() error {
203         wp.setupOnce.Do(wp.setup)
204         if err := wp.loadRunnerData(); err != nil {
205                 return fmt.Errorf("error loading runner binary: %s", err)
206         }
207         return nil
208 }
209
210 // Subscribe returns a buffered channel that becomes ready after any
211 // change to the pool's state that could have scheduling implications:
212 // a worker's state changes, a new worker appears, the cloud
213 // provider's API rate limiting period ends, etc.
214 //
215 // Additional events that occur while the channel is already ready
216 // will be dropped, so it is OK if the caller services the channel
217 // slowly.
218 //
219 // Example:
220 //
221 //      ch := wp.Subscribe()
222 //      defer wp.Unsubscribe(ch)
223 //      for range ch {
224 //              tryScheduling(wp)
225 //              if done {
226 //                      break
227 //              }
228 //      }
229 func (wp *Pool) Subscribe() <-chan struct{} {
230         wp.setupOnce.Do(wp.setup)
231         wp.mtx.Lock()
232         defer wp.mtx.Unlock()
233         ch := make(chan struct{}, 1)
234         wp.subscribers[ch] = ch
235         return ch
236 }
237
238 // Unsubscribe stops sending updates to the given channel.
239 func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
240         wp.setupOnce.Do(wp.setup)
241         wp.mtx.Lock()
242         defer wp.mtx.Unlock()
243         delete(wp.subscribers, ch)
244 }
245
246 // Unallocated returns the number of unallocated (creating + booting +
247 // idle + unknown) workers for each instance type.  Workers in
248 // hold/drain mode are not included.
249 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
250         wp.setupOnce.Do(wp.setup)
251         wp.mtx.RLock()
252         defer wp.mtx.RUnlock()
253         unalloc := map[arvados.InstanceType]int{}
254         creating := map[arvados.InstanceType]int{}
255         oldestCreate := map[arvados.InstanceType]time.Time{}
256         for _, cc := range wp.creating {
257                 it := cc.instanceType
258                 creating[it]++
259                 if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
260                         oldestCreate[it] = cc.time
261                 }
262         }
263         for _, wkr := range wp.workers {
264                 // Skip workers that are not expected to become
265                 // available soon. Note len(wkr.running)>0 is not
266                 // redundant here: it can be true even in
267                 // StateUnknown.
268                 if wkr.state == StateShutdown ||
269                         wkr.state == StateRunning ||
270                         wkr.idleBehavior != IdleBehaviorRun ||
271                         len(wkr.running) > 0 {
272                         continue
273                 }
274                 it := wkr.instType
275                 unalloc[it]++
276                 if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
277                         // If up to N new workers appear in
278                         // Instances() while we are waiting for N
279                         // Create() calls to complete, we assume we're
280                         // just seeing a race between Instances() and
281                         // Create() responses.
282                         //
283                         // The other common reason why nodes have
284                         // state==Unknown is that they appeared at
285                         // startup, before any Create calls. They
286                         // don't match the above timing condition, so
287                         // we never mistakenly attribute them to
288                         // pending Create calls.
289                         creating[it]--
290                 }
291         }
292         for it, c := range creating {
293                 unalloc[it] += c
294         }
295         return unalloc
296 }
297
298 // Create a new instance with the given type, and add it to the worker
299 // pool. The worker is added immediately; instance creation runs in
300 // the background.
301 //
302 // Create returns false if a pre-existing error state prevents it from
303 // even attempting to create a new instance. Those errors are logged
304 // by the Pool, so the caller does not need to log anything in such
305 // cases.
306 func (wp *Pool) Create(it arvados.InstanceType) bool {
307         logger := wp.logger.WithField("InstanceType", it.Name)
308         wp.setupOnce.Do(wp.setup)
309         if wp.loadRunnerData() != nil {
310                 // Boot probe is certain to fail.
311                 return false
312         }
313         wp.mtx.Lock()
314         defer wp.mtx.Unlock()
315         if time.Now().Before(wp.atQuotaUntil) || wp.instanceSet.throttleCreate.Error() != nil {
316                 return false
317         }
318         // The maxConcurrentInstanceCreateOps knob throttles the number of node create
319         // requests in flight. It was added to work around a limitation in Azure's
320         // managed disks, which support no more than 20 concurrent node creation
321         // requests from a single disk image (cf.
322         // https://docs.microsoft.com/en-us/azure/virtual-machines/linux/capture-image).
323         // The code assumes that node creation, from Azure's perspective, means the
324         // period until the instance appears in the "get all instances" list.
325         if wp.maxConcurrentInstanceCreateOps > 0 && len(wp.creating) >= wp.maxConcurrentInstanceCreateOps {
326                 logger.Info("reached MaxConcurrentInstanceCreateOps")
327                 wp.instanceSet.throttleCreate.ErrorUntil(errors.New("reached MaxConcurrentInstanceCreateOps"), time.Now().Add(5*time.Second), wp.notify)
328                 return false
329         }
330         now := time.Now()
331         secret := randomHex(instanceSecretLength)
332         wp.creating[secret] = createCall{time: now, instanceType: it}
333         go func() {
334                 defer wp.notify()
335                 tags := cloud.InstanceTags{
336                         wp.tagKeyPrefix + tagKeyInstanceSetID:  string(wp.instanceSetID),
337                         wp.tagKeyPrefix + tagKeyInstanceType:   it.Name,
338                         wp.tagKeyPrefix + tagKeyIdleBehavior:   string(IdleBehaviorRun),
339                         wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
340                 }
341                 initCmd := TagVerifier{nil, secret, nil}.InitCommand()
342                 inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
343                 wp.mtx.Lock()
344                 defer wp.mtx.Unlock()
345                 // delete() is deferred so the updateWorker() call
346                 // below knows to use StateBooting when adding a new
347                 // worker.
348                 defer delete(wp.creating, secret)
349                 if err != nil {
350                         if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
351                                 wp.atQuotaErr = err
352                                 wp.atQuotaUntil = time.Now().Add(quotaErrorTTL)
353                                 time.AfterFunc(quotaErrorTTL, wp.notify)
354                         }
355                         logger.WithError(err).Error("create failed")
356                         wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
357                         return
358                 }
359                 wp.updateWorker(inst, it)
360         }()
361         return true
362 }
363
364 // AtQuota returns true if Create is not expected to work at the
365 // moment.
366 func (wp *Pool) AtQuota() bool {
367         wp.mtx.Lock()
368         defer wp.mtx.Unlock()
369         return time.Now().Before(wp.atQuotaUntil)
370 }
371
372 // SetIdleBehavior determines how the indicated instance will behave
373 // when it has no containers running.
374 func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior) error {
375         wp.mtx.Lock()
376         defer wp.mtx.Unlock()
377         wkr, ok := wp.workers[id]
378         if !ok {
379                 return errors.New("requested instance does not exist")
380         }
381         wkr.setIdleBehavior(idleBehavior)
382         return nil
383 }
384
385 // Successful connection to the SSH daemon, update the mTimeToSSH metric
386 func (wp *Pool) reportSSHConnected(inst cloud.Instance) {
387         wp.mtx.Lock()
388         defer wp.mtx.Unlock()
389         wkr := wp.workers[inst.ID()]
390         if wkr.state != StateBooting || !wkr.firstSSHConnection.IsZero() {
391                 // the node is not in booting state (can happen if a-d-c is restarted) OR
392                 // this is not the first SSH connection
393                 return
394         }
395
396         wkr.firstSSHConnection = time.Now()
397         if wp.mTimeToSSH != nil {
398                 wp.mTimeToSSH.Observe(wkr.firstSSHConnection.Sub(wkr.appeared).Seconds())
399         }
400 }
401
402 // Add or update worker attached to the given instance.
403 //
404 // The second return value is true if a new worker is created.
405 //
406 // A newly added instance has state=StateBooting if its tags match an
407 // entry in wp.creating, otherwise StateUnknown.
408 //
409 // Caller must have lock.
410 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
411         secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
412         inst = TagVerifier{Instance: inst, Secret: secret, ReportVerified: wp.reportSSHConnected}
413         id := inst.ID()
414         if wkr := wp.workers[id]; wkr != nil {
415                 wkr.executor.SetTarget(inst)
416                 wkr.instance = inst
417                 wkr.updated = time.Now()
418                 wkr.saveTags()
419                 return wkr, false
420         }
421
422         state := StateUnknown
423         if _, ok := wp.creating[secret]; ok {
424                 state = StateBooting
425         }
426
427         // If an instance has a valid IdleBehavior tag when it first
428         // appears, initialize the new worker accordingly (this is how
429         // we restore IdleBehavior that was set by a prior dispatch
430         // process); otherwise, default to "run". After this,
431         // wkr.idleBehavior is the source of truth, and will only be
432         // changed via SetIdleBehavior().
433         idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
434         if !validIdleBehavior[idleBehavior] {
435                 idleBehavior = IdleBehaviorRun
436         }
437
438         logger := wp.logger.WithFields(logrus.Fields{
439                 "InstanceType": it.Name,
440                 "Instance":     inst.ID(),
441                 "Address":      inst.Address(),
442         })
443         logger.WithFields(logrus.Fields{
444                 "State":        state,
445                 "IdleBehavior": idleBehavior,
446         }).Infof("instance appeared in cloud")
447         now := time.Now()
448         wkr := &worker{
449                 mtx:          &wp.mtx,
450                 wp:           wp,
451                 logger:       logger,
452                 executor:     wp.newExecutor(inst),
453                 state:        state,
454                 idleBehavior: idleBehavior,
455                 instance:     inst,
456                 instType:     it,
457                 appeared:     now,
458                 probed:       now,
459                 busy:         now,
460                 updated:      now,
461                 running:      make(map[string]*remoteRunner),
462                 starting:     make(map[string]*remoteRunner),
463                 probing:      make(chan struct{}, 1),
464         }
465         wp.workers[id] = wkr
466         return wkr, true
467 }
468
469 // Shutdown shuts down a worker with the given type, or returns false
470 // if all workers with the given type are busy.
471 func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
472         wp.setupOnce.Do(wp.setup)
473         wp.mtx.Lock()
474         defer wp.mtx.Unlock()
475         logger := wp.logger.WithField("InstanceType", it.Name)
476         logger.Info("shutdown requested")
477         for _, tryState := range []State{StateBooting, StateIdle} {
478                 // TODO: shutdown the worker with the longest idle
479                 // time (Idle) or the earliest create time (Booting)
480                 for _, wkr := range wp.workers {
481                         if wkr.idleBehavior != IdleBehaviorHold && wkr.state == tryState && wkr.instType == it {
482                                 logger.WithField("Instance", wkr.instance.ID()).Info("shutting down")
483                                 wkr.reportBootOutcome(BootOutcomeAborted)
484                                 wkr.shutdown()
485                                 return true
486                         }
487                 }
488         }
489         return false
490 }
491
492 // CountWorkers returns the current number of workers in each state.
493 //
494 // CountWorkers blocks, if necessary, until the initial instance list
495 // has been loaded from the cloud provider.
496 func (wp *Pool) CountWorkers() map[State]int {
497         wp.setupOnce.Do(wp.setup)
498         wp.waitUntilLoaded()
499         wp.mtx.Lock()
500         defer wp.mtx.Unlock()
501         r := map[State]int{}
502         for _, w := range wp.workers {
503                 r[w.state]++
504         }
505         return r
506 }
507
508 // Running returns the container UUIDs being prepared/run on workers.
509 //
510 // In the returned map, the time value indicates when the Pool
511 // observed that the container process had exited. A container that
512 // has not yet exited has a zero time value. The caller should use
513 // ForgetContainer() to garbage-collect the entries for exited
514 // containers.
515 func (wp *Pool) Running() map[string]time.Time {
516         wp.setupOnce.Do(wp.setup)
517         wp.mtx.Lock()
518         defer wp.mtx.Unlock()
519         r := map[string]time.Time{}
520         for _, wkr := range wp.workers {
521                 for uuid := range wkr.running {
522                         r[uuid] = time.Time{}
523                 }
524                 for uuid := range wkr.starting {
525                         r[uuid] = time.Time{}
526                 }
527         }
528         for uuid, exited := range wp.exited {
529                 r[uuid] = exited
530         }
531         return r
532 }
533
534 // StartContainer starts a container on an idle worker immediately if
535 // possible, otherwise returns false.
536 func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
537         wp.setupOnce.Do(wp.setup)
538         wp.mtx.Lock()
539         defer wp.mtx.Unlock()
540         var wkr *worker
541         for _, w := range wp.workers {
542                 if w.instType == it && w.state == StateIdle && w.idleBehavior == IdleBehaviorRun {
543                         if wkr == nil || w.busy.After(wkr.busy) {
544                                 wkr = w
545                         }
546                 }
547         }
548         if wkr == nil {
549                 return false
550         }
551         wkr.startContainer(ctr)
552         return true
553 }
554
555 // KillContainer kills the crunch-run process for the given container
556 // UUID, if it's running on any worker.
557 //
558 // KillContainer returns immediately; the act of killing the container
559 // takes some time, and runs in the background.
560 //
561 // KillContainer returns false if the container has already ended.
562 func (wp *Pool) KillContainer(uuid string, reason string) bool {
563         wp.mtx.Lock()
564         defer wp.mtx.Unlock()
565         logger := wp.logger.WithFields(logrus.Fields{
566                 "ContainerUUID": uuid,
567                 "Reason":        reason,
568         })
569         for _, wkr := range wp.workers {
570                 rr := wkr.running[uuid]
571                 if rr == nil {
572                         rr = wkr.starting[uuid]
573                 }
574                 if rr != nil {
575                         rr.Kill(reason)
576                         return true
577                 }
578         }
579         logger.Debug("cannot kill: already disappeared")
580         return false
581 }
582
583 // ForgetContainer clears the placeholder for the given exited
584 // container, so it isn't returned by subsequent calls to Running().
585 //
586 // ForgetContainer has no effect if the container has not yet exited.
587 //
588 // The "container exited at time T" placeholder (which necessitates
589 // ForgetContainer) exists to make it easier for the caller
590 // (scheduler) to distinguish a container that exited without
591 // finalizing its state from a container that exited too recently for
592 // its final state to have appeared in the scheduler's queue cache.
593 func (wp *Pool) ForgetContainer(uuid string) {
594         wp.mtx.Lock()
595         defer wp.mtx.Unlock()
596         if _, ok := wp.exited[uuid]; ok {
597                 wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
598                 delete(wp.exited, uuid)
599         }
600 }
601
602 func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
603         if reg == nil {
604                 reg = prometheus.NewRegistry()
605         }
606         wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
607                 Namespace: "arvados",
608                 Subsystem: "dispatchcloud",
609                 Name:      "containers_running",
610                 Help:      "Number of containers reported running by cloud VMs.",
611         })
612         reg.MustRegister(wp.mContainersRunning)
613         wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
614                 Namespace: "arvados",
615                 Subsystem: "dispatchcloud",
616                 Name:      "instances_total",
617                 Help:      "Number of cloud VMs.",
618         }, []string{"category", "instance_type"})
619         reg.MustRegister(wp.mInstances)
620         wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
621                 Namespace: "arvados",
622                 Subsystem: "dispatchcloud",
623                 Name:      "instances_price",
624                 Help:      "Price of cloud VMs.",
625         }, []string{"category"})
626         reg.MustRegister(wp.mInstancesPrice)
627         wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
628                 Namespace: "arvados",
629                 Subsystem: "dispatchcloud",
630                 Name:      "vcpus_total",
631                 Help:      "Total VCPUs on all cloud VMs.",
632         }, []string{"category"})
633         reg.MustRegister(wp.mVCPUs)
634         wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
635                 Namespace: "arvados",
636                 Subsystem: "dispatchcloud",
637                 Name:      "memory_bytes_total",
638                 Help:      "Total memory on all cloud VMs.",
639         }, []string{"category"})
640         reg.MustRegister(wp.mMemory)
641         wp.mBootOutcomes = prometheus.NewCounterVec(prometheus.CounterOpts{
642                 Namespace: "arvados",
643                 Subsystem: "dispatchcloud",
644                 Name:      "boot_outcomes",
645                 Help:      "Boot outcomes by type.",
646         }, []string{"outcome"})
647         for k := range validBootOutcomes {
648                 wp.mBootOutcomes.WithLabelValues(string(k)).Add(0)
649         }
650         reg.MustRegister(wp.mBootOutcomes)
651         wp.mDisappearances = prometheus.NewCounterVec(prometheus.CounterOpts{
652                 Namespace: "arvados",
653                 Subsystem: "dispatchcloud",
654                 Name:      "instances_disappeared",
655                 Help:      "Number of occurrences of an instance disappearing from the cloud provider's list of instances.",
656         }, []string{"state"})
657         for _, v := range stateString {
658                 wp.mDisappearances.WithLabelValues(v).Add(0)
659         }
660         reg.MustRegister(wp.mDisappearances)
661         wp.mTimeToSSH = prometheus.NewSummary(prometheus.SummaryOpts{
662                 Namespace:  "arvados",
663                 Subsystem:  "dispatchcloud",
664                 Name:       "instances_time_to_ssh_seconds",
665                 Help:       "Number of seconds between instance creation and the first successful SSH connection.",
666                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
667         })
668         reg.MustRegister(wp.mTimeToSSH)
669         wp.mTimeToReadyForContainer = prometheus.NewSummary(prometheus.SummaryOpts{
670                 Namespace:  "arvados",
671                 Subsystem:  "dispatchcloud",
672                 Name:       "instances_time_to_ready_for_container_seconds",
673                 Help:       "Number of seconds between the first successful SSH connection and ready to run a container.",
674                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
675         })
676         reg.MustRegister(wp.mTimeToReadyForContainer)
677         wp.mTimeFromShutdownToGone = prometheus.NewSummary(prometheus.SummaryOpts{
678                 Namespace:  "arvados",
679                 Subsystem:  "dispatchcloud",
680                 Name:       "instances_time_from_shutdown_request_to_disappearance_seconds",
681                 Help:       "Number of seconds between the first shutdown attempt and the disappearance of the worker.",
682                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
683         })
684         reg.MustRegister(wp.mTimeFromShutdownToGone)
685         wp.mTimeFromQueueToCrunchRun = prometheus.NewSummary(prometheus.SummaryOpts{
686                 Namespace:  "arvados",
687                 Subsystem:  "dispatchcloud",
688                 Name:       "containers_time_from_queue_to_crunch_run_seconds",
689                 Help:       "Number of seconds between the queuing of a container and the start of crunch-run.",
690                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
691         })
692         reg.MustRegister(wp.mTimeFromQueueToCrunchRun)
693         wp.mRunProbeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
694                 Namespace:  "arvados",
695                 Subsystem:  "dispatchcloud",
696                 Name:       "instances_run_probe_duration_seconds",
697                 Help:       "Number of seconds per runProbe call.",
698                 Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
699         }, []string{"outcome"})
700         reg.MustRegister(wp.mRunProbeDuration)
701 }
702
703 func (wp *Pool) runMetrics() {
704         ch := wp.Subscribe()
705         defer wp.Unsubscribe(ch)
706         wp.updateMetrics()
707         for range ch {
708                 wp.updateMetrics()
709         }
710 }
711
712 func (wp *Pool) updateMetrics() {
713         wp.mtx.RLock()
714         defer wp.mtx.RUnlock()
715
716         type entKey struct {
717                 cat      string
718                 instType string
719         }
720         instances := map[entKey]int64{}
721         price := map[string]float64{}
722         cpu := map[string]int64{}
723         mem := map[string]int64{}
724         var running int64
725         for _, wkr := range wp.workers {
726                 var cat string
727                 switch {
728                 case len(wkr.running)+len(wkr.starting) > 0:
729                         cat = "inuse"
730                 case wkr.idleBehavior == IdleBehaviorHold:
731                         cat = "hold"
732                 case wkr.state == StateBooting:
733                         cat = "booting"
734                 case wkr.state == StateUnknown:
735                         cat = "unknown"
736                 default:
737                         cat = "idle"
738                 }
739                 instances[entKey{cat, wkr.instType.Name}]++
740                 price[cat] += wkr.instType.Price
741                 cpu[cat] += int64(wkr.instType.VCPUs)
742                 mem[cat] += int64(wkr.instType.RAM)
743                 running += int64(len(wkr.running) + len(wkr.starting))
744         }
745         for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
746                 wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
747                 wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
748                 wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
749                 // make sure to reset gauges for non-existing category/nodetype combinations
750                 for _, it := range wp.instanceTypes {
751                         if _, ok := instances[entKey{cat, it.Name}]; !ok {
752                                 wp.mInstances.WithLabelValues(cat, it.Name).Set(float64(0))
753                         }
754                 }
755         }
756         for k, v := range instances {
757                 wp.mInstances.WithLabelValues(k.cat, k.instType).Set(float64(v))
758         }
759         wp.mContainersRunning.Set(float64(running))
760 }
761
762 func (wp *Pool) runProbes() {
763         maxPPS := wp.maxProbesPerSecond
764         if maxPPS < 1 {
765                 maxPPS = defaultMaxProbesPerSecond
766         }
767         limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
768         defer limitticker.Stop()
769
770         probeticker := time.NewTicker(wp.probeInterval)
771         defer probeticker.Stop()
772
773         workers := []cloud.InstanceID{}
774         for range probeticker.C {
775                 workers = workers[:0]
776                 wp.mtx.Lock()
777                 for id, wkr := range wp.workers {
778                         if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
779                                 continue
780                         }
781                         workers = append(workers, id)
782                 }
783                 wp.mtx.Unlock()
784
785                 for _, id := range workers {
786                         wp.mtx.Lock()
787                         wkr, ok := wp.workers[id]
788                         wp.mtx.Unlock()
789                         if !ok {
790                                 // Deleted while we were probing
791                                 // others
792                                 continue
793                         }
794                         go wkr.ProbeAndUpdate()
795                         select {
796                         case <-wp.stop:
797                                 return
798                         case <-limitticker.C:
799                         }
800                 }
801         }
802 }
803
804 func (wp *Pool) runSync() {
805         // sync once immediately, then wait syncInterval, sync again,
806         // etc.
807         timer := time.NewTimer(1)
808         for {
809                 select {
810                 case <-timer.C:
811                         err := wp.getInstancesAndSync()
812                         if err != nil {
813                                 wp.logger.WithError(err).Warn("sync failed")
814                         }
815                         timer.Reset(wp.syncInterval)
816                 case <-wp.stop:
817                         wp.logger.Debug("worker.Pool stopped")
818                         return
819                 }
820         }
821 }
822
823 // Stop synchronizing with the InstanceSet.
824 func (wp *Pool) Stop() {
825         wp.setupOnce.Do(wp.setup)
826         close(wp.stop)
827 }
828
829 // Instances returns an InstanceView for each worker in the pool,
830 // summarizing its current state and recent activity.
831 func (wp *Pool) Instances() []InstanceView {
832         var r []InstanceView
833         wp.setupOnce.Do(wp.setup)
834         wp.mtx.Lock()
835         for _, w := range wp.workers {
836                 r = append(r, InstanceView{
837                         Instance:             w.instance.ID(),
838                         Address:              w.instance.Address(),
839                         Price:                w.instType.Price,
840                         ArvadosInstanceType:  w.instType.Name,
841                         ProviderInstanceType: w.instType.ProviderType,
842                         LastContainerUUID:    w.lastUUID,
843                         LastBusy:             w.busy,
844                         WorkerState:          w.state.String(),
845                         IdleBehavior:         w.idleBehavior,
846                 })
847         }
848         wp.mtx.Unlock()
849         sort.Slice(r, func(i, j int) bool {
850                 return strings.Compare(string(r[i].Instance), string(r[j].Instance)) < 0
851         })
852         return r
853 }
854
855 // KillInstance destroys a cloud VM instance. It returns an error if
856 // the given instance does not exist.
857 func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
858         wkr, ok := wp.workers[id]
859         if !ok {
860                 return errors.New("instance not found")
861         }
862         wkr.logger.WithField("Reason", reason).Info("shutting down")
863         wkr.reportBootOutcome(BootOutcomeAborted)
864         wkr.shutdown()
865         return nil
866 }
867
868 func (wp *Pool) setup() {
869         wp.creating = map[string]createCall{}
870         wp.exited = map[string]time.Time{}
871         wp.workers = map[cloud.InstanceID]*worker{}
872         wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
873         wp.loadRunnerData()
874 }
875
876 // Load the runner program to be deployed on worker nodes into
877 // wp.runnerData, if necessary. Errors are logged.
878 //
879 // If auto-deploy is disabled, len(wp.runnerData) will be 0.
880 //
881 // Caller must not have lock.
882 func (wp *Pool) loadRunnerData() error {
883         wp.mtx.Lock()
884         defer wp.mtx.Unlock()
885         if wp.runnerData != nil {
886                 return nil
887         } else if wp.runnerSource == "" {
888                 wp.runnerCmd = wp.runnerCmdDefault
889                 wp.runnerData = []byte{}
890                 return nil
891         }
892         logger := wp.logger.WithField("source", wp.runnerSource)
893         logger.Debug("loading runner")
894         buf, err := ioutil.ReadFile(wp.runnerSource)
895         if err != nil {
896                 logger.WithError(err).Error("failed to load runner program")
897                 return err
898         }
899         wp.runnerData = buf
900         wp.runnerMD5 = md5.Sum(buf)
901         wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
902         return nil
903 }
904
905 func (wp *Pool) notify() {
906         wp.mtx.RLock()
907         defer wp.mtx.RUnlock()
908         for _, send := range wp.subscribers {
909                 select {
910                 case send <- struct{}{}:
911                 default:
912                 }
913         }
914 }
915
916 func (wp *Pool) getInstancesAndSync() error {
917         wp.setupOnce.Do(wp.setup)
918         if err := wp.instanceSet.throttleInstances.Error(); err != nil {
919                 return err
920         }
921         wp.logger.Debug("getting instance list")
922         threshold := time.Now()
923         instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
924         if err != nil {
925                 wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
926                 return err
927         }
928         wp.sync(threshold, instances)
929         wp.logger.Debug("sync done")
930         return nil
931 }
932
933 // Add/remove/update workers based on instances, which was obtained
934 // from the instanceSet. However, don't clobber any other updates that
935 // already happened after threshold.
936 func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
937         wp.mtx.Lock()
938         defer wp.mtx.Unlock()
939         wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
940         notify := false
941
942         for _, inst := range instances {
943                 itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
944                 it, ok := wp.instanceTypes[itTag]
945                 if !ok {
946                         wp.logger.WithField("Instance", inst.ID()).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
947                         continue
948                 }
949                 if wkr, isNew := wp.updateWorker(inst, it); isNew {
950                         notify = true
951                 } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
952                         wp.logger.WithField("Instance", inst.ID()).Info("worker still listed after shutdown; retrying")
953                         wkr.shutdown()
954                 }
955         }
956
957         for id, wkr := range wp.workers {
958                 if wkr.updated.After(threshold) {
959                         continue
960                 }
961                 logger := wp.logger.WithFields(logrus.Fields{
962                         "Instance":    wkr.instance.ID(),
963                         "WorkerState": wkr.state,
964                 })
965                 logger.Info("instance disappeared in cloud")
966                 wkr.reportBootOutcome(BootOutcomeDisappeared)
967                 if wp.mDisappearances != nil {
968                         wp.mDisappearances.WithLabelValues(stateString[wkr.state]).Inc()
969                 }
970                 // wkr.destroyed.IsZero() can happen if instance disappeared but we weren't trying to shut it down
971                 if wp.mTimeFromShutdownToGone != nil && !wkr.destroyed.IsZero() {
972                         wp.mTimeFromShutdownToGone.Observe(time.Now().Sub(wkr.destroyed).Seconds())
973                 }
974                 delete(wp.workers, id)
975                 go wkr.Close()
976                 notify = true
977         }
978
979         if !wp.loaded {
980                 notify = true
981                 wp.loaded = true
982                 wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
983         }
984
985         if notify {
986                 go wp.notify()
987         }
988 }
989
990 func (wp *Pool) waitUntilLoaded() {
991         ch := wp.Subscribe()
992         wp.mtx.RLock()
993         defer wp.mtx.RUnlock()
994         for !wp.loaded {
995                 wp.mtx.RUnlock()
996                 <-ch
997                 wp.mtx.RLock()
998         }
999 }
1000
1001 func (wp *Pool) gatewayAuthSecret(uuid string) string {
1002         h := hmac.New(sha256.New, []byte(wp.systemRootToken))
1003         fmt.Fprint(h, uuid)
1004         return fmt.Sprintf("%x", h.Sum(nil))
1005 }
1006
1007 // Return a random string of n hexadecimal digits (n*4 random bits). n
1008 // must be even.
1009 func randomHex(n int) string {
1010         buf := make([]byte, n/2)
1011         _, err := rand.Read(buf)
1012         if err != nil {
1013                 panic(err)
1014         }
1015         return fmt.Sprintf("%x", buf)
1016 }