Merge branch '16834-stale-run-lock'
authorTom Clegg <tom@tomclegg.ca>
Tue, 15 Sep 2020 14:49:44 +0000 (10:49 -0400)
committerTom Clegg <tom@tomclegg.ca>
Tue, 15 Sep 2020 14:49:44 +0000 (10:49 -0400)
fixes #16834

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

lib/config/config.default.yml
lib/config/generated_config.go
lib/crunchrun/background.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/worker.go
sdk/go/arvados/config.go

index b1865a2217ce99c48a13ae8a17e4cf10d336cbf0..15e7c7c06ce723114cafc86d8e4c6ec0c2b99dff 100644 (file)
@@ -938,6 +938,11 @@ Clusters:
         # Time before repeating SIGTERM when killing a container.
         TimeoutSignal: 5s
 
+        # Time to give up on a process (most likely arv-mount) that
+        # still holds a container lockfile after its main supervisor
+        # process has exited, and declare the instance broken.
+        TimeoutStaleRunLock: 5s
+
         # Time to give up on SIGTERM and write off the worker.
         TimeoutTERM: 2m
 
index 201ae3604537f9f44a9e788320b7262685944f98..7ed332151b8bbacaa81fa7352e8251058296b8c8 100644 (file)
@@ -944,6 +944,11 @@ Clusters:
         # Time before repeating SIGTERM when killing a container.
         TimeoutSignal: 5s
 
+        # Time to give up on a process (most likely arv-mount) that
+        # still holds a container lockfile after its main supervisor
+        # process has exited, and declare the instance broken.
+        TimeoutStaleRunLock: 5s
+
         # Time to give up on SIGTERM and write off the worker.
         TimeoutTERM: 2m
 
index bf039afa0ad53799183607fe9795b5556f615bad..8cdba72c10d3c5902225456de9389bcc70b6dbca 100644 (file)
@@ -218,6 +218,24 @@ func ListProcesses(stdout, stderr io.Writer) int {
                        return nil
                }
 
+               proc, err := os.FindProcess(pi.PID)
+               if err != nil {
+                       // FindProcess should have succeeded, even if the
+                       // process does not exist.
+                       fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
+                       return nil
+               }
+               err = proc.Signal(syscall.Signal(0))
+               if err != nil {
+                       // Process is dead, even though lockfile was
+                       // still locked. Most likely a stuck arv-mount
+                       // process that inherited the lock from
+                       // crunch-run. Report container UUID as
+                       // "stale".
+                       fmt.Fprintln(stdout, pi.UUID, "stale")
+                       return nil
+               }
+
                fmt.Fprintln(stdout, pi.UUID)
                return nil
        }))
index 6e1850410b28bf3394ec4e29c4416a9551ec6d91..cb5cdf1cf4e87ba8b9e741fe60775276587d5f0d 100644 (file)
@@ -66,6 +66,7 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                                ProbeInterval:        arvados.Duration(5 * time.Millisecond),
                                MaxProbesPerSecond:   1000,
                                TimeoutSignal:        arvados.Duration(3 * time.Millisecond),
+                               TimeoutStaleRunLock:  arvados.Duration(3 * time.Millisecond),
                                TimeoutTERM:          arvados.Duration(20 * time.Millisecond),
                                ResourceTags:         map[string]string{"testtag": "test value"},
                                TagKeyPrefix:         "test:",
@@ -169,6 +170,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
                        stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
                default:
                        stubvm.CrunchRunCrashRate = 0.1
+                       stubvm.ArvMountDeadlockRate = 0.1
                }
        }
        s.stubDriver.Bugf = c.Errorf
index 132bd4d695f0ef88095951b151be592029c31328..4d32cf221ce49461e092a834ad192460bc37a49d 100644 (file)
@@ -131,7 +131,7 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                tags:         copyTags(tags),
                providerType: it.ProviderType,
                initCommand:  cmd,
-               running:      map[string]int64{},
+               running:      map[string]stubProcess{},
                killing:      map[string]bool{},
        }
        svm.SSHService = SSHService{
@@ -189,6 +189,8 @@ type StubVM struct {
        CrunchRunMissing      bool
        CrunchRunCrashRate    float64
        CrunchRunDetachDelay  time.Duration
+       ArvMountMaxExitLag    time.Duration
+       ArvMountDeadlockRate  float64
        ExecuteContainer      func(arvados.Container) int
        CrashRunningContainer func(arvados.Container)
 
@@ -198,12 +200,21 @@ type StubVM struct {
        initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
-       running      map[string]int64
+       running      map[string]stubProcess
        killing      map[string]bool
        lastPID      int64
+       deadlocked   string
        sync.Mutex
 }
 
+type stubProcess struct {
+       pid int64
+
+       // crunch-run has exited, but arv-mount process (or something)
+       // still holds lock in /var/run/
+       exited bool
+}
+
 func (svm *StubVM) Instance() stubInstance {
        svm.Lock()
        defer svm.Unlock()
@@ -256,7 +267,7 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                svm.Lock()
                svm.lastPID++
                pid := svm.lastPID
-               svm.running[uuid] = pid
+               svm.running[uuid] = stubProcess{pid: pid}
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
@@ -273,14 +284,13 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                                logger.Print("[test] exiting crunch-run stub")
                                svm.Lock()
                                defer svm.Unlock()
-                               if svm.running[uuid] != pid {
+                               if svm.running[uuid].pid != pid {
                                        bugf := svm.sis.driver.Bugf
                                        if bugf == nil {
                                                bugf = logger.Warnf
                                        }
-                                       bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid])
-                               } else {
-                                       delete(svm.running, uuid)
+                                       bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
+                                       return
                                }
                                if !completed {
                                        logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
@@ -288,6 +298,15 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                                                svm.CrashRunningContainer(ctr)
                                        }
                                }
+                               sproc := svm.running[uuid]
+                               sproc.exited = true
+                               svm.running[uuid] = sproc
+                               svm.Unlock()
+                               time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
+                               svm.Lock()
+                               if math_rand.Float64() >= svm.ArvMountDeadlockRate {
+                                       delete(svm.running, uuid)
+                               }
                        }()
 
                        crashluck := math_rand.Float64()
