//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepproxy
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io/ioutil"
"math/rand"
+ "net"
"net/http"
"net/http/httptest"
"strings"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
"git.arvados.org/arvados.git/sdk/go/keepclient"
log "github.com/sirupsen/logrus"
. "gopkg.in/check.v1"
)
-// Gocheck boilerplate
func Test(t *testing.T) {
+ keepclient.DefaultRetryDelay = time.Millisecond
TestingT(t)
}
var TestProxyUUID = "zzzzz-bi6l4-lrixqc4fxofbmzz"
-// Wait (up to 1 second) for keepproxy to listen on a port. This
-// avoids a race condition where we hit a "connection refused" error
-// because we start testing the proxy too soon.
-func waitForListener() {
- const (
- ms = 5
- )
- for i := 0; listener == nil && i < 10000; i += ms {
- time.Sleep(ms * time.Millisecond)
- }
- if listener == nil {
- panic("Timed out waiting for listener to start")
- }
-}
-
-func closeListener() {
- if listener != nil {
- listener.Close()
- }
-}
-
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
- arvadostest.StartAPI()
arvadostest.StartKeep(2, false)
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
arvadostest.StopKeep(2)
- arvadostest.StopAPI()
}
func (s *ServerRequiredConfigYmlSuite) SetUpSuite(c *C) {
- arvadostest.StartAPI()
// config.yml defines 4 keepstores
arvadostest.StartKeep(4, false)
}
func (s *ServerRequiredConfigYmlSuite) TearDownSuite(c *C) {
arvadostest.StopKeep(4)
- arvadostest.StopAPI()
}
func (s *NoKeepServerSuite) SetUpSuite(c *C) {
- arvadostest.StartAPI()
// We need API to have some keep services listed, but the
// services themselves should be unresponsive.
arvadostest.StartKeep(2, false)
arvadostest.ResetEnv()
}
-func (s *NoKeepServerSuite) TearDownSuite(c *C) {
- arvadostest.StopAPI()
+type testServer struct {
+ *httpserver.Server
+ proxyHandler *proxyHandler
}
-func runProxy(c *C, bogusClientToken bool, loadKeepstoresFromConfig bool, kp *arvados.UploadDownloadRolePermissions) (*keepclient.KeepClient, *bytes.Buffer) {
+func runProxy(c *C, bogusClientToken bool, loadKeepstoresFromConfig bool, kp *arvados.UploadDownloadRolePermissions) (*testServer, *keepclient.KeepClient, *bytes.Buffer) {
cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
c.Assert(err, Equals, nil)
cluster, err := cfg.GetCluster("")
cluster.Collections.KeepproxyPermission = *kp
}
- listener = nil
logbuf := &bytes.Buffer{}
logger := log.New()
logger.Out = logbuf
- go func() {
- run(logger, cluster)
- defer closeListener()
- }()
- waitForListener()
+ ctx := ctxlog.Context(context.Background(), logger)
+
+ handler := newHandlerOrErrorHandler(ctx, cluster, cluster.SystemRootToken, nil).(*proxyHandler)
+ srv := &testServer{
+ Server: &httpserver.Server{
+ Server: http.Server{
+ BaseContext: func(net.Listener) context.Context { return ctx },
+ Handler: httpserver.AddRequestIDs(
+ httpserver.LogRequests(handler)),
+ },
+ Addr: ":",
+ },
+ proxyHandler: handler,
+ }
+ err = srv.Start()
+ c.Assert(err, IsNil)
client := arvados.NewClientFromEnv()
arv, err := arvadosclient.New(client)
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
if bogusClientToken {
arv.ApiToken = "bogus-token"
}
kc := keepclient.New(arv)
+ kc.DiskCacheSize = keepclient.DiskCacheDisabled
sr := map[string]string{
- TestProxyUUID: "http://" + listener.Addr().String(),
+ TestProxyUUID: "http://" + srv.Addr,
}
kc.SetServiceRoots(sr, sr, sr)
- kc.Arvados.External = true
-
- return kc, logbuf
+ return srv, kc, logbuf
}
func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
- runProxy(c, false, false, nil)
- defer closeListener()
+ srv, _, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
req, err := http.NewRequest("POST",
- "http://"+listener.Addr().String()+"/",
+ "http://"+srv.Addr+"/",
strings.NewReader("TestViaHeader"))
c.Assert(err, Equals, nil)
- req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+ req.Header.Add("Authorization", "Bearer "+arvadostest.ActiveToken)
resp, err := (&http.Client{}).Do(req)
c.Assert(err, Equals, nil)
c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
resp.Body.Close()
req, err = http.NewRequest("GET",
- "http://"+listener.Addr().String()+"/"+string(locator),
+ "http://"+srv.Addr+"/"+string(locator),
nil)
c.Assert(err, Equals, nil)
resp, err = (&http.Client{}).Do(req)
}
func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
sr := map[string]string{
- TestProxyUUID: "http://" + listener.Addr().String(),
+ TestProxyUUID: "http://" + srv.Addr,
}
- router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+ srv.proxyHandler.KeepClient.SetServiceRoots(sr, sr, sr)
content := []byte("TestLoopDetection")
_, _, err := kc.PutB(content)
}
func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
// Set up fake keepstore to record request headers
var hdr http.Header
sr := map[string]string{
TestProxyUUID: ts.URL,
}
- router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+ srv.proxyHandler.KeepClient.SetServiceRoots(sr, sr, sr)
// Set up client to ask for storage classes to keepproxy
kc.StorageClasses = []string{"secure"}
}
func (s *ServerRequiredSuite) TestStorageClassesConfirmedHeader(c *C) {
- runProxy(c, false, false, nil)
- defer closeListener()
+ srv, _, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
content := []byte("foo")
hash := fmt.Sprintf("%x", md5.Sum(content))
client := &http.Client{}
req, err := http.NewRequest("PUT",
- fmt.Sprintf("http://%s/%s", listener.Addr().String(), hash),
+ fmt.Sprintf("http://%s/%s", srv.Addr, hash),
bytes.NewReader(content))
c.Assert(err, IsNil)
req.Header.Set("X-Keep-Storage-Classes", "default")
- req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := client.Do(req)
}
func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
content := []byte("TestDesiredReplicas")
hash := fmt.Sprintf("%x", md5.Sum(content))
}
func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
content := []byte("TestPutWrongContentLength")
hash := fmt.Sprintf("%x", md5.Sum(content))
// fixes the invalid Content-Length header. In order to test
// our server behavior, we have to call the handler directly
// using an httptest.ResponseRecorder.
- rtr, err := MakeRESTRouter(kc, 10*time.Second, &arvados.Cluster{}, log.New())
+ rtr, err := newHandler(context.Background(), kc, 10*time.Second, &arvados.Cluster{})
c.Assert(err, check.IsNil)
type testcase struct {
{"abcdef", http.StatusLengthRequired},
} {
req, err := http.NewRequest("PUT",
- fmt.Sprintf("http://%s/%s+%d", listener.Addr().String(), hash, len(content)),
+ fmt.Sprintf("http://%s/%s+%d", srv.Addr, hash, len(content)),
bytes.NewReader(content))
c.Assert(err, IsNil)
req.Header.Set("Content-Length", t.sendLength)
- req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
req.Header.Set("Content-Type", "application/octet-stream")
resp := httptest.NewRecorder()
}
func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
- router.(*proxyHandler).timeout = time.Nanosecond
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
+ srv.proxyHandler.timeout = time.Nanosecond
buf := make([]byte, 1<<20)
rand.Read(buf)
}
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
- kc, logbuf := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, logbuf := runProxy(c, false, false, nil)
+ defer srv.Close()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
var hash2 string
}
{
- reader, _, _, err := kc.Get(hash)
+ reader, _, _, err := kc.Get(hash + "+3")
c.Check(reader, Equals, nil)
c.Check(err, Equals, keepclient.BlockNotFound)
c.Log("Finished Get (expected BlockNotFound)")
c.Check(err, Equals, nil)
c.Log("Finished PutB (expected success)")
- c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
logbuf.Reset()
}
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
c.Log("Finished Ask (expected success)")
- c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
logbuf.Reset()
}
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
c.Log("Finished Get (expected success)")
- c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
logbuf.Reset()
}
{
reader, blocklen, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e")
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
all, err := ioutil.ReadAll(reader)
c.Check(err, IsNil)
c.Check(all, DeepEquals, []byte(""))
}
func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
- kc, _ := runProxy(c, true, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, true, false, nil)
+ defer srv.Close()
hash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
kp.User = perm
}
- kc, logbuf := runProxy(c, false, false, &kp)
- defer closeListener()
+ srv, kc, logbuf := runProxy(c, false, false, &kp)
+ defer srv.Close()
if admin {
kc.Arvados.ApiToken = arvadostest.AdminToken
} else {
c.Check(err, Equals, nil)
c.Log("Finished PutB (expected success)")
if admin {
- c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
} else {
- c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="Active User" user_uuid=zzzzz-tpzed-xurymjxw79nv3jz.*`)
+ c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="Active User".* userUUID=zzzzz-tpzed-xurymjxw79nv3jz.*`)
}
} else {
c.Check(hash2, Equals, "")
c.Check(blocklen, Equals, int64(3))
c.Log("Finished Get (expected success)")
if admin {
- c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+ c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="TestCase Administrator".* userUUID=zzzzz-tpzed-d9tiejq69daie8f.*`)
} else {
- c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="Active User" user_uuid=zzzzz-tpzed-xurymjxw79nv3jz.*`)
+ c.Check(logbuf.String(), Matches, `(?ms).* locator=acbd18db4cc2f85cedef654fccc4a4d8\+3.* userFullName="Active User".* userUUID=zzzzz-tpzed-xurymjxw79nv3jz.*`)
}
} else {
c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
}
func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
- runProxy(c, false, false, nil)
- defer closeListener()
+ srv, _, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
{
client := http.Client{}
req, err := http.NewRequest("OPTIONS",
- fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))),
+ fmt.Sprintf("http://%s/%x+3", srv.Addr, md5.Sum([]byte("foo"))),
nil)
c.Assert(err, IsNil)
req.Header.Add("Access-Control-Request-Method", "PUT")
body, err := ioutil.ReadAll(resp.Body)
c.Check(err, IsNil)
c.Check(string(body), Equals, "")
- c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, POST, PUT, OPTIONS")
+ c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, PUT, OPTIONS, POST")
c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
}
{
- resp, err := http.Get(
- fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))))
+ resp, err := http.Get(fmt.Sprintf("http://%s/%x+3", srv.Addr, md5.Sum([]byte("foo"))))
c.Check(err, Equals, nil)
- c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
+ c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas, X-Keep-Signature, X-Keep-Storage-Classes")
c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
}
}
func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
- runProxy(c, false, false, nil)
- defer closeListener()
+ srv, _, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
{
client := http.Client{}
req, err := http.NewRequest("POST",
- "http://"+listener.Addr().String()+"/",
+ "http://"+srv.Addr+"/",
strings.NewReader("qux"))
c.Check(err, IsNil)
- req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+ req.Header.Add("Authorization", "Bearer "+arvadostest.ActiveToken)
req.Header.Add("Content-Type", "application/octet-stream")
resp, err := client.Do(req)
c.Check(err, Equals, nil)
}
// Test GetIndex
-// Put one block, with 2 replicas
-// With no prefix (expect the block locator, twice)
-// With an existing prefix (expect the block locator, twice)
-// With a valid but non-existing prefix (expect "\n")
-// With an invalid prefix (expect error)
+// - Put one block, with 2 replicas
+// - With no prefix (expect the block locator, twice)
+// - With an existing prefix (expect the block locator, twice)
+// - With a valid but non-existing prefix (expect "\n")
+// - With an invalid prefix (expect error)
func (s *ServerRequiredSuite) TestGetIndex(c *C) {
getIndexWorker(c, false)
}
// Test GetIndex
-// Uses config.yml
-// Put one block, with 2 replicas
-// With no prefix (expect the block locator, twice)
-// With an existing prefix (expect the block locator, twice)
-// With a valid but non-existing prefix (expect "\n")
-// With an invalid prefix (expect error)
+// - Uses config.yml
+// - Put one block, with 2 replicas
+// - With no prefix (expect the block locator, twice)
+// - With an existing prefix (expect the block locator, twice)
+// - With a valid but non-existing prefix (expect "\n")
+// - With an invalid prefix (expect error)
func (s *ServerRequiredConfigYmlSuite) TestGetIndex(c *C) {
getIndexWorker(c, true)
}
func getIndexWorker(c *C, useConfig bool) {
- kc, _ := runProxy(c, false, useConfig, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, useConfig, nil)
+ defer srv.Close()
// Put "index-data" blocks
data := []byte("index-data")
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- reader, blocklen, _, err := kc.Get(hash)
+ reader, blocklen, _, err := kc.Get(hash2)
c.Assert(err, IsNil)
c.Check(blocklen, Equals, int64(10))
all, err := ioutil.ReadAll(reader)
}
func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
hash, _, err := kc.PutB([]byte("shareddata"))
c.Check(err, IsNil)
- kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken
+ kc.Arvados.ApiToken = arvadostest.FooFileCollectionSharingToken
rdr, _, _, err := kc.Get(hash)
c.Assert(err, IsNil)
data, err := ioutil.ReadAll(rdr)
}
func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
// Put a test block
hash, rep, err := kc.PutB([]byte("foo"))
}
func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
// Point keepproxy at a non-existent keepstore
locals := map[string]string{
TestProxyUUID: "http://localhost:12345",
}
- router.(*proxyHandler).KeepClient.SetServiceRoots(locals, nil, nil)
+ srv.proxyHandler.KeepClient.SetServiceRoots(locals, nil, nil)
// Ask should result in temporary bad gateway error
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
}
func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
for _, f := range []func() error{
},
} {
err := f()
- c.Assert(err, NotNil)
+ c.Check(err, NotNil)
errNotFound, _ := err.(*keepclient.ErrNotFound)
- c.Check(errNotFound.Temporary(), Equals, true)
- c.Check(err, ErrorMatches, `.*HTTP 502.*`)
+ if c.Check(errNotFound, NotNil) {
+ c.Check(errNotFound.Temporary(), Equals, true)
+ c.Check(err, ErrorMatches, `.*HTTP 502.*`)
+ }
}
}
func (s *ServerRequiredSuite) TestPing(c *C) {
- kc, _ := runProxy(c, false, false, nil)
- defer closeListener()
+ srv, kc, _ := runProxy(c, false, false, nil)
+ defer srv.Close()
- rtr, err := MakeRESTRouter(kc, 10*time.Second, &arvados.Cluster{ManagementToken: arvadostest.ManagementToken}, log.New())
+ rtr, err := newHandler(context.Background(), kc, 10*time.Second, &arvados.Cluster{ManagementToken: arvadostest.ManagementToken})
c.Assert(err, check.IsNil)
req, err := http.NewRequest("GET",
- "http://"+listener.Addr().String()+"/_health/ping",
+ "http://"+srv.Addr+"/_health/ping",
nil)
c.Assert(err, IsNil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)