16665: Exposes bug through updated test.
[arvados.git] / services / keepproxy / keepproxy_test.go
index 2c75ec1616e3b404a9547b6e2f969ce9fc1fe9f2..f02971d61b22c7ff169f18208b7f159ff94c67a9 100644 (file)
@@ -1,22 +1,33 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
+       "bytes"
        "crypto/md5"
-       "crypto/tls"
+       "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       . "gopkg.in/check.v1"
-       "io"
        "io/ioutil"
-       "log"
+       "math/rand"
        "net/http"
-       "net/url"
-       "os"
+       "net/http/httptest"
        "strings"
+       "sync"
        "testing"
        "time"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
+       log "github.com/sirupsen/logrus"
+
+       "gopkg.in/check.v1"
+       . "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -30,12 +41,20 @@ var _ = Suite(&ServerRequiredSuite{})
 // Tests that require the Keep server running
 type ServerRequiredSuite struct{}
 
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredConfigYmlSuite{})
+
+// Tests that require the Keep servers running as defined in config.yml
+type ServerRequiredConfigYmlSuite struct{}
+
 // Gocheck boilerplate
 var _ = Suite(&NoKeepServerSuite{})
 
 // Test with no keepserver to simulate errors
 type NoKeepServerSuite struct{}
 
+var TestProxyUUID = "zzzzz-bi6l4-lrixqc4fxofbmzz"
+
 // Wait (up to 1 second) for keepproxy to listen on a port. This
 // avoids a race condition where we hit a "connection refused" error
 // because we start testing the proxy too soon.
