1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.arvados.org/arvados.git/lib/config"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "git.arvados.org/arvados.git/sdk/go/arvadostest"
22 "git.arvados.org/arvados.git/sdk/go/ctxlog"
23 "github.com/ghodss/yaml"
27 type AggregatorSuite struct {
30 resp *httptest.ResponseRecorder
33 // Gocheck boilerplate
34 var _ = check.Suite(&AggregatorSuite{})
36 func (s *AggregatorSuite) TestInterface(c *check.C) {
37 var _ http.Handler = &Aggregator{}
40 func (s *AggregatorSuite) SetUpTest(c *check.C) {
41 ldr := config.NewLoader(bytes.NewBufferString(`Clusters: {zzzzz: {}}`), ctxlog.TestLogger(c))
43 cfg, err := ldr.Load()
44 c.Assert(err, check.IsNil)
45 cluster, err := cfg.GetCluster("")
46 c.Assert(err, check.IsNil)
47 cluster.ManagementToken = arvadostest.ManagementToken
48 cluster.SystemRootToken = arvadostest.SystemRootToken
49 cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
50 cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
51 cluster.Containers.LocalKeepBlobBuffersPerVCPU = 0
52 s.handler = &Aggregator{Cluster: cluster}
53 s.req = httptest.NewRequest("GET", "/_health/all", nil)
54 s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
55 s.resp = httptest.NewRecorder()
58 func (s *AggregatorSuite) TestNoAuth(c *check.C) {
59 s.req.Header.Del("Authorization")
60 s.handler.ServeHTTP(s.resp, s.req)
62 c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
65 func (s *AggregatorSuite) TestBadAuth(c *check.C) {
66 s.req.Header.Set("Authorization", "xyzzy")
67 s.handler.ServeHTTP(s.resp, s.req)
69 c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
72 func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
73 s.handler.ServeHTTP(s.resp, s.req)
77 func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
78 srv := httptest.NewServer(handler)
80 if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
83 port = parts[len(parts)-1]
85 return srv, ":" + port
88 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
89 srv, listen := s.stubServer(&unhealthyHandler{})
91 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
92 s.handler.ServeHTTP(s.resp, s.req)
96 func (s *AggregatorSuite) TestHealthy(c *check.C) {
97 srv, listen := s.stubServer(&healthyHandler{})
99 s.setAllServiceURLs(listen)
100 s.handler.ServeHTTP(s.resp, s.req)
102 svc := "keepstore+http://localhost" + listen + "/_health/ping"
104 ep := resp.Checks[svc]
105 c.Check(ep.Health, check.Equals, "OK")
106 c.Check(ep.HTTPStatusCode, check.Equals, 200)
109 func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
110 srvH, listenH := s.stubServer(&healthyHandler{})
112 srvU, listenU := s.stubServer(&unhealthyHandler{})
114 s.setAllServiceURLs(listenH)
115 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
116 s.handler.ServeHTTP(s.resp, s.req)
117 resp := s.checkUnhealthy(c)
118 ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
119 c.Check(ep.Health, check.Equals, "OK")
120 c.Check(ep.HTTPStatusCode, check.Equals, 200)
121 ep = resp.Checks["keepstore+http://127.0.0.1"+listenU+"/_health/ping"]
122 c.Check(ep.Health, check.Equals, "ERROR")
123 c.Check(ep.HTTPStatusCode, check.Equals, 200)
127 // If an InternalURL host is 0.0.0.0, localhost, 127/8, or ::1 and
128 // nothing is listening there, don't fail the health check -- instead,
129 // assume the relevant component just isn't installed/enabled on this
130 // node, but does work when contacted through ExternalURL.
131 func (s *AggregatorSuite) TestUnreachableLoopbackPort(c *check.C) {
132 srvH, listenH := s.stubServer(&healthyHandler{})
134 s.setAllServiceURLs(listenH)
135 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepproxy, "http://localhost:9/")
136 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Workbench1, "http://0.0.0.0:9/")
137 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepbalance, "http://127.0.0.127:9/")
138 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://[::1]:9/")
139 s.handler.ServeHTTP(s.resp, s.req)
142 // If a non-loopback address is unreachable, that's still a
144 s.resp = httptest.NewRecorder()
145 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV, "http://172.31.255.254:9/")
146 s.handler.ServeHTTP(s.resp, s.req)
150 func (s *AggregatorSuite) TestIsLocalHost(c *check.C) {
151 c.Check(isLocalHost("Localhost"), check.Equals, true)
152 c.Check(isLocalHost("localhost"), check.Equals, true)
153 c.Check(isLocalHost("127.0.0.1"), check.Equals, true)
154 c.Check(isLocalHost("127.0.0.127"), check.Equals, true)
155 c.Check(isLocalHost("127.1.2.7"), check.Equals, true)
156 c.Check(isLocalHost("0.0.0.0"), check.Equals, true)
157 c.Check(isLocalHost("::1"), check.Equals, true)
158 c.Check(isLocalHost("1.2.3.4"), check.Equals, false)
159 c.Check(isLocalHost("1::1"), check.Equals, false)
160 c.Check(isLocalHost("example.com"), check.Equals, false)
161 c.Check(isLocalHost("127.0.0"), check.Equals, false)
162 c.Check(isLocalHost(""), check.Equals, false)
165 func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
166 // time1/hash1: current config
167 time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
168 hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
169 // time2/hash2: old config
170 time2 := time1.Add(-time.Hour)
171 hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
173 // srv1: current file
174 handler1 := healthyHandler{configHash: hash1, configTime: time1}
175 srv1, listen1 := s.stubServer(&handler1)
177 // srv2: old file, current content
178 handler2 := healthyHandler{configHash: hash1, configTime: time2}
179 srv2, listen2 := s.stubServer(&handler2)
181 // srv3: old file, old content
182 handler3 := healthyHandler{configHash: hash2, configTime: time2}
183 srv3, listen3 := s.stubServer(&handler3)
185 // srv4: no metrics handler
186 handler4 := healthyHandler{}
187 srv4, listen4 := s.stubServer(&handler4)
190 s.setAllServiceURLs(listen1)
192 // listen2 => old timestamp, same content => no problem
193 s.resp = httptest.NewRecorder()
194 arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
195 "http://localhost"+listen2+"/")
196 s.handler.ServeHTTP(s.resp, s.req)
199 // listen4 => no metrics on some services => no problem
200 s.resp = httptest.NewRecorder()
201 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
202 "http://localhost"+listen4+"/")
203 s.handler.ServeHTTP(s.resp, s.req)
206 // listen3 => old timestamp, old content => report discrepancy
207 s.resp = httptest.NewRecorder()
208 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
209 "http://localhost"+listen1+"/",
210 "http://localhost"+listen3+"/")
211 s.handler.ServeHTTP(s.resp, s.req)
212 resp = s.checkUnhealthy(c)
213 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
214 c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
217 // no services report config time (migrating to current version) => no problem
218 s.resp = httptest.NewRecorder()
219 s.setAllServiceURLs(listen4)
220 s.handler.ServeHTTP(s.resp, s.req)
224 func (s *AggregatorSuite) TestClockSkew(c *check.C) {
225 // srv1: report real wall clock time
226 handler1 := healthyHandler{}
227 srv1, listen1 := s.stubServer(&handler1)
229 // srv2: report near-future time
230 handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
231 srv2, listen2 := s.stubServer(&handler2)
233 // srv3: report far-future time
234 handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
235 srv3, listen3 := s.stubServer(&handler3)
238 s.setAllServiceURLs(listen1)
240 // near-future time => OK
241 s.resp = httptest.NewRecorder()
242 arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
243 "http://localhost"+listen2+"/")
244 s.handler.ServeHTTP(s.resp, s.req)
247 // far-future time => error
248 s.resp = httptest.NewRecorder()
249 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
250 "http://localhost"+listen3+"/")
251 s.handler.ServeHTTP(s.resp, s.req)
252 resp := s.checkUnhealthy(c)
253 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
254 c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
258 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
259 s.handler.timeout = arvados.Duration(100 * time.Millisecond)
260 srv, listen := s.stubServer(&slowHandler{})
262 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
263 s.handler.ServeHTTP(s.resp, s.req)
264 resp := s.checkUnhealthy(c)
265 ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
266 c.Check(ep.Health, check.Equals, "ERROR")
267 c.Check(ep.HTTPStatusCode, check.Equals, 0)
268 rt, err := ep.ResponseTime.Float64()
269 c.Check(err, check.IsNil)
270 c.Check(rt > 0.005, check.Equals, true)
273 func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
274 srv, listen := s.stubServer(&healthyHandler{})
276 s.setAllServiceURLs(listen)
278 confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
279 c.Assert(err, check.IsNil)
280 confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
281 err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
282 c.Assert(err, check.IsNil)
284 var stdout, stderr bytes.Buffer
286 exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
287 c.Check(exitcode, check.Equals, 0)
288 c.Check(stderr.String(), check.Equals, "")
289 c.Check(stdout.String(), check.Equals, "")
293 exitcode = CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml", "-yaml"}, &bytes.Buffer{}, &stdout, &stderr)
294 c.Check(exitcode, check.Equals, 0)
295 c.Check(stderr.String(), check.Equals, "")
296 c.Check(stdout.String(), check.Matches, `(?ms).*(\n|^)health: OK\n.*`)
299 func (s *AggregatorSuite) checkError(c *check.C) {
300 c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
301 var resp ClusterHealthResponse
302 err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
303 c.Check(err, check.IsNil)
304 c.Check(resp.Health, check.Not(check.Equals), "OK")
307 func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
308 return s.checkResult(c, "ERROR")
311 func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
312 return s.checkResult(c, "OK")
315 func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
316 c.Check(s.resp.Code, check.Equals, http.StatusOK)
317 var resp ClusterHealthResponse
318 c.Log(s.resp.Body.String())
319 err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
320 c.Check(err, check.IsNil)
321 c.Check(resp.Health, check.Equals, health)
325 func (s *AggregatorSuite) setAllServiceURLs(listen string) {
326 svcs := &s.handler.Cluster.Services
327 for _, svc := range []*arvados.Service{
343 arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
347 type unhealthyHandler struct{}
349 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
350 if req.URL.Path == "/_health/ping" {
351 resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
353 http.Error(resp, "not found", http.StatusNotFound)
357 type healthyHandler struct {
363 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
364 if !h.headerDate.IsZero() {
365 resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
367 authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
368 if req.URL.Path == "/_health/ping" {
370 http.Error(resp, "unauthorized", http.StatusUnauthorized)
373 resp.Write([]byte(`{"health":"OK"}`))
374 } else if req.URL.Path == "/metrics" {
376 http.Error(resp, "unauthorized", http.StatusUnauthorized)
383 fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
384 # TYPE arvados_config_load_timestamp_seconds gauge
385 arvados_config_load_timestamp_seconds{sha256="%s"} %g
386 # HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
387 # TYPE arvados_config_source_timestamp_seconds gauge
388 arvados_config_source_timestamp_seconds{sha256="%s"} %g
390 h.configHash, float64(time.Now().UnixNano())/1e9,
391 h.configHash, float64(t.UnixNano())/1e9)
393 http.Error(resp, "not found", http.StatusNotFound)
397 type slowHandler struct{}
399 func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
400 if req.URL.Path == "/_health/ping" {
401 time.Sleep(3 * time.Second)
402 resp.Write([]byte(`{"health":"OK"}`))
404 http.Error(resp, "not found", http.StatusNotFound)