// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package worker import ( "bytes" "strings" "sync" "time" "git.curoverse.com/arvados.git/lib/cloud" "git.curoverse.com/arvados.git/sdk/go/arvados" "github.com/sirupsen/logrus" ) // State indicates whether a worker is available to do work, and (if // not) whether/when it is expected to become ready. type State int const ( StateUnknown State = iota // might be running a container already StateBooting // instance is booting StateIdle // instance booted, no containers are running StateRunning // instance is running one or more containers StateShutdown // worker has stopped monitoring the instance StateHold // running, but not available to run new containers ) const ( // TODO: configurable maxPingFailTime = 10 * time.Minute ) var stateString = map[State]string{ StateUnknown: "unknown", StateBooting: "booting", StateIdle: "idle", StateRunning: "running", StateShutdown: "shutdown", StateHold: "hold", } // String implements fmt.Stringer. func (s State) String() string { return stateString[s] } // MarshalText implements encoding.TextMarshaler so a JSON encoding of // map[State]anything uses the state's string representation. func (s State) MarshalText() ([]byte, error) { return []byte(stateString[s]), nil } type worker struct { logger logrus.FieldLogger executor Executor wp *Pool mtx sync.Locker // must be wp's Locker. state State instance cloud.Instance instType arvados.InstanceType vcpus int64 memory int64 appeared time.Time probed time.Time updated time.Time busy time.Time destroyed time.Time lastUUID string running map[string]struct{} // remember to update state idle<->running when this changes starting map[string]struct{} // remember to update state idle<->running when this changes probing chan struct{} } // caller must have lock. func (wkr *worker) startContainer(ctr arvados.Container) { logger := wkr.logger.WithFields(logrus.Fields{ "ContainerUUID": ctr.UUID, "Priority": ctr.Priority, }) logger = logger.WithField("Instance", wkr.instance) logger.Debug("starting container") wkr.starting[ctr.UUID] = struct{}{} wkr.state = StateRunning go func() { stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil) wkr.mtx.Lock() defer wkr.mtx.Unlock() now := time.Now() wkr.updated = now wkr.busy = now delete(wkr.starting, ctr.UUID) wkr.running[ctr.UUID] = struct{}{} wkr.lastUUID = ctr.UUID if err != nil { logger.WithField("stdout", string(stdout)). WithField("stderr", string(stderr)). WithError(err). Error("error starting crunch-run process") // Leave uuid in wkr.running, though: it's // possible the error was just a communication // failure and the process was in fact // started. Wait for next probe to find out. return } logger.Info("crunch-run process started") wkr.lastUUID = ctr.UUID }() } // ProbeAndUpdate conducts appropriate boot/running probes (if any) // for the worker's curent state. If a previous probe is still // running, it does nothing. // // It should be called in a new goroutine. func (wkr *worker) ProbeAndUpdate() { select { case wkr.probing <- struct{}{}: wkr.probeAndUpdate() <-wkr.probing default: wkr.logger.Debug("still waiting for last probe to finish") } } // should be called in a new goroutine func (wkr *worker) probeAndUpdate() { wkr.mtx.Lock() updated := wkr.updated needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting wkr.mtx.Unlock() if !needProbeBooted && !needProbeRunning { return } var ( ctrUUIDs []string ok bool stderr []byte ) if needProbeBooted { ok, stderr = wkr.probeBooted() wkr.mtx.Lock() if ok || wkr.state == StateRunning || wkr.state == StateIdle { wkr.logger.Info("instance booted; will try probeRunning") needProbeRunning = true } wkr.mtx.Unlock() } if needProbeRunning { ctrUUIDs, ok, stderr = wkr.probeRunning() } logger := wkr.logger.WithField("stderr", string(stderr)) wkr.mtx.Lock() defer wkr.mtx.Unlock() if !ok { if wkr.state == StateShutdown && wkr.updated.After(updated) { // Skip the logging noise if shutdown was // initiated during probe. return } dur := time.Since(wkr.probed) logger := logger.WithFields(logrus.Fields{ "Duration": dur, "State": wkr.state, }) if wkr.state == StateBooting && !needProbeRunning { // If we know the instance has never passed a // boot probe, it's not noteworthy that it // hasn't passed this probe. logger.Debug("new instance not responding") } else { logger.Info("instance not responding") } wkr.shutdownIfBroken(dur) return } updateTime := time.Now() wkr.probed = updateTime if updated != wkr.updated { // Worker was updated after the probe began, so // wkr.running might have a container UUID that was // not yet running when ctrUUIDs was generated. Leave // wkr.running alone and wait for the next probe to // catch up on any changes. return } if len(ctrUUIDs) > 0 { wkr.busy = updateTime wkr.lastUUID = ctrUUIDs[0] } else if len(wkr.running) > 0 { // Actual last-busy time was sometime between wkr.busy // and now. Now is the earliest opportunity to take // advantage of the non-busy state, though. wkr.busy = updateTime } running := map[string]struct{}{} changed := false for _, uuid := range ctrUUIDs { running[uuid] = struct{}{} if _, ok := wkr.running[uuid]; !ok { changed = true } } for uuid := range wkr.running { if _, ok := running[uuid]; !ok { logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") wkr.wp.notifyExited(uuid, updateTime) changed = true } } if wkr.state == StateUnknown || wkr.state == StateBooting { wkr.state = StateIdle changed = true } if changed { wkr.running = running if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { wkr.state = StateRunning } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { wkr.state = StateIdle } wkr.updated = updateTime go wkr.wp.notify() } } func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) { cmd := "crunch-run --list" stdout, stderr, err := wkr.executor.Execute(cmd, nil) if err != nil { wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), "stderr": string(stderr), }).WithError(err).Warn("probe failed") return nil, false, stderr } stdout = bytes.TrimRight(stdout, "\n") if len(stdout) == 0 { return nil, true, stderr } return strings.Split(string(stdout), "\n"), true, stderr } func (wkr *worker) probeBooted() (ok bool, stderr []byte) { cmd := wkr.wp.bootProbeCommand if cmd == "" { cmd = "true" } stdout, stderr, err := wkr.executor.Execute(cmd, nil) logger := wkr.logger.WithFields(logrus.Fields{ "Command": cmd, "stdout": string(stdout), "stderr": string(stderr), }) if err != nil { logger.WithError(err).Debug("boot probe failed") return false, stderr } logger.Info("boot probe succeeded") return true, stderr } // caller must have lock. func (wkr *worker) shutdownIfBroken(dur time.Duration) { if wkr.state == StateHold { return } label, threshold := "", wkr.wp.timeoutProbe if wkr.state == StateBooting { label, threshold = "new ", wkr.wp.timeoutBooting } if dur < threshold { return } wkr.logger.WithFields(logrus.Fields{ "Duration": dur, "Since": wkr.probed, "State": wkr.state, }).Warnf("%sinstance unresponsive, shutting down", label) wkr.shutdown() } // caller must have lock. func (wkr *worker) shutdownIfIdle() bool { if wkr.state != StateIdle { return false } age := time.Since(wkr.busy) if age < wkr.wp.timeoutIdle { return false } wkr.logger.WithField("Age", age).Info("shutdown idle worker") wkr.shutdown() return true } // caller must have lock func (wkr *worker) shutdown() { now := time.Now() wkr.updated = now wkr.destroyed = now wkr.state = StateShutdown go wkr.wp.notify() go func() { err := wkr.instance.Destroy() if err != nil { wkr.logger.WithError(err).Warn("shutdown failed") return } }() }