5043: Use Go's log package to serialize writes. Lose logChan.
authorTom Clegg <tom@curoverse.com>
Tue, 3 Mar 2015 02:35:35 +0000 (21:35 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 3 Mar 2015 02:35:35 +0000 (21:35 -0500)
services/crunchstat/crunchstat.go
services/crunchstat/crunchstat_test.go

index 4e7e53dced4632a64eb51e073e9535b35fcbc39a..356eb018f4664980e705020ed965726f03861c56 100644 (file)
@@ -34,7 +34,10 @@ type Cgroup struct {
        cid    string
 }
 
-func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
+var childLog = log.New(os.Stderr, "", 0)
+var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+
+func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
        reader := bufio.NewReader(in)
        for {
                line, err := reader.ReadBytes('\n')
@@ -43,38 +46,22 @@ func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
                                // err == nil IFF line ends in \n
                                line = line[:len(line)-1]
                        }
-                       out <- string(line)
+                       childLog.Println(string(line))
                }
-               if err != nil {
-                       if err != io.EOF {
-                               out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
-                       }
+               if err == io.EOF {
                        break
+               } else if err != nil {
+                       statLog.Fatalln("line buffering error:", err)
                }
        }
        done <- true
        in.Close()
 }
 
-func CopyChanToPipe(in <-chan string, out io.Writer) {
-       for s := range in {
-               fmt.Fprintln(out, s)
-       }
-}
-
-var logChan chan string
-
-func LogPrintf(format string, args ...interface{}) {
-       if logChan == nil {
-               return
-       }
-       logChan <- fmt.Sprintf("crunchstat: "+format, args...)
-}
-
 func ReadAllOrWarn(in *os.File) ([]byte, error) {
        content, err := ioutil.ReadAll(in)
        if err != nil {
-               LogPrintf("read %s: %s", in.Name(), err)
+               statLog.Printf("read %s: %s\n", in.Name(), err)
        }
        return content, err
 }
@@ -116,9 +103,9 @@ func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error
                // [b] after all contained processes have exited.
                reportedStatFile[stat] = path
                if path == "" {
-                       LogPrintf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+                       statLog.Printf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
                } else {
-                       LogPrintf("reading stats from %s", path)
+                       statLog.Printf("reading stats from %s\n", path)
                }
        }
        return file, err
@@ -136,7 +123,7 @@ func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
                statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
                stats, err := ioutil.ReadFile(statsFilename)
                if err != nil {
-                       LogPrintf("read %s: %s", statsFilename, err)
+                       statLog.Printf("read %s: %s\n", statsFilename, err)
                        continue
                }
                return strings.NewReader(string(stats)), nil
@@ -189,7 +176,7 @@ func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
                                sample.txBytes-prev.txBytes,
                                sample.rxBytes-prev.rxBytes)
                }
-               LogPrintf("blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
+               statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
                lastSample[dev] = sample
        }
 }
@@ -222,7 +209,7 @@ func DoMemoryStats(cgroup Cgroup) {
                        outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
                }
        }
-       LogPrintf("mem%s", outstat.String())
+       statLog.Printf("mem%s\n", outstat.String())
 }
 
 func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
@@ -264,7 +251,7 @@ func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
                                tx-prev.txBytes,
                                rx-prev.rxBytes)
                }
-               LogPrintf("net:%s %d tx %d rx%s", ifName, tx, rx, delta)
+               statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
                lastSample[ifName] = nextSample
        }
 }
@@ -325,7 +312,7 @@ func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
                        nextSample.user-lastSample.user,
                        nextSample.sys-lastSample.sys)
        }
-       LogPrintf("cpu %.4f user %.4f sys %d cpus%s",
+       statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
                nextSample.user, nextSample.sys, nextSample.cpus, delta)
        *lastSample = nextSample
 }
@@ -377,23 +364,19 @@ func run(logger *log.Logger) error {
        flag.Parse()
 
        if cgroup_root == "" {
-               logger.Fatal("Must provide -cgroup-root")
+               statLog.Fatalln("Must provide -cgroup-root")
        }
 
-       logChan = make(chan string, 1)
-       defer close(logChan)
        finish_chan := make(chan bool)
        defer close(finish_chan)
 
-       go CopyChanToPipe(logChan, os.Stderr)
-
        var cmd *exec.Cmd
 
        if len(flag.Args()) > 0 {
                // Set up subprocess
                cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
 
-               logger.Print("Running ", flag.Args())
+               statLog.Println("Running ", flag.Args())
 
                // Child process will use our stdin and stdout pipes
                // (we close our copies below)
@@ -401,27 +384,27 @@ func run(logger *log.Logger) error {
                cmd.Stdout = os.Stdout
 
                // Forward SIGINT and SIGTERM to inner process
-               term := make(chan os.Signal, 1)
+               sigChan := make(chan os.Signal, 1)
                go func(sig <-chan os.Signal) {
                        catch := <-sig
                        if cmd.Process != nil {
                                cmd.Process.Signal(catch)
                        }
-                       logger.Print("caught signal: ", catch)
-               }(term)
-               signal.Notify(term, syscall.SIGTERM)
-               signal.Notify(term, syscall.SIGINT)
+                       statLog.Println("caught signal:", catch)
+               }(sigChan)
+               signal.Notify(sigChan, syscall.SIGTERM)
+               signal.Notify(sigChan, syscall.SIGINT)
 
                // Funnel stderr through our channel
                stderr_pipe, err := cmd.StderrPipe()
                if err != nil {
-                       logger.Fatal(err)
+                       statLog.Fatalln("stderr:", err)
                }
-               go CopyPipeToChan(stderr_pipe, logChan, finish_chan)
+               go CopyPipeToChildLog(stderr_pipe, finish_chan)
 
                // Run subprocess
                if err := cmd.Start(); err != nil {
-                       logger.Fatal(err)
+                       statLog.Fatalln("cmd.Start:", err)
                }
 
                // Close stdin/stdout in this (parent) process
@@ -445,7 +428,7 @@ func run(logger *log.Logger) error {
                        time.Sleep(100 * time.Millisecond)
                }
                if !ok {
-                       logger.Printf("Could not read cid file %s", cgroup_cidfile)
+                       statLog.Println("Could not read cid file:", cgroup_cidfile)
                }
        }
 
@@ -478,7 +461,7 @@ func main() {
                                os.Exit(status.ExitStatus())
                        }
                } else {
-                       logger.Fatalf("cmd.Wait: %v", err)
+                       statLog.Fatalln("cmd.Wait:", err)
                }
        }
 }
