1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
19 "git.arvados.org/arvados.git/lib/config"
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "git.arvados.org/arvados.git/sdk/go/arvadostest"
22 "git.arvados.org/arvados.git/sdk/go/ctxlog"
23 "github.com/ghodss/yaml"
27 type AggregatorSuite struct {
30 resp *httptest.ResponseRecorder
33 // Gocheck boilerplate
34 var _ = check.Suite(&AggregatorSuite{})
36 func (s *AggregatorSuite) TestInterface(c *check.C) {
37 var _ http.Handler = &Aggregator{}
40 func (s *AggregatorSuite) SetUpTest(c *check.C) {
41 ldr := config.NewLoader(bytes.NewBufferString(`Clusters: {zzzzz: {}}`), ctxlog.TestLogger(c))
43 cfg, err := ldr.Load()
44 c.Assert(err, check.IsNil)
45 cluster, err := cfg.GetCluster("")
46 c.Assert(err, check.IsNil)
47 cluster.ManagementToken = arvadostest.ManagementToken
48 cluster.SystemRootToken = arvadostest.SystemRootToken
49 cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
50 cluster.Volumes["z"] = arvados.Volume{StorageClasses: map[string]bool{"default": true}}
51 s.handler = &Aggregator{Cluster: cluster}
52 s.req = httptest.NewRequest("GET", "/_health/all", nil)
53 s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
54 s.resp = httptest.NewRecorder()
57 func (s *AggregatorSuite) TestNoAuth(c *check.C) {
58 s.req.Header.Del("Authorization")
59 s.handler.ServeHTTP(s.resp, s.req)
61 c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
64 func (s *AggregatorSuite) TestBadAuth(c *check.C) {
65 s.req.Header.Set("Authorization", "xyzzy")
66 s.handler.ServeHTTP(s.resp, s.req)
68 c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
71 func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
72 s.handler.ServeHTTP(s.resp, s.req)
76 func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
77 srv := httptest.NewServer(handler)
79 if parts := strings.Split(srv.URL, ":"); len(parts) < 3 {
82 port = parts[len(parts)-1]
84 return srv, ":" + port
87 func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
88 srv, listen := s.stubServer(&unhealthyHandler{})
90 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
91 s.handler.ServeHTTP(s.resp, s.req)
95 func (s *AggregatorSuite) TestHealthy(c *check.C) {
96 srv, listen := s.stubServer(&healthyHandler{})
98 s.setAllServiceURLs(listen)
99 s.handler.ServeHTTP(s.resp, s.req)
101 svc := "keepstore+http://localhost" + listen + "/_health/ping"
103 ep := resp.Checks[svc]
104 c.Check(ep.Health, check.Equals, "OK")
105 c.Check(ep.HTTPStatusCode, check.Equals, 200)
108 func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
109 srvH, listenH := s.stubServer(&healthyHandler{})
111 srvU, listenU := s.stubServer(&unhealthyHandler{})
113 s.setAllServiceURLs(listenH)
114 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
115 s.handler.ServeHTTP(s.resp, s.req)
116 resp := s.checkUnhealthy(c)
117 ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
118 c.Check(ep.Health, check.Equals, "OK")
119 c.Check(ep.HTTPStatusCode, check.Equals, 200)
120 ep = resp.Checks["keepstore+http://127.0.0.1"+listenU+"/_health/ping"]
121 c.Check(ep.Health, check.Equals, "ERROR")
122 c.Check(ep.HTTPStatusCode, check.Equals, 200)
126 func (s *AggregatorSuite) TestConfigMismatch(c *check.C) {
127 // time1/hash1: current config
128 time1 := time.Now().Add(time.Second - time.Minute - time.Hour)
129 hash1 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: xyzzy}}`)))
130 // time2/hash2: old config
131 time2 := time1.Add(-time.Hour)
132 hash2 := fmt.Sprintf("%x", sha256.Sum256([]byte(`Clusters: {zzzzz: {SystemRootToken: old-token}}`)))
134 // srv1: current file
135 handler1 := healthyHandler{configHash: hash1, configTime: time1}
136 srv1, listen1 := s.stubServer(&handler1)
138 // srv2: old file, current content
139 handler2 := healthyHandler{configHash: hash1, configTime: time2}
140 srv2, listen2 := s.stubServer(&handler2)
142 // srv3: old file, old content
143 handler3 := healthyHandler{configHash: hash2, configTime: time2}
144 srv3, listen3 := s.stubServer(&handler3)
146 // srv4: no metrics handler
147 handler4 := healthyHandler{}
148 srv4, listen4 := s.stubServer(&handler4)
151 s.setAllServiceURLs(listen1)
153 // listen2 => old timestamp, same content => no problem
154 s.resp = httptest.NewRecorder()
155 arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
156 "http://localhost"+listen2+"/")
157 s.handler.ServeHTTP(s.resp, s.req)
160 // listen4 => no metrics on some services => no problem
161 s.resp = httptest.NewRecorder()
162 arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
163 "http://localhost"+listen4+"/")
164 s.handler.ServeHTTP(s.resp, s.req)
167 // listen3 => old timestamp, old content => report discrepancy
168 s.resp = httptest.NewRecorder()
169 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore,
170 "http://localhost"+listen1+"/",
171 "http://localhost"+listen3+"/")
172 s.handler.ServeHTTP(s.resp, s.req)
173 resp = s.checkUnhealthy(c)
174 if c.Check(len(resp.Errors) > 0, check.Equals, true) {
175 c.Check(resp.Errors[0], check.Matches, `outdated config: \Qkeepstore+http://localhost`+listen3+`\E: config file \(sha256 .*\) does not match latest version with timestamp .*`)
178 // no services report config time (migrating to current version) => no problem
179 s.resp = httptest.NewRecorder()
180 s.setAllServiceURLs(listen4)
181 s.handler.ServeHTTP(s.resp, s.req)
185 func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
186 s.handler.timeout = arvados.Duration(100 * time.Millisecond)
187 srv, listen := s.stubServer(&slowHandler{})
189 arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
190 s.handler.ServeHTTP(s.resp, s.req)
191 resp := s.checkUnhealthy(c)
192 ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
193 c.Check(ep.Health, check.Equals, "ERROR")
194 c.Check(ep.HTTPStatusCode, check.Equals, 0)
195 rt, err := ep.ResponseTime.Float64()
196 c.Check(err, check.IsNil)
197 c.Check(rt > 0.005, check.Equals, true)
200 func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
201 srv, listen := s.stubServer(&healthyHandler{})
203 s.setAllServiceURLs(listen)
205 confdata, err := yaml.Marshal(arvados.Config{Clusters: map[string]arvados.Cluster{s.handler.Cluster.ClusterID: *s.handler.Cluster}})
206 c.Assert(err, check.IsNil)
207 confdata = regexp.MustCompile(`Source(Timestamp|SHA256): [^\n]+\n`).ReplaceAll(confdata, []byte{})
208 err = ioutil.WriteFile(tmpdir+"/config.yml", confdata, 0777)
209 c.Assert(err, check.IsNil)
210 var stdout, stderr bytes.Buffer
211 exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
212 c.Check(exitcode, check.Equals, 0)
213 c.Check(stderr.String(), check.Equals, "")
214 c.Check(stdout.String(), check.Matches, `(?ms).*(\n|^)health: OK\n.*`)
217 func (s *AggregatorSuite) checkError(c *check.C) {
218 c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
219 var resp ClusterHealthResponse
220 err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
221 c.Check(err, check.IsNil)
222 c.Check(resp.Health, check.Not(check.Equals), "OK")
225 func (s *AggregatorSuite) checkUnhealthy(c *check.C) ClusterHealthResponse {
226 return s.checkResult(c, "ERROR")
229 func (s *AggregatorSuite) checkOK(c *check.C) ClusterHealthResponse {
230 return s.checkResult(c, "OK")
233 func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
234 c.Check(s.resp.Code, check.Equals, http.StatusOK)
235 var resp ClusterHealthResponse
236 c.Log(s.resp.Body.String())
237 err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
238 c.Check(err, check.IsNil)
239 c.Check(resp.Health, check.Equals, health)
243 func (s *AggregatorSuite) setAllServiceURLs(listen string) {
244 svcs := &s.handler.Cluster.Services
245 for _, svc := range []*arvados.Service{
259 arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
263 type unhealthyHandler struct{}
265 func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
266 if req.URL.Path == "/_health/ping" {
267 resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
269 http.Error(resp, "not found", http.StatusNotFound)
273 type healthyHandler struct {
278 func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
279 authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
280 if req.URL.Path == "/_health/ping" {
282 http.Error(resp, "unauthorized", http.StatusUnauthorized)
285 resp.Write([]byte(`{"health":"OK"}`))
286 } else if req.URL.Path == "/metrics" {
288 http.Error(resp, "unauthorized", http.StatusUnauthorized)
295 fmt.Fprintf(resp, `# HELP arvados_config_load_timestamp_seconds Time when config file was loaded.
296 # TYPE arvados_config_load_timestamp_seconds gauge
297 arvados_config_load_timestamp_seconds{sha256="%s"} %g
298 # HELP arvados_config_source_timestamp_seconds Timestamp of config file when it was loaded.
299 # TYPE arvados_config_source_timestamp_seconds gauge
300 arvados_config_source_timestamp_seconds{sha256="%s"} %g
302 h.configHash, float64(time.Now().UnixNano())/1e9,
303 h.configHash, float64(t.UnixNano())/1e9)
305 http.Error(resp, "not found", http.StatusNotFound)
309 type slowHandler struct{}
311 func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
312 if req.URL.Path == "/_health/ping" {
313 time.Sleep(3 * time.Second)
314 resp.Write([]byte(`{"health":"OK"}`))
316 http.Error(resp, "not found", http.StatusNotFound)