10903: support need_transaction for job and pipeline_instance cancel methods.
[arvados.git] / services / crunchstat / crunchstat.go
1 package main
2
3 import (
4         "bufio"
5         "flag"
6         "io"
7         "log"
8         "os"
9         "os/exec"
10         "os/signal"
11         "syscall"
12         "time"
13
14         "git.curoverse.com/arvados.git/lib/crunchstat"
15 )
16
17 const MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
18
19 var (
20         signalOnDeadPPID  int = 15
21         ppidCheckInterval     = time.Second
22 )
23
24 func main() {
25         reporter := crunchstat.Reporter{
26                 Logger: log.New(os.Stderr, "crunchstat: ", 0),
27         }
28
29         flag.StringVar(&reporter.CgroupRoot, "cgroup-root", "", "Root of cgroup tree")
30         flag.StringVar(&reporter.CgroupParent, "cgroup-parent", "", "Name of container parent under cgroup")
31         flag.StringVar(&reporter.CIDFile, "cgroup-cid", "", "Path to container id file")
32         flag.IntVar(&signalOnDeadPPID, "signal-on-dead-ppid", signalOnDeadPPID, "Signal to send child if crunchstat's parent process disappears (0 to disable)")
33         flag.DurationVar(&ppidCheckInterval, "ppid-check-interval", ppidCheckInterval, "Time between checks for parent process disappearance")
34         pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds")
35
36         flag.Parse()
37
38         if reporter.CgroupRoot == "" {
39                 reporter.Logger.Fatal("error: must provide -cgroup-root")
40         } else if signalOnDeadPPID < 0 {
41                 reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
42         }
43         reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
44
45         reporter.Start()
46         err := runCommand(flag.Args(), reporter.Logger)
47         reporter.Stop()
48
49         if err, ok := err.(*exec.ExitError); ok {
50                 // The program has exited with an exit code != 0
51
52                 // This works on both Unix and Windows. Although
53                 // package syscall is generally platform dependent,
54                 // WaitStatus is defined for both Unix and Windows and
55                 // in both cases has an ExitStatus() method with the
56                 // same signature.
57                 if status, ok := err.Sys().(syscall.WaitStatus); ok {
58                         os.Exit(status.ExitStatus())
59                 } else {
60                         reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
61                 }
62         } else if err != nil {
63                 reporter.Logger.Fatalln("error in cmd.Wait:", err)
64         }
65 }
66
67 func runCommand(argv []string, logger *log.Logger) error {
68         cmd := exec.Command(argv[0], argv[1:]...)
69
70         logger.Println("Running", argv)
71
72         // Child process will use our stdin and stdout pipes
73         // (we close our copies below)
74         cmd.Stdin = os.Stdin
75         cmd.Stdout = os.Stdout
76
77         // Forward SIGINT and SIGTERM to child process
78         sigChan := make(chan os.Signal, 1)
79         go func(sig <-chan os.Signal) {
80                 catch := <-sig
81                 if cmd.Process != nil {
82                         cmd.Process.Signal(catch)
83                 }
84                 logger.Println("notice: caught signal:", catch)
85         }(sigChan)
86         signal.Notify(sigChan, syscall.SIGTERM)
87         signal.Notify(sigChan, syscall.SIGINT)
88
89         // Kill our child proc if our parent process disappears
90         if signalOnDeadPPID != 0 {
91                 go sendSignalOnDeadPPID(ppidCheckInterval, signalOnDeadPPID, os.Getppid(), cmd, logger)
92         }
93
94         // Funnel stderr through our channel
95         stderr_pipe, err := cmd.StderrPipe()
96         if err != nil {
97                 logger.Fatalln("error in StderrPipe:", err)
98         }
99
100         // Run subprocess
101         if err := cmd.Start(); err != nil {
102                 logger.Fatalln("error in cmd.Start:", err)
103         }
104
105         // Close stdin/stdout in this (parent) process
106         os.Stdin.Close()
107         os.Stdout.Close()
108
109         copyPipeToChildLog(stderr_pipe, log.New(os.Stderr, "", 0))
110
111         return cmd.Wait()
112 }
113
114 func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) {
115         ticker := time.NewTicker(intvl)
116         for _ = range ticker.C {
117                 ppid := os.Getppid()
118                 if ppid == ppidOrig {
119                         continue
120                 }
121                 if cmd.Process == nil {
122                         // Child process isn't running yet
123                         continue
124                 }
125                 logger.Printf("notice: crunchstat ppid changed from %d to %d -- killing child pid %d with signal %d", ppidOrig, ppid, cmd.Process.Pid, signum)
126                 err := cmd.Process.Signal(syscall.Signal(signum))
127                 if err != nil {
128                         logger.Printf("error: sending signal: %s", err)
129                         continue
130                 }
131                 ticker.Stop()
132                 break
133         }
134 }
135
136 func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
137         reader := bufio.NewReaderSize(in, MaxLogLine)
138         var prefix string
139         for {
140                 line, isPrefix, err := reader.ReadLine()
141                 if err == io.EOF {
142                         break
143                 } else if err != nil {
144                         logger.Fatal("error reading child stderr:", err)
145                 }
146                 var suffix string
147                 if isPrefix {
148                         suffix = "[...]"
149                 }
150                 logger.Print(prefix, string(line), suffix)
151                 // Set up prefix for following line
152                 if isPrefix {
153                         prefix = "[...]"
154                 } else {
155                         prefix = ""
156                 }
157         }
158         in.Close()
159 }