16314: Merge branch 'master'
[arvados.git] / sdk / go / keepclient / keepclient_test.go
index ff0f57bbe6ce58c091e9c5a94b98cda67e077424..2604b02b17aaeb412b2519e4c09a69264fa8d340 100644 (file)
@@ -1,6 +1,11 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package keepclient
 
 import (
+       "bytes"
        "crypto/md5"
        "errors"
        "fmt"
@@ -14,9 +19,8 @@ import (
        "testing"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
+       "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
        . "gopkg.in/check.v1"
 )
 
@@ -36,7 +40,7 @@ type ServerRequiredSuite struct{}
 type StandaloneSuite struct{}
 
 func (s *StandaloneSuite) SetUpTest(c *C) {
-       ClearCache()
+       RefreshServiceDiscovery()
 }
 
 func pythonDir() string {
@@ -55,7 +59,7 @@ func (s *ServerRequiredSuite) TearDownSuite(c *C) {
 }
 
 func (s *ServerRequiredSuite) SetUpTest(c *C) {
-       ClearCache()
+       RefreshServiceDiscovery()
 }
 
 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
@@ -76,28 +80,33 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
        c.Assert(err, Equals, nil)
 
        kc, err := MakeKeepClient(arv)
+       c.Check(err, IsNil)
        c.Assert(kc.Want_replicas, Equals, 2)
 
        arv.DiscoveryDoc["defaultCollectionReplication"] = 3.0
        kc, err = MakeKeepClient(arv)
+       c.Check(err, IsNil)
        c.Assert(kc.Want_replicas, Equals, 3)
 
        arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
        kc, err = MakeKeepClient(arv)
+       c.Check(err, IsNil)
        c.Assert(kc.Want_replicas, Equals, 1)
 }
 
 type StubPutHandler struct {
-       c              *C
-       expectPath     string
-       expectApiToken string
-       expectBody     string
-       handled        chan string
+       c                  *C
+       expectPath         string
+       expectApiToken     string
+       expectBody         string
+       expectStorageClass string
+       handled            chan string
 }
 
 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
        sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
+       sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
        body, err := ioutil.ReadAll(req.Body)
        sph.c.Check(err, Equals, nil)
        sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
@@ -107,7 +116,9 @@ func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 
 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
        var err error
-       ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
+       // If we don't explicitly bind it to localhost, ks.listener.Addr() will
+       // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
+       ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
        if err != nil {
                panic(fmt.Sprintf("Could not listen on any port"))
        }
@@ -141,12 +152,13 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
                "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
+               "hot",
                make(chan string)}
 
        UploadToStubHelper(c, st,
                func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
-
-                       go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), 0)
+                       kc.StorageClasses = []string{"hot"}
+                       go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
@@ -163,21 +175,12 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
+               "",
                make(chan string)}
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser,
-                       writer io.WriteCloser, upload_status chan uploadStatus) {
-
-                       tr := streamer.AsyncStreamFromReader(512, reader)
-                       defer tr.Close()
-
-                       br1 := tr.MakeStreamReader()
-
-                       go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
-                       writer.Write([]byte("foo"))
-                       writer.Close()
+               func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+                       go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, kc.getRequestID())
 
                        <-st.handled
 
@@ -198,10 +201,12 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 type FailThenSucceedHandler struct {
        handled        chan string
        count          int
-       successhandler StubGetHandler
+       successhandler http.Handler
+       reqIDs         []string
 }
 
 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
        if fh.count == 0 {
                resp.WriteHeader(500)
                fh.count += 1
@@ -230,7 +235,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
                func(kc *KeepClient, url string, reader io.ReadCloser,
                        writer io.WriteCloser, upload_status chan uploadStatus) {
 
-                       go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, 0)
+                       go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
@@ -266,6 +271,7 @@ func (s *StandaloneSuite) TestPutB(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -307,6 +313,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -355,12 +362,14 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 4)}
 
        fh := FailHandler{
                make(chan string, 1)}
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
 
        kc.Want_replicas = 2
@@ -413,12 +422,14 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 1)}
 
        fh := FailHandler{
                make(chan string, 4)}
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
 
        kc.Want_replicas = 2
