14360: Merge branch 'master'
[arvados.git] / services / crunch-run / crunchrun.go
index 7247b339d77942892945ea231f200a4e14004ac1..1c6c58009fac71bd5fa5015a4922821937e61d5a 100644 (file)
@@ -32,7 +32,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/shirou/gopsutil/process"
        "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
@@ -147,8 +146,6 @@ type ContainerRunner struct {
        finalState      string
        parentTemp      string
 
-       ListProcesses func() ([]PsProcess, error)
-
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
@@ -173,10 +170,9 @@ type ContainerRunner struct {
        cCancelled bool // StopContainer() invoked
        cRemoved   bool // docker confirmed the container no longer exists
 
-       enableNetwork   string // one of "default" or "always"
-       networkMode     string // passed through to HostConfig.NetworkMode
-       arvMountLog     *ThrottledLogger
-       checkContainerd time.Duration
+       enableNetwork string // one of "default" or "always"
+       networkMode   string // passed through to HostConfig.NetworkMode
+       arvMountLog   *ThrottledLogger
 
        containerWatchdogInterval time.Duration
 }
@@ -1121,27 +1117,6 @@ func (runner *ContainerRunner) StartContainer() error {
        return nil
 }
 
-// checkContainerd checks if "containerd" is present in the process list.
-func (runner *ContainerRunner) CheckContainerd() error {
-       if runner.checkContainerd == 0 {
-               return nil
-       }
-       p, _ := runner.ListProcesses()
-       for _, i := range p {
-               e, _ := i.CmdlineSlice()
-               if len(e) > 0 {
-                       if strings.Index(e[0], "containerd") > -1 {
-                               return nil
-                       }
-               }
-       }
-
-       // Not found
-       runner.runBrokenNodeHook()
-       runner.stop(nil)
-       return fmt.Errorf("'containerd' not found in process list.")
-}
-
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
 func (runner *ContainerRunner) WaitFinish() error {
@@ -1180,27 +1155,6 @@ func (runner *ContainerRunner) WaitFinish() error {
                }
        }()
 
-       containerdGone := make(chan error)
-       defer close(containerdGone)
-       if runner.checkContainerd > 0 {
-               go func() {
-                       ticker := time.NewTicker(time.Duration(runner.checkContainerd))
-                       defer ticker.Stop()
-                       for {
-                               select {
-                               case <-ticker.C:
-                                       if ck := runner.CheckContainerd(); ck != nil {
-                                               containerdGone <- ck
-                                               return
-                                       }
-                               case <-containerdGone:
-                                       // Channel closed, quit goroutine
-                                       return
-                               }
-                       }
-               }()
-       }
-
        for {
                select {
                case waitBody := <-waitOk:
@@ -1229,9 +1183,6 @@ func (runner *ContainerRunner) WaitFinish() error {
 
                case <-containerGone:
                        return errors.New("docker client never returned status")
-
-               case err := <-containerdGone:
-                       return err
                }
        }
 }
@@ -1617,12 +1568,6 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       // Sanity check that containerd is running.
-       err = runner.CheckContainerd()
-       if err != nil {
-               return
-       }
-
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
@@ -1748,17 +1693,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
-       cr.ListProcesses = func() ([]PsProcess, error) {
-               pr, err := process.Processes()
-               if err != nil {
-                       return nil, err
-               }
-               ps := make([]PsProcess, len(pr))
-               for i, j := range pr {
-                       ps[i] = j
-               }
-               return ps, nil
-       }
        cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
                cl, err := arvadosclient.MakeArvadosClient()
                if err != nil {
@@ -1798,6 +1732,10 @@ func main() {
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+       detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
+       kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+       list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
        enableNetwork := flag.String("container-enable-networking", "default",
                `Specify if networking should be enabled for container.  One of 'default', 'always':
        default: only enable networking if container requests it.
@@ -1808,9 +1746,31 @@ func main() {
        `)
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        getVersion := flag.Bool("version", false, "Print version information and exit.")
-       checkContainerd := flag.Duration("check-containerd", 60*time.Second, "Periodic check if (docker-)containerd is running (use 0s to disable).")
+       flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+
+       detached := false
+       if len(os.Args) > 1 && os.Args[1] == "-detached" {
+               // This process was invoked by a parent process, which
+               // has passed along its own arguments, including
+               // -detach, after the leading -detached flag.  Strip
+               // the leading -detached flag (it's not recognized by
+               // flag.Parse()) ... and remember not to detach all
+               // over again in this process.
+               os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+               detached = true
+       }
+
        flag.Parse()
 
+       switch {
+       case *detach && !detached:
+               os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+       case *kill >= 0:
+               os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+       case *list:
+               os.Exit(ListProcesses(os.Stdout, os.Stderr))
+       }
+
        // Print version information if requested
        if *getVersion {
                fmt.Printf("crunch-run %s\n", version)
@@ -1818,6 +1778,7 @@ func main() {
        }
 
        log.Printf("crunch-run %s started", version)
+       time.Sleep(*sleep)
 
        containerId := flag.Arg(0)
 
@@ -1864,7 +1825,6 @@ func main() {
        cr.expectCgroupParent = *cgroupParent
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
-       cr.checkContainerd = *checkContainerd
        if *cgroupParentSubsystem != "" {
                p := findCgroup(*cgroupParentSubsystem)
                cr.setCgroupParent = p