@@ -43,11 +62,11 @@ func waitForListener() {
        const (
                ms = 5
        )
-       for i := 0; listener == nil && i < 1000; i += ms {
+       for i := 0; listener == nil && i < 10000; i += ms {
                time.Sleep(ms * time.Millisecond)
        }
        if listener == nil {
-               log.Fatalf("Timed out waiting for listener to start")
+               panic("Timed out waiting for listener to start")
        }
 }
 
@@ -71,8 +90,27 @@ func (s *ServerRequiredSuite) TearDownSuite(c *C) {
        arvadostest.StopAPI()
 }
 
+func (s *ServerRequiredConfigYmlSuite) SetUpSuite(c *C) {
+       arvadostest.StartAPI()
+       // config.yml defines 4 keepstores
+       arvadostest.StartKeep(4, false)
+}
+
+func (s *ServerRequiredConfigYmlSuite) SetUpTest(c *C) {
+       arvadostest.ResetEnv()
+}
+
+func (s *ServerRequiredConfigYmlSuite) TearDownSuite(c *C) {
+       arvadostest.StopKeep(4)
+       arvadostest.StopAPI()
+}
+
 func (s *NoKeepServerSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
+       // We need API to have some keep services listed, but the
+       // services themselves should be unresponsive.
+       arvadostest.StartKeep(2, false)
+       arvadostest.StopKeep(2)
 }
 
 func (s *NoKeepServerSuite) SetUpTest(c *C) {
@@ -83,106 +121,233 @@ func (s *NoKeepServerSuite) TearDownSuite(c *C) {
        arvadostest.StopAPI()
 }
 
-func setupProxyService() {
-
-       client := &http.Client{Transport: &http.Transport{
-               TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}
+func runProxy(c *C, bogusClientToken bool, loadKeepstoresFromConfig bool, kp *arvados.UploadDownloadRolePermissions) (*keepclient.KeepClient, *bytes.Buffer) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, Equals, nil)
+       cluster, err := cfg.GetCluster("")
+       c.Assert(err, Equals, nil)
 
-       var req *http.Request
-       var err error
-       if req, err = http.NewRequest("POST", fmt.Sprintf("https://%s/arvados/v1/keep_services", os.Getenv("ARVADOS_API_HOST")), nil); err != nil {
-               panic(err.Error())
+       if !loadKeepstoresFromConfig {
+               // Do not load Keepstore InternalURLs from the config file
+               cluster.Services.Keepstore.InternalURLs = make(map[arvados.URL]arvados.ServiceInstance)
        }
-       req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", os.Getenv("ARVADOS_API_TOKEN")))
 
-       reader, writer := io.Pipe()
+       cluster.Services.Keepproxy.InternalURLs = map[arvados.URL]arvados.ServiceInstance{{Host: ":0"}: {}}
 
-       req.Body = reader
+       if kp != nil {
+               cluster.Collections.KeepproxyPermission = *kp
+       }
 
+       listener = nil
+       logbuf := &bytes.Buffer{}
+       logger := log.New()
+       logger.Out = logbuf
        go func() {
-               data := url.Values{}
-               data.Set("keep_service", `{
-  "service_host": "localhost",
-  "service_port": 29950,
-  "service_ssl_flag": false,
-  "service_type": "proxy"
-}`)
-
-               writer.Write([]byte(data.Encode()))
-               writer.Close()
+               run(logger, cluster)
+               defer closeListener()
        }()
+       waitForListener()
 
-       var resp *http.Response
-       if resp, err = client.Do(req); err != nil {
-               panic(err.Error())
+       client := arvados.NewClientFromEnv()
+       arv, err := arvadosclient.New(client)
+       c.Assert(err, Equals, nil)
+       if bogusClientToken {
+               arv.ApiToken = "bogus-token"
        }
-       if resp.StatusCode != 200 {
-               panic(resp.Status)
+       kc := keepclient.New(arv)
+       sr := map[string]string{
+               TestProxyUUID: "http://" + listener.Addr().String(),
        }
+       kc.SetServiceRoots(sr, sr, sr)
+       kc.Arvados.External = true
+
+       return kc, logbuf
 }
 
-func runProxy(c *C, args []string, port int, bogusClientToken bool) *keepclient.KeepClient {
-       if bogusClientToken {
-               os.Setenv("ARVADOS_API_TOKEN", "bogus-token")
-       }
-       arv, err := arvadosclient.MakeArvadosClient()
+func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
+       runProxy(c, false, false, nil)
+       defer closeListener()
+
+       req, err := http.NewRequest("POST",
+               "http://"+listener.Addr().String()+"/",
+               strings.NewReader("TestViaHeader"))
        c.Assert(err, Equals, nil)
-       kc := keepclient.KeepClient{
-               Arvados:       &arv,
-               Want_replicas: 2,
-               Using_proxy:   true,
-               Client:        &http.Client{},
-       }
-       locals := map[string]string{
-               "proxy": fmt.Sprintf("http://localhost:%v", port),
-       }
-       writableLocals := map[string]string{
-               "proxy": fmt.Sprintf("http://localhost:%v", port),
-       }
-       kc.SetServiceRoots(locals, writableLocals, nil)
-       c.Check(kc.Using_proxy, Equals, true)
-       c.Check(len(kc.LocalRoots()), Equals, 1)
-       for _, root := range kc.LocalRoots() {
-               c.Check(root, Equals, fmt.Sprintf("http://localhost:%v", port))
-       }
-       log.Print("keepclient created")
-       if bogusClientToken {
-               arvadostest.ResetEnv()
-       }
+       req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+       resp, err := (&http.Client{}).Do(req)
+       c.Assert(err, Equals, nil)
+       c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+       c.Assert(resp.StatusCode, Equals, http.StatusOK)
+       locator, err := ioutil.ReadAll(resp.Body)
+       c.Assert(err, Equals, nil)
+       resp.Body.Close()
 
-       {
-               os.Args = append(args, fmt.Sprintf("-listen=:%v", port))
-               listener = nil
-               go main()
+       req, err = http.NewRequest("GET",
+               "http://"+listener.Addr().String()+"/"+string(locator),
+               nil)
+       c.Assert(err, Equals, nil)
+       resp, err = (&http.Client{}).Do(req)
+       c.Assert(err, Equals, nil)
+       c.Check(resp.Header.Get("Via"), Equals, "HTTP/1.1 keepproxy")
+       resp.Body.Close()
+}
+
+func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
+       kc, _ := runProxy(c, false, false, nil)
+       defer closeListener()
+
+       sr := map[string]string{
+               TestProxyUUID: "http://" + listener.Addr().String(),
        }
+       router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+
+       content := []byte("TestLoopDetection")
+       _, _, err := kc.PutB(content)
+       c.Check(err, ErrorMatches, `.*loop detected.*`)
 
-       return &kc
+       hash := fmt.Sprintf("%x", md5.Sum(content))
+       _, _, _, err = kc.Get(hash)
+       c.Check(err, ErrorMatches, `.*loop detected.*`)
 }
 
-func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
-       log.Print("TestPutAndGet start")
+func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
+       kc, _ := runProxy(c, false, false, nil)
+       defer closeListener()
 
-       os.Args = []string{"keepproxy", "-listen=:29950"}
-       listener = nil
-       go main()
-       time.Sleep(100 * time.Millisecond)
+       // Set up fake keepstore to record request headers
+       var hdr http.Header
+       ts := httptest.NewServer(http.HandlerFunc(
+               func(w http.ResponseWriter, r *http.Request) {
+                       hdr = r.Header
+                       http.Error(w, "Error", http.StatusInternalServerError)
+               }))
+       defer ts.Close()
+
+       // Point keepproxy router's keepclient to the fake keepstore
+       sr := map[string]string{
+               TestProxyUUID: ts.URL,
+       }
+       router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+
+       // Set up client to ask for storage classes to keepproxy
+       kc.StorageClasses = []string{"secure"}
+       content := []byte("Very important data")
+       _, _, err := kc.PutB(content)
+       c.Check(err, NotNil)
+       c.Check(hdr.Get("X-Keep-Storage-Classes"), Equals, "secure")
+}
 
-       setupProxyService()
+func (s *ServerRequiredSuite) TestStorageClassesConfirmedHeader(c *C) {
+       runProxy(c, false, false, nil)
+       defer closeListener()
 
-       os.Setenv("ARVADOS_EXTERNAL_CLIENT", "true")
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, Equals, nil)
-       kc, err := keepclient.MakeKeepClient(&arv)
-       c.Assert(err, Equals, nil)
-       c.Check(kc.Arvados.External, Equals, true)
-       c.Check(kc.Using_proxy, Equals, true)
-       c.Check(len(kc.LocalRoots()), Equals, 1)
-       for _, root := range kc.LocalRoots() {
-               c.Check(root, Equals, "http://localhost:29950")
+       content := []byte("foo")
+       hash := fmt.Sprintf("%x", md5.Sum(content))
+       client := &http.Client{}
+
+       req, err := http.NewRequest("PUT",
+               fmt.Sprintf("http://%s/%s", listener.Addr().String(), hash),
+               bytes.NewReader(content))
+       c.Assert(err, IsNil)
+       req.Header.Set("X-Keep-Storage-Classes", "default")
+       req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+       req.Header.Set("Content-Type", "application/octet-stream")
+
+       resp, err := client.Do(req)
+       c.Assert(err, IsNil)
+       c.Assert(resp.StatusCode, Equals, http.StatusOK)
+       c.Assert(resp.Header.Get("X-Keep-Storage-Classes-Confirmed"), Equals, "default=2")
+}
+
+func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
+       kc, _ := runProxy(c, false, false, nil)
+       defer closeListener()
+
+       content := []byte("TestDesiredReplicas")
+       hash := fmt.Sprintf("%x", md5.Sum(content))
+
+       for _, kc.Want_replicas = range []int{0, 1, 2, 3} {
+               locator, rep, err := kc.PutB(content)
+               if kc.Want_replicas < 3 {
+                       c.Check(err, Equals, nil)
+                       c.Check(rep, Equals, kc.Want_replicas)
+                       if rep > 0 {
+                               c.Check(locator, Matches, fmt.Sprintf(`^%s\+%d(\+.+)?$`, hash, len(content)))
+                       }
+               } else {
+                       c.Check(err, ErrorMatches, ".*503.*")
+               }
        }
-       os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+}
 
-       waitForListener()
+func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
+       kc, _ := runProxy(c, false, false, nil)
+       defer closeListener()
+
+       content := []byte("TestPutWrongContentLength")
+       hash := fmt.Sprintf("%x", md5.Sum(content))
+
+       // If we use http.Client to send these requests to the network
+       // server we just started, the Go http library automatically
+       // fixes the invalid Content-Length header. In order to test
+       // our server behavior, we have to call the handler directly
+       // using an httptest.ResponseRecorder.
+       rtr, err := MakeRESTRouter(kc, 10*time.Second, &arvados.Cluster{}, log.New())
+       c.Assert(err, check.IsNil)
+
+       type testcase struct {
+               sendLength   string
+               expectStatus int
+       }
+
+       for _, t := range []testcase{
+               {"1", http.StatusBadRequest},
+               {"", http.StatusLengthRequired},
+               {"-1", http.StatusLengthRequired},
+               {"abcdef", http.StatusLengthRequired},
+       } {
+               req, err := http.NewRequest("PUT",
+                       fmt.Sprintf("http://%s/%s+%d", listener.Addr().String(), hash, len(content)),
+                       bytes.NewReader(content))
+               c.Assert(err, IsNil)
+               req.Header.Set("Content-Length", t.sendLength)
+               req.Header.Set("Authorization", "OAuth2 "+arvadostest.ActiveToken)
+               req.Header.Set("Content-Type", "application/octet-stream")
+
+               resp := httptest.NewRecorder()
+               rtr.ServeHTTP(resp, req)
+               c.Check(resp.Code, Equals, t.expectStatus)
+       }
+}
+
+func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
+       kc, _ := runProxy(c, false, false, nil)
+       defer closeListener()
+       router.(*proxyHandler).timeout = time.Nanosecond
+
+       buf := make([]byte, 1<<20)
+       rand.Read(buf)
+       var wg sync.WaitGroup
+       for i := 0; i < 128; i++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       kc.PutB(buf)
+               }()
+       }
+       done := make(chan bool)
+       go func() {
+               wg.Wait()
+               close(done)
+       }()
+       select {
+       case <-done:
+       case <-time.After(10 * time.Second):
+               c.Error("timeout")
+       }
+}
+
+func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
+       kc, logbuf := runProxy(c, false, false, nil)
        defer closeListener()
 
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
@@ -191,14 +356,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        {
                _, _, err := kc.Ask(hash)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Ask (expected BlockNotFound)")
+               c.Log("Finished Ask (expected BlockNotFound)")
        }
 
        {
                reader, _, _, err := kc.Get(hash)
                c.Check(reader, Equals, nil)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Get (expected BlockNotFound)")
+               c.Log("Finished Get (expected BlockNotFound)")
        }
 
        // Note in bug #5309 among other errors keepproxy would set
