1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
20 "git.arvados.org/arvados.git/lib/cmd"
21 "git.arvados.org/arvados.git/lib/config"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "github.com/ghodss/yaml"
29 type AggregatorSuite struct {
32 resp *httptest.ResponseRecorder
35 // Gocheck boilerplate
36 var _ = check.Suite(&AggregatorSuite{})
38 func (s *AggregatorSuite) TestInterface(c *check.C) {
39 var _ http.Handler = &Aggregator{}
42 func (s *AggregatorSuite) SetUpTest(c *check.C) {
43 ldr := config.NewLoader(bytes.NewBufferString(`Clusters: {zzzzz: {}}`), ctxlog.TestLogger(c))
45 cfg, err := ldr.Load()
46 c.Assert(err, check.IsNil)
47 cluster, err := cfg.GetCluster("")
48 c.Assert(err, check.IsNil)
49 cluster.ManagementToken = arvadostest.ManagementToken
50 cluster.SystemRootToken = arvadostest.SystemRootToken
51 cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
52 cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
53 cluster.Containers.LocalKeepBlobBuffersPerVCPU = 0
54 s.handler = &Aggregator{Cluster: cluster}
55 s.req = httptest.NewRequest("GET", "/_health/all", nil)
56 s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
57 s.resp = httptest.NewRecorder()
60 func (s *AggregatorSuite) TestSameVersion(c *check.C) {
61 c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.1~dev20240610194320"), check.Equals, false)
62 c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.1~dev20240610194320 (go1.21.10)"), check.Equals, false)
63 c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.1~dev20240610194320 (go1.21.9)"), check.Equals, false)
64 c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.0~dev20240610194320 (go1.21.9)"), check.Equals, true)
65 c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.0~dev20240611211146 (go1.21.10)"), check.Equals, true)
66 c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.0~dev20240611211146"), check.Equals, true)
67 c.Check(sameVersion("2.8.0~dev20240610194320 (go1.21.10)", "2.8.0"), check.Equals, false)
68 c.Check(sameVersion("2.8.0~dev20240610194320", "2.8.0"), check.Equals, false)
69 c.Check(sameVersion("2.8.0", "2.8.0"), check.Equals, true)
70 c.Check(sameVersion("2.8.0", "2.8.1"), check.Equals, false)
73 func (s *AggregatorSuite) TestNoAuth(c *check.C) {
74 s.req.Header.Del("Authorization")
75 s.handler.ServeHTTP(s.resp, s.req)
77 c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
80 func (s *AggregatorSuite) TestBadAuth(c *check.C) {
81 s.req.Header.Set("Authorization", "xyzzy")
82 s.handler.ServeHTTP(s.resp, s.req)
84 c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
87 func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
88 s.handler.ServeHTTP(s.resp, s.req)
92 func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
93 srv := httptest.NewServer(handler)
95 if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
98 port = parts[len(parts)-1]
100 return srv, ":" + port
103 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
104 srv, listen := s.stubServer(&unhealthyHandler{})
106 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
107 s.handler.ServeHTTP(s.resp, s.req)
111 func (s *AggregatorSuite) TestHealthy(c *check.C) {
112 srv, listen := s.stubServer(&healthyHandler{})
114 s.setAllServiceURLs(listen)
115 s.handler.ServeHTTP(s.resp, s.req)
117 svc := "keepstore+http://localhost" + listen + "/_health/ping"
119 ep := resp.Checks[svc]
120 c.Check(ep.Health, check.Equals, "OK")
121 c.Check(ep.HTTPStatusCode, check.Equals, 200)
124 func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
125 srvH, listenH := s.stubServer(&healthyHandler{})
127 srvU, listenU := s.stubServer(&unhealthyHandler{})
129 s.setAllServiceURLs(listenH)
130 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
131 s.handler.ServeHTTP(s.resp, s.req)
132 resp := s.checkUnhealthy(c)
133 ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
134 c.Check(ep.Health, check.Equals, "OK")
135 c.Check(ep.HTTPStatusCode, check.Equals, 200)
136 ep = resp.Checks["keepstore+http://127.0.0.1"+listenU+"/_health/ping"]
137 c.Check(ep.Health, check.Equals, "ERROR")
138 c.Check(ep.HTTPStatusCode, check.Equals, 200)
142 // If an InternalURL host is 0.0.0.0, localhost, 127/8, or ::1 and
143 // nothing is listening there, don't fail the health check -- instead,
144 // assume the relevant component just isn't installed/enabled on this
145 // node, but does work when contacted through ExternalURL.
146 func (s *AggregatorSuite) TestUnreachableLoopbackPort(c *check.C) {
147 srvH, listenH := s.stubServer(&healthyHandler{})
149 s.setAllServiceURLs(listenH)
150 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepproxy, "http://localhost:9/")
151 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Workbench1, "http://0.0.0.0:9/")
152 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepbalance, "http://127.0.0.127:9/")
153 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://[::1]:9/")
154 s.handler.ServeHTTP(s.resp, s.req)
157 // If a non-loopback address is unreachable, that's still a
159 s.resp = httptest.NewRecorder()
160 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://172.31.255.254:9/")
161 s.handler.ServeHTTP(s.resp, s.req)
165 func (s *AggregatorSuite) TestIsLocalHost(c *check.C) {
166 c.Check(isLocalHost("Localhost"), check.Equals, true)
167 c.Check(isLocalHost("localhost"), check.Equals, true)
168 c.Check(isLocalHost("127.0.0.1"), check.Equals, true)
169 c.Check(isLocalHost("127.0.0.127"), check.Equals, true)
170 c.Check(isLocalHost("127.1.2.7"), check.Equals, true)
171 c.Check(isLocalHost("0.0.0.0"), check.Equals, true)
172 c.Check(isLocalHost("::1"), check.Equals, true)
173 c.Check(isLocalHost("1.2.3.4"), check.Equals, false)
174 c.Check(isLocalHost("1::1"), check.Equals, false)
175 c.Check(isLocalHost("example.com"), check.Equals, false)
176 c.Check(isLocalHost("127.0.0"), check.Equals, false)
177 c.Check(isLocalHost(""), check.Equals, false)
180 func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
181 // time1/hash1: current config
182 time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
183 hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
184 // time2/hash2: old config
185 time2 := time1.Add(-time.Hour)
186 hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
188 // srv1: current file
189 handler1 := healthyHandler{configHash: hash1, configTime: time1}
190 srv1, listen1 := s.stubServer(&handler1)
192 // srv2: old file, current content
193 handler2 := healthyHandler{configHash: hash1, configTime: time2}
194 srv2, listen2 := s.stubServer(&handler2)
196 // srv3: old file, old content
197 handler3 := healthyHandler{configHash: hash2, configTime: time2}
198 srv3, listen3 := s.stubServer(&handler3)
200 // srv4: no metrics handler
201 handler4 := healthyHandler{}
202 srv4, listen4 := s.stubServer(&handler4)
205 s.setAllServiceURLs(listen1)
207 // listen2 => old timestamp, same content => no problem
208 s.resp = httptest.NewRecorder()
209 arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
210 "http://localhost"+listen2+"/")
211 s.handler.ServeHTTP(s.resp, s.req)
214 // listen4 => no metrics on some services => no problem
215 s.resp = httptest.NewRecorder()
216 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
217 "http://localhost"+listen4+"/")
218 s.handler.ServeHTTP(s.resp, s.req)
221 // listen3 => old timestamp, old content => report discrepancy
222 s.resp = httptest.NewRecorder()
223 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
224 "http://localhost"+listen1+"/",
225 "http://localhost"+listen3+"/")
226 s.handler.ServeHTTP(s.resp, s.req)
227 resp = s.checkUnhealthy(c)
228 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
229 c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
232 // no services report config time (migrating to current version) => no problem
233 s.resp = httptest.NewRecorder()
234 s.setAllServiceURLs(listen4)
235 s.handler.ServeHTTP(s.resp, s.req)
239 func (s *AggregatorSuite) TestClockSkew(c *check.C) {
240 // srv1: report real wall clock time
241 handler1 := healthyHandler{}
242 srv1, listen1 := s.stubServer(&handler1)
244 // srv2: report near-future time
245 handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
246 srv2, listen2 := s.stubServer(&handler2)
248 // srv3: report far-future time
249 handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
250 srv3, listen3 := s.stubServer(&handler3)
253 s.setAllServiceURLs(listen1)
255 // near-future time => OK
256 s.resp = httptest.NewRecorder()
257 arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
258 "http://localhost"+listen2+"/")
259 s.handler.ServeHTTP(s.resp, s.req)
262 // far-future time => error
263 s.resp = httptest.NewRecorder()
264 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
265 "http://localhost"+listen3+"/")
266 s.handler.ServeHTTP(s.resp, s.req)
267 resp := s.checkUnhealthy(c)
268 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
269 c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
273 func (s *AggregatorSuite) TestVersionSkew(c *check.C) {
274 // srv1: report same version
275 handler1 := healthyHandler{version: cmd.Version.String()}
276 srv1, listen1 := s.stubServer(&handler1)
278 // srv2: report same version but without " (go1.2.3)" part
279 handler2 := healthyHandler{version: strings.Fields(cmd.Version.String())[0]}
280 srv2, listen2 := s.stubServer(&handler2)
282 // srv3: report different version
283 handler3 := healthyHandler{version: "1.2.3~4 (" + runtime.Version() + ")"}
284 srv3, listen3 := s.stubServer(&handler3)
287 s.setAllServiceURLs(listen1)
289 // same version but without go1.2.3 part => OK
290 s.resp = httptest.NewRecorder()
291 arvadostest.SetServiceURL(&s.handler.Cluster.Services.RailsAPI,
292 "http://localhost"+listen2+"/")
293 s.handler.ServeHTTP(s.resp, s.req)
296 // different version => error
297 s.resp = httptest.NewRecorder()
298 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
299 "http://localhost"+listen3+"/")
300 s.handler.ServeHTTP(s.resp, s.req)
301 resp := s.checkUnhealthy(c)
302 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
303 c.Check(resp.Errors[0], check.Matches, `version mismatch: \Qkeep-web+http://localhost`+listen3+`\E is running 1.2.3~4 (.*) -- expected \Q`+cmd.Version.String()+`\E`)
307 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
308 s.handler.timeout = arvados.Duration(100 * time.Millisecond)
309 srv, listen := s.stubServer(&slowHandler{})
311 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
312 s.handler.ServeHTTP(s.resp, s.req)
313 resp := s.checkUnhealthy(c)
314 ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
315 c.Check(ep.Health, check.Equals, "ERROR")
316 c.Check(ep.HTTPStatusCode, check.Equals, 0)
317 rt, err := ep.ResponseTime.Float64()
318 c.Check(err, check.IsNil)
319 c.Check(rt > 0.005, check.Equals, true)
322 func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
323 srv, listen := s.stubServer(&healthyHandler{})
325 s.setAllServiceURLs(listen)
327 confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
328 c.Assert(err, check.IsNil)
329 confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
330 err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
331 c.Assert(err, check.IsNil)
333 var stdout, stderr bytes.Buffer
335 exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
336 c.Check(exitcode, check.Equals, 0)
337 c.Check(stderr.String(), check.Equals, "health check OK\n")
338 c.Check(stdout.String(), check.Equals, "")
342 exitcode = CheckCommand.RunCommand("check", []string{"-quiet", "-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
343 c.Check(exitcode, check.Equals, 0)
344 c.Check(stderr.String(), check.Equals, "")
345 c.Check(stdout.String(), check.Equals, "")
349 exitcode = CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml", "-yaml"}, &bytes.Buffer{}, &stdout, &stderr)
350 c.Check(exitcode, check.Equals, 0)
351 c.Check(stderr.String(), check.Equals, "")
352 c.Check(stdout.String(), check.Matches, `(?ms).*(\n|^)Health: OK\n.*`)
355 func (s *AggregatorSuite) checkError(c *check.C) {
356 c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
357 var resp ClusterHealthResponse
358 err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
359 c.Check(err, check.IsNil)
360 c.Check(resp.Health, check.Not(check.Equals), "OK")
363 func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
364 return s.checkResult(c, "ERROR")
367 func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
368 return s.checkResult(c, "OK")
371 func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
372 c.Check(s.resp.Code, check.Equals, http.StatusOK)
373 var resp ClusterHealthResponse
374 c.Log(s.resp.Body.String())
375 err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
376 c.Check(err, check.IsNil)
377 c.Check(resp.Health, check.Equals, health)
381 func (s *AggregatorSuite) setAllServiceURLs(listen string) {
382 svcs := &s.handler.Cluster.Services
383 for _, svc := range []*arvados.Service{
398 arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
402 type unhealthyHandler struct{}
404 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
405 if req.URL.Path == "/_health/ping" {
406 resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
408 http.Error(resp, "not found", http.StatusNotFound)
412 type healthyHandler struct {
419 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
420 if !h.headerDate.IsZero() {
421 resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
423 authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
424 if req.URL.Path == "/_health/ping" {
426 http.Error(resp, "unauthorized", http.StatusUnauthorized)
429 resp.Write([]byte(`{"health":"OK"}`))
430 } else if req.URL.Path == "/metrics" {
432 http.Error(resp, "unauthorized", http.StatusUnauthorized)
439 fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
440 # TYPE arvados_config_load_timestamp_seconds gauge
441 arvados_config_load_timestamp_seconds{sha256="%s"} %g
442 # HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
443 # TYPE arvados_config_source_timestamp_seconds gauge
444 arvados_config_source_timestamp_seconds{sha256="%s"} %g
445 # HELP arvados_version_running Indicated version is running.
446 # TYPE arvados_version_running gauge
447 arvados_version_running{version="%s"} 1
449 h.configHash, float64(time.Now().UnixNano())/1e9,
450 h.configHash, float64(t.UnixNano())/1e9,
453 http.Error(resp, "not found", http.StatusNotFound)
457 type slowHandler struct{}
459 func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
460 if req.URL.Path == "/_health/ping" {
461 time.Sleep(3 * time.Second)
462 resp.Write([]byte(`{"health":"OK"}`))
464 http.Error(resp, "not found", http.StatusNotFound)