19388: Add tests for activity logs.
[arvados.git] / services / crunchstat / crunchstat.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bufio"
9         "flag"
10         "fmt"
11         "io"
12         "log"
13         "os"
14         "os/exec"
15         "os/signal"
16         "syscall"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cmd"
20         "git.arvados.org/arvados.git/lib/crunchstat"
21 )
22
23 const MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
24
25 var (
26         signalOnDeadPPID  int = 15
27         ppidCheckInterval     = time.Second
28         version               = "dev"
29 )
30
31 func main() {
32         reporter := crunchstat.Reporter{
33                 Logger: log.New(os.Stderr, "crunchstat: ", 0),
34         }
35
36         flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
37         flags.StringVar(&reporter.CgroupRoot, "cgroup-root", "", "Root of cgroup tree")
38         flags.StringVar(&reporter.CgroupParent, "cgroup-parent", "", "Name of container parent under cgroup")
39         flags.StringVar(&reporter.CIDFile, "cgroup-cid", "", "Path to container id file")
40         flags.IntVar(&signalOnDeadPPID, "signal-on-dead-ppid", signalOnDeadPPID, "Signal to send child if crunchstat's parent process disappears (0 to disable)")
41         flags.DurationVar(&ppidCheckInterval, "ppid-check-interval", ppidCheckInterval, "Time between checks for parent process disappearance")
42         pollMsec := flags.Int64("poll", 1000, "Reporting interval, in milliseconds")
43         getVersion := flags.Bool("version", false, "Print version information and exit.")
44
45         if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "program [args ...]", os.Stderr); !ok {
46                 os.Exit(code)
47         } else if *getVersion {
48                 fmt.Printf("crunchstat %s\n", version)
49                 return
50         } else if flags.NArg() == 0 {
51                 fmt.Fprintf(os.Stderr, "missing required argument: program (try -help)\n")
52                 os.Exit(2)
53         }
54
55         reporter.Logger.Printf("crunchstat %s started", version)
56
57         if reporter.CgroupRoot == "" {
58                 reporter.Logger.Fatal("error: must provide -cgroup-root")
59         } else if signalOnDeadPPID < 0 {
60                 reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
61         }
62         reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
63
64         reporter.Start()
65         err := runCommand(flags.Args(), reporter.Logger)
66         reporter.Stop()
67
68         if err, ok := err.(*exec.ExitError); ok {
69                 // The program has exited with an exit code != 0
70
71                 // This works on both Unix and Windows. Although
72                 // package syscall is generally platform dependent,
73                 // WaitStatus is defined for both Unix and Windows and
74                 // in both cases has an ExitStatus() method with the
75                 // same signature.
76                 if status, ok := err.Sys().(syscall.WaitStatus); ok {
77                         os.Exit(status.ExitStatus())
78                 } else {
79                         reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
80                 }
81         } else if err != nil {
82                 reporter.Logger.Fatalln("error in cmd.Wait:", err)
83         }
84 }
85
86 func runCommand(argv []string, logger *log.Logger) error {
87         cmd := exec.Command(argv[0], argv[1:]...)
88
89         logger.Println("Running", argv)
90
91         // Child process will use our stdin and stdout pipes
92         // (we close our copies below)
93         cmd.Stdin = os.Stdin
94         cmd.Stdout = os.Stdout
95
96         // Forward SIGINT and SIGTERM to child process
97         sigChan := make(chan os.Signal, 1)
98         go func(sig <-chan os.Signal) {
99                 catch := <-sig
100                 if cmd.Process != nil {
101                         cmd.Process.Signal(catch)
102                 }
103                 logger.Println("notice: caught signal:", catch)
104         }(sigChan)
105         signal.Notify(sigChan, syscall.SIGTERM)
106         signal.Notify(sigChan, syscall.SIGINT)
107
108         // Kill our child proc if our parent process disappears
109         if signalOnDeadPPID != 0 {
110                 go sendSignalOnDeadPPID(ppidCheckInterval, signalOnDeadPPID, os.Getppid(), cmd, logger)
111         }
112
113         // Funnel stderr through our channel
114         stderrPipe, err := cmd.StderrPipe()
115         if err != nil {
116                 logger.Fatalln("error in StderrPipe:", err)
117         }
118
119         // Run subprocess
120         if err := cmd.Start(); err != nil {
121                 logger.Fatalln("error in cmd.Start:", err)
122         }
123
124         // Close stdin/stdout in this (parent) process
125         os.Stdin.Close()
126         os.Stdout.Close()
127
128         copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
129
130         return cmd.Wait()
131 }
132
133 func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) {
134         ticker := time.NewTicker(intvl)
135         for range ticker.C {
136                 ppid := os.Getppid()
137                 if ppid == ppidOrig {
138                         continue
139                 }
140                 if cmd.Process == nil {
141                         // Child process isn't running yet
142                         continue
143                 }
144                 logger.Printf("notice: crunchstat ppid changed from %d to %d -- killing child pid %d with signal %d", ppidOrig, ppid, cmd.Process.Pid, signum)
145                 err := cmd.Process.Signal(syscall.Signal(signum))
146                 if err != nil {
147                         logger.Printf("error: sending signal: %s", err)
148                         continue
149                 }
150                 ticker.Stop()
151                 break
152         }
153 }
154
155 func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
156         reader := bufio.NewReaderSize(in, MaxLogLine)
157         var prefix string
158         for {
159                 line, isPrefix, err := reader.ReadLine()
160                 if err == io.EOF {
161                         break
162                 } else if err != nil {
163                         logger.Fatal("error reading child stderr:", err)
164                 }
165                 var suffix string
166                 if isPrefix {
167                         suffix = "[...]"
168                 }
169                 logger.Print(prefix, string(line), suffix)
170                 // Set up prefix for following line
171                 if isPrefix {
172                         prefix = "[...]"
173                 } else {
174                         prefix = ""
175                 }
176         }
177         in.Close()
178 }