21891: Use bytes.Buffer to reduce string copies.
[arvados.git] / sdk / go / health / aggregator_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package health
6
7 import (
8         "bytes"
9         "crypto/sha256"
10         "encoding/json"
11         "fmt"
12         "io/ioutil"
13         "net/http"
14         "net/http/httptest"
15         "regexp"
16         "runtime"
17         "strings"
18         "time"
19
20         "git.arvados.org/arvados.git/lib/cmd"
21         "git.arvados.org/arvados.git/lib/config"
22         "git.arvados.org/arvados.git/sdk/go/arvados"
23         "git.arvados.org/arvados.git/sdk/go/arvadostest"
24         "git.arvados.org/arvados.git/sdk/go/ctxlog"
25         "github.com/ghodss/yaml"
26         "gopkg.in/check.v1"
27 )
28
29 type AggregatorSuite struct {
30         handler *Aggregator
31         req     *http.Request
32         resp    *httptest.ResponseRecorder
33 }
34
35 // Gocheck boilerplate
36 var _ = check.Suite(&AggregatorSuite{})
37
38 func (s *AggregatorSuite) TestInterface(c *check.C) {
39         var _ http.Handler = &Aggregator{}
40 }
41
42 func (s *AggregatorSuite) SetUpTest(c *check.C) {
43         ldr := config.NewLoader(bytes.NewBufferString(`Clusters: {zzzzz: {}}`), ctxlog.TestLogger(c))
44         ldr.Path = "-"
45         cfg, err := ldr.Load()
46         c.Assert(err, check.IsNil)
47         cluster, err := cfg.GetCluster("")
48         c.Assert(err, check.IsNil)
49         cluster.ManagementToken = arvadostest.ManagementToken
50         cluster.SystemRootToken = arvadostest.SystemRootToken
51         cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
52         cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
53         cluster.Containers.LocalKeepBlobBuffersPerVCPU = 0
54         s.handler = &Aggregator{Cluster: cluster}
55         s.req = httptest.NewRequest("GET", "/_health/all", nil)
56         s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
57         s.resp = httptest.NewRecorder()
58 }
59
60 func (s *AggregatorSuite) TestSameVersion(c *check.C) {
61         c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.1~dev20240610194320"), check.Equals, false)
62         c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.1~dev20240610194320 (go1.21.10)"), check.Equals, false)
63         c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.1~dev20240610194320 (go1.21.9)"), check.Equals, false)
64         c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.0~dev20240610194320 (go1.21.9)"), check.Equals, true)
65         c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.0~dev20240611211146 (go1.21.10)"), check.Equals, true)
66         c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.0~dev20240611211146"), check.Equals, true)
67         c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.0"), check.Equals, false)
68         c.Check(sameVersion("2.8.0~dev20240610194320", "2.8.0"), check.Equals, false)
69         c.Check(sameVersion("2.8.0", "2.8.0"), check.Equals, true)
70         c.Check(sameVersion("2.8.0", "2.8.1"), check.Equals, false)
71 }
72
73 func (s *AggregatorSuite) TestNoAuth(c *check.C) {
74         s.req.Header.Del("Authorization")
75         s.handler.ServeHTTP(s.resp, s.req)
76         s.checkError(c)
77         c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
78 }
79
80 func (s *AggregatorSuite) TestBadAuth(c *check.C) {
81         s.req.Header.Set("Authorization", "xyzzy")
82         s.handler.ServeHTTP(s.resp, s.req)
83         s.checkError(c)
84         c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
85 }
86
87 func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
88         s.handler.ServeHTTP(s.resp, s.req)
89         s.checkUnhealthy(c)
90 }
91
92 func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
93         srv := httptest.NewServer(handler)
94         var port string
95         if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
96                 panic(srv.URL)
97         } else {
98                 port = parts[len(parts)-1]
99         }
100         return srv, ":" + port
101 }
102
103 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
104         srv, listen := s.stubServer(&unhealthyHandler{})
105         defer srv.Close()
106         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
107         s.handler.ServeHTTP(s.resp, s.req)
108         s.checkUnhealthy(c)
109 }
110
111 func (s *AggregatorSuite) TestHealthy(c *check.C) {
112         srv, listen := s.stubServer(&healthyHandler{})
113         defer srv.Close()
114         s.setAllServiceURLs(listen)
115         s.handler.ServeHTTP(s.resp, s.req)
116         resp := s.checkOK(c)
117         svc := "keepstore+http://localhost" + listen + "/_health/ping"
118         c.Logf("%#v", resp)
119         ep := resp.Checks[svc]
120         c.Check(ep.Health, check.Equals, "OK")
121         c.Check(ep.HTTPStatusCode, check.Equals, 200)
122 }
123
124 func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
125         srvH, listenH := s.stubServer(&healthyHandler{})
126         defer srvH.Close()
127         srvU, listenU := s.stubServer(&unhealthyHandler{})
128         defer srvU.Close()
129         s.setAllServiceURLs(listenH)
130         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
131         s.handler.ServeHTTP(s.resp, s.req)
132         resp := s.checkUnhealthy(c)
133         ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
134         c.Check(ep.Health, check.Equals, "OK")
135         c.Check(ep.HTTPStatusCode, check.Equals, 200)
136         ep = resp.Checks["keepstore+http://127.0.0.1"+listenU+"/_health/ping"]
137         c.Check(ep.Health, check.Equals, "ERROR")
138         c.Check(ep.HTTPStatusCode, check.Equals, 200)
139         c.Logf("%#v", ep)
140 }
141
142 // If an InternalURL host is 0.0.0.0, localhost, 127/8, or ::1 and
143 // nothing is listening there, don't fail the health check -- instead,
144 // assume the relevant component just isn't installed/enabled on this
145 // node, but does work when contacted through ExternalURL.
146 func (s *AggregatorSuite) TestUnreachableLoopbackPort(c *check.C) {
147         srvH, listenH := s.stubServer(&healthyHandler{})
148         defer srvH.Close()
149         s.setAllServiceURLs(listenH)
150         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepproxy, "http://localhost:9/")
151         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Workbench1, "http://0.0.0.0:9/")
152         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepbalance, "http://127.0.0.127:9/")
153         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://[::1]:9/")
154         s.handler.ServeHTTP(s.resp, s.req)
155         s.checkOK(c)
156
157         // If a non-loopback address is unreachable, that's still a
158         // fail.
159         s.resp = httptest.NewRecorder()
160         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://172.31.255.254:9/")
161         s.handler.ServeHTTP(s.resp, s.req)
162         s.checkUnhealthy(c)
163 }
164
165 func (s *AggregatorSuite) TestIsLocalHost(c *check.C) {
166         c.Check(isLocalHost("Localhost"), check.Equals, true)
167         c.Check(isLocalHost("localhost"), check.Equals, true)
168         c.Check(isLocalHost("127.0.0.1"), check.Equals, true)
169         c.Check(isLocalHost("127.0.0.127"), check.Equals, true)
170         c.Check(isLocalHost("127.1.2.7"), check.Equals, true)
171         c.Check(isLocalHost("0.0.0.0"), check.Equals, true)
172         c.Check(isLocalHost("::1"), check.Equals, true)
173         c.Check(isLocalHost("1.2.3.4"), check.Equals, false)
174         c.Check(isLocalHost("1::1"), check.Equals, false)
175         c.Check(isLocalHost("example.com"), check.Equals, false)
176         c.Check(isLocalHost("127.0.0"), check.Equals, false)
177         c.Check(isLocalHost(""), check.Equals, false)
178 }
179
180 func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
181         // time1/hash1: current config
182         time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
183         hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
184         // time2/hash2: old config
185         time2 := time1.Add(-time.Hour)
186         hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
187
188         // srv1: current file
189         handler1 := healthyHandler{configHash: hash1, configTime: time1}
190         srv1, listen1 := s.stubServer(&handler1)
191         defer srv1.Close()
192         // srv2: old file, current content
193         handler2 := healthyHandler{configHash: hash1, configTime: time2}
194         srv2, listen2 := s.stubServer(&handler2)
195         defer srv2.Close()
196         // srv3: old file, old content
197         handler3 := healthyHandler{configHash: hash2, configTime: time2}
198         srv3, listen3 := s.stubServer(&handler3)
199         defer srv3.Close()
200         // srv4: no metrics handler
201         handler4 := healthyHandler{}
202         srv4, listen4 := s.stubServer(&handler4)
203         defer srv4.Close()
204
205         s.setAllServiceURLs(listen1)
206
207         // listen2 => old timestamp, same content => no problem
208         s.resp = httptest.NewRecorder()
209         arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
210                 "http://localhost"+listen2+"/")
211         s.handler.ServeHTTP(s.resp, s.req)
212         resp := s.checkOK(c)
213
214         // listen4 => no metrics on some services => no problem
215         s.resp = httptest.NewRecorder()
216         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
217                 "http://localhost"+listen4+"/")
218         s.handler.ServeHTTP(s.resp, s.req)
219         resp = s.checkOK(c)
220
221         // listen3 => old timestamp, old content => report discrepancy
222         s.resp = httptest.NewRecorder()
223         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
224                 "http://localhost"+listen1+"/",
225                 "http://localhost"+listen3+"/")
226         s.handler.ServeHTTP(s.resp, s.req)
227         resp = s.checkUnhealthy(c)
228         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
229                 c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
230         }
231
232         // no services report config time (migrating to current version) => no problem
233         s.resp = httptest.NewRecorder()
234         s.setAllServiceURLs(listen4)
235         s.handler.ServeHTTP(s.resp, s.req)
236         s.checkOK(c)
237 }
238
239 func (s *AggregatorSuite) TestClockSkew(c *check.C) {
240         // srv1: report real wall clock time
241         handler1 := healthyHandler{}
242         srv1, listen1 := s.stubServer(&handler1)
243         defer srv1.Close()
244         // srv2: report near-future time
245         handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
246         srv2, listen2 := s.stubServer(&handler2)
247         defer srv2.Close()
248         // srv3: report far-future time
249         handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
250         srv3, listen3 := s.stubServer(&handler3)
251         defer srv3.Close()
252
253         s.setAllServiceURLs(listen1)
254
255         // near-future time => OK
256         s.resp = httptest.NewRecorder()
257         arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
258                 "http://localhost"+listen2+"/")
259         s.handler.ServeHTTP(s.resp, s.req)
260         s.checkOK(c)
261
262         // far-future time => error
263         s.resp = httptest.NewRecorder()
264         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
265                 "http://localhost"+listen3+"/")
266         s.handler.ServeHTTP(s.resp, s.req)
267         resp := s.checkUnhealthy(c)
268         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
269                 c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
270         }
271 }
272
273 func (s *AggregatorSuite) TestVersionSkew(c *check.C) {
274         // srv1: report same version
275         handler1 := healthyHandler{version: cmd.Version.String()}
276         srv1, listen1 := s.stubServer(&handler1)
277         defer srv1.Close()
278         // srv2: report same version but without " (go1.2.3)" part
279         handler2 := healthyHandler{version: strings.Fields(cmd.Version.String())[0]}
280         srv2, listen2 := s.stubServer(&handler2)
281         defer srv2.Close()
282         // srv3: report different version
283         handler3 := healthyHandler{version: "1.2.3~4 (" + runtime.Version() + ")"}
284         srv3, listen3 := s.stubServer(&handler3)
285         defer srv3.Close()
286
287         s.setAllServiceURLs(listen1)
288
289         // same version but without go1.2.3 part => OK
290         s.resp = httptest.NewRecorder()
291         arvadostest.SetServiceURL(&s.handler.Cluster.Services.RailsAPI,
292                 "http://localhost"+listen2+"/")
293         s.handler.ServeHTTP(s.resp, s.req)
294         s.checkOK(c)
295
296         // different version => error
297         s.resp = httptest.NewRecorder()
298         arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
299                 "http://localhost"+listen3+"/")
300         s.handler.ServeHTTP(s.resp, s.req)
301         resp := s.checkUnhealthy(c)
302         if c.Check(len(resp.Errors) > 0, check.Equals, true) {
303                 c.Check(resp.Errors[0], check.Matches, `version mismatch: \Qkeep-web+http://localhost`+listen3+`\E is running 1.2.3~4 (.*) -- expected \Q`+cmd.Version.String()+`\E`)
304         }
305 }
306
307 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
308         s.handler.timeout = arvados.Duration(100 * time.Millisecond)
309         srv, listen := s.stubServer(&slowHandler{})
310         defer srv.Close()
311         arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
312         s.handler.ServeHTTP(s.resp, s.req)
313         resp := s.checkUnhealthy(c)
314         ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
315         c.Check(ep.Health, check.Equals, "ERROR")
316         c.Check(ep.HTTPStatusCode, check.Equals, 0)
317         rt, err := ep.ResponseTime.Float64()
318         c.Check(err, check.IsNil)
319         c.Check(rt > 0.005, check.Equals, true)
320 }
321
322 func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
323         srv, listen := s.stubServer(&healthyHandler{})
324         defer srv.Close()
325         s.setAllServiceURLs(listen)
326         tmpdir := c.MkDir()
327         confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
328         c.Assert(err, check.IsNil)
329         confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
330         err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
331         c.Assert(err, check.IsNil)
332
333         var stdout, stderr bytes.Buffer
334
335         exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
336         c.Check(exitcode, check.Equals, 0)
337         c.Check(stderr.String(), check.Equals, "health check OK\n")
338         c.Check(stdout.String(), check.Equals, "")
339
340         stdout.Reset()
341         stderr.Reset()
342         exitcode = CheckCommand.RunCommand("check", []string{"-quiet", "-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
343         c.Check(exitcode, check.Equals, 0)
344         c.Check(stderr.String(), check.Equals, "")
345         c.Check(stdout.String(), check.Equals, "")
346
347         stdout.Reset()
348         stderr.Reset()
349         exitcode = CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml", "-yaml"}, &bytes.Buffer{}, &stdout, &stderr)
350         c.Check(exitcode, check.Equals, 0)
351         c.Check(stderr.String(), check.Equals, "")
352         c.Check(stdout.String(), check.Matches, `(?ms).*(\n|^)Health: OK\n.*`)
353 }
354
355 func (s *AggregatorSuite) checkError(c *check.C) {
356         c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
357         var resp ClusterHealthResponse
358         err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
359         c.Check(err, check.IsNil)
360         c.Check(resp.Health, check.Not(check.Equals), "OK")
361 }
362
363 func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
364         return s.checkResult(c, "ERROR")
365 }
366
367 func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
368         return s.checkResult(c, "OK")
369 }
370
371 func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
372         c.Check(s.resp.Code, check.Equals, http.StatusOK)
373         var resp ClusterHealthResponse
374         c.Log(s.resp.Body.String())
375         err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
376         c.Check(err, check.IsNil)
377         c.Check(resp.Health, check.Equals, health)
378         return resp
379 }
380
381 func (s *AggregatorSuite) setAllServiceURLs(listen string) {
382         svcs := &s.handler.Cluster.Services
383         for _, svc := range []*arvados.Service{
384                 &svcs.Controller,
385                 &svcs.DispatchCloud,
386                 &svcs.DispatchLSF,
387                 &svcs.DispatchSLURM,
388                 &svcs.Keepbalance,
389                 &svcs.Keepproxy,
390                 &svcs.Keepstore,
391                 &svcs.Health,
392                 &svcs.RailsAPI,
393                 &svcs.WebDAV,
394                 &svcs.Websocket,
395                 &svcs.Workbench1,
396                 &svcs.Workbench2,
397         } {
398                 arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
399         }
400 }
401
402 type unhealthyHandler struct{}
403
404 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
405         if req.URL.Path == "/_health/ping" {
406                 resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
407         } else {
408                 http.Error(resp, "not found", http.StatusNotFound)
409         }
410 }
411
412 type healthyHandler struct {
413         version    string
414         configHash string
415         configTime time.Time
416         headerDate time.Time
417 }
418
419 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
420         if !h.headerDate.IsZero() {
421                 resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
422         }
423         authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
424         if req.URL.Path == "/_health/ping" {
425                 if !authOK {
426                         http.Error(resp, "unauthorized", http.StatusUnauthorized)
427                         return
428                 }
429                 resp.Write([]byte(`{"health":"OK"}`))
430         } else if req.URL.Path == "/metrics" {
431                 if !authOK {
432                         http.Error(resp, "unauthorized", http.StatusUnauthorized)
433                         return
434                 }
435                 t := h.configTime
436                 if t.IsZero() {
437                         t = time.Now()
438                 }
439                 fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
440 # TYPE arvados_config_load_timestamp_seconds gauge
441 arvados_config_load_timestamp_seconds{sha256="%s"} %g
442 # HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
443 # TYPE arvados_config_source_timestamp_seconds gauge
444 arvados_config_source_timestamp_seconds{sha256="%s"} %g
445 # HELP arvados_version_running Indicated version is running.
446 # TYPE arvados_version_running gauge
447 arvados_version_running{version="%s"} 1
448 `,
449                         h.configHash, float64(time.Now().UnixNano())/1e9,
450                         h.configHash, float64(t.UnixNano())/1e9,
451                         h.version)
452         } else {
453                 http.Error(resp, "not found", http.StatusNotFound)
454         }
455 }
456
457 type slowHandler struct{}
458
459 func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
460         if req.URL.Path == "/_health/ping" {
461                 time.Sleep(3 * time.Second)
462                 resp.Write([]byte(`{"health":"OK"}`))
463         } else {
464                 http.Error(resp, "not found", http.StatusNotFound)
465         }
466 }