}
func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
- defer in.Close()
-
- // TODO(twp): handle long input records gracefully, if possible
- // without killing the child task (#4889)
- //
- s := bufio.NewScanner(in)
- for s.Scan() {
- out <- s.Text()
- }
- if s.Err() != nil {
- out <- fmt.Sprintf("crunchstat: line buffering error: %s", s.Err())
+ reader := bufio.NewReader(in)
+ for {
+ line, err := reader.ReadBytes('\n')
+ if len(line) > 0 {
+ if err == nil {
+ // err == nil IFF line ends in \n
+ line = line[:len(line)-1]
+ }
+ out <- string(line)
+ }
+ if err != nil {
+ if err != io.EOF {
+ out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
+ }
+ break
+ }
}
done <- true
+ in.Close()
}
func CopyChanToPipe(in <-chan string, out io.Writer) {
import (
"bufio"
+ "bytes"
"io"
+ "math/rand"
"os"
"regexp"
"testing"
+ "time"
)
func TestReadAllOrWarnFail(t *testing.T) {
}
}
-// Test that if CopyPipeToChan reads a line longer than
-// bufio.MaxScanTokenSize, it emits an error to the output channel.
+// Test that CopyPipeToChan works even on lines longer than
+// bufio.MaxScanTokenSize.
func TestCopyPipeToChanLongLines(t *testing.T) {
logChan := make(chan string)
control := make(chan bool)
pipeIn, pipeOut := io.Pipe()
go CopyPipeToChan(pipeIn, logChan, control)
+ sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
go func() {
- long_line := make([]byte, bufio.MaxScanTokenSize+1)
- for i := range long_line {
- long_line[i] = byte('x')
+ for i := range sentBytes {
+ // Some bytes that aren't newlines:
+ sentBytes[i] = byte((rand.Int() & 0xff) | 0x80)
}
- pipeOut.Write(long_line)
+ pipeOut.Write([]byte("before\n"))
+ pipeOut.Write(sentBytes)
+ pipeOut.Write([]byte("\nafter\n"))
+ pipeOut.Close()
}()
- // Expect error message from logChan.
-
- errmsg := <-logChan
- if matched, err := regexp.MatchString("^crunchstat: line buffering error:.*token too long", errmsg); err != nil || !matched {
- t.Fatalf("expected CopyPipeToChan error, got %s", errmsg)
+ if before := <-logChan; before != "before" {
+ t.Fatalf("\"before\" not received (got \"%s\")", before)
+ }
+ receivedString := <-logChan
+ receivedBytes := []byte(receivedString)
+ if bytes.Compare(receivedBytes, sentBytes) != 0 {
+ t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
+ }
+ if after := <-logChan; after != "after" {
+ t.Fatal("\"after\" not received")
+ }
+ select {
+ case <-time.After(time.Second):
+ t.Fatal("Timeout")
+ case <-control:
+ // Done.
}
-
- <-control
}