16665: Fixes tests.
[arvados.git] / sdk / go / keepclient / keepclient_test.go
index 2604b02b17aaeb412b2519e4c09a69264fa8d340..62268fa463e6dee07083a815e9b97536e83a5c40 100644 (file)
@@ -6,8 +6,8 @@ package keepclient
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
-       "errors"
        "fmt"
        "io"
        "io/ioutil"
@@ -16,12 +16,15 @@ import (
        "net/http"
        "os"
        "strings"
+       "sync"
        "testing"
        "time"
 
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        . "gopkg.in/check.v1"
+       check "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -95,21 +98,33 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
 }
 
 type StubPutHandler struct {
-       c                  *C
-       expectPath         string
-       expectApiToken     string
-       expectBody         string
-       expectStorageClass string
-       handled            chan string
+       c                    *C
+       expectPath           string
+       expectAPIToken       string
+       expectBody           string
+       expectStorageClass   string
+       returnStorageClasses string
+       handled              chan string
+       requests             []*http.Request
+       mtx                  sync.Mutex
 }
 
-func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (sph *StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       sph.mtx.Lock()
+       sph.requests = append(sph.requests, req)
+       sph.mtx.Unlock()
        sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
-       sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
-       sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
+       sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectAPIToken))
+       if sph.expectStorageClass != "*" {
+               sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
+       }
        body, err := ioutil.ReadAll(req.Body)
        sph.c.Check(err, Equals, nil)
        sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
+       resp.Header().Set("X-Keep-Replicas-Stored", "1")
+       if sph.returnStorageClasses != "" {
+               resp.Header().Set("X-Keep-Storage-Classes-Confirmed", sph.returnStorageClasses)
+       }
        resp.WriteHeader(200)
        sph.handled <- fmt.Sprintf("http://%s", req.Host)
 }
@@ -139,56 +154,162 @@ func UploadToStubHelper(c *C, st http.Handler, f func(*KeepClient, string,
        kc, _ := MakeKeepClient(arv)
 
        reader, writer := io.Pipe()
-       upload_status := make(chan uploadStatus)
+       uploadStatusChan := make(chan uploadStatus)
 
-       f(kc, ks.url, reader, writer, upload_status)
+       f(kc, ks.url, reader, writer, uploadStatusChan)
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
        log.Printf("TestUploadToStubKeepServer")
 
-       st := StubPutHandler{
-               c,
-               "acbd18db4cc2f85cedef654fccc4a4d8",
-               "abc123",
-               "foo",
-               "hot",
-               make(chan string)}
+       st := &StubPutHandler{
+               c:                    c,
+               expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
+               expectAPIToken:       "abc123",
+               expectBody:           "foo",
+               expectStorageClass:   "",
+               returnStorageClasses: "default=1",
+               handled:              make(chan string),
+       }
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
-                       kc.StorageClasses = []string{"hot"}
-                       go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), kc.getRequestID())
+               func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
+                       go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
 
                        <-st.handled
-                       status := <-upload_status
-                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
+                       status := <-uploadStatusChan
+                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
                })
 }
 
 func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
-       st := StubPutHandler{
-               c,
-               "acbd18db4cc2f85cedef654fccc4a4d8",
-               "abc123",
-               "foo",
-               "",
-               make(chan string)}
+       st := &StubPutHandler{
+               c:                    c,
+               expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
+               expectAPIToken:       "abc123",
+               expectBody:           "foo",
+               expectStorageClass:   "",
+               returnStorageClasses: "default=1",
+               handled:              make(chan string),
+       }
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
-                       go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, kc.getRequestID())
+               func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, uploadStatusChan chan uploadStatus) {
+                       go kc.uploadToKeepServer(url, st.expectPath, nil, bytes.NewBuffer([]byte("foo")), uploadStatusChan, 3, kc.getRequestID())
 
                        <-st.handled
 
-                       status := <-upload_status
-                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, ""})
+                       status := <-uploadStatusChan
+                       c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, map[string]int{"default": 1}, ""})
                })
 }
 
