X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/262871175bbe08f6ab1b7051bf9c5dfdd74283ac..4a919918a4ce37b5290793f02fa959db1c073590:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 46d2d20680..3261291b53 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -222,7 +222,14 @@ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run func (runner *ContainerRunner) runBrokenNodeHook() { if *brokenNodeHook == "" { - runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.") + path := filepath.Join(lockdir, brokenfile) + runner.CrunchLog.Printf("Writing %s to mark node as broken", path) + f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700) + if err != nil { + runner.CrunchLog.Printf("Error writing %s: %s", path, err) + return + } + f.Close() } else { runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) // run killme script @@ -980,7 +987,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { go func() { _, err := io.Copy(response.Conn, stdinRdr) if err != nil { - runner.CrunchLog.Print("While writing stdin collection to docker container %q", err) + runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err) runner.stop(nil) } stdinRdr.Close() @@ -990,7 +997,7 @@ func (runner *ContainerRunner) AttachStreams() (err error) { go func() { _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson)) if err != nil { - runner.CrunchLog.Print("While writing stdin json to docker container %q", err) + runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err) runner.stop(nil) } response.CloseWrite() @@ -1518,6 +1525,14 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Close() }() + err = runner.fetchContainerRecord() + if err != nil { + return + } + if runner.Container.State != "Locked" { + return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State) + } + defer func() { // checkErr prints e (unless it's nil) and sets err to // e (unless err is already non-nil). Thus, if err @@ -1558,10 +1573,6 @@ func (runner *ContainerRunner) Run() (err error) { checkErr("UpdateContainerFinal", runner.UpdateContainerFinal()) }() - err = runner.fetchContainerRecord() - if err != nil { - return - } runner.setupSignals() err = runner.startHoststat() if err != nil { @@ -1733,6 +1744,7 @@ func main() { cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates") detach := flag.Bool("detach", false, "Detach from parent process and run in the background") + stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin") sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes") @@ -1762,6 +1774,13 @@ func main() { flag.Parse() + if *stdinEnv && !ignoreDetachFlag { + // Load env vars on stdin if asked (but not in a + // detached child process, in which case stdin is + // /dev/null). + loadEnv(os.Stdin) + } + switch { case *detach && !ignoreDetachFlag: os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr)) @@ -1852,3 +1871,21 @@ func main() { log.Fatalf("%s: %v", containerId, runerr) } } + +func loadEnv(rdr io.Reader) { + buf, err := ioutil.ReadAll(rdr) + if err != nil { + log.Fatalf("read stdin: %s", err) + } + var env map[string]string + err = json.Unmarshal(buf, &env) + if err != nil { + log.Fatalf("decode stdin: %s", err) + } + for k, v := range env { + err = os.Setenv(k, v) + if err != nil { + log.Fatalf("setenv(%q): %s", k, err) + } + } +}