18794: Fix "check" command test.
[arvados.git] / sdk / go / health / aggregator_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package health
6
7 import (
8         "bytes"
9         "crypto/sha256"
10         "encoding/json"
11         "fmt"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "regexp"
16         "strings"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/config"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/arvadostest"
22         "git.arvados.org/arvados.git/sdk/go/ctxlog"
23         "github.com/ghodss/yaml"
24         "gopkg.in/check.v1"
25 )
26
27 type AggregatorSuite struct {
28         handler *Aggregator
29         req     *http.Request
30         resp    *httptest.ResponseRecorder
31 }
32
33 // Gocheck boilerplate
34 var _ = check.Suite(&AggregatorSuite{})
35
36 func (s *AggregatorSuite) TestInterface(c *check.C) {
37         var _ http.Handler = &Aggregator{}
38 }
39
40 func (s *AggregatorSuite) SetUpTest(c *check.C) {
41         ldr := config.NewLoader(bytes.NewBufferString(`Clusters: {zzzzz: {}}`), ctxlog.TestLogger(c))
42         ldr.Path = "-"
43         cfg, err := ldr.Load()
44         c.Assert(err, check.IsNil)
45         cluster, err := cfg.GetCluster("")
46         c.Assert(err, check.IsNil)
47         cluster.ManagementToken = arvadostest.ManagementToken
48         cluster.SystemRootToken = arvadostest.SystemRootToken
49         cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
50         cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
51         s.handler = &Aggregator{Cluster: cluster}
52         s.req = httptest.NewRequest("GET", "/_health/all", nil)
53         s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
54         s.resp = httptest.NewRecorder()
55 }
56
57 func (s *AggregatorSuite) TestNoAuth(c *check.C) {
58         s.req.Header.Del("Authorization")
59         s.handler.ServeHTTP(s.resp, s.req)
60         s.checkError(c)
61         c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
62 }
63
64 func (s *AggregatorSuite) TestBadAuth(c *check.C) {
65         s.req.Header.Set("Authorization", "xyzzy")
66         s.handler.ServeHTTP(s.resp, s.req)
67         s.checkError(c)
68         c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
69 }
70
71 func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
72         s.handler.ServeHTTP(s.resp, s.req)
73         s.checkUnhealthy(c)
74 }
75
76 func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
77         srv := httptest.NewServer(handler)
78         var port string
79         if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
80                 panic(srv.URL)
81         } else {
82                 port = parts[len(parts)-1]
83         }
84         return srv, ":" + port
85 }
86
87 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
88         srv, listen := s.stubServer(&unhealthyHandler{})
89         defer srv.Close()
90         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
91         s.handler.ServeHTTP(s.resp, s.req)
92         s.checkUnhealthy(c)
93 }
94
95 func (s *AggregatorSuite) TestHealthy(c *check.C) {
96         srv, listen := s.stubServer(&healthyHandler{})
97         defer srv.Close()
98         s.setAllServiceURLs(listen)
99         s.handler.ServeHTTP(s.resp, s.req)
100         resp := s.checkOK(c)
101         svc := "keepstore+http://localhost" + listen + "/_health/ping"
102         c.Logf("%#v", resp)
103         ep := resp.Checks[svc]
104         c.Check(ep.Health, check.Equals, "OK")
105         c.Check(ep.HTTPStatusCode, check.Equals, 200)
106 }
107
108 func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
109         srvH, listenH := s.stubServer(&healthyHandler{})
110         defer srvH.Close()
111         srvU, listenU := s.stubServer(&unhealthyHandler{})
112         defer srvU.Close()
113         s.setAllServiceURLs(listenH)
114         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
115         s.handler.ServeHTTP(s.resp, s.req)
116         resp := s.checkUnhealthy(c)
117         ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
118         c.Check(ep.Health, check.Equals, "OK")
119         c.Check(ep.HTTPStatusCode, check.Equals, 200)
120         ep = resp.Checks["keepstore+http://127.0.0.1"+listenU+"/_health/ping"]
121         c.Check(ep.Health, check.Equals, "ERROR")
122         c.Check(ep.HTTPStatusCode, check.Equals, 200)
123         c.Logf("%#v", ep)
124 }
125
126 // If an InternalURL host is 0.0.0.0, localhost, 127/8, or ::1 and
127 // nothing is listening there, don't fail the health check -- instead,
128 // assume the relevant component just isn't installed/enabled on this
129 // node, but does work when contacted through ExternalURL.
130 func (s *AggregatorSuite) TestUnreachableLoopbackPort(c *check.C) {
131         srvH, listenH := s.stubServer(&healthyHandler{})
132         defer srvH.Close()
133         s.setAllServiceURLs(listenH)
134         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepproxy, "http://localhost:9/")
135         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Workbench1, "http://0.0.0.0:9/")
136         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepbalance, "http://127.0.0.127:9/")
137         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://[::1]:9/")
138         s.handler.ServeHTTP(s.resp, s.req)
139         s.checkOK(c)
140
141         // If a non-loopback address is unreachable, that's still a
142         // fail.
143         s.resp = httptest.NewRecorder()
144         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://172.31.255.254:9/")
145         s.handler.ServeHTTP(s.resp, s.req)
146         s.checkUnhealthy(c)
147 }
148
149 func (s *AggregatorSuite) TestIsLocalHost(c *check.C) {
150         c.Check(isLocalHost("Localhost"), check.Equals, true)
151         c.Check(isLocalHost("localhost"), check.Equals, true)
152         c.Check(isLocalHost("127.0.0.1"), check.Equals, true)
153         c.Check(isLocalHost("127.0.0.127"), check.Equals, true)
154         c.Check(isLocalHost("127.1.2.7"), check.Equals, true)
155         c.Check(isLocalHost("0.0.0.0"), check.Equals, true)
156         c.Check(isLocalHost("::1"), check.Equals, true)
157         c.Check(isLocalHost("1.2.3.4"), check.Equals, false)
158         c.Check(isLocalHost("1::1"), check.Equals, false)
159         c.Check(isLocalHost("example.com"), check.Equals, false)
160         c.Check(isLocalHost("127.0.0"), check.Equals, false)
161         c.Check(isLocalHost(""), check.Equals, false)
162 }
163
164 func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
165         // time1/hash1: current config
166         time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
167         hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
168         // time2/hash2: old config
169         time2 := time1.Add(-time.Hour)
170         hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
171
172         // srv1: current file
173         handler1 := healthyHandler{configHash: hash1, configTime: time1}
174         srv1, listen1 := s.stubServer(&handler1)
175         defer srv1.Close()
176         // srv2: old file, current content
177         handler2 := healthyHandler{configHash: hash1, configTime: time2}
178         srv2, listen2 := s.stubServer(&handler2)
179         defer srv2.Close()
180         // srv3: old file, old content
181         handler3 := healthyHandler{configHash: hash2, configTime: time2}
182         srv3, listen3 := s.stubServer(&handler3)
183         defer srv3.Close()
184         // srv4: no metrics handler
185         handler4 := healthyHandler{}
186         srv4, listen4 := s.stubServer(&handler4)
187         defer srv4.Close()
188
189         s.setAllServiceURLs(listen1)
190
191         // listen2 => old timestamp, same content => no problem
192         s.resp = httptest.NewRecorder()
193         arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
194                 "http://localhost"+listen2+"/")
195         s.handler.ServeHTTP(s.resp, s.req)
196         resp := s.checkOK(c)
197
198         // listen4 => no metrics on some services => no problem
199         s.resp = httptest.NewRecorder()
200         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
201                 "http://localhost"+listen4+"/")
202         s.handler.ServeHTTP(s.resp, s.req)
203         resp = s.checkOK(c)
204
205         // listen3 => old timestamp, old content => report discrepancy
206         s.resp = httptest.NewRecorder()
207         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
208                 "http://localhost"+listen1+"/",
209                 "http://localhost"+listen3+"/")
210         s.handler.ServeHTTP(s.resp, s.req)
211         resp = s.checkUnhealthy(c)
212         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
213                 c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
214         }
215
216         // no services report config time (migrating to current version) => no problem
217         s.resp = httptest.NewRecorder()
218         s.setAllServiceURLs(listen4)
219         s.handler.ServeHTTP(s.resp, s.req)
220         s.checkOK(c)
221 }
222
223 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
224         s.handler.timeout = arvados.Duration(100 * time.Millisecond)
225         srv, listen := s.stubServer(&slowHandler{})
226         defer srv.Close()
227         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
228         s.handler.ServeHTTP(s.resp, s.req)
229         resp := s.checkUnhealthy(c)
230         ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
231         c.Check(ep.Health, check.Equals, "ERROR")
232         c.Check(ep.HTTPStatusCode, check.Equals, 0)
233         rt, err := ep.ResponseTime.Float64()
234         c.Check(err, check.IsNil)
235         c.Check(rt > 0.005, check.Equals, true)
236 }
237
238 func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
239         srv, listen := s.stubServer(&healthyHandler{})
240         defer srv.Close()
241         s.setAllServiceURLs(listen)
242         tmpdir := c.MkDir()
243         confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
244         c.Assert(err, check.IsNil)
245         confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
246         err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
247         c.Assert(err, check.IsNil)
248
249         var stdout, stderr bytes.Buffer
250
251         exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
252         c.Check(exitcode, check.Equals, 0)
253         c.Check(stderr.String(), check.Equals, "")
254         c.Check(stdout.String(), check.Equals, "")
255
256         stdout.Reset()
257         stderr.Reset()
258         exitcode = CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml", "-yaml"}, &bytes.Buffer{}, &stdout, &stderr)
259         c.Check(exitcode, check.Equals, 0)
260         c.Check(stderr.String(), check.Equals, "")
261         c.Check(stdout.String(), check.Matches, `(?ms).*(\n|^)health: OK\n.*`)
262 }
263
264 func (s *AggregatorSuite) checkError(c *check.C) {
265         c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
266         var resp ClusterHealthResponse
267         err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
268         c.Check(err, check.IsNil)
269         c.Check(resp.Health, check.Not(check.Equals), "OK")
270 }
271
272 func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
273         return s.checkResult(c, "ERROR")
274 }
275
276 func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
277         return s.checkResult(c, "OK")
278 }
279
280 func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
281         c.Check(s.resp.Code, check.Equals, http.StatusOK)
282         var resp ClusterHealthResponse
283         c.Log(s.resp.Body.String())
284         err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
285         c.Check(err, check.IsNil)
286         c.Check(resp.Health, check.Equals, health)
287         return resp
288 }
289
290 func (s *AggregatorSuite) setAllServiceURLs(listen string) {
291         svcs := &s.handler.Cluster.Services
292         for _, svc := range []*arvados.Service{
293                 &svcs.Controller,
294                 &svcs.DispatchCloud,
295                 &svcs.DispatchLSF,
296                 &svcs.Keepbalance,
297                 &svcs.Keepproxy,
298                 &svcs.Keepstore,
299                 &svcs.Health,
300                 &svcs.RailsAPI,
301                 &svcs.WebDAV,
302                 &svcs.Websocket,
303                 &svcs.Workbench1,
304                 &svcs.Workbench2,
305         } {
306                 arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
307         }
308 }
309
310 type unhealthyHandler struct{}
311
312 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
313         if req.URL.Path == "/_health/ping" {
314                 resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
315         } else {
316                 http.Error(resp, "not found", http.StatusNotFound)
317         }
318 }
319
320 type healthyHandler struct {
321         configHash string
322         configTime time.Time
323 }
324
325 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
326         authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
327         if req.URL.Path == "/_health/ping" {
328                 if !authOK {
329                         http.Error(resp, "unauthorized", http.StatusUnauthorized)
330                         return
331                 }
332                 resp.Write([]byte(`{"health":"OK"}`))
333         } else if req.URL.Path == "/metrics" {
334                 if !authOK {
335                         http.Error(resp, "unauthorized", http.StatusUnauthorized)
336                         return
337                 }
338                 t := h.configTime
339                 if t.IsZero() {
340                         t = time.Now()
341                 }
342                 fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
343 # TYPE arvados_config_load_timestamp_seconds gauge
344 arvados_config_load_timestamp_seconds{sha256="%s"} %g
345 # HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
346 # TYPE arvados_config_source_timestamp_seconds gauge
347 arvados_config_source_timestamp_seconds{sha256="%s"} %g
348 `,
349                         h.configHash, float64(time.Now().UnixNano())/1e9,
350                         h.configHash, float64(t.UnixNano())/1e9)
351         } else {
352                 http.Error(resp, "not found", http.StatusNotFound)
353         }
354 }
355
356 type slowHandler struct{}
357
358 func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
359         if req.URL.Path == "/_health/ping" {
360                 time.Sleep(3 * time.Second)
361                 resp.Write([]byte(`{"health":"OK"}`))
362         } else {
363                 http.Error(resp, "not found", http.StatusNotFound)
364         }
365 }