Docker ThinDockerClient
// Dispatcher client is initialized with the Dispatcher token.
- // This is a priviledged token used to manage container status
+ // This is a privileged token used to manage container status
// and logs.
//
// We have both dispatcherClient and DispatcherArvClient
func (runner *ContainerRunner) runBrokenNodeHook() {
if *brokenNodeHook == "" {
- runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ path := filepath.Join(lockdir, brokenfile)
+ runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
+ f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
+ if err != nil {
+ runner.CrunchLog.Printf("Error writing %s: %s", path, err)
+ return
+ }
+ f.Close()
} else {
runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
// run killme script
return err
}
-// LogNodeRecord logs arvados#node record corresponding to the current host.
+// LogNodeRecord logs the current host's InstanceType config entry (or
+// the arvados#node record, if running via crunch-dispatch-slurm).
func (runner *ContainerRunner) LogNodeRecord() error {
- hostname := os.Getenv("SLURMD_NODENAME")
- if hostname == "" {
- hostname, _ = os.Hostname()
- }
- _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
- // The "info" field has admin-only info when obtained
- // with a privileged token, and should not be logged.
- node, ok := resp.(map[string]interface{})
- if ok {
- delete(node, "info")
- }
- })
- return err
+ if it := os.Getenv("InstanceType"); it != "" {
+ // Dispatched via arvados-dispatch-cloud. Save
+ // InstanceType config fragment received from
+ // dispatcher on stdin.
+ w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+ defer w.Close()
+ _, err = io.WriteString(w, it)
+ if err != nil {
+ return err
+ }
+ return w.Close()
+ } else {
+ // Dispatched via crunch-dispatch-slurm. Look up
+ // apiserver's node record corresponding to
+ // $SLURMD_NODENAME.
+ hostname := os.Getenv("SLURMD_NODENAME")
+ if hostname == "" {
+ hostname, _ = os.Hostname()
+ }
+ _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) {
+ // The "info" field has admin-only info when
+ // obtained with a privileged token, and
+ // should not be logged.
+ node, ok := resp.(map[string]interface{})
+ if ok {
+ delete(node, "info")
+ }
+ })
+ return err
+ }
}
func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
go func() {
_, err := io.Copy(response.Conn, stdinRdr)
if err != nil {
- runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+ runner.CrunchLog.Printf("While writing stdin collection to docker container: %v", err)
runner.stop(nil)
}
stdinRdr.Close()
go func() {
_, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
if err != nil {
- runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+ runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
runner.stop(nil)
}
response.CloseWrite()
runner.CrunchLog.Close()
}()
+ err = runner.fetchContainerRecord()
+ if err != nil {
+ return
+ }
+ if runner.Container.State != "Locked" {
+ return fmt.Errorf("dispatch error detected: container %q has state %q", runner.Container.UUID, runner.Container.State)
+ }
+
defer func() {
// checkErr prints e (unless it's nil) and sets err to
// e (unless err is already non-nil). Thus, if err
checkErr("UpdateContainerFinal", runner.UpdateContainerFinal())
}()
- err = runner.fetchContainerRecord()
- if err != nil {
- return
- }
runner.setupSignals()
err = runner.startHoststat()
if err != nil {
cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+ stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
flag.Parse()
+ if *stdinEnv && !ignoreDetachFlag {
+ // Load env vars on stdin if asked (but not in a
+ // detached child process, in which case stdin is
+ // /dev/null).
+ loadEnv(os.Stdin)
+ }
+
switch {
case *detach && !ignoreDetachFlag:
os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
log.Fatalf("%s: %v", containerId, runerr)
}
}
+
+func loadEnv(rdr io.Reader) {
+ buf, err := ioutil.ReadAll(rdr)
+ if err != nil {
+ log.Fatalf("read stdin: %s", err)
+ }
+ var env map[string]string
+ err = json.Unmarshal(buf, &env)
+ if err != nil {
+ log.Fatalf("decode stdin: %s", err)
+ }
+ for k, v := range env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Fatalf("setenv(%q): %s", k, err)
+ }
+ }
+}