Merge branch '9550-keep-services-env'
[arvados.git] / sdk / go / keepclient / collectionreader_test.go
index 51710b7bea40ccd39148e9286ab441fb9ec60e7d..2cc23738855dfeab3cd8ab2ef33cb27055a35fa1 100644 (file)
@@ -2,26 +2,53 @@ package keepclient
 
 import (
        "crypto/md5"
+       "crypto/rand"
        "fmt"
        "io"
        "io/ioutil"
        "net/http"
        "os"
+       "strconv"
+       "strings"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
 )
 
-var _ = check.Suite(&IntegrationSuite{})
+var _ = check.Suite(&CollectionReaderUnit{})
 
-// IntegrationSuite tests need an API server
-type IntegrationSuite struct{}
+type CollectionReaderUnit struct {
+       arv     arvadosclient.ArvadosClient
+       kc      *KeepClient
+       handler SuccessHandler
+}
+
+func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
+       var err error
+       s.arv, err = arvadosclient.MakeArvadosClient()
+       c.Assert(err, check.IsNil)
+       s.arv.ApiToken = arvadostest.ActiveToken
+
+       s.kc, err = MakeKeepClient(&s.arv)
+       c.Assert(err, check.IsNil)
+
+       s.handler = SuccessHandler{
+               disk: make(map[string][]byte),
+               lock: make(chan struct{}, 1),
+               ops:  new(int),
+       }
+       localRoots := make(map[string]string)
+       for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
+               localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+       }
+       s.kc.SetServiceRoots(localRoots, localRoots, nil)
+}
 
 type SuccessHandler struct {
        disk map[string][]byte
-       lock chan struct{}      // channel with buffer==1: full when an operation is in progress.
-       ops  *int               // number of operations completed
+       lock chan struct{} // channel with buffer==1: full when an operation is in progress.
+       ops  *int          // number of operations completed
 }
 
 func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
@@ -38,7 +65,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
                if h.ops != nil {
                        (*h.ops)++
                }
-               <- h.lock
+               <-h.lock
                resp.Write([]byte(pdh))
        case "GET":
                pdh := req.URL.Path[1:]
@@ -47,7 +74,7 @@ func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
                if h.ops != nil {
                        (*h.ops)++
                }
-               <- h.lock
+               <-h.lock
                if !ok {
                        resp.WriteHeader(http.StatusNotFound)
                } else {
@@ -64,33 +91,11 @@ type rdrTest struct {
        want interface{} // error or string to expect
 }
 
-func StubWithFakeServers(kc *KeepClient, h http.Handler) {
-       localRoots := make(map[string]string)
-       for i, k := range RunSomeFakeKeepServers(h, 4) {
-               localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
-       }
-       kc.SetServiceRoots(localRoots, localRoots, nil)
-}
-
-func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.IsNil)
-       arv.ApiToken = arvadostest.ActiveToken
-
-       kc, err := MakeKeepClient(&arv)
-       c.Assert(err, check.IsNil)
-
-       {
-               h := SuccessHandler{
-                       disk: make(map[string][]byte),
-                       lock: make(chan struct{}, 1),
-               }
-               StubWithFakeServers(kc, h)
-               kc.PutB([]byte("foo"))
-               kc.PutB([]byte("bar"))
-               kc.PutB([]byte("Hello world\n"))
-               kc.PutB([]byte(""))
-       }
+func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
+       s.kc.PutB([]byte("foo"))
+       s.kc.PutB([]byte("bar"))
+       s.kc.PutB([]byte("Hello world\n"))
+       s.kc.PutB([]byte(""))
 
        mt := arvadostest.PathologicalManifest
 
@@ -116,7 +121,7 @@ func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) {
                {mt: mt, f: "segmented/frob", want: "frob"},
                {mt: mt, f: "segmented/oof", want: "oof"},
        } {
-               rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
+               rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
                switch want := testCase.want.(type) {
                case error:
                        c.Check(rdr, check.IsNil)
@@ -136,21 +141,34 @@ func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) {
        }
 }
 
-func (s *ServerRequiredSuite) TestCollectionReaderCloseEarly(c *check.C) {
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, check.IsNil)
-       arv.ApiToken = arvadostest.ActiveToken
-
-       kc, err := MakeKeepClient(&arv)
-       c.Assert(err, check.IsNil)
-
-       h := SuccessHandler{
-               disk: make(map[string][]byte),
-               lock: make(chan struct{}, 1),
-               ops: new(int),
+func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
+       h := md5.New()
+       buf := make([]byte, 4096)
+       locs := make([]string, len(buf))
+       filesize := 0
+       for i := 0; i < len(locs); i++ {
+               _, err := io.ReadFull(rand.Reader, buf[:i])
+               c.Assert(err, check.IsNil)
+               h.Write(buf[:i])
+               locs[i], _, err = s.kc.PutB(buf[:i])
+               c.Assert(err, check.IsNil)
+               filesize += i
        }
-       StubWithFakeServers(kc, h)
-       kc.PutB([]byte("foo"))
+       manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
+       dataMD5 := h.Sum(nil)
+
+       checkMD5 := md5.New()
+       rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
+       c.Check(err, check.IsNil)
+       _, err = io.Copy(checkMD5, rdr)
+       c.Check(err, check.IsNil)
+       _, err = rdr.Read(make([]byte, 1))
+       c.Check(err, check.Equals, io.EOF)
+       c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
+}
+
+func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+       s.kc.PutB([]byte("foo"))
 
        mt := ". "
        for i := 0; i < 1000; i++ {
@@ -161,23 +179,46 @@ func (s *ServerRequiredSuite) TestCollectionReaderCloseEarly(c *check.C) {
        // Grab the stub server's lock, ensuring our cfReader doesn't
        // get anything back from its first call to kc.Get() before we
        // have a chance to call Close().
-       h.lock <- struct{}{}
-       opsBeforeRead := *h.ops
+       s.handler.lock <- struct{}{}
+       opsBeforeRead := *s.handler.ops
 
-       rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
+       rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
        c.Assert(err, check.IsNil)
+
+       firstReadDone := make(chan struct{})
+       go func() {
+               rdr.Read(make([]byte, 6))
+               firstReadDone <- struct{}{}
+       }()
        err = rdr.Close()
        c.Assert(err, check.IsNil)
-       c.Assert(rdr.Error(), check.IsNil)
+       c.Assert(rdr.(*cfReader).Error(), check.IsNil)
 
        // Release the stub server's lock. The first GET operation will proceed.
-       <-h.lock
+       <-s.handler.lock
+
+       // Make sure our first read operation consumes the data
+       // received from the first GET.
+       <-firstReadDone
 
        // doGet() should close toRead before sending any more bufs to it.
-       if what, ok := <-rdr.toRead;  ok {
-               c.Errorf("Got %+v, expected toRead to be closed", what)
+       if what, ok := <-rdr.(*cfReader).toRead; ok {
+               c.Errorf("Got %q, expected toRead to be closed", what)
        }
 
        // Stub should have handled exactly one GET request.
-       c.Assert(*h.ops, check.Equals, opsBeforeRead+1)
+       c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1)
+}
+
+func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
+       manifest := ". ffffffffffffffffffffffffffffffff+1 0:1:notfound.txt\n"
+       buf := make([]byte, 1)
+       rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "notfound.txt")
+       c.Check(err, check.IsNil)
+       for i := 0; i < 2; i++ {
+               _, err = io.ReadFull(rdr, buf)
+               c.Check(err, check.NotNil)
+               c.Check(err, check.Not(check.Equals), io.EOF)
+       }
+       c.Check(rdr.Close(), check.NotNil)
 }