X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/41887dd213cfa165925e94d4f3bb120edeb1a30d..a74c81b035c67d299e2a7298f8db3d368a578510:/services/crunchstat/crunchstat.go diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go index 728f1a4374..6bce3258d9 100644 --- a/services/crunchstat/crunchstat.go +++ b/services/crunchstat/crunchstat.go @@ -34,32 +34,43 @@ type Cgroup struct { cid string } -func CopyPipeToChan(in io.Reader, out chan string, done chan<- bool) { - s := bufio.NewScanner(in) - for s.Scan() { - out <- s.Text() - } - done <- true -} +var childLog = log.New(os.Stderr, "", 0) +var statLog = log.New(os.Stderr, "crunchstat: ", 0) -func CopyChanToPipe(in <-chan string, out io.Writer) { - for s := range in { - fmt.Fprintln(out, s) - } -} +const ( + MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split +) -var logChan chan string -func LogPrintf(format string, args ...interface{}) { - if logChan == nil { - return +func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) { + reader := bufio.NewReaderSize(in, MaxLogLine) + var prefix string + for { + line, isPrefix, err := reader.ReadLine() + if err == io.EOF { + break + } else if err != nil { + statLog.Fatal("error reading child stderr:", err) + } + var suffix string + if isPrefix { + suffix = "[...]" + } + childLog.Print(prefix, string(line), suffix) + // Set up prefix for following line + if isPrefix { + prefix = "[...]" + } else { + prefix = "" + } } - logChan <- fmt.Sprintf("crunchstat: " + format, args...) + done <- true + in.Close() } func ReadAllOrWarn(in *os.File) ([]byte, error) { content, err := ioutil.ReadAll(in) if err != nil { - LogPrintf("read %s: %s", in.Name(), err) + statLog.Printf("error reading %s: %s\n", in.Name(), err) } return content, err } @@ -75,11 +86,19 @@ var reportedStatFile = map[string]string{} // cgroup root for the given statgroup. (This will avoid falling back // to host-level stats during container setup and teardown.) func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) { - var paths = []string{ - fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat), - fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat), - fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat), - fmt.Sprintf("%s/%s", cgroup.root, stat), + var paths []string + if cgroup.cid != "" { + // Collect container's stats + paths = []string{ + fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat), + fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat), + } + } else { + // Collect this host's stats + paths = []string{ + fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat), + fmt.Sprintf("%s/%s", cgroup.root, stat), + } } var path string var file *os.File @@ -99,12 +118,14 @@ func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error // whether we happen to collect stats [a] before any // processes have been created in the container and // [b] after all contained processes have exited. - reportedStatFile[stat] = path if path == "" { - LogPrintf("did not find stats file: stat %s, statgroup %s, cid %s, parent %s, root %s", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root) + statLog.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root) + } else if ok { + statLog.Printf("notice: stats moved from %s to %s\n", reportedStatFile[stat], path) } else { - LogPrintf("reading stats from %s", path) + statLog.Printf("notice: reading stats from %s\n", path) } + reportedStatFile[stat] = path } return file, err } @@ -121,7 +142,7 @@ func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) { statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid) stats, err := ioutil.ReadFile(statsFilename) if err != nil { - LogPrintf("read %s: %s", statsFilename, err) + statLog.Printf("error reading %s: %s\n", statsFilename, err) continue } return strings.NewReader(string(stats)), nil @@ -174,7 +195,7 @@ func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) { sample.txBytes-prev.txBytes, sample.rxBytes-prev.rxBytes) } - LogPrintf("blkio:%s %d write %d read%s", dev, sample.txBytes, sample.rxBytes, delta) + statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta) lastSample[dev] = sample } } @@ -192,7 +213,7 @@ func DoMemoryStats(cgroup Cgroup) { defer c.Close() b := bufio.NewScanner(c) thisSample := MemSample{time.Now(), make(map[string]int64)} - wantStats := [...]string{"cache", "pgmajfault", "rss"} + wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"} for b.Scan() { var stat string var val int64 @@ -207,7 +228,7 @@ func DoMemoryStats(cgroup Cgroup) { outstat.WriteString(fmt.Sprintf(" %d %s", val, key)) } } - LogPrintf("mem%s", outstat.String()) + statLog.Printf("mem%s\n", outstat.String()) } func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) { @@ -249,7 +270,7 @@ func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) { tx-prev.txBytes, rx-prev.rxBytes) } - LogPrintf("net:%s %d tx %d rx%s", ifName, tx, rx, delta) + statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta) lastSample[ifName] = nextSample } } @@ -310,7 +331,7 @@ func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) { nextSample.user-lastSample.user, nextSample.sys-lastSample.sys) } - LogPrintf("cpu %.4f user %.4f sys %d cpus%s", + statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n", nextSample.user, nextSample.sys, nextSample.cpus, delta) *lastSample = nextSample } @@ -362,23 +383,19 @@ func run(logger *log.Logger) error { flag.Parse() if cgroup_root == "" { - logger.Fatal("Must provide -cgroup-root") + statLog.Fatal("error: must provide -cgroup-root") } - logChan = make(chan string, 1) - defer close(logChan) finish_chan := make(chan bool) defer close(finish_chan) - go CopyChanToPipe(logChan, os.Stderr) - var cmd *exec.Cmd if len(flag.Args()) > 0 { // Set up subprocess cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...) - logger.Print("Running ", flag.Args()) + childLog.Println("Running", flag.Args()) // Child process will use our stdin and stdout pipes // (we close our copies below) @@ -386,27 +403,27 @@ func run(logger *log.Logger) error { cmd.Stdout = os.Stdout // Forward SIGINT and SIGTERM to inner process - term := make(chan os.Signal, 1) + sigChan := make(chan os.Signal, 1) go func(sig <-chan os.Signal) { catch := <-sig if cmd.Process != nil { cmd.Process.Signal(catch) } - logger.Print("caught signal: ", catch) - }(term) - signal.Notify(term, syscall.SIGTERM) - signal.Notify(term, syscall.SIGINT) + statLog.Println("notice: caught signal:", catch) + }(sigChan) + signal.Notify(sigChan, syscall.SIGTERM) + signal.Notify(sigChan, syscall.SIGINT) // Funnel stderr through our channel stderr_pipe, err := cmd.StderrPipe() if err != nil { - logger.Fatal(err) + statLog.Fatalln("error in StderrPipe:", err) } - go CopyPipeToChan(stderr_pipe, logChan, finish_chan) + go CopyPipeToChildLog(stderr_pipe, finish_chan) // Run subprocess if err := cmd.Start(); err != nil { - logger.Fatal(err) + statLog.Fatalln("error in cmd.Start:", err) } // Close stdin/stdout in this (parent) process @@ -430,7 +447,7 @@ func run(logger *log.Logger) error { time.Sleep(100 * time.Millisecond) } if !ok { - logger.Printf("Could not read cid file %s", cgroup_cidfile) + statLog.Println("error reading cid file:", cgroup_cidfile) } } @@ -463,7 +480,7 @@ func main() { os.Exit(status.ExitStatus()) } } else { - logger.Fatalf("cmd.Wait: %v", err) + statLog.Fatalln("error in cmd.Wait:", err) } } }