@@ -480,6 +491,7 @@ func (s *StandaloneSuite) TestGet(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -504,6 +516,7 @@ func (s *StandaloneSuite) TestGet404(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -522,6 +535,7 @@ func (s *StandaloneSuite) TestGetEmptyBlock(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -545,6 +559,7 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -563,8 +578,9 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-       st := &FailThenSucceedHandler{make(chan string, 1), 0,
-               StubGetHandler{
+       st := &FailThenSucceedHandler{
+               handled: make(chan string, 1),
+               successhandler: StubGetHandler{
                        c,
                        hash,
                        "abc123",
@@ -575,6 +591,7 @@ func (s *StandaloneSuite) TestGetFailRetry(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -588,12 +605,20 @@ func (s *StandaloneSuite) TestGetFailRetry(c *C) {
        content, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
        c.Check(content, DeepEquals, []byte("foo"))
+
+       c.Logf("%q", st.reqIDs)
+       c.Assert(len(st.reqIDs) > 1, Equals, true)
+       for _, reqid := range st.reqIDs {
+               c.Check(reqid, Not(Equals), "")
+               c.Check(reqid, Equals, st.reqIDs[0])
+       }
 }
 
 func (s *StandaloneSuite) TestGetNetError(c *C) {
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
@@ -630,6 +655,7 @@ func (s *StandaloneSuite) TestGetWithServiceHint(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(
@@ -673,6 +699,7 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(
@@ -680,13 +707,13 @@ func (s *StandaloneSuite) TestGetWithLocalServiceHint(c *C) {
                        "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
                        "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
                        "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
-                       uuid: ks.url},
+                       uuid:                          ks.url},
                nil,
                map[string]string{
                        "zzzzz-bi6l4-yyyyyyyyyyyyyyy": ks0.url,
                        "zzzzz-bi6l4-xxxxxxxxxxxxxxx": ks0.url,
                        "zzzzz-bi6l4-wwwwwwwwwwwwwww": ks0.url,
-                       uuid: ks.url},
+                       uuid:                          ks.url},
        )
 
        r, n, uri, err := kc.Get(hash + "+K@" + uuid)
@@ -720,6 +747,7 @@ func (s *StandaloneSuite) TestGetWithServiceHintFailoverToLocals(c *C) {
        defer ksGateway.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(
@@ -757,11 +785,13 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, n, _, err := kc.Get(barhash)
+       c.Check(err, IsNil)
        _, err = ioutil.ReadAll(r)
        c.Check(n, Equals, int64(3))
        c.Check(err, Equals, nil)
@@ -769,6 +799,7 @@ func (s *StandaloneSuite) TestChecksum(c *C) {
        <-st.handled
 
        r, n, _, err = kc.Get(foohash)
+       c.Check(err, IsNil)
        _, err = ioutil.ReadAll(r)
        c.Check(n, Equals, int64(3))
        c.Check(err, Equals, BadChecksum)
@@ -791,6 +822,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
                content}
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        localRoots := make(map[string]string)
@@ -837,6 +869,7 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
        content := []byte("TestPutGetHead")
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, err := MakeKeepClient(arv)
        c.Assert(err, Equals, nil)
 
@@ -869,6 +902,19 @@ func (s *ServerRequiredSuite) TestPutGetHead(c *C) {
                c.Check(n, Equals, int64(len(content)))
                c.Check(url2, Matches, fmt.Sprintf("http://localhost:\\d+/%s", hash))
        }
+       {
+               loc, err := kc.LocalLocator(hash)
+               c.Check(err, Equals, nil)
+               c.Assert(len(loc) >= 32, Equals, true)
+               c.Check(loc[:32], Equals, hash[:32])
+       }
+       {
+               content := []byte("the perth county conspiracy")
+               loc, err := kc.LocalLocator(fmt.Sprintf("%x+%d+Rzaaaa-abcde@12345", md5.Sum(content), len(content)))
+               c.Check(loc, Equals, "")
+               c.Check(err, ErrorMatches, `.*HEAD .*\+R.*`)
+               c.Check(err, ErrorMatches, `.*HTTP 400.*`)
+       }
 }
 
 type StubProxyHandler struct {
@@ -884,6 +930,7 @@ func (s *StandaloneSuite) TestPutProxy(c *C) {
        st := StubProxyHandler{make(chan string, 1)}
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
 
        kc.Want_replicas = 2
@@ -912,6 +959,7 @@ func (s *StandaloneSuite) TestPutProxyInsufficientReplicas(c *C) {
        st := StubProxyHandler{make(chan string, 1)}
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
 
        kc.Want_replicas = 3
@@ -982,6 +1030,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1020,6 +1069,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1103,6 +1153,7 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -1129,6 +1180,7 @@ func (s *StandaloneSuite) TestGetIndexIncomplete(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -1151,6 +1203,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchServer(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -1171,6 +1224,7 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
+       c.Check(err, IsNil)
        kc, _ := MakeKeepClient(arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
@@ -1183,29 +1237,15 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
        c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
 }
 
-type FailThenSucceedPutHandler struct {
-       handled        chan string
-       count          int
-       successhandler StubPutHandler
-}
-
-func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-       if h.count == 0 {
-               resp.WriteHeader(500)
-               h.count += 1
-               h.handled <- fmt.Sprintf("http://%s", req.Host)
-       } else {
-               h.successhandler.ServeHTTP(resp, req)
-       }
-}
-
 func (s *StandaloneSuite) TestPutBRetry(c *C) {
-       st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
-               StubPutHandler{
+       st := &FailThenSucceedHandler{
+               handled: make(chan string, 1),
+               successhandler: StubPutHandler{
                        c,
                        Md5String("foo"),
                        "abc123",
                        "foo",
+                       "",
                        make(chan string, 5)}}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1247,7 +1287,7 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
                &blobKeepService)
        c.Assert(err, Equals, nil)
        defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
-       ClearCache()
+       RefreshServiceDiscovery()
 
        // Make a keepclient and ensure that the testblobstore is included
        kc, err := MakeKeepClient(arv)