X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b205525d0b7c7b9042513fe77d2e8061534208ae..916ec66d0caeeb37983043810bea22e0bc41751f:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 46d2d20680..082ca0ce9b 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -95,7 +95,7 @@ type ContainerRunner struct { Docker ThinDockerClient // Dispatcher client is initialized with the Dispatcher token. - // This is a priviledged token used to manage container status + // This is a privileged token used to manage container status // and logs. // // We have both dispatcherClient and DispatcherArvClient @@ -116,6 +116,9 @@ type ContainerRunner struct { ContainerArvClient IArvadosClient ContainerKeepClient IKeepClient + // environment provided by arvados-dispatch-cloud + dispatchEnv map[string]interface{} + Container arvados.Container ContainerConfig dockercontainer.Config HostConfig dockercontainer.HostConfig @@ -222,7 +225,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run func (runner *ContainerRunner) runBrokenNodeHook() { if *brokenNodeHook == "" { - runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.") + path := filepath.Join(lockdir, brokenfile) + runner.CrunchLog.Printf("Writing %s to mark node as broken", path) + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700) + if err != nil { + runner.CrunchLog.Printf("Error writing %s: %s", path, err) + return + } + f.Close() } else { runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) // run killme script @@ -843,21 +853,55 @@ func (runner *ContainerRunner) LogContainerRecord() error { return err } -// LogNodeRecord logs arvados#node record corresponding to the current host. +// LogNodeRecord logs the current host's InstanceType config entry (or +// the arvados#node record, if running via crunch-dispatch-slurm). func (runner *ContainerRunner) LogNodeRecord() error { - hostname := os.Getenv("SLURMD_NODENAME") - if hostname == "" { - hostname, _ = os.Hostname() - } - _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) { - // The "info" field has admin-only info when obtained - // with a privileged token, and should not be logged. - node, ok := resp.(map[string]interface{}) - if ok { - delete(node, "info") - } - }) - return err + if it, ok := runner.dispatchEnv["InstanceType"]; ok { + // Dispatched via arvados-dispatch-cloud. Save + // InstanceType config fragment received from + // dispatcher on stdin. + w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + defer w.Close() + if it, ok := it.(string); ok { + // dispatcher supplied JSON data (in order to + // stay compatible with old crunch-run + // versions) + _, err = io.WriteString(w, it) + if err != nil { + return err + } + } else { + // dispatcher supplied struct + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + err = enc.Encode(it) + if err != nil { + return err + } + } + return w.Close() + } else { + // Dispatched via crunch-dispatch-slurm. Look up + // apiserver's node record corresponding to + // $SLURMD_NODENAME. + hostname := os.Getenv("SLURMD_NODENAME") + if hostname == "" { + hostname, _ = os.Hostname() + } + _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) { + // The "info" field has admin-only info when + // obtained with a privileged token, and + // should not be logged. + node, ok := resp.(map[string]interface{}) + if ok { + delete(node, "info") + } + }) + return err + } } func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) { @@ -980,7 +1024,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { go func() { _, err := io.Copy(response.Conn, stdinRdr) if err != nil { - runner.CrunchLog.Print("While writing stdin collection to docker container %q", err) + runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err) runner.stop(nil) } stdinRdr.Close() @@ -990,7 +1034,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { go func() { _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) if err != nil { - runner.CrunchLog.Print("While writing stdin json to docker container %q", err) + runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err) runner.stop(nil) } response.CloseWrite() @@ -1518,6 +1562,14 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Close() }() + err = runner.fetchContainerRecord() + if err != nil { + return + } + if runner.Container.State != "Locked" { + return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State) + } + defer func() { // checkErr prints e (unless it's nil) and sets err to // e (unless err is already non-nil). Thus, if err @@ -1558,10 +1610,6 @@ func (runner *ContainerRunner) Run() (err error) { checkErr("UpdateContainerFinal", runner.UpdateContainerFinal()) }() - err = runner.fetchContainerRecord() - if err != nil { - return - } runner.setupSignals() err = runner.startHoststat() if err != nil { @@ -1733,6 +1781,7 @@ func main() { cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates") detach := flag.Bool("detach", false, "Detach from parent process and run in the background") + stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin") sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes") @@ -1762,6 +1811,14 @@ func main() { flag.Parse() + var env map[string]interface{} + if *stdinEnv && !ignoreDetachFlag { + // Load env vars on stdin if asked (but not in a + // detached child process, in which case stdin is + // /dev/null). + env = loadEnv(os.Stdin) + } + switch { case *detach && !ignoreDetachFlag: os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr)) @@ -1814,6 +1871,8 @@ func main() { os.Exit(1) } + cr.dispatchEnv = env + parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".") if tmperr != nil { log.Fatalf("%s: %v", containerId, tmperr) @@ -1852,3 +1911,24 @@ func main() { log.Fatalf("%s: %v", containerId, runerr) } } + +func loadEnv(rdr io.Reader) map[string]interface{} { + buf, err := ioutil.ReadAll(rdr) + if err != nil { + log.Fatalf("read stdin: %s", err) + } + var env map[string]interface{} + err = json.Unmarshal(buf, &env) + if err != nil { + log.Fatalf("decode stdin: %s", err) + } + for k, v := range env { + if v, ok := v.(string); ok { + err = os.Setenv(k, v) + if err != nil { + log.Fatalf("setenv(%q): %s", k, err) + } + } + } + return env +}