X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/defaae08f226bb9efbb76a0e15d392f46e929479..15a36415708345e2361741606cc0e06bbd9e40c6:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 556a3bfe13..0fe8237bc7 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -40,7 +40,6 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" - "git.arvados.org/arvados.git/sdk/go/manifest" "golang.org/x/sys/unix" ) @@ -76,14 +75,10 @@ var ErrCancelled = errors.New("Cancelled") type IKeepClient interface { BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) ReadAt(locator string, p []byte, off int) (int, error) - ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) LocalLocator(locator string) (string, error) SetStorageClasses(sc []string) } -// NewLogWriter is a factory function to create a new log writer. -type NewLogWriter func(name string) (io.WriteCloser, error) - type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -125,8 +120,7 @@ type ContainerRunner struct { Container arvados.Container token string ExitCode *int - NewLogWriter NewLogWriter - CrunchLog *ThrottledLogger + CrunchLog *logWriter logUUID string logMtx sync.Mutex LogCollection arvados.CollectionFileSystem @@ -168,7 +162,7 @@ type ContainerRunner struct { enableNetwork string // one of "default" or "always" networkMode string // "none", "host", or "" -- passed through to executor brokenNodeHook string // script to run if node appears to be broken - arvMountLog *ThrottledLogger + arvMountLog io.WriteCloser containerWatchdogInterval time.Duration @@ -303,11 +297,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e } c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) - w, err := runner.NewLogWriter("arv-mount") + runner.arvMountLog, err = runner.openLogFile("arv-mount") if err != nil { return nil, err } - runner.arvMountLog = NewThrottledLogger(w) scanner := logScanner{ Patterns: []string{ "Keep write error", @@ -626,17 +619,6 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { // OutputPath is a staging directory. bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true} } - - case mnt.Kind == "git_tree": - tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree") - if err != nil { - return nil, fmt.Errorf("creating temp dir: %v", err) - } - err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token) - if err != nil { - return nil, err - } - bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true} } } @@ -746,13 +728,13 @@ func (runner *ContainerRunner) stopHoststat() error { } func (runner *ContainerRunner) startHoststat() error { - w, err := runner.NewLogWriter("hoststat") + var err error + runner.hoststatLogger, err = runner.openLogFile("hoststat") if err != nil { return err } - runner.hoststatLogger = NewThrottledLogger(w) runner.hoststatReporter = &crunchstat.Reporter{ - Logger: log.New(runner.hoststatLogger, "", 0), + Logger: newLogWriter(runner.hoststatLogger), // Our own cgroup is the "host" cgroup, in the sense // that it accounts for resource usage outside the // container. It doesn't count _all_ resource usage on @@ -770,15 +752,15 @@ func (runner *ContainerRunner) startHoststat() error { } func (runner *ContainerRunner) startCrunchstat() error { - w, err := runner.NewLogWriter("crunchstat") + var err error + runner.statLogger, err = runner.openLogFile("crunchstat") if err != nil { return err } - runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ Pid: runner.executor.Pid, FS: runner.crunchstatFakeFS, - Logger: log.New(runner.statLogger, "", 0), + Logger: newLogWriter(runner.statLogger), MemThresholds: map[string][]crunchstat.Threshold{ "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), }, @@ -801,7 +783,7 @@ type infoCommand struct { // might differ from what's described in the node record (see // LogNodeRecord). func (runner *ContainerRunner) LogHostInfo() (err error) { - w, err := runner.NewLogWriter("node-info") + w, err := runner.openLogFile("node-info") if err != nil { return } @@ -852,62 +834,40 @@ func (runner *ContainerRunner) LogHostInfo() (err error) { // LogContainerRecord gets and saves the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() error { - logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil) + logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}) if !logged && err == nil { err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID) } return err } -// LogNodeRecord logs the current host's InstanceType config entry (or -// the arvados#node record, if running via crunch-dispatch-slurm). +// LogNodeRecord logs the current host's InstanceType config entry, if +// running via arvados-dispatch-cloud. func (runner *ContainerRunner) LogNodeRecord() error { - if it := os.Getenv("InstanceType"); it != "" { - // Dispatched via arvados-dispatch-cloud. Save - // InstanceType config fragment received from - // dispatcher on stdin. - w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return err - } - defer w.Close() - _, err = io.WriteString(w, it) - if err != nil { - return err - } - return w.Close() + it := os.Getenv("InstanceType") + if it == "" { + // Not dispatched by arvados-dispatch-cloud. + return nil } - // Dispatched via crunch-dispatch-slurm. Look up - // apiserver's node record corresponding to - // $SLURMD_NODENAME. - hostname := os.Getenv("SLURMD_NODENAME") - if hostname == "" { - hostname, _ = os.Hostname() + // Save InstanceType config fragment received from dispatcher + // on stdin. + w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err } - _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) { - // The "info" field has admin-only info when - // obtained with a privileged token, and - // should not be logged. - node, ok := resp.(map[string]interface{}) - if ok { - delete(node, "info") - } - }) - return err + defer w.Close() + _, err = io.WriteString(w, it) + if err != nil { + return err + } + return w.Close() } -func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) { +func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}) (logged bool, err error) { writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return false, err } - w := &ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: label, - writeCloser: writer, - } - reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params)) if err != nil { return false, fmt.Errorf("error getting %s record: %v", label, err) @@ -926,16 +886,13 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str } else if len(items) < 1 { return false, nil } - if munge != nil { - munge(items[0]) - } // Re-encode it using indentation to improve readability - enc := json.NewEncoder(w) + enc := json.NewEncoder(writer) enc.SetIndent("", " ") if err = enc.Encode(items[0]); err != nil { return false, fmt.Errorf("error logging %s record: %v", label, err) } - err = w.Close() + err = writer.Close() if err != nil { return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err) } @@ -969,7 +926,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { // CreateContainer creates the docker container. func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error { - var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) + var stdin io.Reader if mnt, ok := runner.Container.Mounts["stdin"]; ok { switch mnt.Kind { case "collection": @@ -985,28 +942,35 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st return err } stdin = f + runner.executorStdin = f case "json": j, err := json.Marshal(mnt.Content) if err != nil { return fmt.Errorf("error encoding stdin json data: %v", err) } - stdin = ioutil.NopCloser(bytes.NewReader(j)) + stdin = bytes.NewReader(j) + runner.executorStdin = io.NopCloser(nil) default: return fmt.Errorf("stdin mount has unsupported kind %q", mnt.Kind) } + } else { + stdin = bytes.NewReader(nil) + runner.executorStdin = ioutil.NopCloser(nil) } - var stdout, stderr io.WriteCloser + var stdout, stderr io.Writer if mnt, ok := runner.Container.Mounts["stdout"]; ok { f, err := runner.getStdoutFile(mnt.Path) if err != nil { return err } stdout = f - } else if w, err := runner.NewLogWriter("stdout"); err != nil { + runner.executorStdout = f + } else if w, err := runner.openLogFile("stdout"); err != nil { return err } else { - stdout = NewThrottledLogger(w) + stdout = newTimestamper(w) + runner.executorStdout = w } if mnt, ok := runner.Container.Mounts["stderr"]; ok { @@ -1015,10 +979,12 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st return err } stderr = f - } else if w, err := runner.NewLogWriter("stderr"); err != nil { + runner.executorStderr = f + } else if w, err := runner.openLogFile("stderr"); err != nil { return err } else { - stderr = NewThrottledLogger(w) + stderr = newTimestamper(w) + runner.executorStderr = w } env := runner.Container.Environment @@ -1047,9 +1013,6 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st if !runner.enableMemoryLimit { ram = 0 } - runner.executorStdin = stdin - runner.executorStdout = stdout - runner.executorStderr = stderr if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 { nvidiaModprobe(runner.CrunchLog) @@ -1365,6 +1328,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, + globs: runner.Container.OutputGlob, bindmounts: bindmounts, mounts: runner.Container.Mounts, secretMounts: runner.SecretMounts, @@ -1470,19 +1434,11 @@ func (runner *ContainerRunner) CommitLogs() error { if runner.arvMountLog != nil { runner.arvMountLog.Close() } - runner.CrunchLog.Close() - - // Closing CrunchLog above allows them to be committed to Keep at this - // point, but re-open crunch log with ArvClient in case there are any - // other further errors (such as failing to write the log to Keep!) - // while shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: "crunch-run", - writeCloser: nil, - }) - runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) + + // From now on just log to stderr, in case there are + // any other further errors (such as failing to write + // the log to Keep!) while shutting down + runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" "))) }() if runner.keepstoreLogger != nil { @@ -1642,18 +1598,8 @@ func (runner *ContainerRunner) IsCancelled() bool { return runner.cCancelled } -// NewArvLogWriter creates an ArvLogWriter -func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) { - writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return nil, err - } - return &ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: name, - writeCloser: writer, - }, nil +func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) { + return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666) } // Run the full container lifecycle. @@ -1683,9 +1629,7 @@ func (runner *ContainerRunner) Run() (err error) { defer func() { runner.CleanupDirs() - runner.CrunchLog.Printf("crunch-run finished") - runner.CrunchLog.Close() }() err = runner.fetchContainerRecord() @@ -1879,7 +1823,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client, DispatcherArvClient: dispatcherArvClient, DispatcherKeepClient: dispatcherKeepClient, } - cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { @@ -1902,14 +1845,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client, return nil, err } cr.Container.UUID = containerUUID - w, err := cr.NewLogWriter("crunch-run") + f, err := cr.openLogFile("crunch-run") if err != nil { return nil, err } - cr.CrunchLog = NewThrottledLogger(w) - cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) + cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" ")))) - loadLogThrottleParams(dispatcherArvClient) go cr.updateLogs() return cr, nil @@ -2054,12 +1995,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s keepstoreLogbuf.SetWriter(io.Discard) } else { cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) - logwriter, err := cr.NewLogWriter("keepstore") + cr.keepstoreLogger, err = cr.openLogFile("keepstore") if err != nil { log.Print(err) return 1 } - cr.keepstoreLogger = NewThrottledLogger(logwriter) var writer io.WriteCloser = cr.keepstoreLogger if logWhat == "errors" { @@ -2085,13 +2025,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf) default: cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine) - cr.CrunchLog.Close() return 1 } if err != nil { cr.CrunchLog.Printf("%s: %v", containerUUID, err) cr.checkBrokenNode(err) - cr.CrunchLog.Close() return 1 } defer cr.executor.Close()