21611: Remove log-to-logs-table code.
[arvados.git] / lib / crunchrun / crunchrun.go
index f8941162ec089515fd8ad2708f32e72162bc771e..8627045411a56dba53a61ba01c3b9e22d721812c 100644 (file)
@@ -81,9 +81,6 @@ type IKeepClient interface {
        SetStorageClasses(sc []string)
 }
 
-// NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) (io.WriteCloser, error)
-
 type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)
 
 type MkTempDir func(string, string) (string, error)
@@ -125,8 +122,7 @@ type ContainerRunner struct {
        Container     arvados.Container
        token         string
        ExitCode      *int
-       NewLogWriter  NewLogWriter
-       CrunchLog     *ThrottledLogger
+       CrunchLog     *logWriter
        logUUID       string
        logMtx        sync.Mutex
        LogCollection arvados.CollectionFileSystem
@@ -168,7 +164,7 @@ type ContainerRunner struct {
        enableNetwork     string // one of "default" or "always"
        networkMode       string // "none", "host", or "" -- passed through to executor
        brokenNodeHook    string // script to run if node appears to be broken
-       arvMountLog       *ThrottledLogger
+       arvMountLog       io.WriteCloser
 
        containerWatchdogInterval time.Duration
 
@@ -303,11 +299,10 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
        }
        c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
 
-       w, err := runner.NewLogWriter("arv-mount")
+       runner.arvMountLog, err = runner.openLogFile("arv-mount")
        if err != nil {
                return nil, err
        }
-       runner.arvMountLog = NewThrottledLogger(w)
        scanner := logScanner{
                Patterns: []string{
                        "Keep write error",
@@ -735,13 +730,13 @@ func (runner *ContainerRunner) stopHoststat() error {
 }
 
 func (runner *ContainerRunner) startHoststat() error {
-       w, err := runner.NewLogWriter("hoststat")
+       var err error
+       runner.hoststatLogger, err = runner.openLogFile("hoststat")
        if err != nil {
                return err
        }
-       runner.hoststatLogger = NewThrottledLogger(w)
        runner.hoststatReporter = &crunchstat.Reporter{
-               Logger: log.New(runner.hoststatLogger, "", 0),
+               Logger: newLogWriter(runner.hoststatLogger),
                // Our own cgroup is the "host" cgroup, in the sense
                // that it accounts for resource usage outside the
                // container. It doesn't count _all_ resource usage on
@@ -759,15 +754,15 @@ func (runner *ContainerRunner) startHoststat() error {
 }
 
 func (runner *ContainerRunner) startCrunchstat() error {
-       w, err := runner.NewLogWriter("crunchstat")
+       var err error
+       runner.statLogger, err = runner.openLogFile("crunchstat")
        if err != nil {
                return err
        }
-       runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
                Pid:    runner.executor.Pid,
                FS:     runner.crunchstatFakeFS,
-               Logger: log.New(runner.statLogger, "", 0),
+               Logger: newLogWriter(runner.statLogger),
                MemThresholds: map[string][]crunchstat.Threshold{
                        "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
                },
@@ -790,7 +785,7 @@ type infoCommand struct {
 // might differ from what's described in the node record (see
 // LogNodeRecord).
 func (runner *ContainerRunner) LogHostInfo() (err error) {
-       w, err := runner.NewLogWriter("node-info")
+       w, err := runner.openLogFile("node-info")
        if err != nil {
                return
        }
@@ -875,13 +870,6 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
        if err != nil {
                return false, err
        }
-       w := &ArvLogWriter{
-               ArvClient:     runner.DispatcherArvClient,
-               UUID:          runner.Container.UUID,
-               loggingStream: label,
-               writeCloser:   writer,
-       }
-
        reader, err := runner.DispatcherArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
        if err != nil {
                return false, fmt.Errorf("error getting %s record: %v", label, err)
@@ -901,12 +889,12 @@ func (runner *ContainerRunner) logAPIResponse(label, path string, params map[str
                return false, nil
        }
        // Re-encode it using indentation to improve readability
-       enc := json.NewEncoder(w)
+       enc := json.NewEncoder(writer)
        enc.SetIndent("", "    ")
        if err = enc.Encode(items[0]); err != nil {
                return false, fmt.Errorf("error logging %s record: %v", label, err)
        }
-       err = w.Close()
+       err = writer.Close()
        if err != nil {
                return false, fmt.Errorf("error closing %s.json in log collection: %v", label, err)
        }
@@ -974,10 +962,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                        return err
                }
                stdout = f
-       } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+       } else if w, err := runner.openLogFile("stdout"); err != nil {
                return err
        } else {
-               stdout = NewThrottledLogger(w)
+               stdout = w
        }
 
        if mnt, ok := runner.Container.Mounts["stderr"]; ok {
@@ -986,10 +974,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st
                        return err
                }
                stderr = f
-       } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+       } else if w, err := runner.openLogFile("stderr"); err != nil {
                return err
        } else {
-               stderr = NewThrottledLogger(w)
+               stderr = w
        }
 
        env := runner.Container.Environment
@@ -1442,19 +1430,11 @@ func (runner *ContainerRunner) CommitLogs() error {
                if runner.arvMountLog != nil {
                        runner.arvMountLog.Close()
                }
-               runner.CrunchLog.Close()
-
-               // Closing CrunchLog above allows them to be committed to Keep at this
-               // point, but re-open crunch log with ArvClient in case there are any
-               // other further errors (such as failing to write the log to Keep!)
-               // while shutting down
-               runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
-                       ArvClient:     runner.DispatcherArvClient,
-                       UUID:          runner.Container.UUID,
-                       loggingStream: "crunch-run",
-                       writeCloser:   nil,
-               })
-               runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
+
+               // From now on just log to stderr, in case there are
+               // any other further errors (such as failing to write
+               // the log to Keep!)  while shutting down
+               runner.CrunchLog = newLogWriter(newTimestamper(newStringPrefixer(os.Stderr, runner.Container.UUID+" ")))
        }()
 
        if runner.keepstoreLogger != nil {
@@ -1614,18 +1594,8 @@ func (runner *ContainerRunner) IsCancelled() bool {
        return runner.cCancelled
 }
 
-// NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
-       writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
-       if err != nil {
-               return nil, err
-       }
-       return &ArvLogWriter{
-               ArvClient:     runner.DispatcherArvClient,
-               UUID:          runner.Container.UUID,
-               loggingStream: name,
-               writeCloser:   writer,
-       }, nil
+func (runner *ContainerRunner) openLogFile(name string) (io.WriteCloser, error) {
+       return runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
 }
 
 // Run the full container lifecycle.
@@ -1655,9 +1625,7 @@ func (runner *ContainerRunner) Run() (err error) {
 
        defer func() {
                runner.CleanupDirs()
-
                runner.CrunchLog.Printf("crunch-run finished")
-               runner.CrunchLog.Close()
        }()
 
        err = runner.fetchContainerRecord()
@@ -1851,7 +1819,6 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
                DispatcherArvClient:  dispatcherArvClient,
                DispatcherKeepClient: dispatcherKeepClient,
        }
-       cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
        cr.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
@@ -1874,14 +1841,12 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
                return nil, err
        }
        cr.Container.UUID = containerUUID
-       w, err := cr.NewLogWriter("crunch-run")
+       f, err := cr.openLogFile("crunch-run")
        if err != nil {
                return nil, err
        }
-       cr.CrunchLog = NewThrottledLogger(w)
-       cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
+       cr.CrunchLog = newLogWriter(newTimestamper(io.MultiWriter(f, newStringPrefixer(os.Stderr, cr.Container.UUID+" "))))
 
-       loadLogThrottleParams(dispatcherArvClient)
        go cr.updateLogs()
 
        return cr, nil
@@ -2026,12 +1991,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                keepstoreLogbuf.SetWriter(io.Discard)
        } else {
                cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
-               logwriter, err := cr.NewLogWriter("keepstore")
+               cr.keepstoreLogger, err = cr.openLogFile("keepstore")
                if err != nil {
                        log.Print(err)
                        return 1
                }
-               cr.keepstoreLogger = NewThrottledLogger(logwriter)
 
                var writer io.WriteCloser = cr.keepstoreLogger
                if logWhat == "errors" {
@@ -2057,13 +2021,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                cr.executor, err = newSingularityExecutor(cr.CrunchLog.Printf)
        default:
                cr.CrunchLog.Printf("%s: unsupported RuntimeEngine %q", containerUUID, *runtimeEngine)
-               cr.CrunchLog.Close()
                return 1
        }
        if err != nil {
                cr.CrunchLog.Printf("%s: %v", containerUUID, err)
                cr.checkBrokenNode(err)
-               cr.CrunchLog.Close()
                return 1
        }
        defer cr.executor.Close()