import (
"crypto/md5"
+ "crypto/rand"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
+ "strconv"
+ "strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
)
-var _ = check.Suite(&IntegrationSuite{})
+var _ = check.Suite(&CollectionReaderUnit{})
-// IntegrationSuite tests need an API server
-type IntegrationSuite struct{}
+type CollectionReaderUnit struct {
+ arv arvadosclient.ArvadosClient
+ kc *KeepClient
+ handler SuccessHandler
+}
+
+func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
+ var err error
+ s.arv, err = arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.IsNil)
+ s.arv.ApiToken = arvadostest.ActiveToken
+
+ s.kc, err = MakeKeepClient(&s.arv)
+ c.Assert(err, check.IsNil)
+
+ s.handler = SuccessHandler{
+ disk: make(map[string][]byte),
+ lock: make(chan struct{}, 1),
+ ops: new(int),
+ }
+ localRoots := make(map[string]string)
+ for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ }
+ s.kc.SetServiceRoots(localRoots, localRoots, nil)
+}
type SuccessHandler struct {
disk map[string][]byte
- lock chan struct{} // channel with buffer==1: full when an operation is in progress.
- ops *int // number of operations completed
+ lock chan struct{} // channel with buffer==1: full when an operation is in progress.
+ ops *int // number of operations completed
}
func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
if h.ops != nil {
(*h.ops)++
}
- <- h.lock
+ <-h.lock
resp.Write([]byte(pdh))
case "GET":
pdh := req.URL.Path[1:]
if h.ops != nil {
(*h.ops)++
}
- <- h.lock
+ <-h.lock
if !ok {
resp.WriteHeader(http.StatusNotFound)
} else {
want interface{} // error or string to expect
}
-func StubWithFakeServers(kc *KeepClient, h http.Handler) {
- localRoots := make(map[string]string)
- for i, k := range RunSomeFakeKeepServers(h, 4) {
- localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
- }
- kc.SetServiceRoots(localRoots, localRoots, nil)
-}
-
-func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.IsNil)
- arv.ApiToken = arvadostest.ActiveToken
-
- kc, err := MakeKeepClient(&arv)
- c.Assert(err, check.IsNil)
-
- {
- h := SuccessHandler{
- disk: make(map[string][]byte),
- lock: make(chan struct{}, 1),
- }
- StubWithFakeServers(kc, h)
- kc.PutB([]byte("foo"))
- kc.PutB([]byte("bar"))
- kc.PutB([]byte("Hello world\n"))
- kc.PutB([]byte(""))
- }
+func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
+ s.kc.PutB([]byte("foo"))
+ s.kc.PutB([]byte("bar"))
+ s.kc.PutB([]byte("Hello world\n"))
+ s.kc.PutB([]byte(""))
mt := arvadostest.PathologicalManifest
{mt: mt, f: "segmented/frob", want: "frob"},
{mt: mt, f: "segmented/oof", want: "oof"},
} {
- rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
switch want := testCase.want.(type) {
case error:
c.Check(rdr, check.IsNil)
}
}
-func (s *ServerRequiredSuite) TestCollectionReaderCloseEarly(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.IsNil)
- arv.ApiToken = arvadostest.ActiveToken
-
- kc, err := MakeKeepClient(&arv)
- c.Assert(err, check.IsNil)
-
- h := SuccessHandler{
- disk: make(map[string][]byte),
- lock: make(chan struct{}, 1),
- ops: new(int),
+func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
+ h := md5.New()
+ buf := make([]byte, 4096)
+ locs := make([]string, len(buf))
+ filesize := 0
+ for i := 0; i < len(locs); i++ {
+ _, err := io.ReadFull(rand.Reader, buf[:i])
+ c.Assert(err, check.IsNil)
+ h.Write(buf[:i])
+ locs[i], _, err = s.kc.PutB(buf[:i])
+ c.Assert(err, check.IsNil)
+ filesize += i
}
- StubWithFakeServers(kc, h)
- kc.PutB([]byte("foo"))
+ manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
+ dataMD5 := h.Sum(nil)
+
+ checkMD5 := md5.New()
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
+ c.Check(err, check.IsNil)
+ _, err = io.Copy(checkMD5, rdr)
+ c.Check(err, check.IsNil)
+ _, err = rdr.Read(make([]byte, 1))
+ c.Check(err, check.Equals, io.EOF)
+ c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
+}
+
+func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+ s.kc.PutB([]byte("foo"))
mt := ". "
for i := 0; i < 1000; i++ {
// Grab the stub server's lock, ensuring our cfReader doesn't
// get anything back from its first call to kc.Get() before we
// have a chance to call Close().
- h.lock <- struct{}{}
- opsBeforeRead := *h.ops
+ s.handler.lock <- struct{}{}
+ opsBeforeRead := *s.handler.ops
- rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
c.Assert(err, check.IsNil)
+
+ firstReadDone := make(chan struct{})
+ go func() {
+ rdr.Read(make([]byte, 6))
+ firstReadDone <- struct{}{}
+ }()
err = rdr.Close()
c.Assert(err, check.IsNil)
- c.Assert(rdr.Error(), check.IsNil)
+ c.Assert(rdr.(*cfReader).Error(), check.IsNil)
// Release the stub server's lock. The first GET operation will proceed.
- <-h.lock
+ <-s.handler.lock
+
+ // Make sure our first read operation consumes the data
+ // received from the first GET.
+ <-firstReadDone
// doGet() should close toRead before sending any more bufs to it.
- if what, ok := <-rdr.toRead; ok {
- c.Errorf("Got %+v, expected toRead to be closed", what)
+ if what, ok := <-rdr.(*cfReader).toRead; ok {
+ c.Errorf("Got %q, expected toRead to be closed", what)
}
// Stub should have handled exactly one GET request.
- c.Assert(*h.ops, check.Equals, opsBeforeRead+1)
+ c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1)
+}
+
+func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
+ manifest := ". ffffffffffffffffffffffffffffffff+1 0:1:notfound.txt\n"
+ buf := make([]byte, 1)
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "notfound.txt")
+ c.Check(err, check.IsNil)
+ for i := 0; i < 2; i++ {
+ _, err = io.ReadFull(rdr, buf)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.Not(check.Equals), io.EOF)
+ }
+ c.Check(rdr.Close(), check.NotNil)
}