+func (s *StandaloneSuite) TestUploadWithStorageClasses(c *C) {
+       for _, trial := range []struct {
+               respHeader string
+               expectMap  map[string]int
+       }{
+               {"", nil},
+               {"foo=1", map[string]int{"foo": 1}},
+               {" foo=1 , bar=2 ", map[string]int{"foo": 1, "bar": 2}},
+               {" =foo=1 ", nil},
+               {"foo", nil},
+       } {
+               st := &StubPutHandler{
+                       c:                    c,
+                       expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
+                       expectAPIToken:       "abc123",
+                       expectBody:           "foo",
+                       expectStorageClass:   "",
+                       returnStorageClasses: trial.respHeader,
+                       handled:              make(chan string),
+               }
+
+               UploadToStubHelper(c, st,
+                       func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
+                               go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
+
+                               writer.Write([]byte("foo"))
+                               writer.Close()
+
+                               <-st.handled
+                               status := <-uploadStatusChan
+                               c.Check(status, DeepEquals, uploadStatus{nil, fmt.Sprintf("%s/%s", url, st.expectPath), 200, 1, trial.expectMap, ""})
+                       })
+       }
+}
+
+func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
+       nServers := 5
+       for _, trial := range []struct {
+               replicas      int
+               clientClasses []string
+               putClasses    []string // putClasses takes precedence over clientClasses
+               minRequests   int
+               maxRequests   int
+               success       bool
+       }{
+               {1, []string{"class1"}, nil, 1, 1, true},
+               {2, []string{"class1"}, nil, 1, 2, true},
+               {3, []string{"class1"}, nil, 2, 3, true},
+               {1, []string{"class1", "class2"}, nil, 1, 1, true},
+               {3, nil, []string{"class1"}, 2, 3, true},
+               {1, nil, []string{"class1", "class2"}, 1, 1, true},
+               {1, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
+               {1, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
+               {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
+               {1, []string{"class404"}, nil, nServers, nServers, false},
+               {1, []string{"class1", "class404"}, nil, nServers, nServers, false},
+               {1, nil, []string{"class1", "class404"}, nServers, nServers, false},
+       } {
+               c.Logf("%+v", trial)
+               st := &StubPutHandler{
+                       c:                    c,
+                       expectPath:           "acbd18db4cc2f85cedef654fccc4a4d8",
+                       expectAPIToken:       "abc123",
+                       expectBody:           "foo",
+                       expectStorageClass:   "*",
+                       returnStorageClasses: "class1=2, class2=2",
+                       handled:              make(chan string, 100),
+               }
+               ks := RunSomeFakeKeepServers(st, nServers)
+               arv, _ := arvadosclient.MakeArvadosClient()
+               kc, _ := MakeKeepClient(arv)
+               kc.Want_replicas = trial.replicas
+               kc.StorageClasses = trial.clientClasses
+               arv.ApiToken = "abc123"
+               localRoots := make(map[string]string)
+               writableLocalRoots := make(map[string]string)
+               for i, k := range ks {
+                       localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+                       writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+                       defer k.listener.Close()
+               }
+               kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+
+               _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+                       Data:           []byte("foo"),
+                       StorageClasses: trial.putClasses,
+               })
+               if trial.success {
+                       c.Check(err, check.IsNil)
+               } else {
+                       c.Check(err, check.NotNil)
+               }
+               c.Check(len(st.handled) >= trial.minRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.minRequests==%d", len(st.handled), trial.minRequests))
+               c.Check(len(st.handled) <= trial.maxRequests, check.Equals, true, check.Commentf("len(st.handled)==%d, trial.maxRequests==%d", len(st.handled), trial.maxRequests))
+               if !trial.success && trial.replicas == 1 && c.Check(len(st.requests) >= 2, check.Equals, true) {
+                       // Max concurrency should be 1. First request
+                       // should have succeeded for class1. Second
+                       // request should only ask for class404.
+                       c.Check(st.requests[1].Header.Get("X-Keep-Storage-Classes"), check.Equals, "class404")
+               }
+       }
+}
+
 type FailHandler struct {
        handled chan string
 }
@@ -209,7 +330,7 @@ func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.
        fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
        if fh.count == 0 {
                resp.WriteHeader(500)
-               fh.count += 1
+               fh.count++
                fh.handled <- fmt.Sprintf("http://%s", req.Host)
        } else {
                fh.successhandler.ServeHTTP(resp, req)
@@ -233,16 +354,16 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
 
        UploadToStubHelper(c, st,
                func(kc *KeepClient, url string, reader io.ReadCloser,
-                       writer io.WriteCloser, upload_status chan uploadStatus) {
+                       writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
 
-                       go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, kc.getRequestID())
+                       go kc.uploadToKeepServer(url, hash, nil, reader, uploadStatusChan, 3, kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
 
                        <-st.handled
 
-                       status := <-upload_status
+                       status := <-uploadStatusChan
                        c.Check(status.url, Equals, fmt.Sprintf("%s/%s", url, hash))
                        c.Check(status.statusCode, Equals, 500)
                })
@@ -256,7 +377,7 @@ type KeepServer struct {
 func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
        ks = make([]KeepServer, n)
 
-       for i := 0; i < n; i += 1 {
+       for i := 0; i < n; i++ {
                ks[i] = RunFakeKeepServer(st)
        }
 
@@ -266,13 +387,15 @@ func RunSomeFakeKeepServers(st http.Handler, n int) (ks []KeepServer) {
 func (s *StandaloneSuite) TestPutB(c *C) {
        hash := Md5String("foo")
 
-       st := StubPutHandler{
-               c,
-               hash,
-               "abc123",
-               "foo",
-               "",
-               make(chan string, 5)}
+       st := &StubPutHandler{
+               c:                    c,
+               expectPath:           hash,
+               expectAPIToken:       "abc123",
+               expectBody:           "foo",
+               expectStorageClass:   "",
+               returnStorageClasses: "",
+               handled:              make(chan string, 5),
+       }
 
        arv, _ := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(arv)
@@ -308,13 +431,15 @@ func (s *StandaloneSuite) TestPutB(c *C) {
 func (s *StandaloneSuite) TestPutHR(c *C) {
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-       st := StubPutHandler{
-               c,
-               hash,
-               "abc123",
-               "foo",
-               "",
-               make(chan string, 5)}
+       st := &StubPutHandler{
+               c:                    c,
+               expectPath:           hash,
+               expectAPIToken:       "abc123",
+               expectBody:           "foo",
+               expectStorageClass:   "",
+               returnStorageClasses: "",
+               handled:              make(chan string, 5),
+       }
 
        arv, _ := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(arv)
@@ -357,13 +482,15 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
 func (s *StandaloneSuite) TestPutWithFail(c *C) {
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-       st := StubPutHandler{
-               c,
-               hash,
-               "abc123",
-               "foo",
-               "",
-               make(chan string, 4)}
+       st := &StubPutHandler{
+               c:                    c,
+               expectPath:           hash,
+               expectAPIToken:       "abc123",
+               expectBody:           "foo",
+               expectStorageClass:   "",
+               returnStorageClasses: "",
+               handled:              make(chan string, 4),
+       }
 
        fh := FailHandler{
                make(chan string, 1)}
@@ -417,13 +544,15 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
 func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-       st := StubPutHandler{
-               c,
-               hash,
-               "abc123",
-               "foo",
-               "",
-               make(chan string, 1)}
+       st := &StubPutHandler{
+               c:                    c,
+               expectPath:           hash,
+               expectAPIToken:       "abc123",
+               expectBody:           "foo",
+               expectStorageClass:   "",
+               returnStorageClasses: "",
+               handled:              make(chan string, 1),
+       }
 
        fh := FailHandler{
                make(chan string, 4)}
@@ -456,7 +585,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 
        _, replicas, err := kc.PutB([]byte("foo"))
 
-       c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
+       c.Check(err, FitsTypeOf, InsufficientReplicasError{})
        c.Check(replicas, Equals, 1)
        c.Check(<-st.handled, Equals, ks1[0].url)
 }
@@ -464,14 +593,14 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
 type StubGetHandler struct {
        c              *C
        expectPath     string
-       expectApiToken string
+       expectAPIToken string
        httpStatus     int
        body           []byte
 }
 
 func (sgh StubGetHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        sgh.c.Check(req.URL.Path, Equals, "/"+sgh.expectPath)
-       sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectApiToken))
+       sgh.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sgh.expectAPIToken))
        resp.WriteHeader(sgh.httpStatus)
        resp.Header().Set("Content-Length", fmt.Sprintf("%d", len(sgh.body)))
        resp.Write(sgh.body)
@@ -770,9 +899,9 @@ type BarHandler struct {
        handled chan string
 }
 
-func (this BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h BarHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        resp.Write([]byte("bar"))
-       this.handled <- fmt.Sprintf("http://%s", req.Host)
+       h.handled <- fmt.Sprintf("http://%s", req.Host)
 }
 
 func (s *StandaloneSuite) TestChecksum(c *C) {
@@ -860,9 +989,9 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
        c.Check(n, Equals, int64(3))
        c.Check(url2, Equals, fmt.Sprintf("%s/%s", ks1[0].url, hash))
 
-       read_content, err2 := ioutil.ReadAll(r)
+       readContent, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
-       c.Check(read_content, DeepEquals, content)
+       c.Check(readContent, DeepEquals, content)
 }
 
 func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
@@ -892,9 +1021,9 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
                c.Check(n, Equals, int64(len(content)))
                c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
 
-               read_content, err2 := ioutil.ReadAll(r)
+               readContent, err2 := ioutil.ReadAll(r)
                c.Check(err2, Equals, nil)
-               c.Check(read_content, DeepEquals, content)
+               c.Check(readContent, DeepEquals, content)
        }
        {
                n, url2, err := kc.Ask(hash)
@@ -921,9 +1050,9 @@ type StubProxyHandler struct {
        handled chan string
 }
 
-func (this StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+func (h StubProxyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        resp.Header().Set("X-Keep-Replicas-Stored", "2")
-       this.handled <- fmt.Sprintf("http://%s", req.Host)
+       h.handled <- fmt.Sprintf("http://%s", req.Host)
 }
 
 func (s *StandaloneSuite) TestPutProxy(c *C) {
@@ -979,7 +1108,7 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
        _, replicas, err := kc.PutB([]byte("foo"))
        <-st.handled
 
-       c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
+       c.Check(err, FitsTypeOf, InsufficientReplicasError{})
        c.Check(replicas, Equals, 2)
 }
 
@@ -1025,13 +1154,15 @@ func (s *StandaloneSuite) TestMakeLocatorInvalidInput(c *C) {
 func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C) {
        hash := Md5String("foo")
 
-       st := StubPutHandler{
-               c,
-               hash,
-               "abc123",
-               "foo",
-               "",
-               make(chan string, 5)}
+       st := &StubPutHandler{
+               c:                    c,
+               expectPath:           hash,
+               expectAPIToken:       "abc123",
+               expectBody:           "foo",
+               expectStorageClass:   "",
+               returnStorageClasses: "",
+               handled:              make(chan string, 5),
+       }
 
        arv, _ := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(arv)
@@ -1055,7 +1186,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
 
        _, replicas, err := kc.PutB([]byte("foo"))
 
-       c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
+       c.Check(err, FitsTypeOf, InsufficientReplicasError{})
        c.Check(replicas, Equals, 1)
 
        c.Check(<-st.handled, Equals, localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", 0)])
@@ -1064,13 +1195,15 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
 func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
        hash := Md5String("foo")
 
-       st := StubPutHandler{
-               c,
-               hash,
-               "abc123",
-               "foo",
-               "",
-               make(chan string, 5)}
+       st := &StubPutHandler{
+               c:                    c,
+               expectPath:           hash,
+               expectAPIToken:       "abc123",
+               expectBody:           "foo",
+               expectStorageClass:   "",
+               returnStorageClasses: "",
+               handled:              make(chan string, 5),
+       }
 
        arv, _ := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(arv)
@@ -1091,7 +1224,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
 
        _, replicas, err := kc.PutB([]byte("foo"))
 
-       c.Check(err, FitsTypeOf, InsufficientReplicasError(errors.New("")))
+       c.Check(err, FitsTypeOf, InsufficientReplicasError{})
        c.Check(replicas, Equals, 0)
 }
 
@@ -1240,13 +1373,16 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
 func (s *StandaloneSuite) TestPutBRetry(c *C) {
        st := &FailThenSucceedHandler{
                handled: make(chan string, 1),
-               successhandler: StubPutHandler{
-                       c,
-                       Md5String("foo"),
-                       "abc123",
-                       "foo",
-                       "",
-                       make(chan string, 5)}}
+               successhandler: &StubPutHandler{
+                       c:                    c,
+                       expectPath:           Md5String("foo"),
+                       expectAPIToken:       "abc123",
+                       expectBody:           "foo",
+                       expectStorageClass:   "",
+                       returnStorageClasses: "",
+                       handled:              make(chan string, 5),
+               },
+       }
 
        arv, _ := arvadosclient.MakeArvadosClient()
        kc, _ := MakeKeepClient(arv)