1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 // package service provides a cmd.Handler that brings up a system service.
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "github.com/prometheus/client_golang/prometheus"
26 check "gopkg.in/check.v1"
29 func Test(t *testing.T) {
33 var _ = check.Suite(&Suite{})
42 func unusedPort(c *check.C) string {
43 // Find an available port on the testing host, so the test
44 // cases don't get confused by "already in use" errors.
45 listener, err := net.Listen("tcp", ":")
46 c.Assert(err, check.IsNil)
48 _, port, err := net.SplitHostPort(listener.Addr().String())
49 c.Assert(err, check.IsNil)
53 func (*Suite) TestGetListenAddress(c *check.C) {
55 defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
56 for idx, trial := range []struct {
57 // internalURL => listenURL, both with trailing "/"
58 // because config loader always adds it
59 internalURLs map[string]string
61 expectErrorMatch string
62 expectLogsMatch string
67 internalURLs: map[string]string{"http://localhost:" + port + "/": ""},
68 expectListen: "http://localhost:" + port + "/",
69 expectInternal: "http://localhost:" + port + "/",
71 { // implicit port 80 in InternalURLs
72 internalURLs: map[string]string{"http://localhost/": ""},
73 expectErrorMatch: `.*:80: bind: permission denied`,
75 { // implicit port 443 in InternalURLs
76 internalURLs: map[string]string{"https://host.example/": "http://localhost:" + port + "/"},
77 expectListen: "http://localhost:" + port + "/",
78 expectInternal: "https://host.example/",
80 { // implicit port 443 in ListenURL
81 internalURLs: map[string]string{"wss://host.example/": "wss://localhost/"},
82 expectErrorMatch: `.*:443: bind: permission denied`,
85 internalURLs: map[string]string{"https://hostname.example/": "http://localhost:8000/"},
86 expectListen: "http://localhost:8000/",
87 expectInternal: "https://hostname.example/",
90 internalURLs: map[string]string{
91 "https://hostname1.example/": "http://localhost:12435/",
92 "https://hostname2.example/": "http://localhost:" + port + "/",
94 envVar: "https://hostname2.example", // note this works despite missing trailing "/"
95 expectListen: "http://localhost:" + port + "/",
96 expectInternal: "https://hostname2.example/",
98 { // cannot listen on any of the ListenURLs
99 internalURLs: map[string]string{
100 "https://hostname1.example/": "http://1.2.3.4:" + port + "/",
101 "https://hostname2.example/": "http://1.2.3.4:" + port + "/",
103 expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
105 { // cannot listen on any of the (implied) ListenURLs
106 internalURLs: map[string]string{
107 "https://1.2.3.4/": "",
108 "https://1.2.3.5/": "",
110 expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
112 { // impossible port number
113 internalURLs: map[string]string{
114 "https://host.example/": "http://0.0.0.0:1234567",
116 expectErrorMatch: `.*:1234567: listen tcp: address 1234567: invalid port`,
119 // env var URL not mentioned in config = obey env var, with warning
120 internalURLs: map[string]string{"https://hostname1.example/": "http://localhost:8000/"},
121 envVar: "https://hostname2.example",
122 expectListen: "https://hostname2.example/",
123 expectInternal: "https://hostname2.example/",
124 expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname2.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
127 // env var + empty config = obey env var, with warning
128 envVar: "https://hostname.example",
129 expectListen: "https://hostname.example/",
130 expectInternal: "https://hostname.example/",
131 expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
134 c.Logf("trial %d %+v", idx, trial)
135 os.Setenv("ARVADOS_SERVICE_INTERNAL_URL", trial.envVar)
136 var logbuf bytes.Buffer
137 log := ctxlog.New(&logbuf, "text", "info")
138 services := arvados.Services{Controller: arvados.Service{InternalURLs: map[arvados.URL]arvados.ServiceInstance{}}}
139 for k, v := range trial.internalURLs {
140 u, err := url.Parse(k)
141 c.Assert(err, check.IsNil)
142 si := arvados.ServiceInstance{}
144 u, err := url.Parse(v)
145 c.Assert(err, check.IsNil)
146 si.ListenURL = arvados.URL(*u)
148 services.Controller.InternalURLs[arvados.URL(*u)] = si
150 listenURL, internalURL, err := getListenAddr(services, "arvados-controller", log)
151 if trial.expectLogsMatch != "" {
152 c.Check(logbuf.String(), check.Matches, trial.expectLogsMatch)
154 if trial.expectErrorMatch != "" {
155 c.Check(err, check.ErrorMatches, trial.expectErrorMatch)
158 if !c.Check(err, check.IsNil) {
161 c.Check(listenURL.String(), check.Equals, trial.expectListen)
162 c.Check(internalURL.String(), check.Equals, trial.expectInternal)
166 func (*Suite) TestCommand(c *check.C) {
167 cf, err := ioutil.TempFile("", "cmd_test.")
168 c.Assert(err, check.IsNil)
169 defer os.Remove(cf.Name())
171 fmt.Fprintf(cf, "Clusters:\n zzzzz:\n SystemRootToken: abcde\n NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")
173 healthCheck := make(chan bool, 1)
174 ctx, cancel := context.WithCancel(context.Background())
177 cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
178 c.Check(ctx.Value(contextKey), check.Equals, "bar")
179 c.Check(token, check.Equals, "abcde")
180 return &testHandler{ctx: ctx, healthCheck: healthCheck}
182 cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
184 done := make(chan bool)
185 var stdin, stdout, stderr bytes.Buffer
188 cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
194 c.Error("command exited without health check")
197 c.Check(stdout.String(), check.Equals, "")
198 c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
201 func (s *Suite) TestDumpRequestsKeepweb(c *check.C) {
202 s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
205 func (s *Suite) TestDumpRequestsController(c *check.C) {
206 s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
209 func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
210 defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
211 requestQueueDumpCheckInterval = time.Second / 10
213 port := unusedPort(c)
215 cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
216 c.Assert(err, check.IsNil)
217 defer os.Remove(cf.Name())
224 SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
225 ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
227 `+maxReqsConfigKey+`: %d
229 SystemLogs: {RequestQueueDumpDirectory: %q}
232 ExternalURL: "http://localhost:`+port+`"
233 InternalURLs: {"http://localhost:`+port+`": {}}
235 ExternalURL: "http://localhost:`+port+`"
236 InternalURLs: {"http://localhost:`+port+`": {}}
240 started := make(chan bool, max+1)
241 hold := make(chan bool)
242 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
246 healthCheck := make(chan bool, 1)
247 ctx, cancel := context.WithCancel(context.Background())
250 cmd := Command(serviceName, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
251 return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
253 cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
255 exited := make(chan bool)
256 var stdin, stdout, stderr bytes.Buffer
259 cmd.RunCommand(string(serviceName), []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
265 c.Logf("%s", stderr.String())
266 c.Error("command exited without health check")
268 client := http.Client{}
269 deadline := time.Now().Add(time.Second * 2)
270 for i := 0; i < max+1; i++ {
272 resp, err := client.Get("http://localhost:" + port + "/testpath")
273 for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
274 time.Sleep(time.Second / 100)
275 resp, err = client.Get("http://localhost:" + port + "/testpath")
277 if c.Check(err, check.IsNil) {
278 c.Logf("resp StatusCode %d", resp.StatusCode)
282 for i := 0; i < max; i++ {
285 case <-time.After(time.Second):
286 c.Logf("%s", stderr.String())
290 for delay := time.Second / 100; ; delay = delay * 2 {
292 j, err := os.ReadFile(tmpdir + "/" + string(serviceName) + "-requests.json")
293 if os.IsNotExist(err) && deadline.After(time.Now()) {
296 c.Assert(err, check.IsNil)
297 c.Logf("stderr:\n%s", stderr.String())
298 c.Logf("json:\n%s", string(j))
300 var loaded []struct{ URL string }
301 err = json.Unmarshal(j, &loaded)
302 c.Check(err, check.IsNil)
303 if len(loaded) < max {
304 // Dumped when #requests was >90% but <100% of
305 // limit. If we stop now, we won't be able to
306 // confirm (below) that management endpoints
307 // are still accessible when normal requests
309 c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
312 c.Check(loaded, check.HasLen, max)
313 c.Check(loaded[0].URL, check.Equals, "/testpath")
317 for _, path := range []string{"/_inspect/requests", "/metrics"} {
318 req, err := http.NewRequest("GET", "http://localhost:"+port+""+path, nil)
319 c.Assert(err, check.IsNil)
320 req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
321 resp, err := client.Do(req)
322 if !c.Check(err, check.IsNil) {
325 c.Logf("got response for %s", path)
326 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
327 buf, err := ioutil.ReadAll(resp.Body)
328 c.Check(err, check.IsNil)
331 c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`)
332 case "/_inspect/requests":
333 c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
335 c.Error("oops, testing bug")
342 func (*Suite) TestTLS(c *check.C) {
343 port := unusedPort(c)
344 cwd, err := os.Getwd()
345 c.Assert(err, check.IsNil)
347 stdin := bytes.NewBufferString(`
350 SystemRootToken: abcde
353 ExternalURL: "https://localhost:` + port + `"
354 InternalURLs: {"https://localhost:` + port + `": {}}
356 Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
357 Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
360 called := make(chan bool)
361 cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
362 return &testHandler{handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
363 w.Write([]byte("ok"))
368 exited := make(chan bool)
369 var stdout, stderr bytes.Buffer
371 cmd.RunCommand("arvados-controller", []string{"-config", "-"}, stdin, &stdout, &stderr)
374 got := make(chan bool)
377 client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
378 for range time.NewTicker(time.Millisecond).C {
379 resp, err := client.Get("https://localhost:" + port)
384 body, err := ioutil.ReadAll(resp.Body)
385 c.Check(err, check.IsNil)
386 c.Logf("status %d, body %s", resp.StatusCode, string(body))
387 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
394 c.Error("command exited without calling handler")
395 case <-time.After(time.Second):
401 c.Error("command exited before client received response")
402 case <-time.After(time.Second):
405 c.Log(stderr.String())
408 type testHandler struct {
411 healthCheck chan bool
414 func (th *testHandler) Done() <-chan struct{} { return nil }
415 func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
416 func (th *testHandler) CheckHealth() error {
417 ctxlog.FromContext(th.ctx).Info("CheckHealth called")
419 case th.healthCheck <- true: