# Time before repeating SIGTERM when killing a container.
TimeoutSignal: 5s
+ # Time to give up on a process (most likely arv-mount) that
+ # still holds a container lockfile after its main supervisor
+ # process has exited, and declare the instance broken.
+ TimeoutStaleRunLock: 5s
+
# Time to give up on SIGTERM and write off the worker.
TimeoutTERM: 2m
# Time before repeating SIGTERM when killing a container.
TimeoutSignal: 5s
+ # Time to give up on a process (most likely arv-mount) that
+ # still holds a container lockfile after its main supervisor
+ # process has exited, and declare the instance broken.
+ TimeoutStaleRunLock: 5s
+
# Time to give up on SIGTERM and write off the worker.
TimeoutTERM: 2m
return nil
}
+ proc, err := os.FindProcess(pi.PID)
+ if err != nil {
+ // FindProcess should have succeeded, even if the
+ // process does not exist.
+ fmt.Fprintf(stderr, "%s: find process %d: %s", path, pi.PID, err)
+ return nil
+ }
+ err = proc.Signal(syscall.Signal(0))
+ if err != nil {
+ // Process is dead, even though lockfile was
+ // still locked. Most likely a stuck arv-mount
+ // process that inherited the lock from
+ // crunch-run. Report container UUID as
+ // "stale".
+ fmt.Fprintln(stdout, pi.UUID, "stale")
+ return nil
+ }
+
fmt.Fprintln(stdout, pi.UUID)
return nil
}))
ProbeInterval: arvados.Duration(5 * time.Millisecond),
MaxProbesPerSecond: 1000,
TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutStaleRunLock: arvados.Duration(3 * time.Millisecond),
TimeoutTERM: arvados.Duration(20 * time.Millisecond),
ResourceTags: map[string]string{"testtag": "test value"},
TagKeyPrefix: "test:",
stubvm.ReportBroken = time.Now().Add(time.Duration(rand.Int63n(200)) * time.Millisecond)
default:
stubvm.CrunchRunCrashRate = 0.1
+ stubvm.ArvMountDeadlockRate = 0.1
}
}
s.stubDriver.Bugf = c.Errorf
tags: copyTags(tags),
providerType: it.ProviderType,
initCommand: cmd,
- running: map[string]int64{},
+ running: map[string]stubProcess{},
killing: map[string]bool{},
}
svm.SSHService = SSHService{
CrunchRunMissing bool
CrunchRunCrashRate float64
CrunchRunDetachDelay time.Duration
+ ArvMountMaxExitLag time.Duration
+ ArvMountDeadlockRate float64
ExecuteContainer func(arvados.Container) int
CrashRunningContainer func(arvados.Container)
initCommand cloud.InitCommand
providerType string
SSHService SSHService
- running map[string]int64
+ running map[string]stubProcess
killing map[string]bool
lastPID int64
+ deadlocked string
sync.Mutex
}
+type stubProcess struct {
+ pid int64
+
+ // crunch-run has exited, but arv-mount process (or something)
+ // still holds lock in /var/run/
+ exited bool
+}
+
func (svm *StubVM) Instance() stubInstance {
svm.Lock()
defer svm.Unlock()
svm.Lock()
svm.lastPID++
pid := svm.lastPID
- svm.running[uuid] = pid
+ svm.running[uuid] = stubProcess{pid: pid}
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
logger.Print("[test] exiting crunch-run stub")
svm.Lock()
defer svm.Unlock()
- if svm.running[uuid] != pid {
+ if svm.running[uuid].pid != pid {
bugf := svm.sis.driver.Bugf
if bugf == nil {
bugf = logger.Warnf
}
- bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s]==%d", pid, uuid, svm.running[uuid])
- } else {
- delete(svm.running, uuid)
+ bugf("[test] StubDriver bug or caller bug: pid %d exiting, running[%s].pid==%d", pid, uuid, svm.running[uuid].pid)
+ return
}
if !completed {
logger.WithField("State", ctr.State).Print("[test] crashing crunch-run stub")
svm.CrashRunningContainer(ctr)
}
}
+ sproc := svm.running[uuid]
+ sproc.exited = true
+ svm.running[uuid] = sproc
+ svm.Unlock()
+ time.Sleep(svm.ArvMountMaxExitLag * time.Duration(math_rand.Float64()))
+ svm.Lock()
+ if math_rand.Float64() >= svm.ArvMountDeadlockRate {
+ delete(svm.running, uuid)
+ }
}()
crashluck := math_rand.Float64()
if command == "crunch-run --list" {
svm.Lock()
defer svm.Unlock()
- for uuid := range svm.running {
- fmt.Fprintf(stdout, "%s\n", uuid)
+ for uuid, sproc := range svm.running {
+ if sproc.exited {
+ fmt.Fprintf(stdout, "%s stale\n", uuid)
+ } else {
+ fmt.Fprintf(stdout, "%s\n", uuid)
+ }
}
if !svm.ReportBroken.IsZero() && svm.ReportBroken.Before(time.Now()) {
fmt.Fprintln(stdout, "broken")
}
+ fmt.Fprintln(stdout, svm.deadlocked)
return 0
}
if strings.HasPrefix(command, "crunch-run --kill ") {
svm.Lock()
- _, running := svm.running[uuid]
- if running {
+ sproc, running := svm.running[uuid]
+ if running && !sproc.exited {
svm.killing[uuid] = true
svm.Unlock()
time.Sleep(time.Duration(math_rand.Float64()*2) * time.Millisecond)
svm.Lock()
- _, running = svm.running[uuid]
+ sproc, running = svm.running[uuid]
}
svm.Unlock()
- if running {
+ if running && !sproc.exited {
fmt.Fprintf(stderr, "%s: container is running\n", uuid)
return 1
}
}
const (
- defaultSyncInterval = time.Minute
- defaultProbeInterval = time.Second * 10
- defaultMaxProbesPerSecond = 10
- defaultTimeoutIdle = time.Minute
- defaultTimeoutBooting = time.Minute * 10
- defaultTimeoutProbe = time.Minute * 10
- defaultTimeoutShutdown = time.Second * 10
- defaultTimeoutTERM = time.Minute * 2
- defaultTimeoutSignal = time.Second * 5
+ defaultSyncInterval = time.Minute
+ defaultProbeInterval = time.Second * 10
+ defaultMaxProbesPerSecond = 10
+ defaultTimeoutIdle = time.Minute
+ defaultTimeoutBooting = time.Minute * 10
+ defaultTimeoutProbe = time.Minute * 10
+ defaultTimeoutShutdown = time.Second * 10
+ defaultTimeoutTERM = time.Minute * 2
+ defaultTimeoutSignal = time.Second * 5
+ defaultTimeoutStaleRunLock = time.Second * 5
// Time after a quota error to try again anyway, even if no
// instances have been shutdown.
timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
+ timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
installPublicKey: installPublicKey,
tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
stop: make(chan bool),
timeoutShutdown time.Duration
timeoutTERM time.Duration
timeoutSignal time.Duration
+ timeoutStaleRunLock time.Duration
installPublicKey ssh.PublicKey
tagKeyPrefix string
probing chan struct{}
bootOutcomeReported bool
timeToReadyReported bool
+ staleRunLockSince time.Time
}
func (wkr *worker) onUnkillable(uuid string) {
return
}
ok = true
+
+ staleRunLock := false
for _, s := range strings.Split(string(stdout), "\n") {
- if s == "broken" {
+ // Each line of the "crunch-run --list" output is one
+ // of the following:
+ //
+ // * a container UUID, indicating that processes
+ // related to that container are currently running.
+ // Optionally followed by " stale", indicating that
+ // the crunch-run process itself has exited (the
+ // remaining process is probably arv-mount).
+ //
+ // * the string "broken", indicating that the instance
+ // appears incapable of starting containers.
+ //
+ // See ListProcesses() in lib/crunchrun/background.go.
+ if s == "" {
+ // empty string following final newline
+ } else if s == "broken" {
reportsBroken = true
- } else if s != "" {
+ } else if toks := strings.Split(s, " "); len(toks) == 1 {
running = append(running, s)
+ } else if toks[1] == "stale" {
+ wkr.logger.WithField("ContainerUUID", toks[0]).Info("probe reported stale run lock")
+ staleRunLock = true
}
}
+ wkr.mtx.Lock()
+ defer wkr.mtx.Unlock()
+ if !staleRunLock {
+ wkr.staleRunLockSince = time.Time{}
+ } else if wkr.staleRunLockSince.IsZero() {
+ wkr.staleRunLockSince = time.Now()
+ } else if dur := time.Now().Sub(wkr.staleRunLockSince); dur > wkr.wp.timeoutStaleRunLock {
+ wkr.logger.WithField("Duration", dur).Warn("reporting broken after reporting stale run lock for too long")
+ reportsBroken = true
+ }
return
}
TimeoutProbe Duration
TimeoutShutdown Duration
TimeoutSignal Duration
+ TimeoutStaleRunLock Duration
TimeoutTERM Duration
ResourceTags map[string]string
TagKeyPrefix string