1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 // package service provides a cmd.Handler that brings up a system service.
25 "git.arvados.org/arvados.git/sdk/go/arvados"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "github.com/prometheus/client_golang/prometheus"
28 check "gopkg.in/check.v1"
31 func Test(t *testing.T) {
35 var _ = check.Suite(&Suite{})
44 func unusedPort(c *check.C) string {
45 // Find an available port on the testing host, so the test
46 // cases don't get confused by "already in use" errors.
47 listener, err := net.Listen("tcp", ":")
48 c.Assert(err, check.IsNil)
50 _, port, err := net.SplitHostPort(listener.Addr().String())
51 c.Assert(err, check.IsNil)
55 func (*Suite) TestGetListenAddress(c *check.C) {
57 defer os.Unsetenv("ARVADOS_SERVICE_INTERNAL_URL")
58 for idx, trial := range []struct {
59 // internalURL => listenURL, both with trailing "/"
60 // because config loader always adds it
61 internalURLs map[string]string
63 expectErrorMatch string
64 expectLogsMatch string
69 internalURLs: map[string]string{"http://localhost:" + port + "/": ""},
70 expectListen: "http://localhost:" + port + "/",
71 expectInternal: "http://localhost:" + port + "/",
73 { // implicit port 80 in InternalURLs
74 internalURLs: map[string]string{"http://localhost/": ""},
75 expectErrorMatch: `.*:80: bind: permission denied`,
77 { // implicit port 443 in InternalURLs
78 internalURLs: map[string]string{"https://host.example/": "http://localhost:" + port + "/"},
79 expectListen: "http://localhost:" + port + "/",
80 expectInternal: "https://host.example/",
82 { // implicit port 443 in ListenURL
83 internalURLs: map[string]string{"wss://host.example/": "wss://localhost/"},
84 expectErrorMatch: `.*:443: bind: permission denied`,
87 internalURLs: map[string]string{"https://hostname.example/": "http://localhost:8000/"},
88 expectListen: "http://localhost:8000/",
89 expectInternal: "https://hostname.example/",
92 internalURLs: map[string]string{
93 "https://hostname1.example/": "http://localhost:12435/",
94 "https://hostname2.example/": "http://localhost:" + port + "/",
96 envVar: "https://hostname2.example", // note this works despite missing trailing "/"
97 expectListen: "http://localhost:" + port + "/",
98 expectInternal: "https://hostname2.example/",
100 { // cannot listen on any of the ListenURLs
101 internalURLs: map[string]string{
102 "https://hostname1.example/": "http://1.2.3.4:" + port + "/",
103 "https://hostname2.example/": "http://1.2.3.4:" + port + "/",
105 expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
107 { // cannot listen on any of the (implied) ListenURLs
108 internalURLs: map[string]string{
109 "https://1.2.3.4/": "",
110 "https://1.2.3.5/": "",
112 expectErrorMatch: "configuration does not enable the \"arvados-controller\" service on this host",
114 { // impossible port number
115 internalURLs: map[string]string{
116 "https://host.example/": "http://0.0.0.0:1234567",
118 expectErrorMatch: `.*:1234567: listen tcp: address 1234567: invalid port`,
121 // env var URL not mentioned in config = obey env var, with warning
122 internalURLs: map[string]string{"https://hostname1.example/": "http://localhost:8000/"},
123 envVar: "https://hostname2.example",
124 expectListen: "https://hostname2.example/",
125 expectInternal: "https://hostname2.example/",
126 expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname2.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
129 // env var + empty config = obey env var, with warning
130 envVar: "https://hostname.example",
131 expectListen: "https://hostname.example/",
132 expectInternal: "https://hostname.example/",
133 expectLogsMatch: `.*\Qpossible configuration error: listening on https://hostname.example/ (from $ARVADOS_SERVICE_INTERNAL_URL) even though configuration does not have a matching InternalURLs entry\E.*\n`,
136 c.Logf("trial %d %+v", idx, trial)
137 os.Setenv("ARVADOS_SERVICE_INTERNAL_URL", trial.envVar)
138 var logbuf bytes.Buffer
139 log := ctxlog.New(&logbuf, "text", "info")
140 services := arvados.Services{Controller: arvados.Service{InternalURLs: map[arvados.URL]arvados.ServiceInstance{}}}
141 for k, v := range trial.internalURLs {
142 u, err := url.Parse(k)
143 c.Assert(err, check.IsNil)
144 si := arvados.ServiceInstance{}
146 u, err := url.Parse(v)
147 c.Assert(err, check.IsNil)
148 si.ListenURL = arvados.URL(*u)
150 services.Controller.InternalURLs[arvados.URL(*u)] = si
152 listenURL, internalURL, err := getListenAddr(services, "arvados-controller", log)
153 if trial.expectLogsMatch != "" {
154 c.Check(logbuf.String(), check.Matches, trial.expectLogsMatch)
156 if trial.expectErrorMatch != "" {
157 c.Check(err, check.ErrorMatches, trial.expectErrorMatch)
160 if !c.Check(err, check.IsNil) {
163 c.Check(listenURL.String(), check.Equals, trial.expectListen)
164 c.Check(internalURL.String(), check.Equals, trial.expectInternal)
168 func (*Suite) TestCommand(c *check.C) {
169 cf, err := ioutil.TempFile("", "cmd_test.")
170 c.Assert(err, check.IsNil)
171 defer os.Remove(cf.Name())
173 fmt.Fprintf(cf, "Clusters:\n zzzzz:\n SystemRootToken: abcde\n NodeProfiles: {\"*\": {\"arvados-controller\": {Listen: \":1234\"}}}")
175 healthCheck := make(chan bool, 1)
176 ctx, cancel := context.WithCancel(context.Background())
179 cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
180 c.Check(ctx.Value(contextKey), check.Equals, "bar")
181 c.Check(token, check.Equals, "abcde")
182 return &testHandler{ctx: ctx, healthCheck: healthCheck}
184 cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
186 done := make(chan bool)
187 var stdin, stdout, stderr bytes.Buffer
190 cmd.RunCommand("arvados-controller", []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
196 c.Error("command exited without health check")
199 c.Check(stdout.String(), check.Equals, "")
200 c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
203 func (s *Suite) TestTunnelPathRegexp(c *check.C) {
204 c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
205 c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
206 c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
207 c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
208 c.Check(reTunnelPath.MatchString(`/blah/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, false)
209 c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa`), check.Equals, false)
212 func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) {
213 s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
216 func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) {
217 s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
220 func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
221 defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
222 requestQueueDumpCheckInterval = time.Second / 10
224 port := unusedPort(c)
226 cf, err := ioutil.TempFile(tmpdir, "cmd_test.")
227 c.Assert(err, check.IsNil)
228 defer os.Remove(cf.Name())
236 SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
237 ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
239 `+maxReqsConfigKey+`: %d
241 MaxGatewayTunnels: %d
242 SystemLogs: {RequestQueueDumpDirectory: %q}
245 ExternalURL: "http://localhost:`+port+`"
246 InternalURLs: {"http://localhost:`+port+`": {}}
248 ExternalURL: "http://localhost:`+port+`"
249 InternalURLs: {"http://localhost:`+port+`": {}}
250 `, max, maxTunnels, tmpdir)
253 started := make(chan bool, max+1)
254 hold := make(chan bool)
255 handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
256 if strings.Contains(r.URL.Path, "/ssh") || strings.Contains(r.URL.Path, "/gateway_tunnel") {
263 healthCheck := make(chan bool, 1)
264 ctx, cancel := context.WithCancel(context.Background())
267 cmd := Command(serviceName, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
268 return &testHandler{ctx: ctx, handler: handler, healthCheck: healthCheck}
270 cmd.(*command).ctx = context.WithValue(ctx, contextKey, "bar")
272 exited := make(chan bool)
273 var stdin, stdout, stderr bytes.Buffer
276 cmd.RunCommand(string(serviceName), []string{"-config", cf.Name()}, &stdin, &stdout, &stderr)
282 c.Logf("%s", stderr.String())
283 c.Error("command exited without health check")
285 client := http.Client{}
286 deadline := time.Now().Add(time.Second * 2)
287 var activeReqs sync.WaitGroup
289 // Start some API reqs
290 var apiResp200, apiResp503 int64
291 for i := 0; i < max+1; i++ {
294 defer activeReqs.Done()
295 target := "http://localhost:" + port + "/testpath"
296 resp, err := client.Get(target)
297 for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
298 time.Sleep(time.Second / 100)
299 resp, err = client.Get(target)
301 if c.Check(err, check.IsNil) {
302 if resp.StatusCode == http.StatusOK {
303 atomic.AddInt64(&apiResp200, 1)
304 } else if resp.StatusCode == http.StatusServiceUnavailable {
305 atomic.AddInt64(&apiResp503, 1)
311 // Start some gateway tunnel reqs that don't count toward our
313 extraTunnelReqs := 20
314 var tunnelResp200, tunnelResp503 int64
315 var paths = []string{
316 "/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
317 "/" + strings.Replace(arvados.EndpointContainerSSHCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
318 "/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
319 "/" + strings.Replace(arvados.EndpointContainerGatewayTunnelCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
321 for i := 0; i < maxTunnels+extraTunnelReqs; i++ {
325 defer activeReqs.Done()
326 target := "http://localhost:" + port + paths[i%len(paths)]
327 resp, err := client.Post(target, "application/octet-stream", nil)
328 for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
329 time.Sleep(time.Second / 100)
330 resp, err = client.Post(target, "application/octet-stream", nil)
332 if c.Check(err, check.IsNil) {
333 if resp.StatusCode == http.StatusOK {
334 atomic.AddInt64(&tunnelResp200, 1)
335 } else if resp.StatusCode == http.StatusServiceUnavailable {
336 atomic.AddInt64(&tunnelResp503, 1)
338 c.Errorf("tunnel response code %d", resp.StatusCode)
343 for i := 0; i < max; i++ {
346 case <-time.After(time.Second):
347 c.Logf("%s", stderr.String())
348 c.Logf("apiResp200 %d", apiResp200)
349 c.Logf("apiResp503 %d", apiResp503)
350 c.Logf("tunnelResp200 %d", tunnelResp200)
351 c.Logf("tunnelResp503 %d", tunnelResp503)
355 for delay := time.Second / 100; ; delay = delay * 2 {
357 j, err := os.ReadFile(tmpdir + "/" + string(serviceName) + "-requests.json")
358 if os.IsNotExist(err) && deadline.After(time.Now()) {
361 c.Assert(err, check.IsNil)
362 c.Logf("stderr:\n%s", stderr.String())
363 c.Logf("json:\n%s", string(j))
365 var loaded []struct{ URL string }
366 err = json.Unmarshal(j, &loaded)
367 c.Check(err, check.IsNil)
369 for i := 0; i < len(loaded); i++ {
370 if strings.Contains(loaded[i].URL, "/ssh") || strings.Contains(loaded[i].URL, "/gateway_tunnel") {
371 // Filter out a gateway tunnel req
372 // that doesn't count toward our API
374 if i < len(loaded)-1 {
375 copy(loaded[i:], loaded[i+1:])
378 loaded = loaded[:len(loaded)-1]
382 if len(loaded) < max {
383 // Dumped when #requests was >90% but <100% of
384 // limit. If we stop now, we won't be able to
385 // confirm (below) that management endpoints
386 // are still accessible when normal requests
388 c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
391 c.Check(loaded, check.HasLen, max+1)
392 c.Check(loaded[0].URL, check.Equals, "/testpath")
396 for _, path := range []string{"/_inspect/requests", "/metrics"} {
397 req, err := http.NewRequest("GET", "http://localhost:"+port+""+path, nil)
398 c.Assert(err, check.IsNil)
399 req.Header.Set("Authorization", "Bearer bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
400 resp, err := client.Do(req)
401 if !c.Check(err, check.IsNil) {
404 c.Logf("got response for %s", path)
405 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
406 buf, err := ioutil.ReadAll(resp.Body)
407 c.Check(err, check.IsNil)
410 c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`)
411 c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`)
412 case "/_inspect/requests":
413 c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
415 c.Error("oops, testing bug")
420 c.Check(int(apiResp200), check.Equals, max+1)
421 c.Check(int(apiResp503), check.Equals, 0)
422 c.Check(int(tunnelResp200), check.Equals, maxTunnels)
423 c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs)
427 func (*Suite) TestTLS(c *check.C) {
428 port := unusedPort(c)
429 cwd, err := os.Getwd()
430 c.Assert(err, check.IsNil)
432 stdin := bytes.NewBufferString(`
435 SystemRootToken: abcde
438 ExternalURL: "https://localhost:` + port + `"
439 InternalURLs: {"https://localhost:` + port + `": {}}
441 Key: file://` + cwd + `/../../services/api/tmp/self-signed.key
442 Certificate: file://` + cwd + `/../../services/api/tmp/self-signed.pem
445 called := make(chan bool)
446 cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string, reg *prometheus.Registry) Handler {
447 return &testHandler{handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
448 w.Write([]byte("ok"))
453 exited := make(chan bool)
454 var stdout, stderr bytes.Buffer
456 cmd.RunCommand("arvados-controller", []string{"-config", "-"}, stdin, &stdout, &stderr)
459 got := make(chan bool)
462 client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
463 for range time.NewTicker(time.Millisecond).C {
464 resp, err := client.Get("https://localhost:" + port)
469 body, err := ioutil.ReadAll(resp.Body)
470 c.Check(err, check.IsNil)
471 c.Logf("status %d, body %s", resp.StatusCode, string(body))
472 c.Check(resp.StatusCode, check.Equals, http.StatusOK)
479 c.Error("command exited without calling handler")
480 case <-time.After(time.Second):
486 c.Error("command exited before client received response")
487 case <-time.After(time.Second):
490 c.Log(stderr.String())
493 type testHandler struct {
496 healthCheck chan bool
499 func (th *testHandler) Done() <-chan struct{} { return nil }
500 func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { th.handler.ServeHTTP(w, r) }
501 func (th *testHandler) CheckHealth() error {
502 ctxlog.FromContext(th.ctx).Info("CheckHealth called")
504 case th.healthCheck <- true: