import (
"crypto/md5"
+ "crypto/rand"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
+ "strconv"
+ "strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
)
-var _ = check.Suite(&IntegrationSuite{})
+var _ = check.Suite(&CollectionReaderUnit{})
-// IntegrationSuite tests need an API server
-type IntegrationSuite struct{}
+type CollectionReaderUnit struct {
+ arv *arvadosclient.ArvadosClient
+ kc *KeepClient
+ handler SuccessHandler
+}
+
+func (s *CollectionReaderUnit) SetUpTest(c *check.C) {
+ var err error
+ s.arv, err = arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.IsNil)
+ s.arv.ApiToken = arvadostest.ActiveToken
+
+ s.kc, err = MakeKeepClient(s.arv)
+ c.Assert(err, check.IsNil)
+
+ s.handler = SuccessHandler{
+ disk: make(map[string][]byte),
+ lock: make(chan struct{}, 1),
+ ops: new(int),
+ }
+ localRoots := make(map[string]string)
+ for i, k := range RunSomeFakeKeepServers(s.handler, 4) {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ }
+ s.kc.SetServiceRoots(localRoots, localRoots, nil)
+}
type SuccessHandler struct {
disk map[string][]byte
- lock chan struct{}
+ lock chan struct{} // channel with buffer==1: full when an operation is in progress.
+ ops *int // number of operations completed
}
func (h SuccessHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
pdh := fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf))
h.lock <- struct{}{}
h.disk[pdh] = buf
- <- h.lock
+ if h.ops != nil {
+ (*h.ops)++
+ }
+ <-h.lock
resp.Write([]byte(pdh))
case "GET":
pdh := req.URL.Path[1:]
h.lock <- struct{}{}
buf, ok := h.disk[pdh]
- <- h.lock
+ if h.ops != nil {
+ (*h.ops)++
+ }
+ <-h.lock
if !ok {
resp.WriteHeader(http.StatusNotFound)
} else {
want interface{} // error or string to expect
}
-func (s *ServerRequiredSuite) TestCollectionReaderContent(c *check.C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, check.IsNil)
- arv.ApiToken = arvadostest.ActiveToken
-
- kc, err := MakeKeepClient(&arv)
- c.Assert(err, check.IsNil)
-
- {
- localRoots := make(map[string]string)
- h := SuccessHandler{disk: make(map[string][]byte), lock: make(chan struct{}, 1)}
- for i, k := range RunSomeFakeKeepServers(h, 4) {
- localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
- }
- kc.SetServiceRoots(localRoots, localRoots, nil)
- kc.PutB([]byte("foo"))
- kc.PutB([]byte("bar"))
- kc.PutB([]byte("Hello world\n"))
- kc.PutB([]byte(""))
- }
+func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
+ s.kc.PutB([]byte("foo"))
+ s.kc.PutB([]byte("bar"))
+ s.kc.PutB([]byte("Hello world\n"))
+ s.kc.PutB([]byte(""))
mt := arvadostest.PathologicalManifest
{mt: mt, f: "segmented/frob", want: "frob"},
{mt: mt, f: "segmented/oof", want: "oof"},
} {
- rdr, err := kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": testCase.mt}, testCase.f)
switch want := testCase.want.(type) {
case error:
c.Check(rdr, check.IsNil)
}
}
}
+
+func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
+ h := md5.New()
+ buf := make([]byte, 4096)
+ locs := make([]string, len(buf))
+ filesize := 0
+ for i := 0; i < len(locs); i++ {
+ _, err := io.ReadFull(rand.Reader, buf[:i])
+ c.Assert(err, check.IsNil)
+ h.Write(buf[:i])
+ locs[i], _, err = s.kc.PutB(buf[:i])
+ c.Assert(err, check.IsNil)
+ filesize += i
+ }
+ manifest := "./random " + strings.Join(locs, " ") + " 0:" + strconv.Itoa(filesize) + ":bytes.bin\n"
+ dataMD5 := h.Sum(nil)
+
+ checkMD5 := md5.New()
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "random/bytes.bin")
+ c.Check(err, check.IsNil)
+ _, err = io.Copy(checkMD5, rdr)
+ c.Check(err, check.IsNil)
+ _, err = rdr.Read(make([]byte, 1))
+ c.Check(err, check.Equals, io.EOF)
+ c.Check(checkMD5.Sum(nil), check.DeepEquals, dataMD5)
+}
+
+func (s *CollectionReaderUnit) TestCollectionReaderCloseEarly(c *check.C) {
+ s.kc.PutB([]byte("foo"))
+
+ mt := ". "
+ for i := 0; i < 1000; i++ {
+ mt += "acbd18db4cc2f85cedef654fccc4a4d8+3 "
+ }
+ mt += "0:3000:foo1000.txt\n"
+
+ // Grab the stub server's lock, ensuring our cfReader doesn't
+ // get anything back from its first call to kc.Get() before we
+ // have a chance to call Close().
+ s.handler.lock <- struct{}{}
+ opsBeforeRead := *s.handler.ops
+
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": mt}, "foo1000.txt")
+ c.Assert(err, check.IsNil)
+
+ firstReadDone := make(chan struct{})
+ go func() {
+ rdr.Read(make([]byte, 6))
+ firstReadDone <- struct{}{}
+ }()
+ err = rdr.Close()
+ c.Assert(err, check.IsNil)
+ c.Assert(rdr.(*cfReader).Error(), check.IsNil)
+
+ // Release the stub server's lock. The first GET operation will proceed.
+ <-s.handler.lock
+
+ // Make sure our first read operation consumes the data
+ // received from the first GET.
+ <-firstReadDone
+
+ // doGet() should close toRead before sending any more bufs to it.
+ if what, ok := <-rdr.(*cfReader).toRead; ok {
+ c.Errorf("Got %q, expected toRead to be closed", what)
+ }
+
+ // Stub should have handled exactly one GET request.
+ c.Assert(*s.handler.ops, check.Equals, opsBeforeRead+1)
+}
+
+func (s *CollectionReaderUnit) TestCollectionReaderDataError(c *check.C) {
+ manifest := ". ffffffffffffffffffffffffffffffff+1 0:1:notfound.txt\n"
+ buf := make([]byte, 1)
+ rdr, err := s.kc.CollectionFileReader(map[string]interface{}{"manifest_text": manifest}, "notfound.txt")
+ c.Check(err, check.IsNil)
+ for i := 0; i < 2; i++ {
+ _, err = io.ReadFull(rdr, buf)
+ c.Check(err, check.NotNil)
+ c.Check(err, check.Not(check.Equals), io.EOF)
+ }
+ c.Check(rdr.Close(), check.NotNil)
+}