+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
+ "bytes"
"crypto/md5"
+ "errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io/ioutil"
- "log"
+ "math/rand"
"net/http"
+ "net/http/httptest"
"os"
"strings"
+ "sync"
"testing"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
. "gopkg.in/check.v1"
)
const (
ms = 5
)
- for i := 0; listener == nil && i < 1000; i += ms {
+ for i := 0; listener == nil && i < 10000; i += ms {
time.Sleep(ms * time.Millisecond)
}
if listener == nil {
- log.Fatalf("Timed out waiting for listener to start")
+ panic("Timed out waiting for listener to start")
}
}
if bogusClientToken {
arv.ApiToken = "bogus-token"
}
- kc := keepclient.New(&arv)
+ kc := keepclient.New(arv)
sr := map[string]string{
TestProxyUUID: "http://" + listener.Addr().String(),
}
return kc
}
+func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
+ runProxy(c, nil, false)
+ defer closeListener()
+
+ req, err := http.NewRequest("POST",
+ "http://"+listener.Addr().String()+"/",
+ strings.NewReader("TestViaHeader"))
+ req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+ resp, err := (&http.Client{}).Do(req)
+ c.Assert(err, Equals, nil)
+ c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+ locator, err := ioutil.ReadAll(resp.Body)
+ c.Assert(err, Equals, nil)
+ resp.Body.Close()
+
+ req, err = http.NewRequest("GET",
+ "http://"+listener.Addr().String()+"/"+string(locator),
+ nil)
+ c.Assert(err, Equals, nil)
+ resp, err = (&http.Client{}).Do(req)
+ c.Assert(err, Equals, nil)
+ c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+ resp.Body.Close()
+}
+
+func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+
+ sr := map[string]string{
+ TestProxyUUID: "http://" + listener.Addr().String(),
+ }
+ router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+
+ content := []byte("TestLoopDetection")
+ _, _, err := kc.PutB(content)
+ c.Check(err, ErrorMatches, `.*loop detected.*`)
+
+ hash := fmt.Sprintf("%x", md5.Sum(content))
+ _, _, _, err = kc.Get(hash)
+ c.Check(err, ErrorMatches, `.*loop detected.*`)
+}
+
+func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+
+ content := []byte("TestDesiredReplicas")
+ hash := fmt.Sprintf("%x", md5.Sum(content))
+
+ for _, kc.Want_replicas = range []int{0, 1, 2} {
+ locator, rep, err := kc.PutB(content)
+ c.Check(err, Equals, nil)
+ c.Check(rep, Equals, kc.Want_replicas)
+ if rep > 0 {
+ c.Check(locator, Matches, fmt.Sprintf(`^%s\+%d(\+.+)?$`, hash, len(content)))
+ }
+ }
+}
+
+func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+
+ content := []byte("TestPutWrongContentLength")
+ hash := fmt.Sprintf("%x", md5.Sum(content))
+
+ // If we use http.Client to send these requests to the network
+ // server we just started, the Go http library automatically
+ // fixes the invalid Content-Length header. In order to test
+ // our server behavior, we have to call the handler directly
+ // using an httptest.ResponseRecorder.
+ rtr := MakeRESTRouter(true, true, kc, 10*time.Second, "")
+
+ type testcase struct {
+ sendLength string
+ expectStatus int
+ }
+
+ for _, t := range []testcase{
+ {"1", http.StatusBadRequest},
+ {"", http.StatusLengthRequired},
+ {"-1", http.StatusLengthRequired},
+ {"abcdef", http.StatusLengthRequired},
+ } {
+ req, err := http.NewRequest("PUT",
+ fmt.Sprintf("http://%s/%s+%d", listener.Addr().String(), hash, len(content)),
+ bytes.NewReader(content))
+ c.Assert(err, IsNil)
+ req.Header.Set("Content-Length", t.sendLength)
+ req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+ req.Header.Set("Content-Type", "application/octet-stream")
+
+ resp := httptest.NewRecorder()
+ rtr.ServeHTTP(resp, req)
+ c.Check(resp.Code, Equals, t.expectStatus)
+ }
+}
+
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+ router.(*proxyHandler).timeout = time.Nanosecond
+
+ buf := make([]byte, 1<<20)
+ rand.Read(buf)
+ var wg sync.WaitGroup
+ for i := 0; i < 128; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ kc.PutB(buf)
+ }()
+ }
+ done := make(chan bool)
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(10 * time.Second):
+ c.Error("timeout")
+ }
+}
+
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
kc := runProxy(c, nil, false)
defer closeListener()
{
_, _, err := kc.Ask(hash)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Ask (expected BlockNotFound)")
+ c.Log("Finished Ask (expected BlockNotFound)")
}
{
reader, _, _, err := kc.Get(hash)
c.Check(reader, Equals, nil)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Get (expected BlockNotFound)")
+ c.Log("Finished Get (expected BlockNotFound)")
}
// Note in bug #5309 among other errors keepproxy would set
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB (expected success)")
+ c.Log("Finished PutB (expected success)")
}
{
blocklen, _, err := kc.Ask(hash2)
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Ask (expected success)")
+ c.Log("Finished Ask (expected success)")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Get (expected success)")
+ c.Log("Finished Get (expected success)")
}
{
c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB zero block")
+ c.Log("Finished PutB zero block")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte(""))
c.Check(blocklen, Equals, int64(0))
- log.Print("Finished Get zero block")
+ c.Log("Finished Get zero block")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
hash2, rep, err := kc.PutB([]byte("bar"))
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
- c.Check(err, Equals, keepclient.InsufficientReplicasError)
- log.Print("PutB")
+ c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
hash2, rep, err := kc.PutB([]byte("quux"))
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
- c.Check(err, Equals, keepclient.InsufficientReplicasError)
+ c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
}
func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
req, err := http.NewRequest("POST",
"http://"+listener.Addr().String()+"/",
strings.NewReader("qux"))
- req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+ req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
req.Header.Add("Content-Type", "application/octet-stream")
resp, err := client.Do(req)
c.Check(err, Equals, nil)
c.Assert(err, Equals, nil)
// keepclient with no such keep server
- kc := keepclient.New(&arv)
+ kc := keepclient.New(arv)
locals := map[string]string{
TestProxyUUID: "http://localhost:12345",
}
c.Check(err, ErrorMatches, `.*HTTP 502.*`)
}
}
+
+func (s *ServerRequiredSuite) TestPing(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+
+ rtr := MakeRESTRouter(true, true, kc, 10*time.Second, arvadostest.ManagementToken)
+
+ req, err := http.NewRequest("GET",
+ "http://"+listener.Addr().String()+"/_health/ping",
+ nil)
+ c.Assert(err, IsNil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+
+ resp := httptest.NewRecorder()
+ rtr.ServeHTTP(resp, req)
+ c.Check(resp.Code, Equals, 200)
+ c.Assert(strings.Contains(resp.Body.String(), `{"health":"OK"}`), Equals, true)
+}