X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e231fef37079916d0dc6babf93d669d474598ced..6036c55e1239281746152e85dfabbc9ed3cb6864:/lib/dispatchcloud/worker/worker.go diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 9be9f41f43..6878bb0655 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -6,15 +6,15 @@ package worker import ( "bytes" - "encoding/json" "fmt" + "path/filepath" "strings" "sync" "time" - "git.curoverse.com/arvados.git/lib/cloud" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/stats" + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/stats" "github.com/sirupsen/logrus" ) @@ -43,6 +43,33 @@ var stateString = map[State]string{ StateShutdown: "shutdown", } +// BootOutcome is the result of a worker boot. It is used as a label in a metric. +type BootOutcome string + +const ( + BootOutcomeFailed BootOutcome = "failure" + BootOutcomeSucceeded BootOutcome = "success" + BootOutcomeIdleShutdown BootOutcome = "idle shutdown" + BootOutcomeDisappeared BootOutcome = "disappeared" +) + +var validBootOutcomes = map[BootOutcome]bool{ + BootOutcomeFailed: true, + BootOutcomeSucceeded: true, + BootOutcomeIdleShutdown: true, + BootOutcomeDisappeared: true, +} + +func (wkr *worker) reportBootOutcome(outcome BootOutcome) { + if wkr.bootOutcomeReported { + return + } + if wkr.wp.mBootOutcomes != nil { + wkr.wp.mBootOutcomes.WithLabelValues(string(outcome)).Inc() + } + wkr.bootOutcomeReported = true +} + // String implements fmt.Stringer. func (s State) String() string { return stateString[s] @@ -74,22 +101,50 @@ type worker struct { executor Executor wp *Pool - mtx sync.Locker // must be wp's Locker. - state State - idleBehavior IdleBehavior - instance cloud.Instance - instType arvados.InstanceType - vcpus int64 - memory int64 - appeared time.Time - probed time.Time - updated time.Time - busy time.Time - destroyed time.Time - lastUUID string - running map[string]struct{} // remember to update state idle<->running when this changes - starting map[string]struct{} // remember to update state idle<->running when this changes - probing chan struct{} + mtx sync.Locker // must be wp's Locker. + state State + idleBehavior IdleBehavior + instance cloud.Instance + instType arvados.InstanceType + vcpus int64 + memory int64 + appeared time.Time + probed time.Time + updated time.Time + busy time.Time + destroyed time.Time + lastUUID string + running map[string]*remoteRunner // remember to update state idle<->running when this changes + starting map[string]*remoteRunner // remember to update state idle<->running when this changes + probing chan struct{} + bootOutcomeReported bool +} + +func (wkr *worker) onUnkillable(uuid string) { + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + logger := wkr.logger.WithField("ContainerUUID", uuid) + if wkr.idleBehavior == IdleBehaviorHold { + logger.Warn("unkillable container, but worker has IdleBehavior=Hold") + return + } + logger.Warn("unkillable container, draining worker") + wkr.setIdleBehavior(IdleBehaviorDrain) +} + +func (wkr *worker) onKilled(uuid string) { + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + wkr.closeRunner(uuid) + go wkr.wp.notify() +} + +// caller must have lock. +func (wkr *worker) setIdleBehavior(idleBehavior IdleBehavior) { + wkr.logger.WithField("IdleBehavior", idleBehavior).Info("set idle behavior") + wkr.idleBehavior = idleBehavior + wkr.saveTags() + wkr.shutdownIfIdle() } // caller must have lock. @@ -98,48 +153,22 @@ func (wkr *worker) startContainer(ctr arvados.Container) { "ContainerUUID": ctr.UUID, "Priority": ctr.Priority, }) - logger = logger.WithField("Instance", wkr.instance.ID()) logger.Debug("starting container") - wkr.starting[ctr.UUID] = struct{}{} - wkr.state = StateRunning + rr := newRemoteRunner(ctr.UUID, wkr) + wkr.starting[ctr.UUID] = rr + if wkr.state != StateRunning { + wkr.state = StateRunning + go wkr.wp.notify() + } go func() { - env := map[string]string{ - "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost, - "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken, - } - if wkr.wp.arvClient.Insecure { - env["ARVADOS_API_HOST_INSECURE"] = "1" - } - envJSON, err := json.Marshal(env) - if err != nil { - panic(err) - } - stdin := bytes.NewBuffer(envJSON) - cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'" - if u := wkr.instance.RemoteUser(); u != "root" { - cmd = "sudo " + cmd - } - stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin) + rr.Start() wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() wkr.updated = now wkr.busy = now delete(wkr.starting, ctr.UUID) - wkr.running[ctr.UUID] = struct{}{} - wkr.lastUUID = ctr.UUID - if err != nil { - logger.WithField("stdout", string(stdout)). - WithField("stderr", string(stderr)). - WithError(err). - Error("error starting crunch-run process") - // Leave uuid in wkr.running, though: it's - // possible the error was just a communication - // failure and the process was in fact - // started. Wait for next probe to find out. - return - } - logger.Info("crunch-run process started") + wkr.running[ctr.UUID] = rr wkr.lastUUID = ctr.UUID }() } @@ -215,11 +244,17 @@ func (wkr *worker) probeAndUpdate() { logger.Info("instance booted; will try probeRunning") } } + reportedBroken := false if booted || wkr.state == StateUnknown { - ctrUUIDs, ok = wkr.probeRunning() + ctrUUIDs, reportedBroken, ok = wkr.probeRunning() } wkr.mtx.Lock() defer wkr.mtx.Unlock() + if reportedBroken && wkr.idleBehavior == IdleBehaviorRun { + logger.Info("probe reported broken instance") + wkr.reportBootOutcome(BootOutcomeFailed) + wkr.setIdleBehavior(IdleBehaviorDrain) + } if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) { if wkr.state == StateShutdown && wkr.updated.After(updated) { // Skip the logging noise if shutdown was @@ -241,6 +276,7 @@ func (wkr *worker) probeAndUpdate() { // some evidence about why the node never // booted, even in non-debug mode. if !booted { + wkr.reportBootOutcome(BootOutcomeFailed) logger.WithFields(logrus.Fields{ "Duration": dur, "stderr": string(stderr), @@ -271,31 +307,8 @@ func (wkr *worker) probeAndUpdate() { // advantage of the non-busy state, though. wkr.busy = updateTime } - changed := false - // Build a new "running" map. Set changed=true if it differs - // from the existing map (wkr.running) to ensure the scheduler - // gets notified below. - running := map[string]struct{}{} - for _, uuid := range ctrUUIDs { - running[uuid] = struct{}{} - if _, ok := wkr.running[uuid]; !ok { - if _, ok := wkr.starting[uuid]; !ok { - // We didn't start it -- it must have - // been started by a previous - // dispatcher process. - logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected") - } - changed = true - } - } - for uuid := range wkr.running { - if _, ok := running[uuid]; !ok { - logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") - wkr.wp.notifyExited(uuid, updateTime) - changed = true - } - } + changed := wkr.updateRunning(ctrUUIDs) // Update state if this was the first successful boot-probe. if booted && (wkr.state == StateUnknown || wkr.state == StateBooting) { @@ -314,14 +327,13 @@ func (wkr *worker) probeAndUpdate() { // Log whenever a run-probe reveals crunch-run processes // appearing/disappearing before boot-probe succeeds. - if wkr.state == StateUnknown && len(running) != len(wkr.running) { + if wkr.state == StateUnknown && changed { logger.WithFields(logrus.Fields{ - "RunningContainers": len(running), + "RunningContainers": len(wkr.running), "State": wkr.state, }).Info("crunch-run probe succeeded, but boot probe is still failing") } - wkr.running = running if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { wkr.state = StateRunning } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { @@ -329,16 +341,17 @@ func (wkr *worker) probeAndUpdate() { } wkr.updated = updateTime if booted && (initialState == StateUnknown || initialState == StateBooting) { + wkr.reportBootOutcome(BootOutcomeSucceeded) logger.WithFields(logrus.Fields{ - "RunningContainers": len(running), + "RunningContainers": len(wkr.running), "State": wkr.state, }).Info("probes succeeded, instance is in service") } go wkr.wp.notify() } -func (wkr *worker) probeRunning() (running []string, ok bool) { - cmd := "crunch-run --list" +func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { + cmd := wkr.wp.runnerCmd + " --list" if u := wkr.instance.RemoteUser(); u != "root" { cmd = "sudo " + cmd } @@ -349,13 +362,17 @@ func (wkr *worker) probeRunning() (running []string, ok bool) { "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") - return nil, false + return } - stdout = bytes.TrimRight(stdout, "\n") - if len(stdout) == 0 { - return nil, true + ok = true + for _, s := range strings.Split(string(stdout), "\n") { + if s == "broken" { + reportsBroken = true + } else if s != "" { + running = append(running, s) + } } - return strings.Split(string(stdout), "\n"), true + return } func (wkr *worker) probeBooted() (ok bool, stderr []byte) { @@ -374,9 +391,46 @@ func (wkr *worker) probeBooted() (ok bool, stderr []byte) { return false, stderr } logger.Info("boot probe succeeded") + if err = wkr.wp.loadRunnerData(); err != nil { + wkr.logger.WithError(err).Warn("cannot boot worker: error loading runner binary") + return false, stderr + } else if len(wkr.wp.runnerData) == 0 { + // Assume crunch-run is already installed + } else if _, stderr2, err := wkr.copyRunnerData(); err != nil { + wkr.logger.WithError(err).WithField("stderr", string(stderr2)).Warn("error copying runner binary") + return false, stderr2 + } else { + stderr = append(stderr, stderr2...) + } return true, stderr } +func (wkr *worker) copyRunnerData() (stdout, stderr []byte, err error) { + hash := fmt.Sprintf("%x", wkr.wp.runnerMD5) + dstdir, _ := filepath.Split(wkr.wp.runnerCmd) + logger := wkr.logger.WithFields(logrus.Fields{ + "hash": hash, + "path": wkr.wp.runnerCmd, + }) + + stdout, stderr, err = wkr.executor.Execute(nil, `md5sum `+wkr.wp.runnerCmd, nil) + if err == nil && len(stderr) == 0 && bytes.Equal(stdout, []byte(hash+" "+wkr.wp.runnerCmd+"\n")) { + logger.Info("runner binary already exists on worker, with correct hash") + return + } + + // Note touch+chmod come before writing data, to avoid the + // possibility of md5 being correct while file mode is + // incorrect. + cmd := `set -e; dstdir="` + dstdir + `"; dstfile="` + wkr.wp.runnerCmd + `"; mkdir -p "$dstdir"; touch "$dstfile"; chmod 0755 "$dstdir" "$dstfile"; cat >"$dstfile"` + if wkr.instance.RemoteUser() != "root" { + cmd = `sudo sh -c '` + strings.Replace(cmd, "'", "'\\''", -1) + `'` + } + logger.WithField("cmd", cmd).Info("installing runner binary on worker") + stdout, stderr, err = wkr.executor.Execute(nil, cmd, bytes.NewReader(wkr.wp.runnerData)) + return +} + // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { if wkr.idleBehavior == IdleBehaviorHold { @@ -399,27 +453,53 @@ func (wkr *worker) shutdownIfBroken(dur time.Duration) bool { return true } +// Returns true if the instance is eligible for shutdown: either it's +// been idle too long, or idleBehavior=Drain and nothing is running. +// // caller must have lock. -func (wkr *worker) shutdownIfIdle() bool { +func (wkr *worker) eligibleForShutdown() bool { if wkr.idleBehavior == IdleBehaviorHold { - // Never shut down. return false } - age := time.Since(wkr.busy) - - old := age >= wkr.wp.timeoutIdle draining := wkr.idleBehavior == IdleBehaviorDrain - shouldShutdown := ((old || draining) && wkr.state == StateIdle) || - (draining && wkr.state == StateBooting) - if !shouldShutdown { + switch wkr.state { + case StateBooting: + return draining + case StateIdle: + return draining || time.Since(wkr.busy) >= wkr.wp.timeoutIdle + case StateRunning: + if !draining { + return false + } + for _, rr := range wkr.running { + if !rr.givenup { + return false + } + } + for _, rr := range wkr.starting { + if !rr.givenup { + return false + } + } + // draining, and all remaining runners are just trying + // to force-kill their crunch-run procs + return true + default: return false } +} +// caller must have lock. +func (wkr *worker) shutdownIfIdle() bool { + if !wkr.eligibleForShutdown() { + return false + } wkr.logger.WithFields(logrus.Fields{ "State": wkr.state, - "IdleDuration": stats.Duration(age), + "IdleDuration": stats.Duration(time.Since(wkr.busy)), "IdleBehavior": wkr.idleBehavior, - }).Info("shutdown idle worker") + }).Info("shutdown worker") + wkr.reportBootOutcome(BootOutcomeIdleShutdown) wkr.shutdown() return true } @@ -446,8 +526,8 @@ func (wkr *worker) saveTags() { instance := wkr.instance tags := instance.Tags() update := cloud.InstanceTags{ - tagKeyInstanceType: wkr.instType.Name, - tagKeyIdleBehavior: string(wkr.idleBehavior), + wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name, + wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior), } save := false for k, v := range update { @@ -465,3 +545,68 @@ func (wkr *worker) saveTags() { }() } } + +func (wkr *worker) Close() { + // This might take time, so do it after unlocking mtx. + defer wkr.executor.Close() + + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + for uuid, rr := range wkr.running { + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") + rr.Close() + } + for uuid, rr := range wkr.starting { + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process abandoned") + rr.Close() + } +} + +// Add/remove entries in wkr.running to match ctrUUIDs returned by a +// probe. Returns true if anything was added or removed. +// +// Caller must have lock. +func (wkr *worker) updateRunning(ctrUUIDs []string) (changed bool) { + alive := map[string]bool{} + for _, uuid := range ctrUUIDs { + alive[uuid] = true + if _, ok := wkr.running[uuid]; ok { + // unchanged + } else if rr, ok := wkr.starting[uuid]; ok { + wkr.running[uuid] = rr + delete(wkr.starting, uuid) + changed = true + } else { + // We didn't start it -- it must have been + // started by a previous dispatcher process. + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process detected") + wkr.running[uuid] = newRemoteRunner(uuid, wkr) + changed = true + } + } + for uuid := range wkr.running { + if !alive[uuid] { + wkr.closeRunner(uuid) + changed = true + } + } + return +} + +// caller must have lock. +func (wkr *worker) closeRunner(uuid string) { + rr := wkr.running[uuid] + if rr == nil { + return + } + wkr.logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") + delete(wkr.running, uuid) + rr.Close() + + now := time.Now() + wkr.updated = now + wkr.wp.exited[uuid] = now + if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { + wkr.state = StateIdle + } +}