2960: Move streaming from volume to keepstore layer.
[arvados.git] / services / keepstore / volume_test.go
index 2de21edde6708faa3aff96ef32f2b61f191a9775..f64041b04852e7fa9ca71235b2716d84843771f3 100644 (file)
@@ -2,28 +2,16 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
-       "bytes"
-       "context"
-       "crypto/md5"
-       "errors"
-       "fmt"
-       "io"
-       "os"
-       "strings"
        "sync"
        "time"
-
-       "git.arvados.org/arvados.git/sdk/go/arvados"
-       "github.com/sirupsen/logrus"
 )
 
 var (
-       TestBlock       = []byte("The quick brown fox jumps over the lazy dog.")
-       TestHash        = "e4d909c290d0fb1ca068ffaddf22cbd0"
-       TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
+       TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
+       TestHash  = "e4d909c290d0fb1ca068ffaddf22cbd0"
 
        TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
        TestHash2  = "f15ac516f788aec4f30932ffb6395c39"
@@ -31,10 +19,6 @@ var (
        TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
        TestHash3  = "eed29bbffbc2dbe5e5ee0bb71888e61f"
 
-       // BadBlock is used to test collisions and corruption.
-       // It must not match any test hashes.
-       BadBlock = []byte("The magic words are squeamish ossifrage.")
-
        EmptyHash  = "d41d8cd98f00b204e9800998ecf8427e"
        EmptyBlock = []byte("")
 )
@@ -43,230 +27,64 @@ var (
 // underlying Volume, in order to test behavior in cases that are
 // impractical to achieve with a sequence of normal Volume operations.
 type TestableVolume interface {
-       Volume
-
-       // [Over]write content for a locator with the given data,
-       // bypassing all constraints like readonly and serialize.
-       PutRaw(locator string, data []byte)
+       volume
 
        // Returns the strings that a driver uses to record read/write operations.
        ReadWriteOperationLabelValues() (r, w string)
 
        // Specify the value Mtime() should return, until the next
-       // call to Touch, TouchWithDate, or Put.
-       TouchWithDate(locator string, lastPut time.Time)
+       // call to Touch, TouchWithDate, or BlockWrite.
+       TouchWithDate(locator string, lastBlockWrite time.Time)
 
        // Clean up, delete temporary files.
        Teardown()
 }
 
-func init() {
-       driver["mock"] = newMockVolume
-}
-
-// MockVolumes are test doubles for Volumes, used to test handlers.
-type MockVolume struct {
-       Store      map[string][]byte
-       Timestamps map[string]time.Time
-
-       // Bad volumes return an error for every operation.
-       Bad            bool
-       BadVolumeError error
-
-       // Touchable volumes' Touch() method succeeds for a locator
-       // that has been Put().
-       Touchable bool
-
-       // Gate is a "starting gate", allowing test cases to pause
-       // volume operations long enough to inspect state. Every
-       // operation (except Status) starts by receiving from
-       // Gate. Sending one value unblocks one operation; closing the
-       // channel unblocks all operations. By default, Gate is a
-       // closed channel, so all operations proceed without
-       // blocking. See trash_worker_test.go for an example.
-       Gate chan struct{} `json:"-"`
-
-       cluster *arvados.Cluster
-       volume  arvados.Volume
-       logger  logrus.FieldLogger
-       metrics *volumeMetricsVecs
-       called  map[string]int
-       mutex   sync.Mutex
-}
-
-// newMockVolume returns a non-Bad, non-Readonly, Touchable mock
-// volume.
-func newMockVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
-       gate := make(chan struct{})
-       close(gate)
-       return &MockVolume{
-               Store:      make(map[string][]byte),
-               Timestamps: make(map[string]time.Time),
-               Bad:        false,
-               Touchable:  true,
-               called:     map[string]int{},
-               Gate:       gate,
-               cluster:    cluster,
-               volume:     volume,
-               logger:     logger,
-               metrics:    metrics,
-       }, nil
+// brbuffer is like bytes.Buffer, but it implements io.WriterAt.
+// Convenient for testing (volume)BlockRead implementations.
+type brbuffer struct {
+       mtx sync.Mutex
+       buf []byte
 }
 
-// CallCount returns how many times the named method has been called.
-func (v *MockVolume) CallCount(method string) int {
-       v.mutex.Lock()
-       defer v.mutex.Unlock()
-       c, ok := v.called[method]
-       if !ok {
-               return 0
+func (b *brbuffer) WriteAt(p []byte, offset int64) (int, error) {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       if short := int(offset) + len(p) - len(b.buf); short > 0 {
+               b.buf = append(b.buf, make([]byte, short)...)
        }
-       return c
+       return copy(b.buf[offset:], p), nil
 }
 
-func (v *MockVolume) gotCall(method string) {
-       v.mutex.Lock()
-       defer v.mutex.Unlock()
-       if _, ok := v.called[method]; !ok {
-               v.called[method] = 1
-       } else {
-               v.called[method]++
-       }
+func (b *brbuffer) Bytes() []byte {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       return b.buf
 }
 
-func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
-       v.gotCall("Compare")
-       <-v.Gate
-       if v.Bad {
-               return v.BadVolumeError
-       } else if block, ok := v.Store[loc]; ok {
-               if fmt.Sprintf("%x", md5.Sum(block)) != loc {
-                       return DiskHashError
-               }
-               if bytes.Compare(buf, block) != 0 {
-                       return CollisionError
-               }
-               return nil
-       } else {
-               return NotFoundError
-       }
+func (b *brbuffer) String() string {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       return string(b.buf)
 }
 
-func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
-       v.gotCall("Get")
-       <-v.Gate
-       if v.Bad {
-               return 0, v.BadVolumeError
-       } else if block, ok := v.Store[loc]; ok {
-               copy(buf[:len(block)], block)
-               return len(block), nil
-       }
-       return 0, os.ErrNotExist
+func (b *brbuffer) Len() int {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       return len(b.buf)
 }
 
-func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
-       v.gotCall("Put")
-       <-v.Gate
-       if v.Bad {
-               return v.BadVolumeError
-       }
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       v.Store[loc] = block
-       return v.Touch(loc)
+func (b *brbuffer) Reset() {
+       b.mtx.Lock()
+       defer b.mtx.Unlock()
+       b.buf = nil
 }
 
-func (v *MockVolume) Touch(loc string) error {
-       return v.TouchWithDate(loc, time.Now())
-}
+// a brdiscarder is like io.Discard, but it implements
+// io.WriterAt. Convenient for testing (volume)BlockRead
+// implementations when the output is not checked.
+type brdiscarder struct{}
 
-func (v *MockVolume) TouchWithDate(loc string, t time.Time) error {
-       v.gotCall("Touch")
-       <-v.Gate
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       if _, exists := v.Store[loc]; !exists {
-               return os.ErrNotExist
-       }
-       if v.Touchable {
-               v.Timestamps[loc] = t
-               return nil
-       }
-       return errors.New("Touch failed")
-}
+func (brdiscarder) WriteAt(p []byte, offset int64) (int, error) { return len(p), nil }
 
-func (v *MockVolume) Mtime(loc string) (time.Time, error) {
-       v.gotCall("Mtime")
-       <-v.Gate
-       var mtime time.Time
-       var err error
-       if v.Bad {
-               err = v.BadVolumeError
-       } else if t, ok := v.Timestamps[loc]; ok {
-               mtime = t
-       } else {
-               err = os.ErrNotExist
-       }
-       return mtime, err
-}
-
-func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
-       v.gotCall("IndexTo")
-       <-v.Gate
-       for loc, block := range v.Store {
-               if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
-                       continue
-               }
-               _, err := fmt.Fprintf(w, "%s+%d %d\n",
-                       loc, len(block), 123456789)
-               if err != nil {
-                       return err
-               }
-       }
-       return nil
-}
-
-func (v *MockVolume) Trash(loc string) error {
-       v.gotCall("Delete")
-       <-v.Gate
-       if v.volume.ReadOnly {
-               return MethodDisabledError
-       }
-       if _, ok := v.Store[loc]; ok {
-               if time.Since(v.Timestamps[loc]) < time.Duration(v.cluster.Collections.BlobSigningTTL) {
-                       return nil
-               }
-               delete(v.Store, loc)
-               return nil
-       }
-       return os.ErrNotExist
-}
-
-func (v *MockVolume) GetDeviceID() string {
-       return "mock-device-id"
-}
-
-func (v *MockVolume) Untrash(loc string) error {
-       return nil
-}
-
-func (v *MockVolume) Status() *VolumeStatus {
-       var used uint64
-       for _, block := range v.Store {
-               used = used + uint64(len(block))
-       }
-       return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
-}
-
-func (v *MockVolume) String() string {
-       return "[MockVolume]"
-}
-
-func (v *MockVolume) EmptyTrash() {
-}
-
-func (v *MockVolume) GetStorageClasses() []string {
-       return nil
-}
+var brdiscard = brdiscarder{}