Merge branch '8016-crunchrun-crunchstat'
authorTom Clegg <tom@curoverse.com>
Tue, 5 Jul 2016 17:28:51 +0000 (13:28 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 5 Jul 2016 17:28:51 +0000 (13:28 -0400)
closes #8016

1  2 
services/crunch-run/crunchrun.go
services/crunchstat/crunchstat_test.go

index 2fd002907cb6e5d15c9928049fab2de9684ee06e,2795cb021e143ec1b883822c1baf2a891226d2c7..32d524abca2f59689e56efe59b526d9da8f37181
@@@ -5,6 -5,7 +5,7 @@@ import 
        "errors"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/lib/crunchstat"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@@ -91,6 -92,12 +92,12 @@@ type ContainerRunner struct 
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
+       statLogger   io.WriteCloser
+       statReporter *crunchstat.Reporter
+       statInterval time.Duration
+       cgroupRoot   string
+       cgroupParent string
  }
  
  // SetupSignals sets up signal handling to gracefully terminate the underlying
@@@ -102,7 -109,7 +109,7 @@@ func (runner *ContainerRunner) SetupSig
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
  
        go func(sig <-chan os.Signal) {
 -              for _ = range sig {
 +              for range sig {
                        if !runner.Cancelled {
                                runner.CancelLock.Lock()
                                runner.Cancelled = true
@@@ -366,6 -373,14 +373,14 @@@ func (runner *ContainerRunner) ProcessD
                                runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
                        }
  
+                       if runner.statReporter != nil {
+                               runner.statReporter.Stop()
+                               closeerr = runner.statLogger.Close()
+                               if closeerr != nil {
+                                       runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
+                               }
+                       }
                        runner.loggingDone <- true
                        close(runner.loggingDone)
                        return
        }
  }
  
+ func (runner *ContainerRunner) StartCrunchstat() {
+       runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
+       runner.statReporter = &crunchstat.Reporter{
+               CID:          runner.ContainerID,
+               Logger:       log.New(runner.statLogger, "", 0),
+               CgroupParent: runner.cgroupParent,
+               CgroupRoot:   runner.cgroupRoot,
+               PollPeriod:   runner.statInterval,
+       }
+       runner.statReporter.Start()
+ }
  // AttachLogs connects the docker container stdout and stderr logs to the
  // Arvados logger which logs to Keep and the API server logs table.
  func (runner *ContainerRunner) AttachStreams() (err error) {
@@@ -752,6 -779,8 +779,8 @@@ func (runner *ContainerRunner) Run() (e
                return
        }
  
+       runner.StartCrunchstat()
        if runner.IsCancelled() {
                return
        }
@@@ -792,6 -821,9 +821,9 @@@ func NewContainerRunner(api IArvadosCli
  }
  
  func main() {
+       statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
+       cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+       cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup")
        flag.Parse()
  
        containerId := flag.Arg(0)
        }
  
        cr := NewContainerRunner(api, kc, docker, containerId)
+       cr.statInterval = *statInterval
+       cr.cgroupRoot = *cgroupRoot
+       cr.cgroupParent = *cgroupParent
  
        err = cr.Run()
        if err != nil {
index fe95f03175f14df6dcfa391787e547fb466ac5cf,63967d595a1d1e23b15d49f92471c429de2a4941..fe3b56d25876fd832d3596abe3db8e40852ebbf7
@@@ -6,56 -6,21 +6,21 @@@ import 
        "io"
        "log"
        "math/rand"
-       "os"
-       "regexp"
        "testing"
        "time"
  )
  
- func TestReadAllOrWarnFail(t *testing.T) {
-       rcv := captureLogs()
-       defer uncaptureLogs()
-       go func() {
-               // The special file /proc/self/mem can be opened for
-               // reading, but reading from byte 0 returns an error.
-               f, err := os.Open("/proc/self/mem")
-               if err != nil {
-                       t.Fatalf("Opening /proc/self/mem: %s", err)
-               }
-               if x, err := ReadAllOrWarn(f); err == nil {
-                       t.Fatalf("Expected error, got %v", x)
-               }
-       }()
-       if msg, err := rcv.ReadBytes('\n'); err != nil {
-               t.Fatal(err)
-       } else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
-               t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
-       }
- }
- func TestReadAllOrWarnSuccess(t *testing.T) {
-       f, err := os.Open("./crunchstat_test.go")
-       if err != nil {
-               t.Fatalf("Opening ./crunchstat_test.go: %s", err)
-       }
-       data, err := ReadAllOrWarn(f)
-       if err != nil {
-               t.Fatalf("got error %s", err)
-       }
-       if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
-               t.Fatalf("data failed regexp: %s", err)
-       }
- }
  // Test that CopyPipeToChildLog works even on lines longer than
  // bufio.MaxScanTokenSize.
  func TestCopyPipeToChildLogLongLines(t *testing.T) {
-       rcv := captureLogs()
-       defer uncaptureLogs()
+       logger, logBuf := bufLogger()
  
-       control := make(chan bool)
        pipeIn, pipeOut := io.Pipe()
-       go CopyPipeToChildLog(pipeIn, control)
+       copied := make(chan bool)
+       go func() {
+               copyPipeToChildLog(pipeIn, logger)
+               close(copied)
+       }()
  
        sentBytes := make([]byte, bufio.MaxScanTokenSize+MaxLogLine+(1<<22))
        go func() {
                pipeOut.Close()
        }()
  
-       if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+       if before, err := logBuf.ReadBytes('\n'); err != nil || string(before) != "before\n" {
                t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
        }
  
        var receivedBytes []byte
        done := false
        for !done {
-               line, err := rcv.ReadBytes('\n')
+               line, err := logBuf.ReadBytes('\n')
                if err != nil {
                        t.Fatal(err)
                }
@@@ -89,7 -54,7 +54,7 @@@
                        }
                        line = line[5:]
                }
 -              if len(line) >= 6 && string(line[len(line)-6:len(line)]) == "[...]\n" {
 +              if len(line) >= 6 && string(line[len(line)-6:]) == "[...]\n" {
                        line = line[:len(line)-6]
                } else {
                        done = true
                t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
        }
  
-       if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+       if after, err := logBuf.ReadBytes('\n'); err != nil || string(after) != "after\n" {
                t.Fatalf("\"after\n\" not received (got \"%s\", %s)", after, err)
        }
  
        select {
        case <-time.After(time.Second):
                t.Fatal("Timeout")
-       case <-control:
+       case <-copied:
                // Done.
        }
  }
  
- func captureLogs() *bufio.Reader {
-       // Send childLog to our bufio reader instead of stderr
-       stderrIn, stderrOut := io.Pipe()
-       childLog = log.New(stderrOut, "", 0)
-       statLog = log.New(stderrOut, "crunchstat: ", 0)
-       return bufio.NewReader(stderrIn)
- }
- func uncaptureLogs() {
-       childLog = log.New(os.Stderr, "", 0)
-       statLog = log.New(os.Stderr, "crunchstat: ", 0)
+ func bufLogger() (*log.Logger, *bufio.Reader) {
+       r, w := io.Pipe()
+       logger := log.New(w, "", 0)
+       return logger, bufio.NewReader(r)
  }