Merge branch '3381-job-progress-bar-bug' closes #3381
[arvados.git] / services / crunchstat / crunchstat.go
index d61871da6475dcdde342789069492e9e8dcfedf2..387f6478d929941066f94c34ce18c33fb075c862 100644 (file)
@@ -15,30 +15,17 @@ import (
        "time"
 )
 
-func ReadLineByLine(inp io.ReadCloser, out chan string, finish chan bool) {
-       s := bufio.NewScanner(inp)
+func CopyPipeToChan(in io.Reader, out chan string, done chan<- bool) {
+       s := bufio.NewScanner(in)
        for s.Scan() {
                out <- s.Text()
        }
-       finish <- true
+       done <- true
 }
 
-func OutputChannel(stdout chan string, stderr chan string) {
-       for {
-               select {
-               case s, ok := <-stdout:
-                       if ok {
-                               fmt.Fprintln(os.Stdout, s)
-                       } else {
-                               return
-                       }
-               case s, ok := <-stderr:
-                       if ok {
-                               fmt.Fprintln(os.Stderr, s)
-                       } else {
-                               return
-                       }
-               }
+func CopyChanToPipe(in <-chan string, out io.Writer) {
+       for s := range in {
+               fmt.Fprintln(out, s)
        }
 }
 
@@ -63,10 +50,10 @@ func FindStat(cgroup_root string, cgroup_parent string, container_id string, sta
        return ""
 }
 
-func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id string, stderr chan string, poll int64) {
+func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id string, stderr chan string, poll int64, stop_poll_chan <-chan bool) {
        //var last_usage int64 = 0
-       var last_user int64 = 0
-       var last_sys int64 = 0
+       var last_user int64 = -1
+       var last_sys int64 = -1
        var last_cpucount int64 = 0
 
        type Disk struct {
@@ -97,9 +84,25 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
                stderr <- fmt.Sprintf("crunchstat: reading stats from %s", memory_stat)
        }
 
-       var elapsed int64 = poll
-
+       poll_chan := make(chan bool, 1)
+       go func() {
+               // Send periodic poll events.
+               poll_chan <- true
+               for {
+                       time.Sleep(time.Duration(poll) * time.Millisecond)
+                       poll_chan <- true
+               }
+       }()
        for {
+               bedtime := time.Now()
+               select {
+               case <-stop_poll_chan:
+                       return
+               case <-poll_chan:
+                       // Emit stats, then select again.
+               }
+               morning := time.Now()
+               elapsed := morning.Sub(bedtime).Nanoseconds() / int64(time.Millisecond)
                /*{
                        c, _ := os.Open(cpuacct_usage)
                        b, _ := ioutil.ReadAll(c)
@@ -114,7 +117,12 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
                }*/
                var cpus int64 = 0
                if cpuset_cpus != "" {
-                       c, _ := os.Open(cpuset_cpus)
+                       c, err := os.Open(cpuset_cpus)
+                       if err != nil {
+                               stderr <- fmt.Sprintf("open %s: %s", cpuset_cpus, err)
+                               // cgroup probably gone -- skip other stats too.
+                               continue
+                       }
                        b, _ := ioutil.ReadAll(c)
                        sp := strings.Split(string(b), ",")
                        for _, v := range sp {
@@ -138,18 +146,28 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
                        cpus = 1
                }
                if cpuacct_stat != "" {
-                       c, _ := os.Open(cpuacct_stat)
+                       c, err := os.Open(cpuacct_stat)
+                       if err != nil {
+                               stderr <- fmt.Sprintf("open %s: %s", cpuacct_stat, err)
+                               // Next time around, last_user would
+                               // be >1 interval old, so stats will
+                               // be incorrect. Start over instead.
+                               last_user = -1
+
+                               // cgroup probably gone -- skip other stats too.
+                               continue
+                       }
                        b, _ := ioutil.ReadAll(c)
                        var next_user int64
                        var next_sys int64
                        fmt.Sscanf(string(b), "user %d\nsystem %d", &next_user, &next_sys)
                        c.Close()
 
-                       if last_user != 0 {
+                       if elapsed > 0 && last_user != -1 {
                                user_diff := next_user - last_user
                                sys_diff := next_sys - last_sys
                                // Assume we're reading stats based on 100
-                               // jiffies per second.  Because the ellaspsed
+                               // jiffies per second.  Because the elapsed
                                // time is in milliseconds, we need to boost
                                // that to 1000 jiffies per second, then boost
                                // it by another 100x to get a percentage, then
@@ -170,7 +188,12 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
                        last_sys = next_sys
                }
                if blkio_io_service_bytes != "" {
-                       c, _ := os.Open(blkio_io_service_bytes)
+                       c, err := os.Open(blkio_io_service_bytes)
+                       if err != nil {
+                               stderr <- fmt.Sprintf("open %s: %s", blkio_io_service_bytes, err)
+                               // cgroup probably gone -- skip other stats too.
+                               continue
+                       }
                        b := bufio.NewScanner(c)
                        var device, op string
                        var next int64
@@ -199,7 +222,12 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
                }
 
                if memory_stat != "" {
-                       c, _ := os.Open(memory_stat)
+                       c, err := os.Open(memory_stat)
+                       if err != nil {
+                               stderr <- fmt.Sprintf("open %s: %s", memory_stat, err)
+                               // cgroup probably gone -- skip other stats too.
+                               continue
+                       }
                        b := bufio.NewScanner(c)
                        var stat string
                        var val int64
@@ -212,15 +240,10 @@ func PollCgroupStats(cgroup_root string, cgroup_parent string, container_id stri
                        }
                        c.Close()
                }
-
-               bedtime := time.Now()
-               time.Sleep(time.Duration(poll) * time.Millisecond)
-               morning := time.Now()
-               elapsed = morning.Sub(bedtime).Nanoseconds() / int64(time.Millisecond)
        }
 }
 
-func main() {
+func run(logger *log.Logger) error {
 
        var (
                cgroup_root    string
@@ -238,21 +261,16 @@ func main() {
 
        flag.Parse()
 
-       logger := log.New(os.Stderr, "crunchstat: ", 0)
-
        if cgroup_root == "" {
-               logger.Fatal("Must provide either -cgroup-root")
+               logger.Fatal("Must provide -cgroup-root")
        }
 
-       // Make output channel
-       stdout_chan := make(chan string)
-       stderr_chan := make(chan string)
-       finish_chan := make(chan bool)
-       defer close(stdout_chan)
+       stderr_chan := make(chan string, 1)
        defer close(stderr_chan)
+       finish_chan := make(chan bool)
        defer close(finish_chan)
 
-       go OutputChannel(stdout_chan, stderr_chan)
+       go CopyChanToPipe(stderr_chan, os.Stderr)
 
        var cmd *exec.Cmd
 
@@ -262,6 +280,11 @@ func main() {
 
                logger.Print("Running ", flag.Args())
 
+               // Child process will use our stdin and stdout pipes
+               // (we close our copies below)
+               cmd.Stdin = os.Stdin
+               cmd.Stdout = os.Stdout
+
                // Forward SIGINT and SIGTERM to inner process
                term := make(chan os.Signal, 1)
                go func(sig <-chan os.Signal) {
@@ -269,66 +292,76 @@ func main() {
                        if cmd.Process != nil {
                                cmd.Process.Signal(catch)
                        }
-                       logger.Print("caught signal:", catch)
+                       logger.Print("caught signal: ", catch)
                }(term)
                signal.Notify(term, syscall.SIGTERM)
                signal.Notify(term, syscall.SIGINT)
 
-               // Funnel stdout and stderr from subprocess to output channels
-               stdout_pipe, err := cmd.StdoutPipe()
-               if err != nil {
-                       logger.Fatal(err)
-               }
-               go ReadLineByLine(stdout_pipe, stdout_chan, finish_chan)
-
+               // Funnel stderr through our channel
                stderr_pipe, err := cmd.StderrPipe()
                if err != nil {
                        logger.Fatal(err)
                }
-               go ReadLineByLine(stderr_pipe, stderr_chan, finish_chan)
+               go CopyPipeToChan(stderr_pipe, stderr_chan, finish_chan)
 
                // Run subprocess
                if err := cmd.Start(); err != nil {
                        logger.Fatal(err)
                }
+
+               // Close stdin/stdout in this (parent) process
+               os.Stdin.Close()
+               os.Stdout.Close()
        }
 
        // Read the cid file
        var container_id string
        if cgroup_cidfile != "" {
                // wait up to 'wait' seconds for the cid file to appear
+               ok := false
                var i time.Duration
                for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
                        f, err := os.Open(cgroup_cidfile)
                        if err == nil {
+                               defer f.Close()
                                cid, err2 := ioutil.ReadAll(f)
                                if err2 == nil && len(cid) > 0 {
+                                       ok = true
                                        container_id = string(cid)
-                                       f.Close()
                                        break
                                }
                        }
                        time.Sleep(100 * time.Millisecond)
                }
-               if cgroup_root == "" {
+               if !ok {
                        logger.Printf("Could not read cid file %s", cgroup_cidfile)
                }
        }
 
-       go PollCgroupStats(cgroup_root, cgroup_parent, container_id, stderr_chan, poll)
+       stop_poll_chan := make(chan bool, 1)
+       go PollCgroupStats(cgroup_root, cgroup_parent, container_id, stderr_chan, poll, stop_poll_chan)
 
-       // Wait for each of stdout and stderr to drain
-       <-finish_chan
+       // When the child exits, tell the polling goroutine to stop.
+       defer func() { stop_poll_chan <- true }()
+
+       // Wait for CopyPipeToChan to consume child's stderr pipe
        <-finish_chan
 
-       if err := cmd.Wait(); err != nil {
+       return cmd.Wait()
+}
+
+func main() {
+       logger := log.New(os.Stderr, "crunchstat: ", 0)
+       if err := run(logger); err != nil {
                if exiterr, ok := err.(*exec.ExitError); ok {
                        // The program has exited with an exit code != 0
 
-                       // This works on both Unix and Windows. Although package
-                       // syscall is generally platform dependent, WaitStatus is
-                       // defined for both Unix and Windows and in both cases has
-                       // an ExitStatus() method with the same signature.
+                       // This works on both Unix and
+                       // Windows. Although package syscall is
+                       // generally platform dependent, WaitStatus is
+                       // defined for both Unix and Windows and in
+                       // both cases has an ExitStatus() method with
+                       // the same signature.
                        if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
                                os.Exit(status.ExitStatus())
                        }