5043: Accept long stderr lines from crunch tasks.
authorTom Clegg <tom@curoverse.com>
Mon, 2 Mar 2015 19:42:35 +0000 (14:42 -0500)
committerTom Clegg <tom@curoverse.com>
Mon, 2 Mar 2015 19:42:35 +0000 (14:42 -0500)
services/crunchstat/crunchstat.go
services/crunchstat/crunchstat_test.go

index e35e98aa59dedba5848d2a6cc1013fd322c15f09..4e7e53dced4632a64eb51e073e9535b35fcbc39a 100644 (file)
@@ -35,19 +35,25 @@ type Cgroup struct {
 }
 
 func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
-       defer in.Close()
-
-       // TODO(twp): handle long input records gracefully, if possible
-       // without killing the child task (#4889)
-       //
-       s := bufio.NewScanner(in)
-       for s.Scan() {
-               out <- s.Text()
-       }
-       if s.Err() != nil {
-               out <- fmt.Sprintf("crunchstat: line buffering error: %s", s.Err())
+       reader := bufio.NewReader(in)
+       for {
+               line, err := reader.ReadBytes('\n')
+               if len(line) > 0 {
+                       if err == nil {
+                               // err == nil IFF line ends in \n
+                               line = line[:len(line)-1]
+                       }
+                       out <- string(line)
+               }
+               if err != nil {
+                       if err != io.EOF {
+                               out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
+                       }
+                       break
+               }
        }
        done <- true
+       in.Close()
 }
 
 func CopyChanToPipe(in <-chan string, out io.Writer) {
index e3c3a597eacb640557e0e9fdd789a017ab35a8c8..91fe851e0f98be016f0c8ff7a5f398e6ecaee43e 100644 (file)
@@ -2,10 +2,13 @@ package main
 
 import (
        "bufio"
+       "bytes"
        "io"
+       "math/rand"
        "os"
        "regexp"
        "testing"
+       "time"
 )
 
 func TestReadAllOrWarnFail(t *testing.T) {
@@ -51,8 +54,8 @@ func TestReadAllOrWarnSuccess(t *testing.T) {
        }
 }
 
-// Test that if CopyPipeToChan reads a line longer than
-// bufio.MaxScanTokenSize, it emits an error to the output channel.
+// Test that CopyPipeToChan works even on lines longer than
+// bufio.MaxScanTokenSize.
 func TestCopyPipeToChanLongLines(t *testing.T) {
        logChan := make(chan string)
        control := make(chan bool)
@@ -60,20 +63,33 @@ func TestCopyPipeToChanLongLines(t *testing.T) {
        pipeIn, pipeOut := io.Pipe()
        go CopyPipeToChan(pipeIn, logChan, control)
 
+       sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
        go func() {
-               long_line := make([]byte, bufio.MaxScanTokenSize+1)
-               for i := range long_line {
-                       long_line[i] = byte('x')
+               for i := range sentBytes {
+                       // Some bytes that aren't newlines:
+                       sentBytes[i] = byte((rand.Int() & 0xff) | 0x80)
                }
-               pipeOut.Write(long_line)
+               pipeOut.Write([]byte("before\n"))
+               pipeOut.Write(sentBytes)
+               pipeOut.Write([]byte("\nafter\n"))
+               pipeOut.Close()
        }()
 
-       // Expect error message from logChan.
-
-       errmsg := <-logChan
-       if matched, err := regexp.MatchString("^crunchstat: line buffering error:.*token too long", errmsg); err != nil || !matched {
-               t.Fatalf("expected CopyPipeToChan error, got %s", errmsg)
+       if before := <-logChan; before != "before" {
+               t.Fatalf("\"before\" not received (got \"%s\")", before)
+       }
+       receivedString := <-logChan
+       receivedBytes := []byte(receivedString)
+       if bytes.Compare(receivedBytes, sentBytes) != 0 {
+               t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
+       }
+       if after := <-logChan; after != "after" {
+               t.Fatal("\"after\" not received")
+       }
+       select {
+       case <-time.After(time.Second):
+               t.Fatal("Timeout")
+       case <-control:
+               // Done.
        }
-
-       <-control
 }