@@ -217,23 +382,31 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB (expected success)")
+               c.Log("Finished PutB (expected success)")
+
+               c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+               logbuf.Reset()
        }
 
        {
                blocklen, _, err := kc.Ask(hash2)
                c.Assert(err, Equals, nil)
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Ask (expected success)")
+               c.Log("Finished Ask (expected success)")
+               c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+               logbuf.Reset()
        }
 
        {
                reader, blocklen, _, err := kc.Get(hash2)
                c.Assert(err, Equals, nil)
                all, err := ioutil.ReadAll(reader)
+               c.Check(err, IsNil)
                c.Check(all, DeepEquals, []byte("foo"))
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Get (expected success)")
+               c.Log("Finished Get (expected success)")
+               c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+               logbuf.Reset()
        }
 
        {
@@ -243,148 +416,161 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB zero block")
+               c.Log("Finished PutB zero block")
        }
 
        {
                reader, blocklen, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e")
                c.Assert(err, Equals, nil)
                all, err := ioutil.ReadAll(reader)
+               c.Check(err, IsNil)
                c.Check(all, DeepEquals, []byte(""))
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Finished Get zero block")
+               c.Log("Finished Get zero block")
        }
-
-       log.Print("TestPutAndGet done")
 }
 
 func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
-       log.Print("TestPutAskGetForbidden start")
-
-       kc := runProxy(c, []string{"keepproxy"}, 29951, true)
-       waitForListener()
+       kc, _ := runProxy(c, true, false, nil)
        defer closeListener()
 
