X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1476fbd67813d5fbead1ad614f5317db9bd0e0bb..8a6bd5b851e0e947b74710c4c2dd1ea59dce606a:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 556a3bfe13..8627045411 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -81,9 +81,6 @@ type IKeepClient interface { SetStorageClasses(sc []string) } -// NewLogWriter is a factory function to create a new log writer. -type NewLogWriter func(name string) (io.WriteCloser, error) - type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -125,8 +122,7 @@ type ContainerRunner struct { Container arvados.Container token string ExitCode *int - NewLogWriter NewLogWriter - CrunchLog *ThrottledLogger + CrunchLog *logWriter logUUID string logMtx sync.Mutex LogCollection arvados.CollectionFileSystem @@ -168,7 +164,7 @@ type ContainerRunner struct { enableNetwork string // one of "default" or "always" networkMode string // "none", "host", or "" -- passed through to executor brokenNodeHook string // script to run if node appears to be broken - arvMountLog *ThrottledLogger + arvMountLog io.WriteCloser containerWatchdogInterval time.Duration @@ -303,11 +299,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e } c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) - w, err := runner.NewLogWriter("arv-mount") + runner.arvMountLog, err = runner.openLogFile("arv-mount") if err != nil { return nil, err } - runner.arvMountLog = NewThrottledLogger(w) scanner := logScanner{ Patterns: []string{ "Keep write error", @@ -626,17 +621,6 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { // OutputPath is a staging directory. bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true} } - - case mnt.Kind == "git_tree": - tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree") - if err != nil { - return nil, fmt.Errorf("creating temp dir: %v", err) - } - err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token) - if err != nil { - return nil, err - } - bindmounts[bind] = bindmount{HostPath: tmpdir, ReadOnly: true} } } @@ -746,13 +730,13 @@ func (runner *ContainerRunner) stopHoststat() error { } func (runner *ContainerRunner) startHoststat() error { - w, err := runner.NewLogWriter("hoststat") + var err error + runner.hoststatLogger, err = runner.openLogFile("hoststat") if err != nil { return err } - runner.hoststatLogger = NewThrottledLogger(w) runner.hoststatReporter = &crunchstat.Reporter{ - Logger: log.New(runner.hoststatLogger, "", 0), + Logger: newLogWriter(runner.hoststatLogger), // Our own cgroup is the "host" cgroup, in the sense // that it accounts for resource usage outside the // container. It doesn't count _all_ resource usage on @@ -770,15 +754,15 @@ func (runner *ContainerRunner) startHoststat() error { } func (runner *ContainerRunner) startCrunchstat() error { - w, err := runner.NewLogWriter("crunchstat") + var err error + runner.statLogger, err = runner.openLogFile("crunchstat") if err != nil { return err } - runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ Pid: runner.executor.Pid, FS: runner.crunchstatFakeFS, - Logger: log.New(runner.statLogger, "", 0), + Logger: newLogWriter(runner.statLogger), MemThresholds: map[string][]crunchstat.Threshold{ "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), }, @@ -801,7 +785,7 @@ type infoCommand struct { // might differ from what's described in the node record (see // LogNodeRecord). func (runner *ContainerRunner) LogHostInfo() (err error) { - w, err := runner.NewLogWriter("node-info") + w, err := runner.openLogFile("node-info") if err != nil { return } @@ -852,62 +836,40 @@ func (runner *ContainerRunner) LogHostInfo() (err error) { // LogContainerRecord gets and saves the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() error { - logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}, nil) + logged, err := runner.logAPIResponse("container", "containers", map[string]interface{}{"filters": [][]string{{"uuid", "=", runner.Container.UUID}}}) if !logged && err == nil { err = fmt.Errorf("error: no container record found for %s", runner.Container.UUID) } return err } -// LogNodeRecord logs the current host's InstanceType config entry (or -// the arvados#node record, if running via crunch-dispatch-slurm). +// LogNodeRecord logs the current host's InstanceType config entry, if +// running via arvados-dispatch-cloud. func (runner *ContainerRunner) LogNodeRecord() error { - if it := os.Getenv("InstanceType"); it != "" { - // Dispatched via arvados-dispatch-cloud. Save - // InstanceType config fragment received from - // dispatcher on stdin. - w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return err - } - defer w.Close() - _, err = io.WriteString(w, it) - if err != nil { - return err - } - return w.Close() + it := os.Getenv("InstanceType") + if it == "" { + // Not dispatched by arvados-dispatch-cloud. + return nil + } + // Save InstanceType config fragment received from dispatcher + // on stdin. + w, err := runner.LogCollection.OpenFile("node.json", os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err } - // Dispatched via crunch-dispatch-slurm. Look up - // apiserver's node record corresponding to - // $SLURMD_NODENAME. - hostname := os.Getenv("SLURMD_NODENAME") - if hostname == "" { - hostname, _ = os.Hostname() + defer w.Close() + _, err = io.WriteString(w, it) + if err != nil { + return err } - _, err := runner.logAPIResponse("node", "nodes", map[string]interface{}{"filters": [][]string{{"hostname", "=", hostname}}}, func(resp interface{}) { - // The "info" field has admin-only info when - // obtained with a privileged token, and - // should not be logged. - node, ok := resp.(map[string]interface{}) - if ok { - delete(node, "info") - } - }) - return err + return w.Close() } -func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) { +func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}) (logged bool, err error) { writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return false, err } - w := &ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: label, - writeCloser: writer, - } - reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params)) if err != nil { return false, fmt.Errorf("error getting %s record: %v", label, err) @@ -926,16 +888,13 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str } else if len(items) < 1 { return false, nil } - if munge != nil { - munge(items[0]) - } // Re-encode it using indentation to improve readability - enc := json.NewEncoder(w) + enc := json.NewEncoder(writer) enc.SetIndent("", " ") if err = enc.Encode(items[0]); err != nil { return false, fmt.Errorf("error logging %s record: %v", label, err) } - err = w.Close() + err = writer.Close() if err != nil { return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err) } @@ -1003,10 +962,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st return err } stdout = f - } else if w, err := runner.NewLogWriter("stdout"); err != nil { + } else if w, err := runner.openLogFile("stdout"); err != nil { return err } else { - stdout = NewThrottledLogger(w) + stdout = w } if mnt, ok := runner.Container.Mounts["stderr"]; ok { @@ -1015,10 +974,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st return err } stderr = f - } else if w, err := runner.NewLogWriter("stderr"); err != nil { + } else if w, err := runner.openLogFile("stderr"); err != nil { return err } else { - stderr = NewThrottledLogger(w) + stderr = w } env := runner.Container.Environment @@ -1365,6 +1324,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, + globs: runner.Container.OutputGlob, bindmounts: bindmounts, mounts: runner.Container.Mounts, secretMounts: runner.SecretMounts, @@ -1470,19 +1430,11 @@ func (runner *ContainerRunner) CommitLogs() error { if runner.arvMountLog != nil { runner.arvMountLog.Close() } - runner.CrunchLog.Close() - - // Closing CrunchLog above allows them to be committed to Keep at this - // point, but re-open crunch log with ArvClient in case there are any - // other further errors (such as failing to write the log to Keep!) - // while shutting down - runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: "crunch-run", - writeCloser: nil, - }) - runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) + + // From now on just log to stderr, in case there are + // any other further errors (such as failing to write + // the log to Keep!) while shutting down + runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" "))) }() if runner.keepstoreLogger != nil { @@ -1642,18 +1594,8 @@ func (runner *ContainerRunner) IsCancelled() bool { return runner.cCancelled } -// NewArvLogWriter creates an ArvLogWriter -func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) { - writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return nil, err - } - return &ArvLogWriter{ - ArvClient: runner.DispatcherArvClient, - UUID: runner.Container.UUID, - loggingStream: name, - writeCloser: writer, - }, nil +func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) { + return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666) } // Run the full container lifecycle. @@ -1683,9 +1625,7 @@ func (runner *ContainerRunner) Run() (err error) { defer func() { runner.CleanupDirs() - runner.CrunchLog.Printf("crunch-run finished") - runner.CrunchLog.Close() }() err = runner.fetchContainerRecord() @@ -1879,7 +1819,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client, DispatcherArvClient: dispatcherArvClient, DispatcherKeepClient: dispatcherKeepClient, } - cr.NewLogWriter = cr.NewArvLogWriter cr.RunArvMount = cr.ArvMountCmd cr.MkTempDir = ioutil.TempDir cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) { @@ -1902,14 +1841,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client, return nil, err } cr.Container.UUID = containerUUID - w, err := cr.NewLogWriter("crunch-run") + f, err := cr.openLogFile("crunch-run") if err != nil { return nil, err } - cr.CrunchLog = NewThrottledLogger(w) - cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) + cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" ")))) - loadLogThrottleParams(dispatcherArvClient) go cr.updateLogs() return cr, nil @@ -2054,12 +1991,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s keepstoreLogbuf.SetWriter(io.Discard) } else { cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) - logwriter, err := cr.NewLogWriter("keepstore") + cr.keepstoreLogger, err = cr.openLogFile("keepstore") if err != nil { log.Print(err) return 1 } - cr.keepstoreLogger = NewThrottledLogger(logwriter) var writer io.WriteCloser = cr.keepstoreLogger if logWhat == "errors" { @@ -2085,13 +2021,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf) default: cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine) - cr.CrunchLog.Close() return 1 } if err != nil { cr.CrunchLog.Printf("%s: %v", containerUUID, err) cr.checkBrokenNode(err) - cr.CrunchLog.Close() return 1 } defer cr.executor.Close()