@@ -333,26 +352,31 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
        if command == "crunch-run --list" {
                svm.Lock()
                defer svm.Unlock()
-               for uuid := range svm.running {
-                       fmt.Fprintf(stdout, "%s\n", uuid)
+               for uuid, sproc := range svm.running {
+                       if sproc.exited {
+                               fmt.Fprintf(stdout, "%s stale\n", uuid)
+                       } else {
+                               fmt.Fprintf(stdout, "%s\n", uuid)
+                       }
                }
                if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
                        fmt.Fprintln(stdout, "broken")
                }
+               fmt.Fprintln(stdout, svm.deadlocked)
                return 0
        }
        if strings.HasPrefix(command, "crunch-run --kill ") {
                svm.Lock()
-               _, running := svm.running[uuid]
-               if running {
+               sproc, running := svm.running[uuid]
+               if running && !sproc.exited {
                        svm.killing[uuid] = true
                        svm.Unlock()
                        time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
                        svm.Lock()
-                       _, running = svm.running[uuid]
+                       sproc, running = svm.running[uuid]
                }
                svm.Unlock()
-               if running {
+               if running && !sproc.exited {
                        fmt.Fprintf(stderr, "%s: container is running\n", uuid)
                        return 1
                }
index 086887cb44176f05c9446341a68d7176dd5ed7aa..953195c59d39718b6a179277ae8a323ad75f868f 100644 (file)
@@ -64,15 +64,16 @@ type Executor interface {
 }
 
 const (
-       defaultSyncInterval       = time.Minute
-       defaultProbeInterval      = time.Second * 10
-       defaultMaxProbesPerSecond = 10
-       defaultTimeoutIdle        = time.Minute
-       defaultTimeoutBooting     = time.Minute * 10
-       defaultTimeoutProbe       = time.Minute * 10
-       defaultTimeoutShutdown    = time.Second * 10
-       defaultTimeoutTERM        = time.Minute * 2
-       defaultTimeoutSignal      = time.Second * 5
+       defaultSyncInterval        = time.Minute
+       defaultProbeInterval       = time.Second * 10
+       defaultMaxProbesPerSecond  = 10
+       defaultTimeoutIdle         = time.Minute
+       defaultTimeoutBooting      = time.Minute * 10
+       defaultTimeoutProbe        = time.Minute * 10
+       defaultTimeoutShutdown     = time.Second * 10
+       defaultTimeoutTERM         = time.Minute * 2
+       defaultTimeoutSignal       = time.Second * 5
+       defaultTimeoutStaleRunLock = time.Second * 5
 
        // Time after a quota error to try again anyway, even if no
        // instances have been shutdown.
@@ -115,6 +116,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutShutdown:                duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
                timeoutTERM:                    duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
                timeoutSignal:                  duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+               timeoutStaleRunLock:            duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
                installPublicKey:               installPublicKey,
                tagKeyPrefix:                   cluster.Containers.CloudVMs.TagKeyPrefix,
                stop:                           make(chan bool),
@@ -152,6 +154,7 @@ type Pool struct {
        timeoutShutdown                time.Duration
        timeoutTERM                    time.Duration
        timeoutSignal                  time.Duration
+       timeoutStaleRunLock            time.Duration
        installPublicKey               ssh.PublicKey
        tagKeyPrefix                   string
 
index 9199d4bafe764d806312638328cf13fd3b422e4d..517a5d193e328b8f9f2ae2f1ce9d9b4db718ddf6 100644 (file)
@@ -110,6 +110,7 @@ type worker struct {
        probing             chan struct{}
        bootOutcomeReported bool
        timeToReadyReported bool
+       staleRunLockSince   time.Time
 }
 
 func (wkr *worker) onUnkillable(uuid string) {
@@ -382,13 +383,43 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
                return
        }
        ok = true
+
+       staleRunLock := false
        for _, s := range strings.Split(string(stdout), "\n") {
-               if s == "broken" {
+               // Each line of the "crunch-run --list" output is one
+               // of the following:
+               //
+               // * a container UUID, indicating that processes
+               //   related to that container are currently running.
+               //   Optionally followed by " stale", indicating that
+               //   the crunch-run process itself has exited (the
+               //   remaining process is probably arv-mount).
+               //
+               // * the string "broken", indicating that the instance
+               //   appears incapable of starting containers.
+               //
+               // See ListProcesses() in lib/crunchrun/background.go.
+               if s == "" {
+                       // empty string following final newline
+               } else if s == "broken" {
                        reportsBroken = true
-               } else if s != "" {
+               } else if toks := strings.Split(s, " "); len(toks) == 1 {
                        running = append(running, s)
+               } else if toks[1] == "stale" {
+                       wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+                       staleRunLock = true
                }
        }
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       if !staleRunLock {
+               wkr.staleRunLockSince = time.Time{}
+       } else if wkr.staleRunLockSince.IsZero() {
+               wkr.staleRunLockSince = time.Now()
+       } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+               wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+               reportsBroken = true
+       }
        return
 }
 
index 363d09dafb5452b350077d6933ed2698689fb513..394e30a737e2aff13128df90d96decfc12428863 100644 (file)
@@ -462,6 +462,7 @@ type CloudVMsConfig struct {
        TimeoutProbe                   Duration
        TimeoutShutdown                Duration
        TimeoutSignal                  Duration
+       TimeoutStaleRunLock            Duration
        TimeoutTERM                    Duration
        ResourceTags                   map[string]string
        TagKeyPrefix                   string