-       hash := fmt.Sprintf("%x", md5.Sum([]byte("bar")))
-
-       {
-               _, _, err := kc.Ask(hash)
-               errNotFound, _ := err.(keepclient.ErrNotFound)
-               c.Check(errNotFound, NotNil)
-               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
-               log.Print("Ask 1")
-       }
-
-       {
-               hash2, rep, err := kc.PutB([]byte("bar"))
-               c.Check(hash2, Equals, "")
-               c.Check(rep, Equals, 0)
-               c.Check(err, Equals, keepclient.InsufficientReplicasError)
-               log.Print("PutB")
-       }
+       hash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
 
-       {
-               blocklen, _, err := kc.Ask(hash)
-               errNotFound, _ := err.(keepclient.ErrNotFound)
-               c.Check(errNotFound, NotNil)
-               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
-               c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
-       }
-
-       {
-               _, blocklen, _, err := kc.Get(hash)
-               errNotFound, _ := err.(keepclient.ErrNotFound)
-               c.Check(errNotFound, NotNil)
-               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
-               c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
-       }
-
-       log.Print("TestPutAskGetForbidden done")
+       _, _, err := kc.Ask(hash)
+       c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
+
+       hash2, rep, err := kc.PutB([]byte("bar"))
+       c.Check(hash2, Equals, "")
+       c.Check(rep, Equals, 0)
+       c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
+
+       blocklen, _, err := kc.Ask(hash)
+       c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
+       c.Check(err, ErrorMatches, ".*HTTP 403.*")
+       c.Check(blocklen, Equals, int64(0))
+
+       _, blocklen, _, err = kc.Get(hash)
+       c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
+       c.Check(err, ErrorMatches, ".*HTTP 403.*")
+       c.Check(blocklen, Equals, int64(0))
 }
 
