X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8adcf378579655c4653cbcdae92c9d9eec154ea8..6b3a880d607ee3e3dd273f019981fd6cae62373c:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 412f1bbfbf..7a2afeacca 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -6,6 +6,7 @@ package crunchrun import ( "bytes" + "context" "encoding/json" "errors" "flag" @@ -13,6 +14,8 @@ import ( "io" "io/ioutil" "log" + "net" + "net/http" "os" "os/exec" "os/signal" @@ -33,13 +36,20 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" - "golang.org/x/net/context" ) type command struct{} var Command = command{} +// ConfigData contains environment variables and (when needed) cluster +// configuration, passed from dispatchcloud to crunch-run on stdin. +type ConfigData struct { + Env map[string]string + KeepBuffers int + Cluster *arvados.Cluster +} + // IArvadosClient is the minimal Arvados API methods used by crunch-run. type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error @@ -66,7 +76,7 @@ type IKeepClient interface { // NewLogWriter is a factory function to create a new log writer. type NewLogWriter func(name string) (io.WriteCloser, error) -type RunArvMount func(args []string, tok string) (*exec.Cmd, error) +type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -260,23 +270,21 @@ func (runner *ContainerRunner) LoadImage() (string, error) { return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles) } imageID := tarfiles[0][:len(tarfiles[0])-4] - imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0] + imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar" runner.CrunchLog.Printf("Using Docker image id %q", imageID) - if !runner.executor.ImageLoaded(imageID) { - runner.CrunchLog.Print("Loading Docker image from keep") - err = runner.executor.LoadImage(imageFile) - if err != nil { - return "", err - } - } else { - runner.CrunchLog.Print("Docker image is available") + runner.CrunchLog.Print("Loading Docker image from keep") + err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint, + runner.containerClient) + if err != nil { + return "", err } + return imageID, nil } -func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) { - c = exec.Command("arv-mount", arvMountCmd...) +func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) { + c = exec.Command(cmdline[0], cmdline[1:]...) // Copy our environment, but override ARVADOS_API_TOKEN with // the container auth token. @@ -293,8 +301,16 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( return nil, err } runner.arvMountLog = NewThrottledLogger(w) + scanner := logScanner{ + Patterns: []string{ + "Keep write error", + "Block not found error", + "Unhandled exception during FUSE operation", + }, + ReportFunc: runner.reportArvMountWarning, + } c.Stdout = runner.arvMountLog - c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr) + c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner) runner.CrunchLog.Printf("Running %v", c.Args) @@ -394,6 +410,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { pdhOnly := true tmpcount := 0 arvMountCmd := []string{ + "arv-mount", "--foreground", "--allow-other", "--read-write", @@ -599,6 +616,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } else { arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id") } + arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid") arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) @@ -1101,6 +1119,21 @@ func (runner *ContainerRunner) updateLogs() { } } +func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) { + var updated arvados.Container + err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "container": arvadosclient.Dict{ + "runtime_status": arvadosclient.Dict{ + "warning": "arv-mount: " + pattern, + "warningDetail": text, + }, + }, + }, &updated) + if err != nil { + runner.CrunchLog.Printf("error updating container runtime_status: %s", err) + } +} + // CaptureOutput saves data from the container's output directory if // needed, and updates the container output accordingly. func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error { @@ -1171,6 +1204,7 @@ func (runner *ContainerRunner) CleanupDirs() { if umnterr != nil { runner.CrunchLog.Printf("Error unmounting: %v", umnterr) + runner.ArvMount.Process.Kill() } else { // If arv-mount --unmount gets stuck for any reason, we // don't want to wait for it forever. Do Wait() in a goroutine @@ -1201,12 +1235,14 @@ func (runner *ContainerRunner) CleanupDirs() { } } } + runner.ArvMount = nil } if runner.ArvMountPoint != "" { if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil { runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr) } + runner.ArvMountPoint = "" } if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil { @@ -1375,7 +1411,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String()) - runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID) + runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime()) hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1441,6 +1477,7 @@ func (runner *ContainerRunner) Run() (err error) { } checkErr("stopHoststat", runner.stopHoststat()) checkErr("CommitLogs", runner.CommitLogs()) + runner.CleanupDirs() checkErr("UpdateContainerFinal", runner.UpdateContainerFinal()) }() @@ -1617,7 +1654,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") - stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin") + stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") @@ -1647,33 +1684,45 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } - if *stdinEnv && !ignoreDetachFlag { - // Load env vars on stdin if asked (but not in a - // detached child process, in which case stdin is - // /dev/null). - err := loadEnv(os.Stdin) - if err != nil { - log.Print(err) - return 1 - } - } - containerUUID := flags.Arg(0) switch { case *detach && !ignoreDetachFlag: - return Detach(containerUUID, prog, args, os.Stdout, os.Stderr) + return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr) case *kill >= 0: return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr) case *list: return ListProcesses(os.Stdout, os.Stderr) } - if containerUUID == "" { + if len(containerUUID) != 27 { log.Printf("usage: %s [options] UUID", prog) return 1 } + var conf ConfigData + if *stdinConfig { + err := json.NewDecoder(os.Stdin).Decode(&conf) + if err != nil { + log.Print(err) + return 1 + } + for k, v := range conf.Env { + err = os.Setenv(k, v) + if err != nil { + log.Printf("setenv(%q): %s", k, err) + return 1 + } + } + if conf.Cluster != nil { + // ClusterID is missing from the JSON + // representation, but we need it to generate + // a valid config file for keepstore, so we + // fill it using the container UUID prefix. + conf.Cluster.ClusterID = containerUUID[:5] + } + } + log.Printf("crunch-run %s started", cmd.Version.String()) time.Sleep(*sleep) @@ -1681,6 +1730,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } + var keepstoreLog bufThenWrite + keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr)) + if err != nil { + log.Print(err) + return 1 + } + if keepstore != nil { + defer keepstore.Process.Kill() + } + api, err := arvadosclient.MakeArvadosClient() if err != nil { log.Printf("%s: %v", containerUUID, err) @@ -1702,6 +1761,19 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + if keepstore != nil { + w, err := cr.NewLogWriter("keepstore") + if err != nil { + log.Print(err) + return 1 + } + err = keepstoreLog.SetWriter(NewThrottledLogger(w)) + if err != nil { + log.Print(err) + return 1 + } + } + switch *runtimeEngine { case "docker": cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval) @@ -1789,21 +1861,73 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 0 } -func loadEnv(rdr io.Reader) error { - buf, err := ioutil.ReadAll(rdr) +func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) { + if configData.Cluster == nil || configData.KeepBuffers < 1 { + return nil, nil + } + + // Rather than have an alternate way to tell keepstore how + // many buffers to use when starting it this way, we just + // modify the cluster configuration that we feed it on stdin. + configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers + + ln, err := net.Listen("tcp", "localhost:0") if err != nil { - return fmt.Errorf("read stdin: %s", err) + return nil, err } - var env map[string]string - err = json.Unmarshal(buf, &env) + _, port, err := net.SplitHostPort(ln.Addr().String()) if err != nil { - return fmt.Errorf("decode stdin: %s", err) + ln.Close() + return nil, err } - for k, v := range env { - err = os.Setenv(k, v) + ln.Close() + url := "http://localhost:" + port + + fmt.Fprintf(logbuf, "starting keepstore on %s\n", url) + + var confJSON bytes.Buffer + err = json.NewEncoder(&confJSON).Encode(arvados.Config{ + Clusters: map[string]arvados.Cluster{ + configData.Cluster.ClusterID: *configData.Cluster, + }, + }) + if err != nil { + return nil, err + } + cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-") + cmd.Stdin = &confJSON + cmd.Stdout = logbuf + cmd.Stderr = logbuf + cmd.Env = []string{ + "GOGC=10", + "ARVADOS_SERVICE_INTERNAL_URL=" + url, + } + err = cmd.Start() + if err != nil { + return nil, fmt.Errorf("error starting keepstore process: %w", err) + } + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) + defer cancel() + poll := time.NewTicker(time.Second / 10) + defer poll.Stop() + client := http.Client{} + for range poll.C { + testReq, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - return fmt.Errorf("setenv(%q): %s", k, err) + return nil, err + } + resp, err := client.Do(testReq) + if err == nil { + // Success -- don't need to check the + // response, we just need to know it's + // accepting requests. + resp.Body.Close() + break + } + if ctx.Err() != nil { + return nil, fmt.Errorf("timed out waiting for new keepstore process to accept a request") } } - return nil + os.Setenv("ARVADOS_KEEP_SERVICES", url) + return cmd, nil }