14807: Drain instances that crunch-run reports broken.
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 18 Mar 2019 20:32:58 +0000 (16:32 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 20 Mar 2019 19:27:04 +0000 (15:27 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/worker.go
services/crunch-run/background.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go

index 7268f106a9f36ba933da51ecba4465ba760a8820..44d5a0ae75b2b8b4400bdb090e0dfc9baaf90198 100644 (file)
@@ -157,6 +157,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                        stubvm.CrunchRunMissing = true
                default:
                        stubvm.CrunchRunCrashRate = 0.1
+                       stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)+200) * time.Millisecond)
                }
        }
 
index a4521eab7bb9074b02a2f06ef50b781971099d3d..02346a97076d7168869266c8078028a667b39f81 100644 (file)
@@ -181,6 +181,7 @@ func (e RateLimitError) EarliestRetry() time.Time { return e.Retry }
 type StubVM struct {
        Boot                  time.Time
        Broken                time.Time
+       ReportBroken          time.Time
        CrunchRunMissing      bool
        CrunchRunCrashRate    float64
        CrunchRunDetachDelay  time.Duration
@@ -314,6 +315,9 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                for uuid := range svm.running {
                        fmt.Fprintf(stdout, "%s\n", uuid)
                }
+               if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
+                       fmt.Fprintln(stdout, "broken")
+               }
                return 0
        }
        if strings.HasPrefix(command, "crunch-run --kill ") {
index 41117c1d4eafb5aa2a92c163d3f79d72ace443d3..49c5057b3842e49da945d40c3950f7c2185dfcc5 100644 (file)
@@ -5,7 +5,6 @@
 package worker
 
 import (
-       "bytes"
        "fmt"
        "strings"
        "sync"
@@ -215,11 +214,16 @@ func (wkr *worker) probeAndUpdate() {
                        logger.Info("instance booted; will try probeRunning")
                }
        }
+       reportedBroken := false
        if booted || wkr.state == StateUnknown {
-               ctrUUIDs, ok = wkr.probeRunning()
+               ctrUUIDs, reportedBroken, ok = wkr.probeRunning()
        }
        wkr.mtx.Lock()
        defer wkr.mtx.Unlock()
+       if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+               logger.Info("probe reported broken instance")
+               wkr.setIdleBehavior(IdleBehaviorDrain)
+       }
        if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
                if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
@@ -313,7 +317,7 @@ func (wkr *worker) probeAndUpdate() {
        go wkr.wp.notify()
 }
 
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
        cmd := "crunch-run --list"
        if u := wkr.instance.RemoteUser(); u != "root" {
                cmd = "sudo " + cmd
@@ -325,13 +329,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool) {
                        "stdout":  string(stdout),
                        "stderr":  string(stderr),
                }).WithError(err).Warn("probe failed")
-               return nil, false
+               return
        }
-       stdout = bytes.TrimRight(stdout, "\n")
-       if len(stdout) == 0 {
-               return nil, true
+       ok = true
+       for _, s := range strings.Split(string(stdout), "\n") {
+               if s == "broken" {
+                       reportsBroken = true
+               } else if s != "" {
+                       running = append(running, s)
+               }
        }
-       return strings.Split(string(stdout), "\n"), true
+       return
 }
 
 func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
index 933692bdc55b3bbf9c63e78c29c5615418a33d05..852ccb6ece3979385423f3ceb55fb437f164c6aa 100644 (file)
@@ -20,6 +20,7 @@ var (
        lockdir    = "/var/lock"
        lockprefix = "crunch-run-"
        locksuffix = ".lock"
+       brokenfile = "crunch-run-broken"
 )
 
 // procinfo is saved in each process's lockfile.
@@ -146,7 +147,10 @@ func ListProcesses(stdout, stderr io.Writer) int {
                if info.IsDir() && path != walkdir {
                        return filepath.SkipDir
                }
-               if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+               if name := info.Name(); name == brokenfile {
+                       fmt.Fprintln(stdout, "broken")
+                       return nil
+               } else if !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
                        return nil
                }
                if info.Size() == 0 {
index 0576337aa13c280841187db3a7aea2dcf4af65c0..3925b0b7b1f810c9c451c7e756693ba5875bc252 100644 (file)
@@ -222,7 +222,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
        if *brokenNodeHook == "" {
-               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+               path := filepath.Join(lockdir, brokenfile)
+               runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+               f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+               if err != nil {
+                       runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+                       return
+               }
+               f.Close()
        } else {
                runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
                // run killme script
index 17e5e145811aba3e587a66d07fb642ec07bef2d8..60729c019b1a1c508cacceb5b4e7d08e8d300bc5 100644 (file)
@@ -2049,7 +2049,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) {
 
        c.Check(api.CalledWith("container.state", "Queued"), NotNil)
        c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
 }
 
 func (s *TestSuite) TestFullBrokenDocker3(c *C) {