18947: Move arvados-dispatch-slurm into arvados-server binary.
[arvados.git] / sdk / go / health / aggregator.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package health
6
7 import (
8         "bufio"
9         "bytes"
10         "context"
11         "crypto/tls"
12         "encoding/json"
13         "errors"
14         "flag"
15         "fmt"
16         "io"
17         "net"
18         "net/http"
19         "net/url"
20         "regexp"
21         "strconv"
22         "strings"
23         "sync"
24         "time"
25
26         "git.arvados.org/arvados.git/lib/cmd"
27         "git.arvados.org/arvados.git/lib/config"
28         "git.arvados.org/arvados.git/sdk/go/arvados"
29         "git.arvados.org/arvados.git/sdk/go/auth"
30         "git.arvados.org/arvados.git/sdk/go/ctxlog"
31         "github.com/ghodss/yaml"
32         "github.com/sirupsen/logrus"
33 )
34
35 const defaultTimeout = arvados.Duration(2 * time.Second)
36
37 // Aggregator implements service.Handler. It handles "GET /_health/all"
38 // by checking the health of all configured services on the cluster
39 // and responding 200 if everything is healthy.
40 type Aggregator struct {
41         setupOnce  sync.Once
42         httpClient *http.Client
43         timeout    arvados.Duration
44
45         Cluster *arvados.Cluster
46
47         // If non-nil, Log is called after handling each request.
48         Log func(*http.Request, error)
49 }
50
51 func (agg *Aggregator) setup() {
52         agg.httpClient = &http.Client{
53                 Transport: &http.Transport{
54                         TLSClientConfig: &tls.Config{
55                                 InsecureSkipVerify: agg.Cluster.TLS.Insecure,
56                         },
57                 },
58         }
59         if agg.timeout == 0 {
60                 // this is always the case, except in the test suite
61                 agg.timeout = defaultTimeout
62         }
63 }
64
65 func (agg *Aggregator) CheckHealth() error {
66         return nil
67 }
68
69 func (agg *Aggregator) Done() <-chan struct{} {
70         return nil
71 }
72
73 func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
74         agg.setupOnce.Do(agg.setup)
75         sendErr := func(statusCode int, err error) {
76                 resp.WriteHeader(statusCode)
77                 json.NewEncoder(resp).Encode(map[string]string{"error": err.Error()})
78                 if agg.Log != nil {
79                         agg.Log(req, err)
80                 }
81         }
82
83         resp.Header().Set("Content-Type", "application/json")
84
85         if !agg.checkAuth(req) {
86                 sendErr(http.StatusUnauthorized, errUnauthorized)
87                 return
88         }
89         if req.URL.Path == "/_health/all" {
90                 json.NewEncoder(resp).Encode(agg.ClusterHealth())
91         } else if req.URL.Path == "/_health/ping" {
92                 resp.Write(healthyBody)
93         } else {
94                 sendErr(http.StatusNotFound, errNotFound)
95                 return
96         }
97         if agg.Log != nil {
98                 agg.Log(req, nil)
99         }
100 }
101
102 type ClusterHealthResponse struct {
103         // "OK" if all needed services are OK, otherwise "ERROR".
104         Health string `json:"health"`
105
106         // An entry for each known health check of each known instance
107         // of each needed component: "instance of service S on node N
108         // reports health-check C is OK."
109         Checks map[string]CheckResult `json:"checks"`
110
111         // An entry for each service type: "service S is OK." This
112         // exposes problems that can't be expressed in Checks, like
113         // "service S is needed, but isn't configured to run
114         // anywhere."
115         Services map[arvados.ServiceName]ServiceHealth `json:"services"`
116
117         Errors []string `json:"errors"`
118 }
119
120 type CheckResult struct {
121         Health         string                 `json:"health"`
122         Error          string                 `json:"error,omitempty"`
123         HTTPStatusCode int                    `json:",omitempty"`
124         HTTPStatusText string                 `json:",omitempty"`
125         Response       map[string]interface{} `json:"response"`
126         ResponseTime   json.Number            `json:"responseTime"`
127         Metrics        Metrics                `json:"-"`
128 }
129
130 type Metrics struct {
131         ConfigSourceTimestamp time.Time
132         ConfigSourceSHA256    string
133 }
134
135 type ServiceHealth struct {
136         Health string `json:"health"` // "OK", "ERROR", or "SKIP"
137         N      int    `json:"n"`
138 }
139
140 func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
141         agg.setupOnce.Do(agg.setup)
142         resp := ClusterHealthResponse{
143                 Health:   "OK",
144                 Checks:   make(map[string]CheckResult),
145                 Services: make(map[arvados.ServiceName]ServiceHealth),
146         }
147
148         mtx := sync.Mutex{}
149         wg := sync.WaitGroup{}
150         for svcName, svc := range agg.Cluster.Services.Map() {
151                 // Ensure svc is listed in resp.Services.
152                 mtx.Lock()
153                 if _, ok := resp.Services[svcName]; !ok {
154                         resp.Services[svcName] = ServiceHealth{Health: "MISSING"}
155                 }
156                 mtx.Unlock()
157
158                 checkURLs := map[arvados.URL]bool{}
159                 for addr := range svc.InternalURLs {
160                         checkURLs[addr] = true
161                 }
162                 if len(checkURLs) == 0 && svc.ExternalURL.Host != "" {
163                         checkURLs[svc.ExternalURL] = true
164                 }
165                 for addr := range checkURLs {
166                         wg.Add(1)
167                         go func(svcName arvados.ServiceName, addr arvados.URL) {
168                                 defer wg.Done()
169                                 var result CheckResult
170                                 pingURL, err := agg.pingURL(addr)
171                                 if err != nil {
172                                         result = CheckResult{
173                                                 Health: "ERROR",
174                                                 Error:  err.Error(),
175                                         }
176                                 } else {
177                                         result = agg.ping(pingURL)
178                                         if result.Health != "SKIP" {
179                                                 m, err := agg.metrics(pingURL)
180                                                 if err != nil && result.Error == "" {
181                                                         result.Error = "metrics: " + err.Error()
182                                                 }
183                                                 result.Metrics = m
184                                         }
185                                 }
186
187                                 mtx.Lock()
188                                 defer mtx.Unlock()
189                                 resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
190                                 if result.Health == "OK" || result.Health == "SKIP" {
191                                         h := resp.Services[svcName]
192                                         h.N++
193                                         if result.Health == "OK" || h.N == 1 {
194                                                 // "" => "SKIP" or "OK"
195                                                 // "SKIP" => "OK"
196                                                 h.Health = result.Health
197                                         }
198                                         resp.Services[svcName] = h
199                                 } else {
200                                         resp.Health = "ERROR"
201                                         resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: %s", svcName, result.Health, result.Error))
202                                 }
203                         }(svcName, addr)
204                 }
205         }
206         wg.Wait()
207
208         // Report ERROR if a needed service didn't fail any checks
209         // merely because it isn't configured to run anywhere.
210         for svcName, sh := range resp.Services {
211                 switch svcName {
212                 case arvados.ServiceNameDispatchCloud,
213                         arvados.ServiceNameDispatchLSF:
214                         // ok to not run any given dispatcher
215                 case arvados.ServiceNameHealth,
216                         arvados.ServiceNameWorkbench1,
217                         arvados.ServiceNameWorkbench2:
218                         // typically doesn't have InternalURLs in config
219                 default:
220                         if sh.Health != "OK" && sh.Health != "SKIP" {
221                                 resp.Health = "ERROR"
222                                 resp.Errors = append(resp.Errors, fmt.Sprintf("%s: %s: no InternalURLs configured", svcName, sh.Health))
223                                 continue
224                         }
225                 }
226         }
227
228         var newest Metrics
229         for _, result := range resp.Checks {
230                 if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
231                         newest = result.Metrics
232                 }
233         }
234         var mismatches []string
235         for target, result := range resp.Checks {
236                 if hash := result.Metrics.ConfigSourceSHA256; hash != "" && hash != newest.ConfigSourceSHA256 {
237                         mismatches = append(mismatches, target)
238                 }
239         }
240         for _, target := range mismatches {
241                 msg := fmt.Sprintf("outdated config: %s: config file (sha256 %s) does not match latest version with timestamp %s",
242                         strings.TrimSuffix(target, "/_health/ping"),
243                         resp.Checks[target].Metrics.ConfigSourceSHA256,
244                         newest.ConfigSourceTimestamp.Format(time.RFC3339))
245                 resp.Errors = append(resp.Errors, msg)
246                 resp.Health = "ERROR"
247         }
248         return resp
249 }
250
251 func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
252         base := url.URL(svcURL)
253         return base.Parse("/_health/ping")
254 }
255
256 func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
257         t0 := time.Now()
258         defer func() {
259                 result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
260         }()
261         result.Health = "ERROR"
262
263         ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
264         defer cancel()
265         req, err := http.NewRequestWithContext(ctx, "GET", target.String(), nil)
266         if err != nil {
267                 result.Error = err.Error()
268                 return
269         }
270         req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
271
272         // Avoid workbench1's redirect-http-to-https feature
273         req.Header.Set("X-Forwarded-Proto", "https")
274
275         resp, err := agg.httpClient.Do(req)
276         if urlerr, ok := err.(*url.Error); ok {
277                 if neterr, ok := urlerr.Err.(*net.OpError); ok && isLocalHost(target.Hostname()) {
278                         result = CheckResult{
279                                 Health: "SKIP",
280                                 Error:  neterr.Error(),
281                         }
282                         err = nil
283                         return
284                 }
285         }
286         if err != nil {
287                 result.Error = err.Error()
288                 return
289         }
290         result.HTTPStatusCode = resp.StatusCode
291         result.HTTPStatusText = resp.Status
292         err = json.NewDecoder(resp.Body).Decode(&result.Response)
293         if err != nil {
294                 result.Error = fmt.Sprintf("cannot decode response: %s", err)
295         } else if resp.StatusCode != http.StatusOK {
296                 result.Error = fmt.Sprintf("HTTP %d %s", resp.StatusCode, resp.Status)
297         } else if h, _ := result.Response["health"].(string); h != "OK" {
298                 if e, ok := result.Response["error"].(string); ok && e != "" {
299                         result.Error = e
300                         return
301                 } else {
302                         result.Error = fmt.Sprintf("health=%q in ping response", h)
303                         return
304                 }
305         }
306         result.Health = "OK"
307         return
308 }
309
310 var reMetric = regexp.MustCompile(`([a-z_]+){sha256="([0-9a-f]+)"} (\d[\d\.e\+]+)`)
311
312 func (agg *Aggregator) metrics(pingURL *url.URL) (result Metrics, err error) {
313         metricsURL, err := pingURL.Parse("/metrics")
314         if err != nil {
315                 return
316         }
317         ctx, cancel := context.WithTimeout(context.Background(), time.Duration(agg.timeout))
318         defer cancel()
319         req, err := http.NewRequestWithContext(ctx, "GET", metricsURL.String(), nil)
320         if err != nil {
321                 return
322         }
323         req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
324
325         // Avoid workbench1's redirect-http-to-https feature
326         req.Header.Set("X-Forwarded-Proto", "https")
327
328         resp, err := agg.httpClient.Do(req)
329         if err != nil {
330                 return
331         } else if resp.StatusCode != http.StatusOK {
332                 err = fmt.Errorf("%s: HTTP %d %s", metricsURL.String(), resp.StatusCode, resp.Status)
333                 return
334         }
335
336         scanner := bufio.NewScanner(resp.Body)
337         for scanner.Scan() {
338                 m := reMetric.FindSubmatch(scanner.Bytes())
339                 if len(m) != 4 || string(m[1]) != "arvados_config_source_timestamp_seconds" {
340                         continue
341                 }
342                 result.ConfigSourceSHA256 = string(m[2])
343                 unixtime, _ := strconv.ParseFloat(string(m[3]), 64)
344                 result.ConfigSourceTimestamp = time.UnixMicro(int64(unixtime * 1e6))
345         }
346         if err = scanner.Err(); err != nil {
347                 err = fmt.Errorf("error parsing response from %s: %w", metricsURL.String(), err)
348                 return
349         }
350         return
351 }
352
353 // Test whether host is an easily recognizable loopback address:
354 // 0.0.0.0, 127.x.x.x, ::1, or localhost.
355 func isLocalHost(host string) bool {
356         ip := net.ParseIP(host)
357         return ip.IsLoopback() || bytes.Equal(ip.To4(), []byte{0, 0, 0, 0}) || strings.EqualFold(host, "localhost")
358 }
359
360 func (agg *Aggregator) checkAuth(req *http.Request) bool {
361         creds := auth.CredentialsFromRequest(req)
362         for _, token := range creds.Tokens {
363                 if token != "" && token == agg.Cluster.ManagementToken {
364                         return true
365                 }
366         }
367         return false
368 }
369
370 var errSilent = errors.New("")
371
372 var CheckCommand cmd.Handler = checkCommand{}
373
374 type checkCommand struct{}
375
376 func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
377         logger := ctxlog.New(stderr, "json", "info")
378         ctx := ctxlog.Context(context.Background(), logger)
379         err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
380         if err != nil {
381                 if err != errSilent {
382                         fmt.Fprintln(stdout, err.Error())
383                 }
384                 return 1
385         }
386         return 0
387 }
388
389 func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
390         flags := flag.NewFlagSet("", flag.ContinueOnError)
391         flags.SetOutput(stderr)
392         loader := config.NewLoader(stdin, ctxlog.New(stderr, "text", "info"))
393         loader.SetupFlags(flags)
394         versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
395         timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
396         outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
397         if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
398                 // cmd.ParseFlags already reported the error
399                 return errSilent
400         } else if *versionFlag {
401                 cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
402                 return nil
403         }
404         cfg, err := loader.Load()
405         if err != nil {
406                 return err
407         }
408         cluster, err := cfg.GetCluster("")
409         if err != nil {
410                 return err
411         }
412         logger := ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
413                 "ClusterID": cluster.ClusterID,
414         })
415         ctx = ctxlog.Context(ctx, logger)
416         agg := Aggregator{Cluster: cluster, timeout: arvados.Duration(*timeout)}
417         resp := agg.ClusterHealth()
418         if *outputYAML {
419                 y, err := yaml.Marshal(resp)
420                 if err != nil {
421                         return err
422                 }
423                 stdout.Write(y)
424                 if resp.Health != "OK" {
425                         return errSilent
426                 }
427                 return nil
428         }
429         if resp.Health != "OK" {
430                 for _, msg := range resp.Errors {
431                         fmt.Fprintln(stdout, msg)
432                 }
433                 fmt.Fprintln(stderr, "health check failed")
434                 return errSilent
435         }
436         return nil
437 }