1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
20 "git.arvados.org/arvados.git/lib/cmd"
21 "git.arvados.org/arvados.git/lib/config"
22 "git.arvados.org/arvados.git/sdk/go/arvados"
23 "git.arvados.org/arvados.git/sdk/go/arvadostest"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "github.com/ghodss/yaml"
29 type AggregatorSuite struct {
32 resp *httptest.ResponseRecorder
35 // Gocheck boilerplate
36 var _ = check.Suite(&AggregatorSuite{})
38 func (s *AggregatorSuite) TestInterface(c *check.C) {
39 var _ http.Handler = &Aggregator{}
42 func (s *AggregatorSuite) SetUpTest(c *check.C) {
43 ldr := config.NewLoader(bytes.NewBufferString(`Clusters: {zzzzz: {}}`), ctxlog.TestLogger(c))
45 cfg, err := ldr.Load()
46 c.Assert(err, check.IsNil)
47 cluster, err := cfg.GetCluster("")
48 c.Assert(err, check.IsNil)
49 cluster.ManagementToken = arvadostest.ManagementToken
50 cluster.SystemRootToken = arvadostest.SystemRootToken
51 cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
52 cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
53 cluster.Containers.LocalKeepBlobBuffersPerVCPU = 0
54 s.handler = &Aggregator{Cluster: cluster}
55 s.req = httptest.NewRequest("GET", "/_health/all", nil)
56 s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
57 s.resp = httptest.NewRecorder()
60 func (s *AggregatorSuite) TestNoAuth(c *check.C) {
61 s.req.Header.Del("Authorization")
62 s.handler.ServeHTTP(s.resp, s.req)
64 c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
67 func (s *AggregatorSuite) TestBadAuth(c *check.C) {
68 s.req.Header.Set("Authorization", "xyzzy")
69 s.handler.ServeHTTP(s.resp, s.req)
71 c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
74 func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
75 s.handler.ServeHTTP(s.resp, s.req)
79 func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
80 srv := httptest.NewServer(handler)
82 if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
85 port = parts[len(parts)-1]
87 return srv, ":" + port
90 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
91 srv, listen := s.stubServer(&unhealthyHandler{})
93 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
94 s.handler.ServeHTTP(s.resp, s.req)
98 func (s *AggregatorSuite) TestHealthy(c *check.C) {
99 srv, listen := s.stubServer(&healthyHandler{})
101 s.setAllServiceURLs(listen)
102 s.handler.ServeHTTP(s.resp, s.req)
104 svc := "keepstore+http://localhost" + listen + "/_health/ping"
106 ep := resp.Checks[svc]
107 c.Check(ep.Health, check.Equals, "OK")
108 c.Check(ep.HTTPStatusCode, check.Equals, 200)
111 func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
112 srvH, listenH := s.stubServer(&healthyHandler{})
114 srvU, listenU := s.stubServer(&unhealthyHandler{})
116 s.setAllServiceURLs(listenH)
117 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
118 s.handler.ServeHTTP(s.resp, s.req)
119 resp := s.checkUnhealthy(c)
120 ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
121 c.Check(ep.Health, check.Equals, "OK")
122 c.Check(ep.HTTPStatusCode, check.Equals, 200)
123 ep = resp.Checks["keepstore+http://127.0.0.1"+listenU+"/_health/ping"]
124 c.Check(ep.Health, check.Equals, "ERROR")
125 c.Check(ep.HTTPStatusCode, check.Equals, 200)
129 // If an InternalURL host is 0.0.0.0, localhost, 127/8, or ::1 and
130 // nothing is listening there, don't fail the health check -- instead,
131 // assume the relevant component just isn't installed/enabled on this
132 // node, but does work when contacted through ExternalURL.
133 func (s *AggregatorSuite) TestUnreachableLoopbackPort(c *check.C) {
134 srvH, listenH := s.stubServer(&healthyHandler{})
136 s.setAllServiceURLs(listenH)
137 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepproxy, "http://localhost:9/")
138 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Workbench1, "http://0.0.0.0:9/")
139 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepbalance, "http://127.0.0.127:9/")
140 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://[::1]:9/")
141 s.handler.ServeHTTP(s.resp, s.req)
144 // If a non-loopback address is unreachable, that's still a
146 s.resp = httptest.NewRecorder()
147 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://172.31.255.254:9/")
148 s.handler.ServeHTTP(s.resp, s.req)
152 func (s *AggregatorSuite) TestIsLocalHost(c *check.C) {
153 c.Check(isLocalHost("Localhost"), check.Equals, true)
154 c.Check(isLocalHost("localhost"), check.Equals, true)
155 c.Check(isLocalHost("127.0.0.1"), check.Equals, true)
156 c.Check(isLocalHost("127.0.0.127"), check.Equals, true)
157 c.Check(isLocalHost("127.1.2.7"), check.Equals, true)
158 c.Check(isLocalHost("0.0.0.0"), check.Equals, true)
159 c.Check(isLocalHost("::1"), check.Equals, true)
160 c.Check(isLocalHost("1.2.3.4"), check.Equals, false)
161 c.Check(isLocalHost("1::1"), check.Equals, false)
162 c.Check(isLocalHost("example.com"), check.Equals, false)
163 c.Check(isLocalHost("127.0.0"), check.Equals, false)
164 c.Check(isLocalHost(""), check.Equals, false)
167 func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
168 // time1/hash1: current config
169 time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
170 hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
171 // time2/hash2: old config
172 time2 := time1.Add(-time.Hour)
173 hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
175 // srv1: current file
176 handler1 := healthyHandler{configHash: hash1, configTime: time1}
177 srv1, listen1 := s.stubServer(&handler1)
179 // srv2: old file, current content
180 handler2 := healthyHandler{configHash: hash1, configTime: time2}
181 srv2, listen2 := s.stubServer(&handler2)
183 // srv3: old file, old content
184 handler3 := healthyHandler{configHash: hash2, configTime: time2}
185 srv3, listen3 := s.stubServer(&handler3)
187 // srv4: no metrics handler
188 handler4 := healthyHandler{}
189 srv4, listen4 := s.stubServer(&handler4)
192 s.setAllServiceURLs(listen1)
194 // listen2 => old timestamp, same content => no problem
195 s.resp = httptest.NewRecorder()
196 arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
197 "http://localhost"+listen2+"/")
198 s.handler.ServeHTTP(s.resp, s.req)
201 // listen4 => no metrics on some services => no problem
202 s.resp = httptest.NewRecorder()
203 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
204 "http://localhost"+listen4+"/")
205 s.handler.ServeHTTP(s.resp, s.req)
208 // listen3 => old timestamp, old content => report discrepancy
209 s.resp = httptest.NewRecorder()
210 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
211 "http://localhost"+listen1+"/",
212 "http://localhost"+listen3+"/")
213 s.handler.ServeHTTP(s.resp, s.req)
214 resp = s.checkUnhealthy(c)
215 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
216 c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
219 // no services report config time (migrating to current version) => no problem
220 s.resp = httptest.NewRecorder()
221 s.setAllServiceURLs(listen4)
222 s.handler.ServeHTTP(s.resp, s.req)
226 func (s *AggregatorSuite) TestClockSkew(c *check.C) {
227 // srv1: report real wall clock time
228 handler1 := healthyHandler{}
229 srv1, listen1 := s.stubServer(&handler1)
231 // srv2: report near-future time
232 handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
233 srv2, listen2 := s.stubServer(&handler2)
235 // srv3: report far-future time
236 handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
237 srv3, listen3 := s.stubServer(&handler3)
240 s.setAllServiceURLs(listen1)
242 // near-future time => OK
243 s.resp = httptest.NewRecorder()
244 arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
245 "http://localhost"+listen2+"/")
246 s.handler.ServeHTTP(s.resp, s.req)
249 // far-future time => error
250 s.resp = httptest.NewRecorder()
251 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
252 "http://localhost"+listen3+"/")
253 s.handler.ServeHTTP(s.resp, s.req)
254 resp := s.checkUnhealthy(c)
255 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
256 c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
260 func (s *AggregatorSuite) TestVersionSkew(c *check.C) {
261 // srv1: report same version
262 handler1 := healthyHandler{version: cmd.Version.String()}
263 srv1, listen1 := s.stubServer(&handler1)
265 // srv2: report same version but without " (go1.2.3)" part
266 handler2 := healthyHandler{version: strings.Fields(cmd.Version.String())[0]}
267 srv2, listen2 := s.stubServer(&handler2)
269 // srv3: report different version
270 handler3 := healthyHandler{version: "1.2.3~4 (" + runtime.Version() + ")"}
271 srv3, listen3 := s.stubServer(&handler3)
274 s.setAllServiceURLs(listen1)
276 // same version but without go1.2.3 part => OK
277 s.resp = httptest.NewRecorder()
278 arvadostest.SetServiceURL(&s.handler.Cluster.Services.RailsAPI,
279 "http://localhost"+listen2+"/")
280 s.handler.ServeHTTP(s.resp, s.req)
283 // different version => error
284 s.resp = httptest.NewRecorder()
285 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
286 "http://localhost"+listen3+"/")
287 s.handler.ServeHTTP(s.resp, s.req)
288 resp := s.checkUnhealthy(c)
289 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
290 c.Check(resp.Errors[0], check.Matches, `version mismatch: \Qkeep-web+http://localhost`+listen3+`\E is running 1.2.3~4 (.*) -- expected \Q`+cmd.Version.String()+`\E`)
294 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
295 s.handler.timeout = arvados.Duration(100 * time.Millisecond)
296 srv, listen := s.stubServer(&slowHandler{})
298 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
299 s.handler.ServeHTTP(s.resp, s.req)
300 resp := s.checkUnhealthy(c)
301 ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
302 c.Check(ep.Health, check.Equals, "ERROR")
303 c.Check(ep.HTTPStatusCode, check.Equals, 0)
304 rt, err := ep.ResponseTime.Float64()
305 c.Check(err, check.IsNil)
306 c.Check(rt > 0.005, check.Equals, true)
309 func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
310 srv, listen := s.stubServer(&healthyHandler{})
312 s.setAllServiceURLs(listen)
314 confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
315 c.Assert(err, check.IsNil)
316 confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
317 err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
318 c.Assert(err, check.IsNil)
320 var stdout, stderr bytes.Buffer
322 exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
323 c.Check(exitcode, check.Equals, 0)
324 c.Check(stderr.String(), check.Equals, "")
325 c.Check(stdout.String(), check.Equals, "")
329 exitcode = CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml", "-yaml"}, &bytes.Buffer{}, &stdout, &stderr)
330 c.Check(exitcode, check.Equals, 0)
331 c.Check(stderr.String(), check.Equals, "")
332 c.Check(stdout.String(), check.Matches, `(?ms).*(\n|^)Health: OK\n.*`)
335 func (s *AggregatorSuite) checkError(c *check.C) {
336 c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
337 var resp ClusterHealthResponse
338 err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
339 c.Check(err, check.IsNil)
340 c.Check(resp.Health, check.Not(check.Equals), "OK")
343 func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
344 return s.checkResult(c, "ERROR")
347 func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
348 return s.checkResult(c, "OK")
351 func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
352 c.Check(s.resp.Code, check.Equals, http.StatusOK)
353 var resp ClusterHealthResponse
354 c.Log(s.resp.Body.String())
355 err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
356 c.Check(err, check.IsNil)
357 c.Check(resp.Health, check.Equals, health)
361 func (s *AggregatorSuite) setAllServiceURLs(listen string) {
362 svcs := &s.handler.Cluster.Services
363 for _, svc := range []*arvados.Service{
379 arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
383 type unhealthyHandler struct{}
385 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
386 if req.URL.Path == "/_health/ping" {
387 resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
389 http.Error(resp, "not found", http.StatusNotFound)
393 type healthyHandler struct {
400 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
401 if !h.headerDate.IsZero() {
402 resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
404 authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
405 if req.URL.Path == "/_health/ping" {
407 http.Error(resp, "unauthorized", http.StatusUnauthorized)
410 resp.Write([]byte(`{"health":"OK"}`))
411 } else if req.URL.Path == "/metrics" {
413 http.Error(resp, "unauthorized", http.StatusUnauthorized)
420 fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
421 # TYPE arvados_config_load_timestamp_seconds gauge
422 arvados_config_load_timestamp_seconds{sha256="%s"} %g
423 # HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
424 # TYPE arvados_config_source_timestamp_seconds gauge
425 arvados_config_source_timestamp_seconds{sha256="%s"} %g
426 # HELP arvados_version_running Indicated version is running.
427 # TYPE arvados_version_running gauge
428 arvados_version_running{version="%s"} 1
430 h.configHash, float64(time.Now().UnixNano())/1e9,
431 h.configHash, float64(t.UnixNano())/1e9,
434 http.Error(resp, "not found", http.StatusNotFound)
438 type slowHandler struct{}
440 func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
441 if req.URL.Path == "/_health/ping" {
442 time.Sleep(3 * time.Second)
443 resp.Write([]byte(`{"health":"OK"}`))
445 http.Error(resp, "not found", http.StatusNotFound)