2960: Move streaming from volume to keepstore layer.
[arvados.git] / services / keepstore / unix_volume_test.go
index 715e23a9eaaac42329ced3d6b91d69da70f3c26e..bcdb5f6358652eb02fe8024b268d02b23f2eb8cf 100644 (file)
@@ -8,9 +8,7 @@ import (
        "bytes"
        "context"
        "encoding/json"
-       "errors"
        "fmt"
-       "io"
        "io/ioutil"
        "os"
        "sync"
@@ -123,16 +121,9 @@ func (s *unixVolumeSuite) TestGetNotFound(c *check.C) {
        defer v.Teardown()
        v.BlockWrite(context.Background(), TestHash, TestBlock)
 
-       buf := bytes.NewBuffer(nil)
-       _, err := v.BlockRead(context.Background(), TestHash2, buf)
-       switch {
-       case os.IsNotExist(err):
-               break
-       case err == nil:
-               c.Errorf("Read should have failed, returned %+q", buf.Bytes())
-       default:
-               c.Errorf("Read expected ErrNotExist, got: %s", err)
-       }
+       buf := &brbuffer{}
+       err := v.BlockRead(context.Background(), TestHash2, buf)
+       c.Check(err, check.FitsTypeOf, os.ErrNotExist)
 }
 
 func (s *unixVolumeSuite) TestPut(c *check.C) {
@@ -182,94 +173,6 @@ func (s *unixVolumeSuite) TestIsFull(c *check.C) {
        }
 }
 
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
-       v := s.newTestableUnixVolume(c, s.params, false)
-       defer v.Teardown()
-
-       v.BlockWrite(context.Background(), TestHash, TestBlock)
-       mockErr := errors.New("Mock error")
-       err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-               return mockErr
-       })
-       if err != mockErr {
-               c.Errorf("Got %v, expected %v", err, mockErr)
-       }
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
-       v := s.newTestableUnixVolume(c, s.params, false)
-       defer v.Teardown()
-
-       funcCalled := false
-       err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-               funcCalled = true
-               return nil
-       })
-       if err == nil {
-               c.Errorf("Expected error opening non-existent file")
-       }
-       if funcCalled {
-               c.Errorf("Worker func should not have been called")
-       }
-}
-
-func (s *unixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
-       v := s.newTestableUnixVolume(c, s.params, false)
-       defer v.Teardown()
-
-       v.BlockWrite(context.Background(), TestHash, TestBlock)
-
-       mtx := NewMockMutex()
-       v.locker = mtx
-
-       funcCalled := make(chan struct{})
-       go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
-               funcCalled <- struct{}{}
-               return nil
-       })
-       select {
-       case mtx.AllowLock <- struct{}{}:
-       case <-funcCalled:
-               c.Fatal("Function was called before mutex was acquired")
-       case <-time.After(5 * time.Second):
-               c.Fatal("Timed out before mutex was acquired")
-       }
-       select {
-       case <-funcCalled:
-       case mtx.AllowUnlock <- struct{}{}:
-               c.Fatal("Mutex was released before function was called")
-       case <-time.After(5 * time.Second):
-               c.Fatal("Timed out waiting for funcCalled")
-       }
-       select {
-       case mtx.AllowUnlock <- struct{}{}:
-       case <-time.After(5 * time.Second):
-               c.Fatal("Timed out waiting for getFunc() to release mutex")
-       }
-}
-
-type MockMutex struct {
-       AllowLock   chan struct{}
-       AllowUnlock chan struct{}
-}
-
-func NewMockMutex() *MockMutex {
-       return &MockMutex{
-               AllowLock:   make(chan struct{}),
-               AllowUnlock: make(chan struct{}),
-       }
-}
-
-// Lock waits for someone to send to AllowLock.
-func (m *MockMutex) Lock() {
-       <-m.AllowLock
-}
-
-// Unlock waits for someone to send to AllowUnlock.
-func (m *MockMutex) Unlock() {
-       <-m.AllowUnlock
-}
-
 func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) {
        v := s.newTestableUnixVolume(c, s.params, true)
        defer v.Teardown()
@@ -300,9 +203,10 @@ func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) {
                time.Sleep(50 * time.Millisecond)
                cancel()
        }()
-       n, err := v.BlockRead(ctx, TestHash, io.Discard)
-       if n > 0 || err != context.Canceled {
-               c.Errorf("BlockRead() returned %d, %s -- expected short read / canceled", n, err)
+       buf := &brbuffer{}
+       err = v.BlockRead(ctx, TestHash, buf)
+       if buf.Len() != 0 || err != context.Canceled {
+               c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err)
        }
 }
 
@@ -317,7 +221,7 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once
        c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
 
-       _, err := vol.BlockRead(context.Background(), fooHash, io.Discard)
+       err := vol.BlockRead(context.Background(), fooHash, brdiscard)
        c.Check(err, check.NotNil)
        c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
        c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
@@ -339,8 +243,8 @@ func (s *unixVolumeSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
        c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
 
-       buf := bytes.NewBuffer(nil)
-       _, err = vol.BlockRead(context.Background(), fooHash, buf)
+       buf := &brbuffer{}
+       err = vol.BlockRead(context.Background(), fooHash, buf)
        c.Check(err, check.IsNil)
        c.Check(buf.String(), check.Equals, "foo")
        c.Check(stats(), check.Matches, `.*"InBytes":3,.*`)