Merge branch '15305-keep-balance-bytes'
[arvados.git] / services / crunch-run / crunchrun.go
index 7247b339d77942892945ea231f200a4e14004ac1..d1092e98ef50cd221fdbb9c18df0ad6efe8bdd96 100644 (file)
@@ -32,7 +32,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/shirou/gopsutil/process"
        "golang.org/x/net/context"
 
        dockertypes "github.com/docker/docker/api/types"
@@ -96,7 +95,7 @@ type ContainerRunner struct {
        Docker ThinDockerClient
 
        // Dispatcher client is initialized with the Dispatcher token.
-       // This is a priviledged token used to manage container status
+       // This is a privileged token used to manage container status
        // and logs.
        //
        // We have both dispatcherClient and DispatcherArvClient
@@ -147,8 +146,6 @@ type ContainerRunner struct {
        finalState      string
        parentTemp      string
 
-       ListProcesses func() ([]PsProcess, error)
-
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
@@ -173,10 +170,9 @@ type ContainerRunner struct {
        cCancelled bool // StopContainer() invoked
        cRemoved   bool // docker confirmed the container no longer exists
 
-       enableNetwork   string // one of "default" or "always"
-       networkMode     string // passed through to HostConfig.NetworkMode
-       arvMountLog     *ThrottledLogger
-       checkContainerd time.Duration
+       enableNetwork string // one of "default" or "always"
+       networkMode   string // passed through to HostConfig.NetworkMode
+       arvMountLog   *ThrottledLogger
 
        containerWatchdogInterval time.Duration
 }
@@ -226,7 +222,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
        if *brokenNodeHook == "" {
-               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+               path := filepath.Join(lockdir, brokenfile)
+               runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+               f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+               if err != nil {
+                       runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+                       return
+               }
+               f.Close()
        } else {
                runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
                // run killme script
@@ -847,21 +850,42 @@ func (runner *ContainerRunner) LogContainerRecord() error {
        return err
 }
 
-// LogNodeRecord logs arvados#node record corresponding to the current host.
+// LogNodeRecord logs the current host's InstanceType config entry (or
+// the arvados#node record, if running via crunch-dispatch-slurm).
 func (runner *ContainerRunner) LogNodeRecord() error {
-       hostname := os.Getenv("SLURMD_NODENAME")
-       if hostname == "" {
-               hostname, _ = os.Hostname()
-       }
-       _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
-               // The "info" field has admin-only info when obtained
-               // with a privileged token, and should not be logged.
-               node, ok := resp.(map[string]interface{})
-               if ok {
-                       delete(node, "info")
-               }
-       })
-       return err
+       if it := os.Getenv("InstanceType"); it != "" {
+               // Dispatched via arvados-dispatch-cloud. Save
+               // InstanceType config fragment received from
+               // dispatcher on stdin.
+               w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
+               if err != nil {
+                       return err
+               }
+               defer w.Close()
+               _, err = io.WriteString(w, it)
+               if err != nil {
+                       return err
+               }
+               return w.Close()
+       } else {
+               // Dispatched via crunch-dispatch-slurm. Look up
+               // apiserver's node record corresponding to
+               // $SLURMD_NODENAME.
+               hostname := os.Getenv("SLURMD_NODENAME")
+               if hostname == "" {
+                       hostname, _ = os.Hostname()
+               }
+               _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
+                       // The "info" field has admin-only info when
+                       // obtained with a privileged token, and
+                       // should not be logged.
+                       node, ok := resp.(map[string]interface{})
+                       if ok {
+                               delete(node, "info")
+                       }
+               })
+               return err
+       }
 }
 
 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
@@ -984,7 +1008,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                go func() {
                        _, err := io.Copy(response.Conn, stdinRdr)
                        if err != nil {
-                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err)
                                runner.stop(nil)
                        }
                        stdinRdr.Close()
@@ -994,7 +1018,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                go func() {
                        _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
                        if err != nil {
-                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
                                runner.stop(nil)
                        }
                        response.CloseWrite()
@@ -1121,27 +1145,6 @@ func (runner *ContainerRunner) StartContainer() error {
        return nil
 }
 
-// checkContainerd checks if "containerd" is present in the process list.
-func (runner *ContainerRunner) CheckContainerd() error {
-       if runner.checkContainerd == 0 {
-               return nil
-       }
-       p, _ := runner.ListProcesses()
-       for _, i := range p {
-               e, _ := i.CmdlineSlice()
-               if len(e) > 0 {
-                       if strings.Index(e[0], "containerd") > -1 {
-                               return nil
-                       }
-               }
-       }
-
-       // Not found
-       runner.runBrokenNodeHook()
-       runner.stop(nil)
-       return fmt.Errorf("'containerd' not found in process list.")
-}
-
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
 func (runner *ContainerRunner) WaitFinish() error {
@@ -1180,27 +1183,6 @@ func (runner *ContainerRunner) WaitFinish() error {
                }
        }()
 
-       containerdGone := make(chan error)
-       defer close(containerdGone)
-       if runner.checkContainerd > 0 {
-               go func() {
-                       ticker := time.NewTicker(time.Duration(runner.checkContainerd))
-                       defer ticker.Stop()
-                       for {
-                               select {
-                               case <-ticker.C:
-                                       if ck := runner.CheckContainerd(); ck != nil {
-                                               containerdGone <- ck
-                                               return
-                                       }
-                               case <-containerdGone:
-                                       // Channel closed, quit goroutine
-                                       return
-                               }
-                       }
-               }()
-       }
-
        for {
                select {
                case waitBody := <-waitOk:
@@ -1229,9 +1211,6 @@ func (runner *ContainerRunner) WaitFinish() error {
 
                case <-containerGone:
                        return errors.New("docker client never returned status")
-
-               case err := <-containerdGone:
-                       return err
                }
        }
 }
@@ -1567,6 +1546,14 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Close()
        }()
 
+       err = runner.fetchContainerRecord()
+       if err != nil {
+               return
+       }
+       if runner.Container.State != "Locked" {
+               return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
+       }
+
        defer func() {
                // checkErr prints e (unless it's nil) and sets err to
                // e (unless err is already non-nil). Thus, if err
@@ -1607,22 +1594,12 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
        }()
 
-       err = runner.fetchContainerRecord()
-       if err != nil {
-               return
-       }
        runner.setupSignals()
        err = runner.startHoststat()
        if err != nil {
                return
        }
 
-       // Sanity check that containerd is running.
-       err = runner.CheckContainerd()
-       if err != nil {
-               return
-       }
-
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
@@ -1748,17 +1725,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
-       cr.ListProcesses = func() ([]PsProcess, error) {
-               pr, err := process.Processes()
-               if err != nil {
-                       return nil, err
-               }
-               ps := make([]PsProcess, len(pr))
-               for i, j := range pr {
-                       ps[i] = j
-               }
-               return ps, nil
-       }
        cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
                cl, err := arvadosclient.MakeArvadosClient()
                if err != nil {
@@ -1798,6 +1764,11 @@ func main() {
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+       detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+       sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
+       kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+       list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
        enableNetwork := flag.String("container-enable-networking", "default",
                `Specify if networking should be enabled for container.  One of 'default', 'always':
        default: only enable networking if container requests it.
@@ -1808,9 +1779,38 @@ func main() {
        `)
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        getVersion := flag.Bool("version", false, "Print version information and exit.")
-       checkContainerd := flag.Duration("check-containerd", 60*time.Second, "Periodic check if (docker-)containerd is running (use 0s to disable).")
+       flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+
+       ignoreDetachFlag := false
+       if len(os.Args) > 1 && os.Args[1] == "-no-detach" {
+               // This process was invoked by a parent process, which
+               // has passed along its own arguments, including
+               // -detach, after the leading -no-detach flag.  Strip
+               // the leading -no-detach flag (it's not recognized by
+               // flag.Parse()) and ignore the -detach flag that
+               // comes later.
+               os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+               ignoreDetachFlag = true
+       }
+
        flag.Parse()
 
+       if *stdinEnv && !ignoreDetachFlag {
+               // Load env vars on stdin if asked (but not in a
+               // detached child process, in which case stdin is
+               // /dev/null).
+               loadEnv(os.Stdin)
+       }
+
+       switch {
+       case *detach && !ignoreDetachFlag:
+               os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+       case *kill >= 0:
+               os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+       case *list:
+               os.Exit(ListProcesses(os.Stdout, os.Stderr))
+       }
+
        // Print version information if requested
        if *getVersion {
                fmt.Printf("crunch-run %s\n", version)
@@ -1818,6 +1818,7 @@ func main() {
        }
 
        log.Printf("crunch-run %s started", version)
+       time.Sleep(*sleep)
 
        containerId := flag.Arg(0)
 
@@ -1864,7 +1865,6 @@ func main() {
        cr.expectCgroupParent = *cgroupParent
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
-       cr.checkContainerd = *checkContainerd
        if *cgroupParentSubsystem != "" {
                p := findCgroup(*cgroupParentSubsystem)
                cr.setCgroupParent = p
@@ -1892,3 +1892,21 @@ func main() {
                log.Fatalf("%s: %v", containerId, runerr)
        }
 }
+
+func loadEnv(rdr io.Reader) {
+       buf, err := ioutil.ReadAll(rdr)
+       if err != nil {
+               log.Fatalf("read stdin: %s", err)
+       }
+       var env map[string]string
+       err = json.Unmarshal(buf, &env)
+       if err != nil {
+               log.Fatalf("decode stdin: %s", err)
+       }
+       for k, v := range env {
+               err = os.Setenv(k, v)
+               if err != nil {
+                       log.Fatalf("setenv(%q): %s", k, err)
+               }
+       }
+}