Merge branch '18983-warn-unused-local-keep'
[arvados.git] / sdk / go / health / aggregator_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package health
6
7 import (
8         "bytes"
9         "crypto/sha256"
10         "encoding/json"
11         "fmt"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "regexp"
16         "strings"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/config"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/arvadostest"
22         "git.arvados.org/arvados.git/sdk/go/ctxlog"
23         "github.com/ghodss/yaml"
24         "gopkg.in/check.v1"
25 )
26
27 type AggregatorSuite struct {
28         handler *Aggregator
29         req     *http.Request
30         resp    *httptest.ResponseRecorder
31 }
32
33 // Gocheck boilerplate
34 var _ = check.Suite(&AggregatorSuite{})
35
36 func (s *AggregatorSuite) TestInterface(c *check.C) {
37         var _ http.Handler = &Aggregator{}
38 }
39
40 func (s *AggregatorSuite) SetUpTest(c *check.C) {
41         ldr := config.NewLoader(bytes.NewBufferString(`Clusters: {zzzzz: {}}`), ctxlog.TestLogger(c))
42         ldr.Path = "-"
43         cfg, err := ldr.Load()
44         c.Assert(err, check.IsNil)
45         cluster, err := cfg.GetCluster("")
46         c.Assert(err, check.IsNil)
47         cluster.ManagementToken = arvadostest.ManagementToken
48         cluster.SystemRootToken = arvadostest.SystemRootToken
49         cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
50         cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
51         cluster.Containers.LocalKeepBlobBuffersPerVCPU = 0
52         s.handler = &Aggregator{Cluster: cluster}
53         s.req = httptest.NewRequest("GET", "/_health/all", nil)
54         s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
55         s.resp = httptest.NewRecorder()
56 }
57
58 func (s *AggregatorSuite) TestNoAuth(c *check.C) {
59         s.req.Header.Del("Authorization")
60         s.handler.ServeHTTP(s.resp, s.req)
61         s.checkError(c)
62         c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
63 }
64
65 func (s *AggregatorSuite) TestBadAuth(c *check.C) {
66         s.req.Header.Set("Authorization", "xyzzy")
67         s.handler.ServeHTTP(s.resp, s.req)
68         s.checkError(c)
69         c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
70 }
71
72 func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
73         s.handler.ServeHTTP(s.resp, s.req)
74         s.checkUnhealthy(c)
75 }
76
77 func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
78         srv := httptest.NewServer(handler)
79         var port string
80         if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
81                 panic(srv.URL)
82         } else {
83                 port = parts[len(parts)-1]
84         }
85         return srv, ":" + port
86 }
87
88 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
89         srv, listen := s.stubServer(&unhealthyHandler{})
90         defer srv.Close()
91         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
92         s.handler.ServeHTTP(s.resp, s.req)
93         s.checkUnhealthy(c)
94 }
95
96 func (s *AggregatorSuite) TestHealthy(c *check.C) {
97         srv, listen := s.stubServer(&healthyHandler{})
98         defer srv.Close()
99         s.setAllServiceURLs(listen)
100         s.handler.ServeHTTP(s.resp, s.req)
101         resp := s.checkOK(c)
102         svc := "keepstore+http://localhost" + listen + "/_health/ping"
103         c.Logf("%#v", resp)
104         ep := resp.Checks[svc]
105         c.Check(ep.Health, check.Equals, "OK")
106         c.Check(ep.HTTPStatusCode, check.Equals, 200)
107 }
108
109 func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
110         srvH, listenH := s.stubServer(&healthyHandler{})
111         defer srvH.Close()
112         srvU, listenU := s.stubServer(&unhealthyHandler{})
113         defer srvU.Close()
114         s.setAllServiceURLs(listenH)
115         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
116         s.handler.ServeHTTP(s.resp, s.req)
117         resp := s.checkUnhealthy(c)
118         ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
119         c.Check(ep.Health, check.Equals, "OK")
120         c.Check(ep.HTTPStatusCode, check.Equals, 200)
121         ep = resp.Checks["keepstore+http://127.0.0.1"+listenU+"/_health/ping"]
122         c.Check(ep.Health, check.Equals, "ERROR")
123         c.Check(ep.HTTPStatusCode, check.Equals, 200)
124         c.Logf("%#v", ep)
125 }
126
127 // If an InternalURL host is 0.0.0.0, localhost, 127/8, or ::1 and
128 // nothing is listening there, don't fail the health check -- instead,
129 // assume the relevant component just isn't installed/enabled on this
130 // node, but does work when contacted through ExternalURL.
131 func (s *AggregatorSuite) TestUnreachableLoopbackPort(c *check.C) {
132         srvH, listenH := s.stubServer(&healthyHandler{})
133         defer srvH.Close()
134         s.setAllServiceURLs(listenH)
135         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepproxy, "http://localhost:9/")
136         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Workbench1, "http://0.0.0.0:9/")
137         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepbalance, "http://127.0.0.127:9/")
138         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://[::1]:9/")
139         s.handler.ServeHTTP(s.resp, s.req)
140         s.checkOK(c)
141
142         // If a non-loopback address is unreachable, that's still a
143         // fail.
144         s.resp = httptest.NewRecorder()
145         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://172.31.255.254:9/")
146         s.handler.ServeHTTP(s.resp, s.req)
147         s.checkUnhealthy(c)
148 }
149
150 func (s *AggregatorSuite) TestIsLocalHost(c *check.C) {
151         c.Check(isLocalHost("Localhost"), check.Equals, true)
152         c.Check(isLocalHost("localhost"), check.Equals, true)
153         c.Check(isLocalHost("127.0.0.1"), check.Equals, true)
154         c.Check(isLocalHost("127.0.0.127"), check.Equals, true)
155         c.Check(isLocalHost("127.1.2.7"), check.Equals, true)
156         c.Check(isLocalHost("0.0.0.0"), check.Equals, true)
157         c.Check(isLocalHost("::1"), check.Equals, true)
158         c.Check(isLocalHost("1.2.3.4"), check.Equals, false)
159         c.Check(isLocalHost("1::1"), check.Equals, false)
160         c.Check(isLocalHost("example.com"), check.Equals, false)
161         c.Check(isLocalHost("127.0.0"), check.Equals, false)
162         c.Check(isLocalHost(""), check.Equals, false)
163 }
164
165 func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
166         // time1/hash1: current config
167         time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
168         hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
169         // time2/hash2: old config
170         time2 := time1.Add(-time.Hour)
171         hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
172
173         // srv1: current file
174         handler1 := healthyHandler{configHash: hash1, configTime: time1}
175         srv1, listen1 := s.stubServer(&handler1)
176         defer srv1.Close()
177         // srv2: old file, current content
178         handler2 := healthyHandler{configHash: hash1, configTime: time2}
179         srv2, listen2 := s.stubServer(&handler2)
180         defer srv2.Close()
181         // srv3: old file, old content
182         handler3 := healthyHandler{configHash: hash2, configTime: time2}
183         srv3, listen3 := s.stubServer(&handler3)
184         defer srv3.Close()
185         // srv4: no metrics handler
186         handler4 := healthyHandler{}
187         srv4, listen4 := s.stubServer(&handler4)
188         defer srv4.Close()
189
190         s.setAllServiceURLs(listen1)
191
192         // listen2 => old timestamp, same content => no problem
193         s.resp = httptest.NewRecorder()
194         arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
195                 "http://localhost"+listen2+"/")
196         s.handler.ServeHTTP(s.resp, s.req)
197         resp := s.checkOK(c)
198
199         // listen4 => no metrics on some services => no problem
200         s.resp = httptest.NewRecorder()
201         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
202                 "http://localhost"+listen4+"/")
203         s.handler.ServeHTTP(s.resp, s.req)
204         resp = s.checkOK(c)
205
206         // listen3 => old timestamp, old content => report discrepancy
207         s.resp = httptest.NewRecorder()
208         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
209                 "http://localhost"+listen1+"/",
210                 "http://localhost"+listen3+"/")
211         s.handler.ServeHTTP(s.resp, s.req)
212         resp = s.checkUnhealthy(c)
213         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
214                 c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
215         }
216
217         // no services report config time (migrating to current version) => no problem
218         s.resp = httptest.NewRecorder()
219         s.setAllServiceURLs(listen4)
220         s.handler.ServeHTTP(s.resp, s.req)
221         s.checkOK(c)
222 }
223
224 func (s *AggregatorSuite) TestClockSkew(c *check.C) {
225         // srv1: report real wall clock time
226         handler1 := healthyHandler{}
227         srv1, listen1 := s.stubServer(&handler1)
228         defer srv1.Close()
229         // srv2: report near-future time
230         handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
231         srv2, listen2 := s.stubServer(&handler2)
232         defer srv2.Close()
233         // srv3: report far-future time
234         handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
235         srv3, listen3 := s.stubServer(&handler3)
236         defer srv3.Close()
237
238         s.setAllServiceURLs(listen1)
239
240         // near-future time => OK
241         s.resp = httptest.NewRecorder()
242         arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
243                 "http://localhost"+listen2+"/")
244         s.handler.ServeHTTP(s.resp, s.req)
245         s.checkOK(c)
246
247         // far-future time => error
248         s.resp = httptest.NewRecorder()
249         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
250                 "http://localhost"+listen3+"/")
251         s.handler.ServeHTTP(s.resp, s.req)
252         resp := s.checkUnhealthy(c)
253         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
254                 c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
255         }
256 }
257
258 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
259         s.handler.timeout = arvados.Duration(100 * time.Millisecond)
260         srv, listen := s.stubServer(&slowHandler{})
261         defer srv.Close()
262         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
263         s.handler.ServeHTTP(s.resp, s.req)
264         resp := s.checkUnhealthy(c)
265         ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
266         c.Check(ep.Health, check.Equals, "ERROR")
267         c.Check(ep.HTTPStatusCode, check.Equals, 0)
268         rt, err := ep.ResponseTime.Float64()
269         c.Check(err, check.IsNil)
270         c.Check(rt > 0.005, check.Equals, true)
271 }
272
273 func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
274         srv, listen := s.stubServer(&healthyHandler{})
275         defer srv.Close()
276         s.setAllServiceURLs(listen)
277         tmpdir := c.MkDir()
278         confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
279         c.Assert(err, check.IsNil)
280         confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
281         err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
282         c.Assert(err, check.IsNil)
283
284         var stdout, stderr bytes.Buffer
285
286         exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
287         c.Check(exitcode, check.Equals, 0)
288         c.Check(stderr.String(), check.Equals, "")
289         c.Check(stdout.String(), check.Equals, "")
290
291         stdout.Reset()
292         stderr.Reset()
293         exitcode = CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml", "-yaml"}, &bytes.Buffer{}, &stdout, &stderr)
294         c.Check(exitcode, check.Equals, 0)
295         c.Check(stderr.String(), check.Equals, "")
296         c.Check(stdout.String(), check.Matches, `(?ms).*(\n|^)health: OK\n.*`)
297 }
298
299 func (s *AggregatorSuite) checkError(c *check.C) {
300         c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
301         var resp ClusterHealthResponse
302         err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
303         c.Check(err, check.IsNil)
304         c.Check(resp.Health, check.Not(check.Equals), "OK")
305 }
306
307 func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
308         return s.checkResult(c, "ERROR")
309 }
310
311 func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
312         return s.checkResult(c, "OK")
313 }
314
315 func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
316         c.Check(s.resp.Code, check.Equals, http.StatusOK)
317         var resp ClusterHealthResponse
318         c.Log(s.resp.Body.String())
319         err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
320         c.Check(err, check.IsNil)
321         c.Check(resp.Health, check.Equals, health)
322         return resp
323 }
324
325 func (s *AggregatorSuite) setAllServiceURLs(listen string) {
326         svcs := &s.handler.Cluster.Services
327         for _, svc := range []*arvados.Service{
328                 &svcs.Controller,
329                 &svcs.DispatchCloud,
330                 &svcs.DispatchLSF,
331                 &svcs.DispatchSLURM,
332                 &svcs.GitHTTP,
333                 &svcs.Keepbalance,
334                 &svcs.Keepproxy,
335                 &svcs.Keepstore,
336                 &svcs.Health,
337                 &svcs.RailsAPI,
338                 &svcs.WebDAV,
339                 &svcs.Websocket,
340                 &svcs.Workbench1,
341                 &svcs.Workbench2,
342         } {
343                 arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
344         }
345 }
346
347 type unhealthyHandler struct{}
348
349 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
350         if req.URL.Path == "/_health/ping" {
351                 resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
352         } else {
353                 http.Error(resp, "not found", http.StatusNotFound)
354         }
355 }
356
357 type healthyHandler struct {
358         configHash string
359         configTime time.Time
360         headerDate time.Time
361 }
362
363 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
364         if !h.headerDate.IsZero() {
365                 resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
366         }
367         authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
368         if req.URL.Path == "/_health/ping" {
369                 if !authOK {
370                         http.Error(resp, "unauthorized", http.StatusUnauthorized)
371                         return
372                 }
373                 resp.Write([]byte(`{"health":"OK"}`))
374         } else if req.URL.Path == "/metrics" {
375                 if !authOK {
376                         http.Error(resp, "unauthorized", http.StatusUnauthorized)
377                         return
378                 }
379                 t := h.configTime
380                 if t.IsZero() {
381                         t = time.Now()
382                 }
383                 fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
384 # TYPE arvados_config_load_timestamp_seconds gauge
385 arvados_config_load_timestamp_seconds{sha256="%s"} %g
386 # HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
387 # TYPE arvados_config_source_timestamp_seconds gauge
388 arvados_config_source_timestamp_seconds{sha256="%s"} %g
389 `,
390                         h.configHash, float64(time.Now().UnixNano())/1e9,
391                         h.configHash, float64(t.UnixNano())/1e9)
392         } else {
393                 http.Error(resp, "not found", http.StatusNotFound)
394         }
395 }
396
397 type slowHandler struct{}
398
399 func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
400         if req.URL.Path == "/_health/ping" {
401                 time.Sleep(3 * time.Second)
402                 resp.Write([]byte(`{"health":"OK"}`))
403         } else {
404                 http.Error(resp, "not found", http.StatusNotFound)
405         }
406 }