X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d9e48a19b82fe1d957686e0803c759553666237e..3384d541acca51c346f8ebfe08b0dc93a33936ff:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 7247b339d7..0576337aa1 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -32,7 +32,6 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" - "github.com/shirou/gopsutil/process" "golang.org/x/net/context" dockertypes "github.com/docker/docker/api/types" @@ -147,8 +146,6 @@ type ContainerRunner struct { finalState string parentTemp string - ListProcesses func() ([]PsProcess, error) - statLogger io.WriteCloser statReporter *crunchstat.Reporter hoststatLogger io.WriteCloser @@ -173,10 +170,9 @@ type ContainerRunner struct { cCancelled bool // StopContainer() invoked cRemoved bool // docker confirmed the container no longer exists - enableNetwork string // one of "default" or "always" - networkMode string // passed through to HostConfig.NetworkMode - arvMountLog *ThrottledLogger - checkContainerd time.Duration + enableNetwork string // one of "default" or "always" + networkMode string // passed through to HostConfig.NetworkMode + arvMountLog *ThrottledLogger containerWatchdogInterval time.Duration } @@ -1121,27 +1117,6 @@ func (runner *ContainerRunner) StartContainer() error { return nil } -// checkContainerd checks if "containerd" is present in the process list. -func (runner *ContainerRunner) CheckContainerd() error { - if runner.checkContainerd == 0 { - return nil - } - p, _ := runner.ListProcesses() - for _, i := range p { - e, _ := i.CmdlineSlice() - if len(e) > 0 { - if strings.Index(e[0], "containerd") > -1 { - return nil - } - } - } - - // Not found - runner.runBrokenNodeHook() - runner.stop(nil) - return fmt.Errorf("'containerd' not found in process list.") -} - // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. func (runner *ContainerRunner) WaitFinish() error { @@ -1180,27 +1155,6 @@ func (runner *ContainerRunner) WaitFinish() error { } }() - containerdGone := make(chan error) - defer close(containerdGone) - if runner.checkContainerd > 0 { - go func() { - ticker := time.NewTicker(time.Duration(runner.checkContainerd)) - defer ticker.Stop() - for { - select { - case <-ticker.C: - if ck := runner.CheckContainerd(); ck != nil { - containerdGone <- ck - return - } - case <-containerdGone: - // Channel closed, quit goroutine - return - } - } - }() - } - for { select { case waitBody := <-waitOk: @@ -1229,9 +1183,6 @@ func (runner *ContainerRunner) WaitFinish() error { case <-containerGone: return errors.New("docker client never returned status") - - case err := <-containerdGone: - return err } } } @@ -1567,6 +1518,14 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Close() }() + err = runner.fetchContainerRecord() + if err != nil { + return + } + if runner.Container.State != "Locked" { + return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State) + } + defer func() { // checkErr prints e (unless it's nil) and sets err to // e (unless err is already non-nil). Thus, if err @@ -1607,22 +1566,12 @@ func (runner *ContainerRunner) Run() (err error) { checkErr("UpdateContainerFinal", runner.UpdateContainerFinal()) }() - err = runner.fetchContainerRecord() - if err != nil { - return - } runner.setupSignals() err = runner.startHoststat() if err != nil { return } - // Sanity check that containerd is running. - err = runner.CheckContainerd() - if err != nil { - return - } - // check for and/or load image err = runner.LoadImage() if err != nil { @@ -1748,17 +1697,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client, cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir - cr.ListProcesses = func() ([]PsProcess, error) { - pr, err := process.Processes() - if err != nil { - return nil, err - } - ps := make([]PsProcess, len(pr)) - for i, j := range pr { - ps[i] = j - } - return ps, nil - } cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { cl, err := arvadosclient.MakeArvadosClient() if err != nil { @@ -1798,6 +1736,11 @@ func main() { cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)") cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates") + detach := flag.Bool("detach", false, "Detach from parent process and run in the background") + stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin") + sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)") + kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") + list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes") enableNetwork := flag.String("container-enable-networking", "default", `Specify if networking should be enabled for container. One of 'default', 'always': default: only enable networking if container requests it. @@ -1808,9 +1751,38 @@ func main() { `) memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container") getVersion := flag.Bool("version", false, "Print version information and exit.") - checkContainerd := flag.Duration("check-containerd", 60*time.Second, "Periodic check if (docker-)containerd is running (use 0s to disable).") + flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") + + ignoreDetachFlag := false + if len(os.Args) > 1 && os.Args[1] == "-no-detach" { + // This process was invoked by a parent process, which + // has passed along its own arguments, including + // -detach, after the leading -no-detach flag. Strip + // the leading -no-detach flag (it's not recognized by + // flag.Parse()) and ignore the -detach flag that + // comes later. + os.Args = append([]string{os.Args[0]}, os.Args[2:]...) + ignoreDetachFlag = true + } + flag.Parse() + if *stdinEnv && !ignoreDetachFlag { + // Load env vars on stdin if asked (but not in a + // detached child process, in which case stdin is + // /dev/null). + loadEnv(os.Stdin) + } + + switch { + case *detach && !ignoreDetachFlag: + os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr)) + case *kill >= 0: + os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr)) + case *list: + os.Exit(ListProcesses(os.Stdout, os.Stderr)) + } + // Print version information if requested if *getVersion { fmt.Printf("crunch-run %s\n", version) @@ -1818,6 +1790,7 @@ func main() { } log.Printf("crunch-run %s started", version) + time.Sleep(*sleep) containerId := flag.Arg(0) @@ -1864,7 +1837,6 @@ func main() { cr.expectCgroupParent = *cgroupParent cr.enableNetwork = *enableNetwork cr.networkMode = *networkMode - cr.checkContainerd = *checkContainerd if *cgroupParentSubsystem != "" { p := findCgroup(*cgroupParentSubsystem) cr.setCgroupParent = p @@ -1892,3 +1864,21 @@ func main() { log.Fatalf("%s: %v", containerId, runerr) } } + +func loadEnv(rdr io.Reader) { + buf, err := ioutil.ReadAll(rdr) + if err != nil { + log.Fatalf("read stdin: %s", err) + } + var env map[string]string + err = json.Unmarshal(buf, &env) + if err != nil { + log.Fatalf("decode stdin: %s", err) + } + for k, v := range env { + err = os.Setenv(k, v) + if err != nil { + log.Fatalf("setenv(%q): %s", k, err) + } + } +}