Merge branch '14807-prod-blockers'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 21 Mar 2019 20:50:32 +0000 (16:50 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 21 Mar 2019 20:50:32 +0000 (16:50 -0400)
refs #14807

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

12 files changed:
lib/dispatchcloud/container/queue.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/driver.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/worker.go
sdk/go/arvados/config.go
sdk/go/keepclient/discover.go
services/crunch-run/background.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go

index 4e807a12ab0cb55c27e7f7e3319136c904644f2a..af17aaf3927ce9f3b8b94a03ca289201c11640d2 100644 (file)
@@ -53,7 +53,6 @@ func (c *QueueEnt) String() string {
 // cache up to date.
 type Queue struct {
        logger     logrus.FieldLogger
-       reg        *prometheus.Registry
        chooseType typeChooser
        client     APIClient
 
@@ -79,14 +78,17 @@ type Queue struct {
 // Arvados cluster's queue during Update, chooseType will be called to
 // assign an appropriate arvados.InstanceType for the queue entry.
 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
-       return &Queue{
+       cq := &Queue{
                logger:      logger,
-               reg:         reg,
                chooseType:  chooseType,
                client:      client,
                current:     map[string]QueueEnt{},
                subscribers: map[<-chan struct{}]chan struct{}{},
        }
+       if reg != nil {
+               go cq.runMetrics(reg)
+       }
+       return cq
 }
 
 // Subscribe returns a channel that becomes ready to receive when an
@@ -487,3 +489,34 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
        }
        return results, nil
 }
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+       mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "queue_entries",
+               Help:      "Number of active container entries in the controller database.",
+       }, []string{"state", "instance_type"})
+       reg.MustRegister(mEntries)
+
+       type entKey struct {
+               state arvados.ContainerState
+               inst  string
+       }
+       count := map[entKey]int{}
+
+       ch := cq.Subscribe()
+       defer cq.Unsubscribe(ch)
+       for range ch {
+               for k := range count {
+                       count[k] = 0
+               }
+               ents, _ := cq.Entries()
+               for _, ent := range ents {
+                       count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+               }
+               for k, v := range count {
+                       mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+               }
+       }
+}
index adf1028b35fe16ab13afbfcb4f0c91672ec17849..9245d5de30928e038da47172dd526cb6d5ca3b8a 100644 (file)
@@ -39,6 +39,7 @@ type pool interface {
        scheduler.WorkerPool
        Instances() []worker.InstanceView
        SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+       KillInstance(id cloud.InstanceID, reason string) error
        Stop()
 }
 
@@ -147,6 +148,7 @@ func (disp *dispatcher) initialize() {
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
                mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
                metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
                        ErrorLog: disp.logger,
                })