index 91fe851e0f98be016f0c8ff7a5f398e6ecaee43e..fe922e99eefb47ca62af90bad7bf33676a8730ee 100644 (file)
@@ -4,6 +4,7 @@ import (
        "bufio"
        "bytes"
        "io"
+       "log"
        "math/rand"
        "os"
        "regexp"
@@ -12,9 +13,9 @@ import (
 )
 
 func TestReadAllOrWarnFail(t *testing.T) {
-       logChan = make(chan string)
+       rcv := captureLogs()
+       defer uncaptureLogs()
        go func() {
-               defer close(logChan)
                // The special file /proc/self/mem can be opened for
                // reading, but reading from byte 0 returns an error.
                f, err := os.Open("/proc/self/mem")
@@ -25,67 +26,69 @@ func TestReadAllOrWarnFail(t *testing.T) {
                        t.Fatalf("Expected error, got %v", x)
                }
        }()
-       if _, ok := <-logChan; !ok {
-               t.Fatalf("Expected error message about nonexistent file")
-       }
-       if msg, ok := <-logChan; ok {
-               t.Fatalf("Expected channel to close, got %s", msg)
+       if msg, err := rcv.ReadBytes('\n'); err != nil {
+               t.Fatal(err)
+       } else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
+               t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
        }
 }
 
 func TestReadAllOrWarnSuccess(t *testing.T) {
-       logChan = make(chan string)
-       go func() {
-               defer close(logChan)
-               f, err := os.Open("./crunchstat_test.go")
-               if err != nil {
-                       t.Fatalf("Opening ./crunchstat_test.go: %s", err)
-               }
-               data, err := ReadAllOrWarn(f)
-               if err != nil {
-                       t.Fatalf("got error %s", err)
-               }
-               if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
-                       t.Fatalf("data failed regexp: %s", err)
-               }
-       }()
-       if msg, ok := <-logChan; ok {
-               t.Fatalf("Expected channel to close, got %s", msg)
+       f, err := os.Open("./crunchstat_test.go")
+       if err != nil {
+               t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+       }
+       data, err := ReadAllOrWarn(f)
+       if err != nil {
+               t.Fatalf("got error %s", err)
+       }
+       if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
+               t.Fatalf("data failed regexp: %s", err)
        }
 }
 
-// Test that CopyPipeToChan works even on lines longer than
+// Test that CopyPipeToChildLog works even on lines longer than
 // bufio.MaxScanTokenSize.
-func TestCopyPipeToChanLongLines(t *testing.T) {
-       logChan := make(chan string)
-       control := make(chan bool)
+func TestCopyPipeToChildLogLongLines(t *testing.T) {
+       rcv := captureLogs()
+       defer uncaptureLogs()
 
+       control := make(chan bool)
        pipeIn, pipeOut := io.Pipe()
-       go CopyPipeToChan(pipeIn, logChan, control)
+       go CopyPipeToChildLog(pipeIn, control)
 
        sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
        go func() {
+               pipeOut.Write([]byte("before\n"))
+
                for i := range sentBytes {
                        // Some bytes that aren't newlines:
                        sentBytes[i] = byte((rand.Int() & 0xff) | 0x80)
                }
-               pipeOut.Write([]byte("before\n"))
+               sentBytes[len(sentBytes)-1] = '\n'
                pipeOut.Write(sentBytes)
-               pipeOut.Write([]byte("\nafter\n"))
+
+               pipeOut.Write([]byte("after"))
                pipeOut.Close()
        }()
 
-       if before := <-logChan; before != "before" {
-               t.Fatalf("\"before\" not received (got \"%s\")", before)
+       if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+               t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
+       }
+       
+       receivedString, err := rcv.ReadBytes('\n')
+       if err != nil {
+               t.Fatal(err)
        }
-       receivedString := <-logChan
        receivedBytes := []byte(receivedString)
        if bytes.Compare(receivedBytes, sentBytes) != 0 {
-               t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
+               t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes)+1, len(receivedBytes))
        }
-       if after := <-logChan; after != "after" {
-               t.Fatal("\"after\" not received")
+
+       if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+               t.Fatal("\"after\n\" not received (got \"%s\", %s)", after, err)
        }
+
        select {
        case <-time.After(time.Second):
                t.Fatal("Timeout")
@@ -93,3 +96,16 @@ func TestCopyPipeToChanLongLines(t *testing.T) {
                // Done.
        }
 }
+
+func captureLogs() (*bufio.Reader) {
+       // Send childLog to our bufio reader instead of stderr
+       stderrIn, stderrOut := io.Pipe()
+       childLog = log.New(stderrOut, "", 0)
+       statLog = log.New(stderrOut, "crunchstat: ", 0)
+       return bufio.NewReader(stderrIn)
+}
+
+func uncaptureLogs() {
+       childLog = log.New(os.Stderr, "", 0)
+       statLog = log.New(os.Stderr, "crunchstat: ", 0)
+}