cid string
}
-func CopyPipeToChan(in io.ReadCloser, out chan string, done chan<- bool) {
+var childLog = log.New(os.Stderr, "", 0)
+var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+
+func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
reader := bufio.NewReader(in)
for {
line, err := reader.ReadBytes('\n')
// err == nil IFF line ends in \n
line = line[:len(line)-1]
}
- out <- string(line)
+ childLog.Println(string(line))
}
- if err != nil {
- if err != io.EOF {
- out <- fmt.Sprintf("crunchstat: line buffering error: %s", err)
- }
+ if err == io.EOF {
break
+ } else if err != nil {
+ statLog.Fatalln("line buffering error:", err)
}
}
done <- true
in.Close()
}
-func CopyChanToPipe(in <-chan string, out io.Writer) {
- for s := range in {
- fmt.Fprintln(out, s)
- }
-}
-
-var logChan chan string
-
-func LogPrintf(format string, args ...interface{}) {
- if logChan == nil {
- return
- }
- logChan <- fmt.Sprintf("crunchstat: "+format, args...)
-}
-
func ReadAllOrWarn(in *os.File) ([]byte, error) {
content, err := ioutil.ReadAll(in)
if err != nil {
- LogPrintf("read %s: %s", in.Name(), err)
+ statLog.Printf("read %s: %s\n", in.Name(), err)
}
return content, err
}
// [b] after all contained processes have exited.
reportedStatFile[stat] = path
if path == "" {
- LogPrintf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
+ statLog.Printf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
} else {
- LogPrintf("reading stats from %s", path)
+ statLog.Printf("reading stats from %s\n", path)
}
}
return file, err
statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
stats, err := ioutil.ReadFile(statsFilename)
if err != nil {
- LogPrintf("read %s: %s", statsFilename, err)
+ statLog.Printf("read %s: %s\n", statsFilename, err)
continue
}
return strings.NewReader(string(stats)), nil
sample.txBytes-prev.txBytes,
sample.rxBytes-prev.rxBytes)
}
- LogPrintf("blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta)
+ statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
lastSample[dev] = sample
}
}
outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
}
}
- LogPrintf("mem%s", outstat.String())
+ statLog.Printf("mem%s\n", outstat.String())
}
func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
tx-prev.txBytes,
rx-prev.rxBytes)
}
- LogPrintf("net:%s %d tx %d rx%s", ifName, tx, rx, delta)
+ statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
lastSample[ifName] = nextSample
}
}
nextSample.user-lastSample.user,
nextSample.sys-lastSample.sys)
}
- LogPrintf("cpu %.4f user %.4f sys %d cpus%s",
+ statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
nextSample.user, nextSample.sys, nextSample.cpus, delta)
*lastSample = nextSample
}
flag.Parse()
if cgroup_root == "" {
- logger.Fatal("Must provide -cgroup-root")
+ statLog.Fatalln("Must provide -cgroup-root")
}
- logChan = make(chan string, 1)
- defer close(logChan)
finish_chan := make(chan bool)
defer close(finish_chan)
- go CopyChanToPipe(logChan, os.Stderr)
-
var cmd *exec.Cmd
if len(flag.Args()) > 0 {
// Set up subprocess
cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
- logger.Print("Running ", flag.Args())
+ statLog.Println("Running ", flag.Args())
// Child process will use our stdin and stdout pipes
// (we close our copies below)
cmd.Stdout = os.Stdout
// Forward SIGINT and SIGTERM to inner process
- term := make(chan os.Signal, 1)
+ sigChan := make(chan os.Signal, 1)
go func(sig <-chan os.Signal) {
catch := <-sig
if cmd.Process != nil {
cmd.Process.Signal(catch)
}
- logger.Print("caught signal: ", catch)
- }(term)
- signal.Notify(term, syscall.SIGTERM)
- signal.Notify(term, syscall.SIGINT)
+ statLog.Println("caught signal:", catch)
+ }(sigChan)
+ signal.Notify(sigChan, syscall.SIGTERM)
+ signal.Notify(sigChan, syscall.SIGINT)
// Funnel stderr through our channel
stderr_pipe, err := cmd.StderrPipe()
if err != nil {
- logger.Fatal(err)
+ statLog.Fatalln("stderr:", err)
}
- go CopyPipeToChan(stderr_pipe, logChan, finish_chan)
+ go CopyPipeToChildLog(stderr_pipe, finish_chan)
// Run subprocess
if err := cmd.Start(); err != nil {
- logger.Fatal(err)
+ statLog.Fatalln("cmd.Start:", err)
}
// Close stdin/stdout in this (parent) process
time.Sleep(100 * time.Millisecond)
}
if !ok {
- logger.Printf("Could not read cid file %s", cgroup_cidfile)
+ statLog.Println("Could not read cid file:", cgroup_cidfile)
}
}
os.Exit(status.ExitStatus())
}
} else {
- logger.Fatalf("cmd.Wait: %v", err)
+ statLog.Fatalln("cmd.Wait:", err)
}
}
}
"bufio"
"bytes"
"io"
+ "log"
"math/rand"
"os"
"regexp"
)
func TestReadAllOrWarnFail(t *testing.T) {
- logChan = make(chan string)
+ rcv := captureLogs()
+ defer uncaptureLogs()
go func() {
- defer close(logChan)
// The special file /proc/self/mem can be opened for
// reading, but reading from byte 0 returns an error.
f, err := os.Open("/proc/self/mem")
t.Fatalf("Expected error, got %v", x)
}
}()
- if _, ok := <-logChan; !ok {
- t.Fatalf("Expected error message about nonexistent file")
- }
- if msg, ok := <-logChan; ok {
- t.Fatalf("Expected channel to close, got %s", msg)
+ if msg, err := rcv.ReadBytes('\n'); err != nil {
+ t.Fatal(err)
+ } else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
+ t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
}
}
func TestReadAllOrWarnSuccess(t *testing.T) {
- logChan = make(chan string)
- go func() {
- defer close(logChan)
- f, err := os.Open("./crunchstat_test.go")
- if err != nil {
- t.Fatalf("Opening ./crunchstat_test.go: %s", err)
- }
- data, err := ReadAllOrWarn(f)
- if err != nil {
- t.Fatalf("got error %s", err)
- }
- if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
- t.Fatalf("data failed regexp: %s", err)
- }
- }()
- if msg, ok := <-logChan; ok {
- t.Fatalf("Expected channel to close, got %s", msg)
+ f, err := os.Open("./crunchstat_test.go")
+ if err != nil {
+ t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+ }
+ data, err := ReadAllOrWarn(f)
+ if err != nil {
+ t.Fatalf("got error %s", err)
+ }
+ if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
+ t.Fatalf("data failed regexp: %s", err)
}
}
-// Test that CopyPipeToChan works even on lines longer than
+// Test that CopyPipeToChildLog works even on lines longer than
// bufio.MaxScanTokenSize.
-func TestCopyPipeToChanLongLines(t *testing.T) {
- logChan := make(chan string)
- control := make(chan bool)
+func TestCopyPipeToChildLogLongLines(t *testing.T) {
+ rcv := captureLogs()
+ defer uncaptureLogs()
+ control := make(chan bool)
pipeIn, pipeOut := io.Pipe()
- go CopyPipeToChan(pipeIn, logChan, control)
+ go CopyPipeToChildLog(pipeIn, control)
sentBytes := make([]byte, bufio.MaxScanTokenSize + (1 << 22))
go func() {
+ pipeOut.Write([]byte("before\n"))
+
for i := range sentBytes {
// Some bytes that aren't newlines:
sentBytes[i] = byte((rand.Int() & 0xff) | 0x80)
}
- pipeOut.Write([]byte("before\n"))
+ sentBytes[len(sentBytes)-1] = '\n'
pipeOut.Write(sentBytes)
- pipeOut.Write([]byte("\nafter\n"))
+
+ pipeOut.Write([]byte("after"))
pipeOut.Close()
}()
- if before := <-logChan; before != "before" {
- t.Fatalf("\"before\" not received (got \"%s\")", before)
+ if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+ t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
+ }
+
+ receivedString, err := rcv.ReadBytes('\n')
+ if err != nil {
+ t.Fatal(err)
}
- receivedString := <-logChan
receivedBytes := []byte(receivedString)
if bytes.Compare(receivedBytes, sentBytes) != 0 {
- t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
+ t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes)+1, len(receivedBytes))
}
- if after := <-logChan; after != "after" {
- t.Fatal("\"after\" not received")
+
+ if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+ t.Fatal("\"after\n\" not received (got \"%s\", %s)", after, err)
}
+
select {
case <-time.After(time.Second):
t.Fatal("Timeout")
// Done.
}
}
+
+func captureLogs() (*bufio.Reader) {
+ // Send childLog to our bufio reader instead of stderr
+ stderrIn, stderrOut := io.Pipe()
+ childLog = log.New(stderrOut, "", 0)
+ statLog = log.New(stderrOut, "crunchstat: ", 0)
+ return bufio.NewReader(stderrIn)
+}
+
+func uncaptureLogs() {
+ childLog = log.New(os.Stderr, "", 0)
+ statLog = log.New(os.Stderr, "crunchstat: ", 0)
+}