-func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
-       log.Print("TestGetDisabled start")
+func testPermission(c *C, admin bool, perm arvados.UploadDownloadPermission) {
+       kp := arvados.UploadDownloadRolePermissions{}
+       if admin {
+               kp.Admin = perm
+               kp.User = arvados.UploadDownloadPermission{Upload: true, Download: true}
+       } else {
+               kp.Admin = arvados.UploadDownloadPermission{Upload: true, Download: true}
+               kp.User = perm
+       }
 
-       kc := runProxy(c, []string{"keepproxy", "-no-get"}, 29952, false)
-       waitForListener()
+       kc, logbuf := runProxy(c, false, false, &kp)
        defer closeListener()
+       if admin {
+               kc.Arvados.ApiToken = arvadostest.AdminToken
+       } else {
+               kc.Arvados.ApiToken = arvadostest.ActiveToken
+       }
 
-       hash := fmt.Sprintf("%x", md5.Sum([]byte("baz")))
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       var hash2 string
 
        {
-               _, _, err := kc.Ask(hash)
-               errNotFound, _ := err.(keepclient.ErrNotFound)
-               c.Check(errNotFound, NotNil)
-               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
-               log.Print("Ask 1")
-       }
+               var rep int
+               var err error
+               hash2, rep, err = kc.PutB([]byte("foo"))
 
-       {
-               hash2, rep, err := kc.PutB([]byte("baz"))
-               c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
-               c.Check(rep, Equals, 2)
-               c.Check(err, Equals, nil)
-               log.Print("PutB")
-       }
+               if perm.Upload {
+                       c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
+                       c.Check(rep, Equals, 2)
+                       c.Check(err, Equals, nil)
+                       c.Log("Finished PutB (expected success)")
+                       if admin {
+                               c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+                       } else {
 
-       {
-               blocklen, _, err := kc.Ask(hash)
-               errNotFound, _ := err.(keepclient.ErrNotFound)
-               c.Check(errNotFound, NotNil)
-               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
-               c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+                               c.Check(logbuf.String(), Matches, `(?ms).*msg="Block upload" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="Active User" user_uuid=zzzzz-tpzed-xurymjxw79nv3jz.*`)
+                       }
+               } else {
+                       c.Check(hash2, Equals, "")
+                       c.Check(rep, Equals, 0)
+                       c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
+               }
+               logbuf.Reset()
        }
+       if perm.Upload {
+               // can't test download without upload.
 
-       {
-               _, blocklen, _, err := kc.Get(hash)
-               errNotFound, _ := err.(keepclient.ErrNotFound)
-               c.Check(errNotFound, NotNil)
-               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
-               c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               reader, blocklen, _, err := kc.Get(hash2)
+               if perm.Download {
+                       c.Assert(err, Equals, nil)
+                       all, err := ioutil.ReadAll(reader)
+                       c.Check(err, IsNil)
+                       c.Check(all, DeepEquals, []byte("foo"))
+                       c.Check(blocklen, Equals, int64(3))
+                       c.Log("Finished Get (expected success)")
+                       if admin {
+                               c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="TestCase Administrator" user_uuid=zzzzz-tpzed-d9tiejq69daie8f.*`)
+                       } else {
+                               c.Check(logbuf.String(), Matches, `(?ms).*msg="Block download" locator=acbd18db4cc2f85cedef654fccc4a4d8\+3 user_full_name="Active User" user_uuid=zzzzz-tpzed-xurymjxw79nv3jz.*`)
+                       }
+               } else {
+                       c.Check(err, FitsTypeOf, &keepclient.ErrNotFound{})
+                       c.Check(err, ErrorMatches, ".*Missing or invalid Authorization header, or method not allowed.*")
+                       c.Check(blocklen, Equals, int64(0))
+               }
+               logbuf.Reset()
        }
 
-       log.Print("TestGetDisabled done")
 }
 
-func (s *ServerRequiredSuite) TestPutDisabled(c *C) {
-       log.Print("TestPutDisabled start")
-
-       kc := runProxy(c, []string{"keepproxy", "-no-put"}, 29953, false)
-       waitForListener()
-       defer closeListener()
-
-       {
-               hash2, rep, err := kc.PutB([]byte("quux"))
-               c.Check(hash2, Equals, "")
-               c.Check(rep, Equals, 0)
-               c.Check(err, Equals, keepclient.InsufficientReplicasError)
-               log.Print("PutB")
+func (s *ServerRequiredSuite) TestPutGetPermission(c *C) {
+
+       for _, adminperm := range []bool{true, false} {
+               for _, userperm := range []bool{true, false} {
+
+                       testPermission(c, true,
+                               arvados.UploadDownloadPermission{
+                                       Upload:   adminperm,
+                                       Download: true,
+                               })
+                       testPermission(c, true,
+                               arvados.UploadDownloadPermission{
+                                       Upload:   true,
+                                       Download: adminperm,
+                               })
+                       testPermission(c, false,
+                               arvados.UploadDownloadPermission{
+                                       Upload:   true,
+                                       Download: userperm,
+                               })
+                       testPermission(c, false,
+                               arvados.UploadDownloadPermission{
+                                       Upload:   true,
+                                       Download: userperm,
+                               })
+               }
        }
-
-       log.Print("TestPutDisabled done")
 }
 
 func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
-       runProxy(c, []string{"keepproxy"}, 29954, false)
-       waitForListener()
+       runProxy(c, false, false, nil)
        defer closeListener()
 
        {
                client := http.Client{}
                req, err := http.NewRequest("OPTIONS",
-                       fmt.Sprintf("http://localhost:29954/%x+3",
-                               md5.Sum([]byte("foo"))),
+                       fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))),
                        nil)
+               c.Assert(err, IsNil)
                req.Header.Add("Access-Control-Request-Method", "PUT")
                req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
                resp, err := client.Do(req)
                c.Check(err, Equals, nil)
                c.Check(resp.StatusCode, Equals, 200)
                body, err := ioutil.ReadAll(resp.Body)
+               c.Check(err, IsNil)
                c.Check(string(body), Equals, "")
                c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, POST, PUT, OPTIONS")
                c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
@@ -392,8 +578,7 @@ func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
 
        {
                resp, err := http.Get(
-                       fmt.Sprintf("http://localhost:29954/%x+3",
-                               md5.Sum([]byte("foo"))))
+                       fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))))
                c.Check(err, Equals, nil)
                c.Check(resp.Header.Get("Access-Control-Allow-Headers"), Equals, "Authorization, Content-Length, Content-Type, X-Keep-Desired-Replicas")
                c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
@@ -401,16 +586,16 @@ func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
 }
 
 func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
-       runProxy(c, []string{"keepproxy"}, 29955, false)
-       waitForListener()
+       runProxy(c, false, false, nil)
        defer closeListener()
 
        {
                client := http.Client{}
                req, err := http.NewRequest("POST",
-                       "http://localhost:29955/",
+                       "http://"+listener.Addr().String()+"/",
                        strings.NewReader("qux"))
-               req.Header.Add("Authorization", "OAuth2 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h")
+               c.Check(err, IsNil)
+               req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
                req.Header.Add("Content-Type", "application/octet-stream")
                resp, err := client.Do(req)
                c.Check(err, Equals, nil)
@@ -444,8 +629,22 @@ func (s *ServerRequiredSuite) TestStripHint(c *C) {
 //   With a valid but non-existing prefix (expect "\n")
 //   With an invalid prefix (expect error)
 func (s *ServerRequiredSuite) TestGetIndex(c *C) {
-       kc := runProxy(c, []string{"keepproxy"}, 28852, false)
-       waitForListener()
+       getIndexWorker(c, false)
+}
+
+// Test GetIndex
+//   Uses config.yml
+//   Put one block, with 2 replicas
+//   With no prefix (expect the block locator, twice)
+//   With an existing prefix (expect the block locator, twice)
+//   With a valid but non-existing prefix (expect "\n")
+//   With an invalid prefix (expect error)
+func (s *ServerRequiredConfigYmlSuite) TestGetIndex(c *C) {
+       getIndexWorker(c, true)
+}
+
+func getIndexWorker(c *C, useConfig bool) {
+       kc, _ := runProxy(c, false, useConfig, nil)
        defer closeListener()
 
        // Put "index-data" blocks
@@ -458,14 +657,17 @@ func (s *ServerRequiredSuite) TestGetIndex(c *C) {
        c.Check(err, Equals, nil)
 
        reader, blocklen, _, err := kc.Get(hash)
-       c.Assert(err, Equals, nil)
+       c.Assert(err, IsNil)
        c.Check(blocklen, Equals, int64(10))
        all, err := ioutil.ReadAll(reader)
+       c.Assert(err, IsNil)
        c.Check(all, DeepEquals, data)
 
        // Put some more blocks
-       _, rep, err = kc.PutB([]byte("some-more-index-data"))
-       c.Check(err, Equals, nil)
+       _, _, err = kc.PutB([]byte("some-more-index-data"))
+       c.Check(err, IsNil)
+
+       kc.Arvados.ApiToken = arvadostest.SystemRootToken
 
        // Invoke GetIndex
        for _, spec := range []struct {
@@ -477,7 +679,7 @@ func (s *ServerRequiredSuite) TestGetIndex(c *C) {
                {hash[:3], true, false},  // with matching prefix
                {"abcdef", false, false}, // with no such prefix
        } {
-               indexReader, err := kc.GetIndex("proxy", spec.prefix)
+               indexReader, err := kc.GetIndex(TestProxyUUID, spec.prefix)
                c.Assert(err, Equals, nil)
                indexResp, err := ioutil.ReadAll(indexReader)
                c.Assert(err, Equals, nil)
@@ -500,87 +702,124 @@ func (s *ServerRequiredSuite) TestGetIndex(c *C) {
        }
 
        // GetIndex with invalid prefix
-       _, err = kc.GetIndex("proxy", "xyz")
+       _, err = kc.GetIndex(TestProxyUUID, "xyz")
        c.Assert((err != nil), Equals, true)
 }
 
+func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
+       kc, _ := runProxy(c, false, false, nil)
+       defer closeListener()
+       hash, _, err := kc.PutB([]byte("shareddata"))
+       c.Check(err, IsNil)
+       kc.Arvados.ApiToken = arvadostest.FooCollectionSharingToken
+       rdr, _, _, err := kc.Get(hash)
+       c.Assert(err, IsNil)
+       data, err := ioutil.ReadAll(rdr)
+       c.Check(err, IsNil)
+       c.Check(data, DeepEquals, []byte("shareddata"))
+}
+
 func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
-       kc := runProxy(c, []string{"keepproxy"}, 28852, false)
-       waitForListener()
+       kc, _ := runProxy(c, false, false, nil)
        defer closeListener()
 
        // Put a test block
        hash, rep, err := kc.PutB([]byte("foo"))
-       c.Check(err, Equals, nil)
+       c.Check(err, IsNil)
        c.Check(rep, Equals, 2)
 
-       for _, token := range []string{
+       for _, badToken := range []string{
                "nosuchtoken",
                "2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx", // expired
        } {
-               // Change token to given bad token
-               kc.Arvados.ApiToken = token
-
-               // Ask should result in error
-               _, _, err = kc.Ask(hash)
-               c.Check(err, NotNil)
-               errNotFound, _ := err.(keepclient.ErrNotFound)
-               c.Check(errNotFound.Temporary(), Equals, false)
-               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
+               kc.Arvados.ApiToken = badToken
+
+               // Ask and Get will fail only if the upstream
+               // keepstore server checks for valid signatures.
+               // Without knowing the blob signing key, there is no
+               // way for keepproxy to know whether a given token is
+               // permitted to read a block.  So these tests fail:
+               if false {
+                       _, _, err = kc.Ask(hash)
+                       c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
+                       c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
+                       c.Check(err, ErrorMatches, ".*HTTP 403.*")
+
+                       _, _, _, err = kc.Get(hash)
+                       c.Assert(err, FitsTypeOf, &keepclient.ErrNotFound{})
+                       c.Check(err.(*keepclient.ErrNotFound).Temporary(), Equals, false)
+                       c.Check(err, ErrorMatches, ".*HTTP 403 \"Missing or invalid Authorization header, or method not allowed\".*")
+               }
 
-               // Get should result in error
-               _, _, _, err = kc.Get(hash)
-               c.Check(err, NotNil)
-               errNotFound, _ = err.(keepclient.ErrNotFound)
-               c.Check(errNotFound.Temporary(), Equals, false)
-               c.Assert(strings.Contains(err.Error(), "HTTP 403 \"Missing or invalid Authorization header\""), Equals, true)
+               _, _, err = kc.PutB([]byte("foo"))
+               c.Check(err, ErrorMatches, ".*403.*Missing or invalid Authorization header, or method not allowed")
        }
 }
 
 func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, Equals, nil)
+       kc, _ := runProxy(c, false, false, nil)
+       defer closeListener()
 
-       // keepclient with no such keep server
-       kc := keepclient.New(&arv)
+       // Point keepproxy at a non-existent keepstore
        locals := map[string]string{
-               "proxy": "http://localhost:12345",
+               TestProxyUUID: "http://localhost:12345",
        }
-       kc.SetServiceRoots(locals, nil, nil)
+       router.(*proxyHandler).KeepClient.SetServiceRoots(locals, nil, nil)
 
-       // Ask should result in temporary connection refused error
+       // Ask should result in temporary bad gateway error
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
-       _, _, err = kc.Ask(hash)
+       _, _, err := kc.Ask(hash)
        c.Check(err, NotNil)
        errNotFound, _ := err.(*keepclient.ErrNotFound)
        c.Check(errNotFound.Temporary(), Equals, true)
-       c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+       c.Assert(err, ErrorMatches, ".*HTTP 502.*")
 
-       // Get should result in temporary connection refused error
+       // Get should result in temporary bad gateway error
        _, _, _, err = kc.Get(hash)
        c.Check(err, NotNil)
        errNotFound, _ = err.(*keepclient.ErrNotFound)
        c.Check(errNotFound.Temporary(), Equals, true)
-       c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+       c.Assert(err, ErrorMatches, ".*HTTP 502.*")
 }
 
 func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
-       kc := runProxy(c, []string{"keepproxy"}, 29999, false)
-       waitForListener()
+       kc, _ := runProxy(c, false, false, nil)
        defer closeListener()
 
-       // Ask should result in temporary connection refused error
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
-       _, _, err := kc.Ask(hash)
-       c.Check(err, NotNil)
-       errNotFound, _ := err.(*keepclient.ErrNotFound)
-       c.Check(errNotFound.Temporary(), Equals, true)
-       c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+       for _, f := range []func() error{
+               func() error {
+                       _, _, err := kc.Ask(hash)
+                       return err
+               },
+               func() error {
+                       _, _, _, err := kc.Get(hash)
+                       return err
+               },
+       } {
+               err := f()
+               c.Assert(err, NotNil)
+               errNotFound, _ := err.(*keepclient.ErrNotFound)
+               c.Check(errNotFound.Temporary(), Equals, true)
+               c.Check(err, ErrorMatches, `.*HTTP 502.*`)
+       }
+}
 
-       // Get should result in temporary connection refused error
-       _, _, _, err = kc.Get(hash)
-       c.Check(err, NotNil)
-       errNotFound, _ = err.(*keepclient.ErrNotFound)
-       c.Check(errNotFound.Temporary(), Equals, true)
-       c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+func (s *ServerRequiredSuite) TestPing(c *C) {
+       kc, _ := runProxy(c, false, false, nil)
+       defer closeListener()
+
+       rtr, err := MakeRESTRouter(kc, 10*time.Second, &arvados.Cluster{ManagementToken: arvadostest.ManagementToken}, log.New())
+       c.Assert(err, check.IsNil)
+
+       req, err := http.NewRequest("GET",
+               "http://"+listener.Addr().String()+"/_health/ping",
+               nil)
+       c.Assert(err, IsNil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+
+       resp := httptest.NewRecorder()
+       rtr.ServeHTTP(resp, req)
+       c.Check(resp.Code, Equals, 200)
+       c.Assert(resp.Body.String(), Matches, `{"health":"OK"}\n?`)
 }