"errors"
"fmt"
"io/ioutil"
- "log"
"math/rand"
"net/http"
"net/http/httptest"
- "os"
"strings"
"sync"
"testing"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
+ log "github.com/sirupsen/logrus"
. "gopkg.in/check.v1"
)
time.Sleep(ms * time.Millisecond)
}
if listener == nil {
- log.Fatalf("Timed out waiting for listener to start")
+ panic("Timed out waiting for listener to start")
}
}
arvadostest.StopAPI()
}
-func runProxy(c *C, args []string, bogusClientToken bool) *keepclient.KeepClient {
- args = append([]string{"keepproxy"}, args...)
- os.Args = append(args, "-listen=:0")
+func runProxy(c *C, bogusClientToken bool) *keepclient.KeepClient {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, Equals, nil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, Equals, nil)
+
+ cluster.Services.Keepproxy.InternalURLs = map[arvados.URL]arvados.ServiceInstance{arvados.URL{Host: ":0"}: arvados.ServiceInstance{}}
+
listener = nil
- go main()
+ go func() {
+ run(log.New(), cluster)
+ defer closeListener()
+ }()
waitForListener()
- arv, err := arvadosclient.MakeArvadosClient()
+ client := arvados.NewClientFromEnv()
+ arv, err := arvadosclient.New(client)
c.Assert(err, Equals, nil)
if bogusClientToken {
arv.ApiToken = "bogus-token"
}
func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
- runProxy(c, nil, false)
+ runProxy(c, false)
defer closeListener()
req, err := http.NewRequest("POST",
"http://"+listener.Addr().String()+"/",
strings.NewReader("TestViaHeader"))
+ c.Assert(err, Equals, nil)
req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
resp, err := (&http.Client{}).Do(req)
c.Assert(err, Equals, nil)
c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+ c.Assert(resp.StatusCode, Equals, http.StatusOK)
locator, err := ioutil.ReadAll(resp.Body)
c.Assert(err, Equals, nil)
resp.Body.Close()
}
func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
sr := map[string]string{
c.Check(err, ErrorMatches, `.*loop detected.*`)
}
+func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
+ kc := runProxy(c, false)
+ defer closeListener()
+
+ // Set up fake keepstore to record request headers
+ var hdr http.Header
+ ts := httptest.NewServer(http.HandlerFunc(
+ func(w http.ResponseWriter, r *http.Request) {
+ hdr = r.Header
+ http.Error(w, "Error", http.StatusInternalServerError)
+ }))
+ defer ts.Close()
+
+ // Point keepproxy router's keepclient to the fake keepstore
+ sr := map[string]string{
+ TestProxyUUID: ts.URL,
+ }
+ router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+
+ // Set up client to ask for storage classes to keepproxy
+ kc.StorageClasses = []string{"secure"}
+ content := []byte("Very important data")
+ _, _, err := kc.PutB(content)
+ c.Check(err, NotNil)
+ c.Check(hdr.Get("X-Keep-Storage-Classes"), Equals, "secure")
+}
+
func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
content := []byte("TestDesiredReplicas")
}
func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
content := []byte("TestPutWrongContentLength")
// fixes the invalid Content-Length header. In order to test
// our server behavior, we have to call the handler directly
// using an httptest.ResponseRecorder.
- rtr := MakeRESTRouter(true, true, kc, 10*time.Second, "")
+ rtr := MakeRESTRouter(kc, 10*time.Second, "")
type testcase struct {
sendLength string
}
func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
router.(*proxyHandler).timeout = time.Nanosecond
}
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
{
_, _, err := kc.Ask(hash)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Ask (expected BlockNotFound)")
+ c.Log("Finished Ask (expected BlockNotFound)")
}
{
reader, _, _, err := kc.Get(hash)
c.Check(reader, Equals, nil)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Get (expected BlockNotFound)")
+ c.Log("Finished Get (expected BlockNotFound)")
}
// Note in bug #5309 among other errors keepproxy would set
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB (expected success)")
+ c.Log("Finished PutB (expected success)")
}
{
blocklen, _, err := kc.Ask(hash2)
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Ask (expected success)")
+ c.Log("Finished Ask (expected success)")
}
{
reader, blocklen, _, err := kc.Get(hash2)
c.Assert(err, Equals, nil)
all, err := ioutil.ReadAll(reader)
+ c.Check(err, IsNil)
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Get (expected success)")
+ c.Log("Finished Get (expected success)")
}
{
c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB zero block")
+ c.Log("Finished PutB zero block")
}
{
reader, blocklen, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e")
c.Assert(err, Equals, nil)
all, err := ioutil.ReadAll(reader)
+ c.Check(err, IsNil)
c.Check(all, DeepEquals, []byte(""))
c.Check(blocklen, Equals, int64(0))
- log.Print("Finished Get zero block")
+ c.Log("Finished Get zero block")
}
}
func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
- kc := runProxy(c, nil, true)
+ kc := runProxy(c, true)
defer closeListener()
- hash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
-
- {
- _, _, err := kc.Ask(hash)
- errNotFound, _ := err.(keepclient.ErrNotFound)
- c.Check(errNotFound, NotNil)
- c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
- log.Print("Ask 1")
- }
-
- {
- hash2, rep, err := kc.PutB([]byte("bar"))
- c.Check(hash2, Equals, "")
- c.Check(rep, Equals, 0)
- c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
- log.Print("PutB")
- }
+ hash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
- {
- blocklen, _, err := kc.Ask(hash)
- errNotFound, _ := err.(keepclient.ErrNotFound)
- c.Check(errNotFound, NotNil)
- c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
- c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
- }
+ _, _, err := kc.Ask(hash)
+ c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
- {
- _, blocklen, _, err := kc.Get(hash)
- errNotFound, _ := err.(keepclient.ErrNotFound)
- c.Check(errNotFound, NotNil)
- c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
- c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
- }
-}
-
-func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
- kc := runProxy(c, []string{"-no-get"}, false)
- defer closeListener()
-
- hash := fmt.Sprintf("%x", md5.Sum([]byte("baz")))
-
- {
- _, _, err := kc.Ask(hash)
- errNotFound, _ := err.(keepclient.ErrNotFound)
- c.Check(errNotFound, NotNil)
- c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
- log.Print("Ask 1")
- }
-
- {
- hash2, rep, err := kc.PutB([]byte("baz"))
- c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
- c.Check(rep, Equals, 2)
- c.Check(err, Equals, nil)
- log.Print("PutB")
- }
-
- {
- blocklen, _, err := kc.Ask(hash)
- errNotFound, _ := err.(keepclient.ErrNotFound)
- c.Check(errNotFound, NotNil)
- c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
- c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
- }
-
- {
- _, blocklen, _, err := kc.Get(hash)
- errNotFound, _ := err.(keepclient.ErrNotFound)
- c.Check(errNotFound, NotNil)
- c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
- c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
- }
-}
-
-func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
- kc := runProxy(c, []string{"-no-put"}, false)
- defer closeListener()
-
- hash2, rep, err := kc.PutB([]byte("quux"))
+ hash2, rep, err := kc.PutB([]byte("bar"))
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
+
+ blocklen, _, err := kc.Ask(hash)
+ c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
+ c.Check(err, ErrorMatches, ".*not found.*")
+ c.Check(blocklen, Equals, int64(0))
+
+ _, blocklen, _, err = kc.Get(hash)
+ c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
+ c.Check(err, ErrorMatches, ".*not found.*")
+ c.Check(blocklen, Equals, int64(0))
+
}
func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
- runProxy(c, nil, false)
+ runProxy(c, false)
defer closeListener()
{
req, err := http.NewRequest("OPTIONS",
fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))),
nil)
+ c.Assert(err, IsNil)
req.Header.Add("Access-Control-Request-Method", "PUT")
req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
resp, err := client.Do(req)
c.Check(err, Equals, nil)
c.Check(resp.StatusCode, Equals, 200)
body, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, IsNil)
c.Check(string(body), Equals, "")
c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, POST, PUT, OPTIONS")
c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
}
func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
- runProxy(c, nil, false)
+ runProxy(c, false)
defer closeListener()
{
req, err := http.NewRequest("POST",
"http://"+listener.Addr().String()+"/",
strings.NewReader("qux"))
+ c.Check(err, IsNil)
req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
req.Header.Add("Content-Type", "application/octet-stream")
resp, err := client.Do(req)
// With a valid but non-existing prefix (expect "\n")
// With an invalid prefix (expect error)
func (s *ServerRequiredSuite) TestGetIndex(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
// Put "index-data" blocks
c.Check(err, Equals, nil)
reader, blocklen, _, err := kc.Get(hash)
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
c.Check(blocklen, Equals, int64(10))
all, err := ioutil.ReadAll(reader)
+ c.Assert(err, IsNil)
c.Check(all, DeepEquals, data)
// Put some more blocks
- _, rep, err = kc.PutB([]byte("some-more-index-data"))
- c.Check(err, Equals, nil)
+ _, _, err = kc.PutB([]byte("some-more-index-data"))
+ c.Check(err, IsNil)
- kc.Arvados.ApiToken = arvadostest.DataManagerToken
+ kc.Arvados.ApiToken = arvadostest.SystemRootToken
// Invoke GetIndex
for _, spec := range []struct {
c.Assert((err != nil), Equals, true)
}
+func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
+ kc := runProxy(c, false)
+ defer closeListener()
+ hash, _, err := kc.PutB([]byte("shareddata"))
+ c.Check(err, IsNil)
+ kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken
+ rdr, _, _, err := kc.Get(hash)
+ c.Assert(err, IsNil)
+ data, err := ioutil.ReadAll(rdr)
+ c.Check(err, IsNil)
+ c.Check(data, DeepEquals, []byte("shareddata"))
+}
+
func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
// Put a test block
hash, rep, err := kc.PutB([]byte("foo"))
- c.Check(err, Equals, nil)
+ c.Check(err, IsNil)
c.Check(rep, Equals, 2)
- for _, token := range []string{
+ for _, badToken := range []string{
"nosuchtoken",
"2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx", // expired
} {
- // Change token to given bad token
- kc.Arvados.ApiToken = token
-
- // Ask should result in error
- _, _, err = kc.Ask(hash)
- c.Check(err, NotNil)
- errNotFound, _ := err.(keepclient.ErrNotFound)
- c.Check(errNotFound.Temporary(), Equals, false)
- c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
-
- // Get should result in error
- _, _, _, err = kc.Get(hash)
- c.Check(err, NotNil)
- errNotFound, _ = err.(keepclient.ErrNotFound)
- c.Check(errNotFound.Temporary(), Equals, false)
- c.Assert(strings.Contains(err.Error(), "HTTP 403 \"Missing or invalid Authorization header\""), Equals, true)
+ kc.Arvados.ApiToken = badToken
+
+ // Ask and Get will fail only if the upstream
+ // keepstore server checks for valid signatures.
+ // Without knowing the blob signing key, there is no
+ // way for keepproxy to know whether a given token is
+ // permitted to read a block. So these tests fail:
+ if false {
+ _, _, err = kc.Ask(hash)
+ c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
+ c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
+ c.Check(err, ErrorMatches, ".*HTTP 403.*")
+
+ _, _, _, err = kc.Get(hash)
+ c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
+ c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
+ c.Check(err, ErrorMatches, ".*HTTP 403 \"Missing or invalid Authorization header\".*")
+ }
+
+ _, _, err = kc.PutB([]byte("foo"))
+ c.Check(err, ErrorMatches, ".*403.*Missing or invalid Authorization header")
}
}
func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, Equals, nil)
+ kc := runProxy(c, false)
+ defer closeListener()
- // keepclient with no such keep server
- kc := keepclient.New(arv)
+ // Point keepproxy at a non-existent keepstore
locals := map[string]string{
TestProxyUUID: "http://localhost:12345",
}
- kc.SetServiceRoots(locals, nil, nil)
+ router.(*proxyHandler).KeepClient.SetServiceRoots(locals, nil, nil)
- // Ask should result in temporary connection refused error
+ // Ask should result in temporary bad gateway error
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- _, _, err = kc.Ask(hash)
+ _, _, err := kc.Ask(hash)
c.Check(err, NotNil)
errNotFound, _ := err.(*keepclient.ErrNotFound)
c.Check(errNotFound.Temporary(), Equals, true)
- c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+ c.Assert(err, ErrorMatches, ".*HTTP 502.*")
- // Get should result in temporary connection refused error
+ // Get should result in temporary bad gateway error
_, _, _, err = kc.Get(hash)
c.Check(err, NotNil)
errNotFound, _ = err.(*keepclient.ErrNotFound)
c.Check(errNotFound.Temporary(), Equals, true)
- c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+ c.Assert(err, ErrorMatches, ".*HTTP 502.*")
}
func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
}
func (s *ServerRequiredSuite) TestPing(c *C) {
- kc := runProxy(c, nil, false)
+ kc := runProxy(c, false)
defer closeListener()
- rtr := MakeRESTRouter(true, true, kc, 10*time.Second, arvadostest.ManagementToken)
+ rtr := MakeRESTRouter(kc, 10*time.Second, arvadostest.ManagementToken)
req, err := http.NewRequest("GET",
"http://"+listener.Addr().String()+"/_health/ping",
resp := httptest.NewRecorder()
rtr.ServeHTTP(resp, req)
c.Check(resp.Code, Equals, 200)
- c.Assert(strings.Contains(resp.Body.String(), `{"health":"OK"}`), Equals, true)
+ c.Assert(resp.Body.String(), Matches, `{"health":"OK"}\n?`)
}