5043: Split long stderr lines rather than consume unlimited memory.
authorTom Clegg <tom@curoverse.com>
Tue, 3 Mar 2015 15:56:52 +0000 (10:56 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 3 Mar 2015 15:56:52 +0000 (10:56 -0500)
services/crunchstat/crunchstat.go
services/crunchstat/crunchstat_test.go

index 356eb018f4664980e705020ed965726f03861c56..1a22e264f1ab734dafe052a589f86bfc2be6597e 100644 (file)
@@ -37,21 +37,30 @@ type Cgroup struct {
 var childLog = log.New(os.Stderr, "", 0)
 var statLog = log.New(os.Stderr, "crunchstat: ", 0)
 
+const (
+       MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
+)
+
 func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
-       reader := bufio.NewReader(in)
+       reader := bufio.NewReaderSize(in, MaxLogLine)
+       var prefix string
        for {
-               line, err := reader.ReadBytes('\n')
-               if len(line) > 0 {
-                       if err == nil {
-                               // err == nil IFF line ends in \n
-                               line = line[:len(line)-1]
-                       }
-                       childLog.Println(string(line))
-               }
+               line, isPrefix, err := reader.ReadLine()
                if err == io.EOF {
                        break
                } else if err != nil {
-                       statLog.Fatalln("line buffering error:", err)
+                       statLog.Fatal("error reading child stderr:", err)
+               }
+               var suffix string
+               if isPrefix {
+                       suffix = "[...]"
+               }
+               childLog.Print(prefix, string(line), suffix)
+               // Set up prefix for following line
+               if isPrefix {
+                       prefix = "[...]"
+               } else {
+                       prefix = ""
                }
        }
        done <- true
@@ -61,7 +70,7 @@ func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
 func ReadAllOrWarn(in *os.File) ([]byte, error) {
        content, err := ioutil.ReadAll(in)
        if err != nil {
-               statLog.Printf("read %s: %s\n", in.Name(), err)
+               statLog.Printf("error reading %s: %s\n", in.Name(), err)
        }
        return content, err
 }
@@ -103,9 +112,9 @@ func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error
                // [b] after all contained processes have exited.
                reportedStatFile[stat] = path
                if path == "" {
-                       statLog.Printf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+                       statLog.Printf("error finding stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
                } else {
-                       statLog.Printf("reading stats from %s\n", path)
+                       statLog.Printf("error reading stats from %s\n", path)
                }
        }
        return file, err
@@ -364,7 +373,7 @@ func run(logger *log.Logger) error {
        flag.Parse()
 
        if cgroup_root == "" {
-               statLog.Fatalln("Must provide -cgroup-root")
+               statLog.Fatal("error: must provide -cgroup-root")
        }
 
        finish_chan := make(chan bool)
@@ -376,7 +385,7 @@ func run(logger *log.Logger) error {
                // Set up subprocess
                cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
 
-               statLog.Println("Running ", flag.Args())
+               childLog.Println("Running", flag.Args())
 
                // Child process will use our stdin and stdout pipes
                // (we close our copies below)
@@ -398,13 +407,13 @@ func run(logger *log.Logger) error {
                // Funnel stderr through our channel
                stderr_pipe, err := cmd.StderrPipe()
                if err != nil {
-                       statLog.Fatalln("stderr:", err)
+                       statLog.Fatalln("error in StderrPipe:", err)
                }
                go CopyPipeToChildLog(stderr_pipe, finish_chan)
 
                // Run subprocess
                if err := cmd.Start(); err != nil {
-                       statLog.Fatalln("cmd.Start:", err)
+                       statLog.Fatalln("error in cmd.Start:", err)
                }
 
                // Close stdin/stdout in this (parent) process
@@ -428,7 +437,7 @@ func run(logger *log.Logger) error {
                        time.Sleep(100 * time.Millisecond)
                }
                if !ok {
-                       statLog.Println("Could not read cid file:", cgroup_cidfile)
+                       statLog.Println("error reading cid file:", cgroup_cidfile)
                }
        }
 
@@ -461,7 +470,7 @@ func main() {
                                os.Exit(status.ExitStatus())
                        }
                } else {
-                       statLog.Fatalln("cmd.Wait:", err)
+                       statLog.Fatalln("error in cmd.Wait:", err)
                }
        }
 }
index fe922e99eefb47ca62af90bad7bf33676a8730ee..dff323e71875aee08c11bcddbf364dd5885998d3 100644 (file)
@@ -57,7 +57,7 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
        pipeIn, pipeOut := io.Pipe()
        go CopyPipeToChildLog(pipeIn, control)
 
-       sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
+       sentBytes := make([]byte, bufio.MaxScanTokenSize+MaxLogLine+(1<<22))
        go func() {
                pipeOut.Write([]byte("before\n"))
 
@@ -75,12 +75,27 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
        if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
                t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
        }
-       
-       receivedString, err := rcv.ReadBytes('\n')
-       if err != nil {
-               t.Fatal(err)
+
+       var receivedBytes []byte
+       done := false
+       for !done {
+               line, err := rcv.ReadBytes('\n')
+               if err != nil {
+                       t.Fatal(err)
+               }
+               if len(line) >= 5 && string(line[0:5]) == "[...]" {
+                       if receivedBytes == nil {
+                               t.Fatal("Beginning of line reported as continuation")
+                       }
+                       line = line[5:]
+               }
+               if len(line) >= 6 && string(line[len(line)-6:len(line)]) == "[...]\n" {
+                       line = line[:len(line)-6]
+               } else {
+                       done = true
+               }
+               receivedBytes = append(receivedBytes, line...)
        }
-       receivedBytes := []byte(receivedString)
        if bytes.Compare(receivedBytes, sentBytes) != 0 {
                t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes)+1, len(receivedBytes))
        }
@@ -97,7 +112,7 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
        }
 }
 
-func captureLogs() (*bufio.Reader) {
+func captureLogs() *bufio.Reader {
        // Send childLog to our bufio reader instead of stderr
        stderrIn, stderrOut := io.Pipe()
        childLog = log.New(stderrOut, "", 0)