"net/url"
"os"
"strings"
+ "sync"
+ "sync/atomic"
"testing"
"time"
c.Check(stderr.String(), check.Matches, `(?ms).*"msg":"CheckHealth called".*`)
}
-func (s *Suite) TestDumpRequestsKeepweb(c *check.C) {
- s.testDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
+func (s *Suite) TestTunnelPathRegexp(c *check.C) {
+ c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
+ c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/gateway_tunnel`), check.Equals, true)
+ c.Check(reTunnelPath.MatchString(`/arvados/v1/connect/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
+ c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, true)
+ c.Check(reTunnelPath.MatchString(`/blah/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa/ssh`), check.Equals, false)
+ c.Check(reTunnelPath.MatchString(`/arvados/v1/containers/zzzzz-dz642-aaaaaaaaaaaaaaa`), check.Equals, false)
}
-func (s *Suite) TestDumpRequestsController(c *check.C) {
- s.testDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
+func (s *Suite) TestRequestLimitsAndDumpRequests_Keepweb(c *check.C) {
+ s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameKeepweb, "MaxConcurrentRequests")
}
-func (*Suite) testDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
+func (s *Suite) TestRequestLimitsAndDumpRequests_Controller(c *check.C) {
+ s.testRequestLimitAndDumpRequests(c, arvados.ServiceNameController, "MaxConcurrentRailsRequests")
+}
+
+func (*Suite) testRequestLimitAndDumpRequests(c *check.C, serviceName arvados.ServiceName, maxReqsConfigKey string) {
defer func(orig time.Duration) { requestQueueDumpCheckInterval = orig }(requestQueueDumpCheckInterval)
requestQueueDumpCheckInterval = time.Second / 10
defer cf.Close()
max := 24
+ maxTunnels := 30
fmt.Fprintf(cf, `
Clusters:
zzzzz:
ManagementToken: bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb
API:
`+maxReqsConfigKey+`: %d
- MaxQueuedRequests: 0
+ MaxQueuedRequests: 1
+ MaxGatewayTunnels: %d
SystemLogs: {RequestQueueDumpDirectory: %q}
Services:
Controller:
WebDAV:
ExternalURL: "http://localhost:`+port+`"
InternalURLs: {"http://localhost:`+port+`": {}}
-`, max, tmpdir)
+`, max, maxTunnels, tmpdir)
cf.Close()
started := make(chan bool, max+1)
hold := make(chan bool)
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- started <- true
- <-hold
+ if strings.Contains(r.URL.Path, "/ssh") || strings.Contains(r.URL.Path, "/gateway_tunnel") {
+ <-hold
+ } else {
+ started <- true
+ <-hold
+ }
})
healthCheck := make(chan bool, 1)
ctx, cancel := context.WithCancel(context.Background())
}
client := http.Client{}
deadline := time.Now().Add(time.Second * 2)
+ var activeReqs sync.WaitGroup
+
+ // Start some API reqs
+ var apiResp200, apiResp503 int64
for i := 0; i < max+1; i++ {
+ activeReqs.Add(1)
+ go func() {
+ defer activeReqs.Done()
+ target := "http://localhost:" + port + "/testpath"
+ resp, err := client.Get(target)
+ for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
+ time.Sleep(time.Second / 100)
+ resp, err = client.Get(target)
+ }
+ if c.Check(err, check.IsNil) {
+ if resp.StatusCode == http.StatusOK {
+ atomic.AddInt64(&apiResp200, 1)
+ } else if resp.StatusCode == http.StatusServiceUnavailable {
+ atomic.AddInt64(&apiResp503, 1)
+ }
+ }
+ }()
+ }
+
+ // Start some gateway tunnel reqs that don't count toward our
+ // API req limit
+ extraTunnelReqs := 20
+ var tunnelResp200, tunnelResp503 int64
+ var paths = []string{
+ "/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+ "/" + strings.Replace(arvados.EndpointContainerSSHCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+ "/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+ "/" + strings.Replace(arvados.EndpointContainerGatewayTunnelCompat.Path, "{uuid}", "z1234-dz642-abcdeabcdeabcde", -1),
+ }
+ for i := 0; i < maxTunnels+extraTunnelReqs; i++ {
+ i := i
+ activeReqs.Add(1)
go func() {
- resp, err := client.Get("http://localhost:" + port + "/testpath")
+ defer activeReqs.Done()
+ target := "http://localhost:" + port + paths[i%len(paths)]
+ resp, err := client.Post(target, "application/octet-stream", nil)
for err != nil && strings.Contains(err.Error(), "dial tcp") && deadline.After(time.Now()) {
time.Sleep(time.Second / 100)
- resp, err = client.Get("http://localhost:" + port + "/testpath")
+ resp, err = client.Post(target, "application/octet-stream", nil)
}
if c.Check(err, check.IsNil) {
- c.Logf("resp StatusCode %d", resp.StatusCode)
+ if resp.StatusCode == http.StatusOK {
+ atomic.AddInt64(&tunnelResp200, 1)
+ } else if resp.StatusCode == http.StatusServiceUnavailable {
+ atomic.AddInt64(&tunnelResp503, 1)
+ } else {
+ c.Errorf("tunnel response code %d", resp.StatusCode)
+ }
}
}()
}
case <-started:
case <-time.After(time.Second):
c.Logf("%s", stderr.String())
+ c.Logf("apiResp200 %d", apiResp200)
+ c.Logf("apiResp503 %d", apiResp503)
+ c.Logf("tunnelResp200 %d", tunnelResp200)
+ c.Logf("tunnelResp503 %d", tunnelResp503)
c.Fatal("timed out")
}
}
var loaded []struct{ URL string }
err = json.Unmarshal(j, &loaded)
c.Check(err, check.IsNil)
+
+ for i := 0; i < len(loaded); i++ {
+ if strings.Contains(loaded[i].URL, "/ssh") || strings.Contains(loaded[i].URL, "/gateway_tunnel") {
+ // Filter out a gateway tunnel req
+ // that doesn't count toward our API
+ // req limit
+ if i < len(loaded)-1 {
+ copy(loaded[i:], loaded[i+1:])
+ i--
+ }
+ loaded = loaded[:len(loaded)-1]
+ }
+ }
+
if len(loaded) < max {
// Dumped when #requests was >90% but <100% of
// limit. If we stop now, we won't be able to
c.Logf("loaded dumped requests, but len %d < max %d -- still waiting", len(loaded), max)
continue
}
- c.Check(loaded, check.HasLen, max)
+ c.Check(loaded, check.HasLen, max+1)
c.Check(loaded[0].URL, check.Equals, "/testpath")
break
}
c.Check(err, check.IsNil)
switch path {
case "/metrics":
- c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests `+fmt.Sprintf("%d", max)+`\n.*`)
+ c.Check(string(buf), check.Matches, `(?ms).*arvados_concurrent_requests{queue="api"} `+fmt.Sprintf("%d", max)+`\n.*`)
+ c.Check(string(buf), check.Matches, `(?ms).*arvados_queued_requests{priority="normal",queue="api"} 1\n.*`)
case "/_inspect/requests":
c.Check(string(buf), check.Matches, `(?ms).*"URL":"/testpath".*`)
default:
}
}
close(hold)
+ activeReqs.Wait()
+ c.Check(int(apiResp200), check.Equals, max+1)
+ c.Check(int(apiResp503), check.Equals, 0)
+ c.Check(int(tunnelResp200), check.Equals, maxTunnels)
+ c.Check(int(tunnelResp503), check.Equals, extraTunnelReqs)
cancel()
}