Merge branch '15305-keep-balance-bytes'
[arvados.git] / services / crunch-run / crunchrun.go
index 46d2d206804e3bf43e1b5ad52a923dd60e8d4af8..d1092e98ef50cd221fdbb9c18df0ad6efe8bdd96 100644 (file)
@@ -95,7 +95,7 @@ type ContainerRunner struct {
        Docker ThinDockerClient
 
        // Dispatcher client is initialized with the Dispatcher token.
-       // This is a priviledged token used to manage container status
+       // This is a privileged token used to manage container status
        // and logs.
        //
        // We have both dispatcherClient and DispatcherArvClient
@@ -222,7 +222,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run
 
 func (runner *ContainerRunner) runBrokenNodeHook() {
        if *brokenNodeHook == "" {
-               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+               path := filepath.Join(lockdir, brokenfile)
+               runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+               f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+               if err != nil {
+                       runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+                       return
+               }
+               f.Close()
        } else {
                runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
                // run killme script
@@ -843,21 +850,42 @@ func (runner *ContainerRunner) LogContainerRecord() error {
        return err
 }
 
-// LogNodeRecord logs arvados#node record corresponding to the current host.
+// LogNodeRecord logs the current host's InstanceType config entry (or
+// the arvados#node record, if running via crunch-dispatch-slurm).
 func (runner *ContainerRunner) LogNodeRecord() error {
-       hostname := os.Getenv("SLURMD_NODENAME")
-       if hostname == "" {
-               hostname, _ = os.Hostname()
-       }
-       _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
-               // The "info" field has admin-only info when obtained
-               // with a privileged token, and should not be logged.
-               node, ok := resp.(map[string]interface{})
-               if ok {
-                       delete(node, "info")
-               }
-       })
-       return err
+       if it := os.Getenv("InstanceType"); it != "" {
+               // Dispatched via arvados-dispatch-cloud. Save
+               // InstanceType config fragment received from
+               // dispatcher on stdin.
+               w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
+               if err != nil {
+                       return err
+               }
+               defer w.Close()
+               _, err = io.WriteString(w, it)
+               if err != nil {
+                       return err
+               }
+               return w.Close()
+       } else {
+               // Dispatched via crunch-dispatch-slurm. Look up
+               // apiserver's node record corresponding to
+               // $SLURMD_NODENAME.
+               hostname := os.Getenv("SLURMD_NODENAME")
+               if hostname == "" {
+                       hostname, _ = os.Hostname()
+               }
+               _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
+                       // The "info" field has admin-only info when
+                       // obtained with a privileged token, and
+                       // should not be logged.
+                       node, ok := resp.(map[string]interface{})
+                       if ok {
+                               delete(node, "info")
+                       }
+               })
+               return err
+       }
 }
 
 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
@@ -980,7 +1008,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                go func() {
                        _, err := io.Copy(response.Conn, stdinRdr)
                        if err != nil {
-                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err)
                                runner.stop(nil)
                        }
                        stdinRdr.Close()
@@ -990,7 +1018,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                go func() {
                        _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
                        if err != nil {
-                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
                                runner.stop(nil)
                        }
                        response.CloseWrite()
@@ -1518,6 +1546,14 @@ func (runner *ContainerRunner) Run() (err error) {
                runner.CrunchLog.Close()
        }()
 
+       err = runner.fetchContainerRecord()
+       if err != nil {
+               return
+       }
+       if runner.Container.State != "Locked" {
+               return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
+       }
+
        defer func() {
                // checkErr prints e (unless it's nil) and sets err to
                // e (unless err is already non-nil). Thus, if err
@@ -1558,10 +1594,6 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
        }()
 
-       err = runner.fetchContainerRecord()
-       if err != nil {
-               return
-       }
        runner.setupSignals()
        err = runner.startHoststat()
        if err != nil {
@@ -1733,6 +1765,7 @@ func main() {
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
        detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
        sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1762,6 +1795,13 @@ func main() {
 
        flag.Parse()
 
+       if *stdinEnv && !ignoreDetachFlag {
+               // Load env vars on stdin if asked (but not in a
+               // detached child process, in which case stdin is
+               // /dev/null).
+               loadEnv(os.Stdin)
+       }
+
        switch {
        case *detach && !ignoreDetachFlag:
                os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
@@ -1852,3 +1892,21 @@ func main() {
                log.Fatalf("%s: %v", containerId, runerr)
        }
 }
+
+func loadEnv(rdr io.Reader) {
+       buf, err := ioutil.ReadAll(rdr)
+       if err != nil {
+               log.Fatalf("read stdin: %s", err)
+       }
+       var env map[string]string
+       err = json.Unmarshal(buf, &env)
+       if err != nil {
+               log.Fatalf("decode stdin: %s", err)
+       }
+       for k, v := range env {
+               err = os.Setenv(k, v)
+               if err != nil {
+                       log.Fatalf("setenv(%q): %s", k, err)
+               }
+       }
+}