Merge branch 'master' into 13822-nm-delayed-daemon
[arvados.git] / sdk / go / keepclient / keepclient_test.go
index fcae4131fc028e563f5eac4ed1fa1748c5a7f5da..dc80ad7e1d6378ad09da968db62cf038002d0b9c 100644 (file)
@@ -1,6 +1,11 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package keepclient
 
 import (
+       "bytes"
        "crypto/md5"
        "errors"
        "fmt"
@@ -16,7 +21,6 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/streamer"
        . "gopkg.in/check.v1"
 )
 
@@ -35,6 +39,10 @@ type ServerRequiredSuite struct{}
 // Standalone tests
 type StandaloneSuite struct{}
 
+func (s *StandaloneSuite) SetUpTest(c *C) {
+       RefreshServiceDiscovery()
+}
+
 func pythonDir() string {
        cwd, _ := os.Getwd()
        return fmt.Sprintf("%s/../../python/tests", cwd)
@@ -50,6 +58,10 @@ func (s *ServerRequiredSuite) TearDownSuite(c *C) {
        arvadostest.StopAPI()
 }
 
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+       RefreshServiceDiscovery()
+}
+
 func (s *ServerRequiredSuite) TestMakeKeepClient(c *C) {
        arv, err := arvadosclient.MakeArvadosClient()
        c.Assert(err, Equals, nil)
@@ -76,20 +88,23 @@ func (s *ServerRequiredSuite) TestDefaultReplications(c *C) {
 
        arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
        kc, err = MakeKeepClient(arv)
+       c.Check(err, IsNil)
        c.Assert(kc.Want_replicas, Equals, 1)
 }
 
 type StubPutHandler struct {
-       c              *C
-       expectPath     string
-       expectApiToken string
-       expectBody     string
-       handled        chan string
+       c                  *C
+       expectPath         string
+       expectApiToken     string
+       expectBody         string
+       expectStorageClass string
+       handled            chan string
 }
 
 func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
        sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
+       sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
        body, err := ioutil.ReadAll(req.Body)
        sph.c.Check(err, Equals, nil)
        sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
@@ -99,7 +114,9 @@ func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request)
 
 func RunFakeKeepServer(st http.Handler) (ks KeepServer) {
        var err error
-       ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{Port: 0})
+       // If we don't explicitly bind it to localhost, ks.listener.Addr() will
+       // bind to 0.0.0.0 or [::] which is not a valid address for Dial()
+       ks.listener, err = net.ListenTCP("tcp", &net.TCPAddr{IP: []byte{127, 0, 0, 1}, Port: 0})
        if err != nil {
                panic(fmt.Sprintf("Could not listen on any port"))
        }