@@ -212,6 +214,20 @@ func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
        disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
 }
 
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+       id := cloud.InstanceID(r.FormValue("instance_id"))
+       if id == "" {
+               httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+               return
+       }
+       err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
+       if err != nil {
+               httpserver.Error(w, err.Error(), http.StatusNotFound)
+               return
+       }
+}
+
 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
        id := cloud.InstanceID(r.FormValue("instance_id"))
        if id == "" {
index 7268f106a9f36ba933da51ecba4465ba760a8820..b0033353c2e822d1e6883f7d72d117ac8474e84f 100644 (file)
@@ -49,12 +49,13 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
 
        s.cluster = &arvados.Cluster{
                CloudVMs: arvados.CloudVMs{
-                       Driver:          "test",
-                       SyncInterval:    arvados.Duration(10 * time.Millisecond),
-                       TimeoutIdle:     arvados.Duration(150 * time.Millisecond),
-                       TimeoutBooting:  arvados.Duration(150 * time.Millisecond),
-                       TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
-                       TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+                       Driver:               "test",
+                       SyncInterval:         arvados.Duration(10 * time.Millisecond),
+                       TimeoutIdle:          arvados.Duration(150 * time.Millisecond),
+                       TimeoutBooting:       arvados.Duration(150 * time.Millisecond),
+                       TimeoutProbe:         arvados.Duration(15 * time.Millisecond),
+                       TimeoutShutdown:      arvados.Duration(5 * time.Millisecond),
+                       MaxCloudOpsPerSecond: 500,
                },
                Dispatch: arvados.Dispatch{
                        PrivateKey:         string(dispatchprivraw),
@@ -155,6 +156,8 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                        stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
                case 1:
                        stubvm.CrunchRunMissing = true
+               case 2:
+                       stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
                default:
                        stubvm.CrunchRunCrashRate = 0.1
                }
index 0343f85b91a7bc63d20034e111e04487608bef9c..eb1e48737c8b131cbb919ca71e8f6bbc377c553a 100644 (file)
@@ -6,12 +6,14 @@ package dispatchcloud
 
 import (
        "fmt"
+       "time"
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/cloud/azure"
        "git.curoverse.com/arvados.git/lib/cloud/ec2"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 var drivers = map[string]cloud.Driver{
@@ -24,5 +26,33 @@ func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger
        if !ok {
                return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
        }
-       return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+       is, err := driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
+       if maxops := cluster.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+               is = &rateLimitedInstanceSet{
+                       InstanceSet: is,
+                       ticker:      time.NewTicker(time.Second / time.Duration(maxops)),
+               }
+       }
+       return is, err
+}
+
+type rateLimitedInstanceSet struct {
+       cloud.InstanceSet
+       ticker *time.Ticker
+}
+
+func (is rateLimitedInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+       <-is.ticker.C
+       inst, err := is.InstanceSet.Create(it, image, tags, init, pk)
+       return &rateLimitedInstance{inst, is.ticker}, err
+}
+
+type rateLimitedInstance struct {
+       cloud.Instance
+       ticker *time.Ticker
+}
+
+func (inst *rateLimitedInstance) Destroy() error {
+       <-inst.ticker.C
+       return inst.Instance.Destroy()
 }
index a4521eab7bb9074b02a2f06ef50b781971099d3d..02346a97076d7168869266c8078028a667b39f81 100644 (file)
@@ -181,6 +181,7 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 type StubVM struct {
        Boot                  time.Time
        Broken                time.Time
+       ReportBroken          time.Time
        CrunchRunMissing      bool
        CrunchRunCrashRate    float64
        CrunchRunDetachDelay  time.Duration
@@ -314,6 +315,9 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                for uuid := range svm.running {
                        fmt.Fprintf(stdout, "%s\n", uuid)
                }
+               if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+                       fmt.Fprintln(stdout, "broken")
+               }
                return 0
        }
        if strings.HasPrefix(command, "crunch-run --kill ") {
index 81a658535ea593a7a0a0c2d9231fd66f3055bdbd..014ab93bfe9c7289bcd99286379a3a26bbc38b18 100644 (file)
@@ -691,6 +691,18 @@ func (wp *Pool) Instances() []InstanceView {
        return r
 }
 
+// KillInstance destroys a cloud VM instance. It returns an error if
+// the given instance does not exist.
+func (wp *Pool) KillInstance(id cloud.InstanceID, reason string) error {
+       wkr, ok := wp.workers[id]
+       if !ok {
+               return errors.New("instance not found")
+       }
+       wkr.logger.WithField("Reason", reason).Info("shutting down")
+       wkr.shutdown()
+       return nil
+}
+
 func (wp *Pool) setup() {
        wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
index 41117c1d4eafb5aa2a92c163d3f79d72ace443d3..49c5057b3842e49da945d40c3950f7c2185dfcc5 100644 (file)
@@ -5,7 +5,6 @@
 package worker
 
 import (
-       "bytes"
        "fmt"
        "strings"
        "sync"
@@ -215,11 +214,16 @@ func (wkr *worker) probeAndUpdate() {
                        logger.Info("instance booted; will try probeRunning")
                }
        }
+       reportedBroken := false
        if booted || wkr.state == StateUnknown {
-               ctrUUIDs, ok = wkr.probeRunning()
+               ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
        }
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
+       if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+               logger.Info("probe reported broken instance")
+               wkr.setIdleBehavior(IdleBehaviorDrain)
+       }
        if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
                if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
@@ -313,7 +317,7 @@ func (wkr *worker) probeAndUpdate() {
        go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        cmd := "crunch-run --list"
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
@@ -325,13 +329,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
-               return nil, false
+               return
        }
-       stdout = bytes.TrimRight(stdout, "\n")
-       if len(stdout) == 0 {
-               return nil, true
+       ok = true
+       for _, s := range strings.Split(string(stdout), "\n") {
+               if s == "broken" {
+                       reportsBroken = true
+               } else if s != "" {
+                       running = append(running, s)
+               }
        }
-       return strings.Split(string(stdout), "\n"), true
+       return
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
index 73addb739cd0e25b47212300d420fa4f2b8c8e7d..7c87ff0293052762641019f29d7ff442aa09e75d 100644 (file)
@@ -154,6 +154,9 @@ type CloudVMs struct {
        // Time after shutdown to retry shutdown
        TimeoutShutdown Duration
 
+       // Maximum create/destroy-instance operations per second
+       MaxCloudOpsPerSecond int
+
        ImageID string
 
        Driver           string
index 2392fcde7bdc7475068bfac3452665daa2ef5a61..62936e71831fb1fa055b213fac470f2adeb5ea16 100644 (file)
@@ -6,6 +6,7 @@ package keepclient
 
 import (
        "encoding/json"
+       "errors"
        "fmt"
        "log"
        "os"
@@ -150,7 +151,12 @@ func (kc *KeepClient) discoverServices() error {
        }
        svcListCacheMtx.Unlock()
 
-       return kc.loadKeepServers(<-cacheEnt.latest)
+       select {
+       case <-time.After(time.Minute):
+               return errors.New("timed out while getting initial list of keep services")
+       case sl := <-cacheEnt.latest:
+               return kc.loadKeepServers(sl)
+       }
 }
 
 func (kc *KeepClient) RefreshServiceDiscovery() {
index 933692bdc55b3bbf9c63e78c29c5615418a33d05..852ccb6ece3979385423f3ceb55fb437f164c6aa 100644 (file)
@@ -20,6 +20,7 @@ var (
        lockdir    = "/var/lock"
        lockprefix = "crunch-run-"
        locksuffix = ".lock"
+       brokenfile = "crunch-run-broken"
 )
 
 // procinfo is saved in each process's lockfile.
@@ -146,7 +147,10 @@ func ListProcesses(stdout, stderr io.Writer) int {
                if info.IsDir() && path != walkdir {
                        return filepath.SkipDir
                }
-               if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+               if name := info.Name(); name == brokenfile {
+                       fmt.Fprintln(stdout, "broken")
+                       return nil
+               } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
                        return nil
                }
                if info.Size() == 0 {
index 0576337aa13c280841187db3a7aea2dcf4af65c0..3925b0b7b1f810c9c451c7e756693ba5875bc252 100644 (file)
@@ -222,7 +222,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
        if *brokenNodeHook == "" {
-               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+               path := filepath.Join(lockdir, brokenfile)
+               runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+               f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+               if err != nil {
+                       runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+                       return
+               }
+               f.Close()
        } else {
                runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
                // run killme script
index 17e5e145811aba3e587a66d07fb642ec07bef2d8..60729c019b1a1c508cacceb5b4e7d08e8d300bc5 100644 (file)
@@ -2049,7 +2049,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) {
 
        c.Check(api.CalledWith("container.state", "Queued"), NotNil)
        c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
 }
 
 func (s *TestSuite) TestFullBrokenDocker3(c *C) {