21225: Fix check for empty object
[arvados.git] / sdk / go / health / aggregator_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package health
6
7 import (
8         "bytes"
9         "crypto/sha256"
10         "encoding/json"
11         "fmt"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "regexp"
16         "runtime"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cmd"
21         "git.arvados.org/arvados.git/lib/config"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/ghodss/yaml"
26         "gopkg.in/check.v1"
27 )
28
29 type AggregatorSuite struct {
30         handler *Aggregator
31         req     *http.Request
32         resp    *httptest.ResponseRecorder
33 }
34
35 // Gocheck boilerplate
36 var _ = check.Suite(&AggregatorSuite{})
37
38 func (s *AggregatorSuite) TestInterface(c *check.C) {
39         var _ http.Handler = &Aggregator{}
40 }
41
42 func (s *AggregatorSuite) SetUpTest(c *check.C) {
43         ldr := config.NewLoader(bytes.NewBufferString(`Clusters: {zzzzz: {}}`), ctxlog.TestLogger(c))
44         ldr.Path = "-"
45         cfg, err := ldr.Load()
46         c.Assert(err, check.IsNil)
47         cluster, err := cfg.GetCluster("")
48         c.Assert(err, check.IsNil)
49         cluster.ManagementToken = arvadostest.ManagementToken
50         cluster.SystemRootToken = arvadostest.SystemRootToken
51         cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
52         cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
53         cluster.Containers.LocalKeepBlobBuffersPerVCPU = 0
54         s.handler = &Aggregator{Cluster: cluster}
55         s.req = httptest.NewRequest("GET", "/_health/all", nil)
56         s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
57         s.resp = httptest.NewRecorder()
58 }
59
60 func (s *AggregatorSuite) TestNoAuth(c *check.C) {
61         s.req.Header.Del("Authorization")
62         s.handler.ServeHTTP(s.resp, s.req)
63         s.checkError(c)
64         c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
65 }
66
67 func (s *AggregatorSuite) TestBadAuth(c *check.C) {
68         s.req.Header.Set("Authorization", "xyzzy")
69         s.handler.ServeHTTP(s.resp, s.req)
70         s.checkError(c)
71         c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
72 }
73
74 func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
75         s.handler.ServeHTTP(s.resp, s.req)
76         s.checkUnhealthy(c)
77 }
78
79 func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
80         srv := httptest.NewServer(handler)
81         var port string
82         if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
83                 panic(srv.URL)
84         } else {
85                 port = parts[len(parts)-1]
86         }
87         return srv, ":" + port
88 }
89
90 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
91         srv, listen := s.stubServer(&unhealthyHandler{})
92         defer srv.Close()
93         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
94         s.handler.ServeHTTP(s.resp, s.req)
95         s.checkUnhealthy(c)
96 }
97
98 func (s *AggregatorSuite) TestHealthy(c *check.C) {
99         srv, listen := s.stubServer(&healthyHandler{})
100         defer srv.Close()
101         s.setAllServiceURLs(listen)
102         s.handler.ServeHTTP(s.resp, s.req)
103         resp := s.checkOK(c)
104         svc := "keepstore+http://localhost" + listen + "/_health/ping"
105         c.Logf("%#v", resp)
106         ep := resp.Checks[svc]
107         c.Check(ep.Health, check.Equals, "OK")
108         c.Check(ep.HTTPStatusCode, check.Equals, 200)
109 }
110
111 func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
112         srvH, listenH := s.stubServer(&healthyHandler{})
113         defer srvH.Close()
114         srvU, listenU := s.stubServer(&unhealthyHandler{})
115         defer srvU.Close()
116         s.setAllServiceURLs(listenH)
117         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
118         s.handler.ServeHTTP(s.resp, s.req)
119         resp := s.checkUnhealthy(c)
120         ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
121         c.Check(ep.Health, check.Equals, "OK")
122         c.Check(ep.HTTPStatusCode, check.Equals, 200)
123         ep = resp.Checks["keepstore+http://127.0.0.1"+listenU+"/_health/ping"]
124         c.Check(ep.Health, check.Equals, "ERROR")
125         c.Check(ep.HTTPStatusCode, check.Equals, 200)
126         c.Logf("%#v", ep)
127 }
128
129 // If an InternalURL host is 0.0.0.0, localhost, 127/8, or ::1 and
130 // nothing is listening there, don't fail the health check -- instead,
131 // assume the relevant component just isn't installed/enabled on this
132 // node, but does work when contacted through ExternalURL.
133 func (s *AggregatorSuite) TestUnreachableLoopbackPort(c *check.C) {
134         srvH, listenH := s.stubServer(&healthyHandler{})
135         defer srvH.Close()
136         s.setAllServiceURLs(listenH)
137         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepproxy, "http://localhost:9/")
138         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Workbench1, "http://0.0.0.0:9/")
139         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepbalance, "http://127.0.0.127:9/")
140         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://[::1]:9/")
141         s.handler.ServeHTTP(s.resp, s.req)
142         s.checkOK(c)
143
144         // If a non-loopback address is unreachable, that's still a
145         // fail.
146         s.resp = httptest.NewRecorder()
147         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://172.31.255.254:9/")
148         s.handler.ServeHTTP(s.resp, s.req)
149         s.checkUnhealthy(c)
150 }
151
152 func (s *AggregatorSuite) TestIsLocalHost(c *check.C) {
153         c.Check(isLocalHost("Localhost"), check.Equals, true)
154         c.Check(isLocalHost("localhost"), check.Equals, true)
155         c.Check(isLocalHost("127.0.0.1"), check.Equals, true)
156         c.Check(isLocalHost("127.0.0.127"), check.Equals, true)
157         c.Check(isLocalHost("127.1.2.7"), check.Equals, true)
158         c.Check(isLocalHost("0.0.0.0"), check.Equals, true)
159         c.Check(isLocalHost("::1"), check.Equals, true)
160         c.Check(isLocalHost("1.2.3.4"), check.Equals, false)
161         c.Check(isLocalHost("1::1"), check.Equals, false)
162         c.Check(isLocalHost("example.com"), check.Equals, false)
163         c.Check(isLocalHost("127.0.0"), check.Equals, false)
164         c.Check(isLocalHost(""), check.Equals, false)
165 }
166
167 func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
168         // time1/hash1: current config
169         time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
170         hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
171         // time2/hash2: old config
172         time2 := time1.Add(-time.Hour)
173         hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
174
175         // srv1: current file
176         handler1 := healthyHandler{configHash: hash1, configTime: time1}
177         srv1, listen1 := s.stubServer(&handler1)
178         defer srv1.Close()
179         // srv2: old file, current content
180         handler2 := healthyHandler{configHash: hash1, configTime: time2}
181         srv2, listen2 := s.stubServer(&handler2)
182         defer srv2.Close()
183         // srv3: old file, old content
184         handler3 := healthyHandler{configHash: hash2, configTime: time2}
185         srv3, listen3 := s.stubServer(&handler3)
186         defer srv3.Close()
187         // srv4: no metrics handler
188         handler4 := healthyHandler{}
189         srv4, listen4 := s.stubServer(&handler4)
190         defer srv4.Close()
191
192         s.setAllServiceURLs(listen1)
193
194         // listen2 => old timestamp, same content => no problem
195         s.resp = httptest.NewRecorder()
196         arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
197                 "http://localhost"+listen2+"/")
198         s.handler.ServeHTTP(s.resp, s.req)
199         resp := s.checkOK(c)
200
201         // listen4 => no metrics on some services => no problem
202         s.resp = httptest.NewRecorder()
203         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
204                 "http://localhost"+listen4+"/")
205         s.handler.ServeHTTP(s.resp, s.req)
206         resp = s.checkOK(c)
207
208         // listen3 => old timestamp, old content => report discrepancy
209         s.resp = httptest.NewRecorder()
210         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
211                 "http://localhost"+listen1+"/",
212                 "http://localhost"+listen3+"/")
213         s.handler.ServeHTTP(s.resp, s.req)
214         resp = s.checkUnhealthy(c)
215         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
216                 c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
217         }
218
219         // no services report config time (migrating to current version) => no problem
220         s.resp = httptest.NewRecorder()
221         s.setAllServiceURLs(listen4)
222         s.handler.ServeHTTP(s.resp, s.req)
223         s.checkOK(c)
224 }
225
226 func (s *AggregatorSuite) TestClockSkew(c *check.C) {
227         // srv1: report real wall clock time
228         handler1 := healthyHandler{}
229         srv1, listen1 := s.stubServer(&handler1)
230         defer srv1.Close()
231         // srv2: report near-future time
232         handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
233         srv2, listen2 := s.stubServer(&handler2)
234         defer srv2.Close()
235         // srv3: report far-future time
236         handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
237         srv3, listen3 := s.stubServer(&handler3)
238         defer srv3.Close()
239
240         s.setAllServiceURLs(listen1)
241
242         // near-future time => OK
243         s.resp = httptest.NewRecorder()
244         arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
245                 "http://localhost"+listen2+"/")
246         s.handler.ServeHTTP(s.resp, s.req)
247         s.checkOK(c)
248
249         // far-future time => error
250         s.resp = httptest.NewRecorder()
251         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
252                 "http://localhost"+listen3+"/")
253         s.handler.ServeHTTP(s.resp, s.req)
254         resp := s.checkUnhealthy(c)
255         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
256                 c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
257         }
258 }
259
260 func (s *AggregatorSuite) TestVersionSkew(c *check.C) {
261         // srv1: report same version
262         handler1 := healthyHandler{version: cmd.Version.String()}
263         srv1, listen1 := s.stubServer(&handler1)
264         defer srv1.Close()
265         // srv2: report same version but without " (go1.2.3)" part
266         handler2 := healthyHandler{version: strings.Fields(cmd.Version.String())[0]}
267         srv2, listen2 := s.stubServer(&handler2)
268         defer srv2.Close()
269         // srv3: report different version
270         handler3 := healthyHandler{version: "1.2.3~4 (" + runtime.Version() + ")"}
271         srv3, listen3 := s.stubServer(&handler3)
272         defer srv3.Close()
273
274         s.setAllServiceURLs(listen1)
275
276         // same version but without go1.2.3 part => OK
277         s.resp = httptest.NewRecorder()
278         arvadostest.SetServiceURL(&s.handler.Cluster.Services.RailsAPI,
279                 "http://localhost"+listen2+"/")
280         s.handler.ServeHTTP(s.resp, s.req)
281         s.checkOK(c)
282
283         // different version => error
284         s.resp = httptest.NewRecorder()
285         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
286                 "http://localhost"+listen3+"/")
287         s.handler.ServeHTTP(s.resp, s.req)
288         resp := s.checkUnhealthy(c)
289         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
290                 c.Check(resp.Errors[0], check.Matches, `version mismatch: \Qkeep-web+http://localhost`+listen3+`\E is running 1.2.3~4 (.*) -- expected \Q`+cmd.Version.String()+`\E`)
291         }
292 }
293
294 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
295         s.handler.timeout = arvados.Duration(100 * time.Millisecond)
296         srv, listen := s.stubServer(&slowHandler{})
297         defer srv.Close()
298         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
299         s.handler.ServeHTTP(s.resp, s.req)
300         resp := s.checkUnhealthy(c)
301         ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
302         c.Check(ep.Health, check.Equals, "ERROR")
303         c.Check(ep.HTTPStatusCode, check.Equals, 0)
304         rt, err := ep.ResponseTime.Float64()
305         c.Check(err, check.IsNil)
306         c.Check(rt > 0.005, check.Equals, true)
307 }
308
309 func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
310         srv, listen := s.stubServer(&healthyHandler{})
311         defer srv.Close()
312         s.setAllServiceURLs(listen)
313         tmpdir := c.MkDir()
314         confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
315         c.Assert(err, check.IsNil)
316         confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
317         err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
318         c.Assert(err, check.IsNil)
319
320         var stdout, stderr bytes.Buffer
321
322         exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
323         c.Check(exitcode, check.Equals, 0)
324         c.Check(stderr.String(), check.Equals, "health check OK\n")
325         c.Check(stdout.String(), check.Equals, "")
326
327         stdout.Reset()
328         stderr.Reset()
329         exitcode = CheckCommand.RunCommand("check", []string{"-quiet", "-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
330         c.Check(exitcode, check.Equals, 0)
331         c.Check(stderr.String(), check.Equals, "")
332         c.Check(stdout.String(), check.Equals, "")
333
334         stdout.Reset()
335         stderr.Reset()
336         exitcode = CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml", "-yaml"}, &bytes.Buffer{}, &stdout, &stderr)
337         c.Check(exitcode, check.Equals, 0)
338         c.Check(stderr.String(), check.Equals, "")
339         c.Check(stdout.String(), check.Matches, `(?ms).*(\n|^)Health: OK\n.*`)
340 }
341
342 func (s *AggregatorSuite) checkError(c *check.C) {
343         c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
344         var resp ClusterHealthResponse
345         err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
346         c.Check(err, check.IsNil)
347         c.Check(resp.Health, check.Not(check.Equals), "OK")
348 }
349
350 func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
351         return s.checkResult(c, "ERROR")
352 }
353
354 func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
355         return s.checkResult(c, "OK")
356 }
357
358 func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
359         c.Check(s.resp.Code, check.Equals, http.StatusOK)
360         var resp ClusterHealthResponse
361         c.Log(s.resp.Body.String())
362         err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
363         c.Check(err, check.IsNil)
364         c.Check(resp.Health, check.Equals, health)
365         return resp
366 }
367
368 func (s *AggregatorSuite) setAllServiceURLs(listen string) {
369         svcs := &s.handler.Cluster.Services
370         for _, svc := range []*arvados.Service{
371                 &svcs.Controller,
372                 &svcs.DispatchCloud,
373                 &svcs.DispatchLSF,
374                 &svcs.DispatchSLURM,
375                 &svcs.Keepbalance,
376                 &svcs.Keepproxy,
377                 &svcs.Keepstore,
378                 &svcs.Health,
379                 &svcs.RailsAPI,
380                 &svcs.WebDAV,
381                 &svcs.Websocket,
382                 &svcs.Workbench1,
383                 &svcs.Workbench2,
384         } {
385                 arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
386         }
387 }
388
389 type unhealthyHandler struct{}
390
391 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
392         if req.URL.Path == "/_health/ping" {
393                 resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
394         } else {
395                 http.Error(resp, "not found", http.StatusNotFound)
396         }
397 }
398
399 type healthyHandler struct {
400         version    string
401         configHash string
402         configTime time.Time
403         headerDate time.Time
404 }
405
406 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
407         if !h.headerDate.IsZero() {
408                 resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
409         }
410         authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
411         if req.URL.Path == "/_health/ping" {
412                 if !authOK {
413                         http.Error(resp, "unauthorized", http.StatusUnauthorized)
414                         return
415                 }
416                 resp.Write([]byte(`{"health":"OK"}`))
417         } else if req.URL.Path == "/metrics" {
418                 if !authOK {
419                         http.Error(resp, "unauthorized", http.StatusUnauthorized)
420                         return
421                 }
422                 t := h.configTime
423                 if t.IsZero() {
424                         t = time.Now()
425                 }
426                 fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
427 # TYPE arvados_config_load_timestamp_seconds gauge
428 arvados_config_load_timestamp_seconds{sha256="%s"} %g
429 # HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
430 # TYPE arvados_config_source_timestamp_seconds gauge
431 arvados_config_source_timestamp_seconds{sha256="%s"} %g
432 # HELP arvados_version_running Indicated version is running.
433 # TYPE arvados_version_running gauge
434 arvados_version_running{version="%s"} 1
435 `,
436                         h.configHash, float64(time.Now().UnixNano())/1e9,
437                         h.configHash, float64(t.UnixNano())/1e9,
438                         h.version)
439         } else {
440                 http.Error(resp, "not found", http.StatusNotFound)
441         }
442 }
443
444 type slowHandler struct{}
445
446 func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
447         if req.URL.Path == "/_health/ping" {
448                 time.Sleep(3 * time.Second)
449                 resp.Write([]byte(`{"health":"OK"}`))
450         } else {
451                 http.Error(resp, "not found", http.StatusNotFound)
452         }
453 }