import (
"bytes"
"crypto/md5"
- "errors"
"fmt"
"io/ioutil"
"math/rand"
"git.arvados.org/arvados.git/sdk/go/keepclient"
log "github.com/sirupsen/logrus"
+ "gopkg.in/check.v1"
. "gopkg.in/check.v1"
)
// Tests that require the Keep server running
type ServerRequiredSuite struct{}
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredConfigYmlSuite{})
+
+// Tests that require the Keep servers running as defined in config.yml
+type ServerRequiredConfigYmlSuite struct{}
+
// Gocheck boilerplate
var _ = Suite(&NoKeepServerSuite{})
}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
- arvadostest.StartAPI()
arvadostest.StartKeep(2, false)
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
arvadostest.StopKeep(2)
- arvadostest.StopAPI()
+}
+
+func (s *ServerRequiredConfigYmlSuite) SetUpSuite(c *C) {
+ // config.yml defines 4 keepstores
+ arvadostest.StartKeep(4, false)
+}
+
+func (s *ServerRequiredConfigYmlSuite) SetUpTest(c *C) {
+ arvadostest.ResetEnv()
+}
+
+func (s *ServerRequiredConfigYmlSuite) TearDownSuite(c *C) {
+ arvadostest.StopKeep(4)
}
func (s *NoKeepServerSuite) SetUpSuite(c *C) {
- arvadostest.StartAPI()
// We need API to have some keep services listed, but the
// services themselves should be unresponsive.
arvadostest.StartKeep(2, false)
arvadostest.ResetEnv()
}
-func (s *NoKeepServerSuite) TearDownSuite(c *C) {
- arvadostest.StopAPI()
-}
-
-func runProxy(c *C, bogusClientToken bool) *keepclient.KeepClient {
+func runProxy(c *C, bogusClientToken bool, loadKeepstoresFromConfig bool, kp *arvados.UploadDownloadRolePermissions) (*keepclient.KeepClient, *bytes.Buffer) {
cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
c.Assert(err, Equals, nil)
cluster, err := cfg.GetCluster("")
c.Assert(err, Equals, nil)
- cluster.Services.Keepproxy.InternalURLs = map[arvados.URL]arvados.ServiceInstance{arvados.URL{Host: ":0"}: arvados.ServiceInstance{}}
+ if !loadKeepstoresFromConfig {
+ // Do not load Keepstore InternalURLs from the config file
+ cluster.Services.Keepstore.InternalURLs = make(map[arvados.URL]arvados.ServiceInstance)
+ }
+
+ cluster.Services.Keepproxy.InternalURLs = map[arvados.URL]arvados.ServiceInstance{{Host: ":0"}: {}}
+
+ if kp != nil {
+ cluster.Collections.KeepproxyPermission = *kp
+ }
listener = nil
+ logbuf := &bytes.Buffer{}
+ logger := log.New()
+ logger.Out = logbuf
go func() {
- run(log.New(), cluster)
+ run(logger, cluster)
defer closeListener()
}()
waitForListener()
kc.SetServiceRoots(sr, sr, sr)
kc.Arvados.External = true
- return kc
+ return kc, logbuf
}
func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
- runProxy(c, false)
+ runProxy(c, false, false, nil)
defer closeListener()
req, err := http.NewRequest("POST",
resp, err := (&http.Client{}).Do(req)
c.Assert(err, Equals, nil)
c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+ c.Assert(resp.StatusCode, Equals, http.StatusOK)
locator, err := ioutil.ReadAll(resp.Body)
c.Assert(err, Equals, nil)
resp.Body.Close()
}
func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
sr := map[string]string{
}
func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
// Set up fake keepstore to record request headers
c.Check(hdr.Get("X-Keep-Storage-Classes"), Equals, "secure")
}
+func (s *ServerRequiredSuite) TestStorageClassesConfirmedHeader(c *C) {
+ runProxy(c, false, false, nil)
+ defer closeListener()
+
+ content := []byte("foo")
+ hash := fmt.Sprintf("%x", md5.Sum(content))
+ client := &http.Client{}
+
+ req, err := http.NewRequest("PUT",
+ fmt.Sprintf("http://%s/%s", listener.Addr().String(), hash),
+ bytes.NewReader(content))
+ c.Assert(err, IsNil)
+ req.Header.Set("X-Keep-Storage-Classes", "default")
+ req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+ req.Header.Set("Content-Type", "application/octet-stream")
+
+ resp, err := client.Do(req)
+ c.Assert(err, IsNil)
+ c.Assert(resp.StatusCode, Equals, http.StatusOK)
+ c.Assert(resp.Header.Get("X-Keep-Storage-Classes-Confirmed"), Equals, "default=2")
+}
+
func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
content := []byte("TestDesiredReplicas")
hash := fmt.Sprintf("%x", md5.Sum(content))
- for _, kc.Want_replicas = range []int{0, 1, 2} {
+ for _, kc.Want_replicas = range []int{0, 1, 2, 3} {
locator, rep, err := kc.PutB(content)
- c.Check(err, Equals, nil)
- c.Check(rep, Equals, kc.Want_replicas)
- if rep > 0 {
- c.Check(locator, Matches, fmt.Sprintf(`^%s\+%d(\+.+)?$`, hash, len(content)))
+ if kc.Want_replicas < 3 {
+ c.Check(err, Equals, nil)
+ c.Check(rep, Equals, kc.Want_replicas)
+ if rep > 0 {
+ c.Check(locator, Matches, fmt.Sprintf(`^%s\+%d(\+.+)?$`, hash, len(content)))
+ }
+ } else {
+ c.Check(err, ErrorMatches, ".*503.*")
}
}
}
func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
content := []byte("TestPutWrongContentLength")
// fixes the invalid Content-Length header. In order to test
// our server behavior, we have to call the handler directly
// using an httptest.ResponseRecorder.
- rtr := MakeRESTRouter(kc, 10*time.Second, "")
+ rtr, err := MakeRESTRouter(kc, 10*time.Second, &arvados.Cluster{}, log.New())
+ c.Assert(err, check.IsNil)
type testcase struct {
sendLength string
}
func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
router.(*proxyHandler).timeout = time.Nanosecond
}
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
- kc := runProxy(c, false)
+ kc, logbuf := runProxy(c, false, false, nil)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
c.Log("Finished PutB (expected success)")
+
+ c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ logbuf.Reset()
}
{
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
c.Log("Finished Ask (expected success)")
+ c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ logbuf.Reset()
}
{
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
c.Log("Finished Get (expected success)")
+ c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ logbuf.Reset()
}
{
}
func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
- kc := runProxy(c, true)
+ kc, _ := runProxy(c, true, false, nil)
defer closeListener()
hash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
hash2, rep, err := kc.PutB([]byte("bar"))
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
- c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
+ c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError{})
blocklen, _, err := kc.Ask(hash)
c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
- c.Check(err, ErrorMatches, ".*not found.*")
+ c.Check(err, ErrorMatches, ".*HTTP 403.*")
c.Check(blocklen, Equals, int64(0))
_, blocklen, _, err = kc.Get(hash)
c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
- c.Check(err, ErrorMatches, ".*not found.*")
+ c.Check(err, ErrorMatches, ".*HTTP 403.*")
c.Check(blocklen, Equals, int64(0))
+}
+
+func testPermission(c *C, admin bool, perm arvados.UploadDownloadPermission) {
+ kp := arvados.UploadDownloadRolePermissions{}
+ if admin {
+ kp.Admin = perm
+ kp.User = arvados.UploadDownloadPermission{Upload: true, Download: true}
+ } else {
+ kp.Admin = arvados.UploadDownloadPermission{Upload: true, Download: true}
+ kp.User = perm
+ }
+
+ kc, logbuf := runProxy(c, false, false, &kp)
+ defer closeListener()
+ if admin {
+ kc.Arvados.ApiToken = arvadostest.AdminToken
+ } else {
+ kc.Arvados.ApiToken = arvadostest.ActiveToken
+ }
+
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ var hash2 string
+
+ {
+ var rep int
+ var err error
+ hash2, rep, err = kc.PutB([]byte("foo"))
+
+ if perm.Upload {
+ c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
+ c.Check(rep, Equals, 2)
+ c.Check(err, Equals, nil)
+ c.Log("Finished PutB (expected success)")
+ if admin {
+ c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ } else {
+
+ c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="Active User" user_uuid=zzzzz-tpzed-xurymjxw79nv3jz.*`)
+ }
+ } else {
+ c.Check(hash2, Equals, "")
+ c.Check(rep, Equals, 0)
+ c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError{})
+ }
+ logbuf.Reset()
+ }
+ if perm.Upload {
+ // can't test download without upload.
+
+ reader, blocklen, _, err := kc.Get(hash2)
+ if perm.Download {
+ c.Assert(err, Equals, nil)
+ all, err := ioutil.ReadAll(reader)
+ c.Check(err, IsNil)
+ c.Check(all, DeepEquals, []byte("foo"))
+ c.Check(blocklen, Equals, int64(3))
+ c.Log("Finished Get (expected success)")
+ if admin {
+ c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ } else {
+ c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="Active User" user_uuid=zzzzz-tpzed-xurymjxw79nv3jz.*`)
+ }
+ } else {
+ c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
+ c.Check(err, ErrorMatches, ".*Missing or invalid Authorization header, or method not allowed.*")
+ c.Check(blocklen, Equals, int64(0))
+ }
+ logbuf.Reset()
+ }
}
+func (s *ServerRequiredSuite) TestPutGetPermission(c *C) {
+
+ for _, adminperm := range []bool{true, false} {
+ for _, userperm := range []bool{true, false} {
+
+ testPermission(c, true,
+ arvados.UploadDownloadPermission{
+ Upload: adminperm,
+ Download: true,
+ })
+ testPermission(c, true,
+ arvados.UploadDownloadPermission{
+ Upload: true,
+ Download: adminperm,
+ })
+ testPermission(c, false,
+ arvados.UploadDownloadPermission{
+ Upload: true,
+ Download: userperm,
+ })
+ testPermission(c, false,
+ arvados.UploadDownloadPermission{
+ Upload: true,
+ Download: userperm,
+ })
+ }
+ }
+}
+
func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
- runProxy(c, false)
+ runProxy(c, false, false, nil)
defer closeListener()
{
}
func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
- runProxy(c, false)
+ runProxy(c, false, false, nil)
defer closeListener()
{
// With a valid but non-existing prefix (expect "\n")
// With an invalid prefix (expect error)
func (s *ServerRequiredSuite) TestGetIndex(c *C) {
- kc := runProxy(c, false)
+ getIndexWorker(c, false)
+}
+
+// Test GetIndex
+// Uses config.yml
+// Put one block, with 2 replicas
+// With no prefix (expect the block locator, twice)
+// With an existing prefix (expect the block locator, twice)
+// With a valid but non-existing prefix (expect "\n")
+// With an invalid prefix (expect error)
+func (s *ServerRequiredConfigYmlSuite) TestGetIndex(c *C) {
+ getIndexWorker(c, true)
+}
+
+func getIndexWorker(c *C, useConfig bool) {
+ kc, _ := runProxy(c, false, useConfig, nil)
defer closeListener()
// Put "index-data" blocks
}
func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
hash, _, err := kc.PutB([]byte("shareddata"))
c.Check(err, IsNil)
}
func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
// Put a test block
_, _, _, err = kc.Get(hash)
c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
- c.Check(err, ErrorMatches, ".*HTTP 403 \"Missing or invalid Authorization header\".*")
+ c.Check(err, ErrorMatches, ".*HTTP 403 \"Missing or invalid Authorization header, or method not allowed\".*")
}
_, _, err = kc.PutB([]byte("foo"))
- c.Check(err, ErrorMatches, ".*403.*Missing or invalid Authorization header")
+ c.Check(err, ErrorMatches, ".*403.*Missing or invalid Authorization header, or method not allowed")
}
}
func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
// Point keepproxy at a non-existent keepstore
}
func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
}
func (s *ServerRequiredSuite) TestPing(c *C) {
- kc := runProxy(c, false)
+ kc, _ := runProxy(c, false, false, nil)
defer closeListener()
- rtr := MakeRESTRouter(kc, 10*time.Second, arvadostest.ManagementToken)
+ rtr, err := MakeRESTRouter(kc, 10*time.Second, &arvados.Cluster{ManagementToken: arvadostest.ManagementToken}, log.New())
+ c.Assert(err, check.IsNil)
req, err := http.NewRequest("GET",
"http://"+listener.Addr().String()+"/_health/ping",