@@ -133,12 +150,13 @@ func (s *StandaloneSuite) TestUploadToStubKeepServer(c *C) {
                "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
+               "hot",
                make(chan string)}
 
        UploadToStubHelper(c, st,
                func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
-
-                       go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), 0)
+                       kc.StorageClasses = []string{"hot"}
+                       go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
@@ -155,21 +173,12 @@ func (s *StandaloneSuite) TestUploadToStubKeepServerBufferReader(c *C) {
                "acbd18db4cc2f85cedef654fccc4a4d8",
                "abc123",
                "foo",
+               "",
                make(chan string)}
 
        UploadToStubHelper(c, st,
-               func(kc *KeepClient, url string, reader io.ReadCloser,
-                       writer io.WriteCloser, upload_status chan uploadStatus) {
-
-                       tr := streamer.AsyncStreamFromReader(512, reader)
-                       defer tr.Close()
-
-                       br1 := tr.MakeStreamReader()
-
-                       go kc.uploadToKeepServer(url, st.expectPath, br1, upload_status, 3, 0)
-
-                       writer.Write([]byte("foo"))
-                       writer.Close()
+               func(kc *KeepClient, url string, _ io.ReadCloser, _ io.WriteCloser, upload_status chan uploadStatus) {
+                       go kc.uploadToKeepServer(url, st.expectPath, bytes.NewBuffer([]byte("foo")), upload_status, 3, kc.getRequestID())
 
                        <-st.handled
 
@@ -190,10 +199,12 @@ func (fh FailHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 type FailThenSucceedHandler struct {
        handled        chan string
        count          int
-       successhandler StubGetHandler
+       successhandler http.Handler
+       reqIDs         []string
 }
 
 func (fh *FailThenSucceedHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       fh.reqIDs = append(fh.reqIDs, req.Header.Get("X-Request-Id"))
        if fh.count == 0 {
                resp.WriteHeader(500)
                fh.count += 1
@@ -222,7 +233,7 @@ func (s *StandaloneSuite) TestFailedUploadToStubKeepServer(c *C) {
                func(kc *KeepClient, url string, reader io.ReadCloser,
                        writer io.WriteCloser, upload_status chan uploadStatus) {
 
-                       go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, 0)
+                       go kc.uploadToKeepServer(url, hash, reader, upload_status, 3, kc.getRequestID())
 
                        writer.Write([]byte("foo"))
                        writer.Close()
@@ -258,6 +269,7 @@ func (s *StandaloneSuite) TestPutB(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -299,6 +311,7 @@ func (s *StandaloneSuite) TestPutHR(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -347,6 +360,7 @@ func (s *StandaloneSuite) TestPutWithFail(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 4)}
 
        fh := FailHandler{
@@ -405,6 +419,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 1)}
 
        fh := FailHandler{
@@ -555,8 +570,9 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
 func (s *StandaloneSuite) TestGetFailRetry(c *C) {
        hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
 
-       st := &FailThenSucceedHandler{make(chan string, 1), 0,
-               StubGetHandler{
+       st := &FailThenSucceedHandler{
+               handled: make(chan string, 1),
+               successhandler: StubGetHandler{
                        c,
                        hash,
                        "abc123",
@@ -580,6 +596,13 @@ func (s *StandaloneSuite) TestGetFailRetry(c *C) {
        content, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
        c.Check(content, DeepEquals, []byte("foo"))
+
+       c.Logf("%q", st.reqIDs)
+       c.Assert(len(st.reqIDs) > 1, Equals, true)
+       for _, reqid := range st.reqIDs {
+               c.Check(reqid, Not(Equals), "")
+               c.Check(reqid, Equals, st.reqIDs[0])
+       }
 }
 
 func (s *StandaloneSuite) TestGetNetError(c *C) {
@@ -974,6 +997,7 @@ func (s *StandaloneSuite) TestPutBWant2ReplicasWithOnlyOneWritableLocalRoot(c *C
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1012,6 +1036,7 @@ func (s *StandaloneSuite) TestPutBWithNoWritableLocalRoots(c *C) {
                hash,
                "abc123",
                "foo",
+               "",
                make(chan string, 5)}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1067,12 +1092,14 @@ func (s *StandaloneSuite) TestGetIndexWithNoPrefix(c *C) {
        defer ks.listener.Close()
 
        arv, err := arvadosclient.MakeArvadosClient()
-       kc, _ := MakeKeepClient(arv)
+       c.Assert(err, IsNil)
+       kc, err := MakeKeepClient(arv)
+       c.Assert(err, IsNil)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", "")
-       c.Check(err, Equals, nil)
+       c.Check(err, IsNil)
 
        content, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
@@ -1098,7 +1125,7 @@ func (s *StandaloneSuite) TestGetIndexWithPrefix(c *C) {
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
 
        r, err := kc.GetIndex("x", hash[0:3])
-       c.Check(err, Equals, nil)
+       c.Assert(err, Equals, nil)
 
        content, err2 := ioutil.ReadAll(r)
        c.Check(err2, Equals, nil)
@@ -1173,29 +1200,15 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
        c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
 }
 
-type FailThenSucceedPutHandler struct {
-       handled        chan string
-       count          int
-       successhandler StubPutHandler
-}
-
-func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
-       if h.count == 0 {
-               resp.WriteHeader(500)
-               h.count += 1
-               h.handled <- fmt.Sprintf("http://%s", req.Host)
-       } else {
-               h.successhandler.ServeHTTP(resp, req)
-       }
-}
-
 func (s *StandaloneSuite) TestPutBRetry(c *C) {
-       st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
-               StubPutHandler{
+       st := &FailThenSucceedHandler{
+               handled: make(chan string, 1),
+               successhandler: StubPutHandler{
                        c,
                        Md5String("foo"),
                        "abc123",
                        "foo",
+                       "",
                        make(chan string, 5)}}
 
        arv, _ := arvadosclient.MakeArvadosClient()
@@ -1237,6 +1250,7 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
                &blobKeepService)
        c.Assert(err, Equals, nil)
        defer func() { arv.Delete("keep_services", blobKeepService["uuid"].(string), nil, nil) }()
+       RefreshServiceDiscovery()
 
        // Make a keepclient and ensure that the testblobstore is included
        kc, err := MakeKeepClient(arv)
@@ -1265,5 +1279,5 @@ func (s *ServerRequiredSuite) TestMakeKeepClientWithNonDiskTypeService(c *C) {
 
        c.Assert(kc.replicasPerService, Equals, 0)
        c.Assert(kc.foundNonDiskSvc, Equals, true)
-       c.Assert(kc.Client.(*http.Client).Timeout, Equals, 300*time.Second)
+       c.Assert(kc.httpClient().(*http.Client).Timeout, Equals, 300*time.Second)
 }