5562: Use static method. Fixes "TypeError: _socket_open() takes exactly 5 arguments...
[arvados.git] / services / keepstore / volume.go
index 7d791f2be5638dc70c46d28aab4f2ce4984ef862..0f9fcffe52327435540df00692d0c587dd2981b6 100644 (file)
@@ -5,10 +5,7 @@
 package main
 
 import (
-       "errors"
-       "fmt"
-       "os"
-       "strings"
+       "sync/atomic"
        "time"
 )
 
@@ -16,139 +13,64 @@ type Volume interface {
        Get(loc string) ([]byte, error)
        Put(loc string, block []byte) error
        Touch(loc string) error
+       Mtime(loc string) (time.Time, error)
        Index(prefix string) string
        Delete(loc string) error
        Status() *VolumeStatus
        String() string
+       Writable() bool
 }
 
-// MockVolumes are Volumes used to test the Keep front end.
-//
-// If the Bad field is true, this volume should return an error
-// on all writes and puts.
-//
-type MockVolume struct {
-       Store      map[string][]byte
-       Timestamps map[string]time.Time
-       Bad        bool
-}
-
-func CreateMockVolume() *MockVolume {
-       return &MockVolume{
-               make(map[string][]byte),
-               make(map[string]time.Time),
-               false,
-       }
+// A VolumeManager tells callers which volumes can read, which volumes
+// can write, and on which volume the next write should be attempted.
+type VolumeManager interface {
+       // AllReadable returns all volumes.
+       AllReadable() []Volume
+       // AllWritable returns all volumes that aren't known to be in
+       // a read-only state. (There is no guarantee that a write to
+       // one will succeed, though.)
+       AllWritable() []Volume
+       // NextWritable returns the volume where the next new block
+       // should be written. A VolumeManager can select a volume in
+       // order to distribute activity across spindles, fill up disks
+       // with more free space, etc.
+       NextWritable() Volume
+       // Close shuts down the volume manager cleanly.
+       Close()
 }
 
-func (v *MockVolume) Get(loc string) ([]byte, error) {
-       if v.Bad {
-               return nil, errors.New("Bad volume")
-       } else if block, ok := v.Store[loc]; ok {
-               return block, nil
-       }
-       return nil, os.ErrNotExist
+type RRVolumeManager struct {
+       readables []Volume
+       writables []Volume
+       counter   uint32
 }
 
-func (v *MockVolume) Put(loc string, block []byte) error {
-       if v.Bad {
-               return errors.New("Bad volume")
+func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
+       vm := &RRVolumeManager{}
+       for _, v := range volumes {
+               vm.readables = append(vm.readables, v)
+               if v.Writable() {
+                       vm.writables = append(vm.writables, v)
+               }
        }
-       v.Store[loc] = block
-       return Touch(loc)
+       return vm
 }
 
-func (v *MockVolume) Touch(loc string) error {
-       if v.Bad {
-               return errors.New("Bad volume")
-       }
-       v.Timestamps[loc] = time.Now()
-       return nil
+func (vm *RRVolumeManager) AllReadable() []Volume {
+       return vm.readables
 }
 
-func (v *MockVolume) Index(prefix string) string {
-       var result string
-       for loc, block := range v.Store {
-               if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
-                       result = result + fmt.Sprintf("%s+%d %d\n",
-                               loc, len(block), 123456789)
-               }
-       }
-       return result
+func (vm *RRVolumeManager) AllWritable() []Volume {
+       return vm.writables
 }
 
-func (v *MockVolume) Delete(loc string) error {
-       if _, ok := v.Store[loc]; ok {
-               delete(v.Store, loc)
+func (vm *RRVolumeManager) NextWritable() Volume {
+       if len(vm.writables) == 0 {
                return nil
        }
-       return os.ErrNotExist
-}
-
-func (v *MockVolume) Status() *VolumeStatus {
-       var used uint64
-       for _, block := range v.Store {
-               used = used + uint64(len(block))
-       }
-       return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
-}
-
-func (v *MockVolume) String() string {
-       return "[MockVolume]"
-}
-
-// A VolumeManager manages a collection of volumes.
-//
-// - Volumes is a slice of available Volumes.
-// - Choose() returns a Volume suitable for writing to.
-// - Quit() instructs the VolumeManager to shut down gracefully.
-//
-type VolumeManager interface {
-       Volumes() []Volume
-       Choose() Volume
-       Quit()
-}
-
-type RRVolumeManager struct {
-       volumes   []Volume
-       nextwrite chan Volume
-       quit      chan int
-}
-
-func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
-       // Create a new VolumeManager struct with the specified volumes,
-       // and with new Nextwrite and Quit channels.
-       // The Quit channel is buffered with a capacity of 1 so that
-       // another routine may write to it without blocking.
-       vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
-
-       // This goroutine implements round-robin volume selection.
-       // It sends each available Volume in turn to the Nextwrite
-       // channel, until receiving a notification on the Quit channel
-       // that it should terminate.
-       go func() {
-               var i int = 0
-               for {
-                       select {
-                       case <-vm.quit:
-                               return
-                       case vm.nextwrite <- vm.volumes[i]:
-                               i = (i + 1) % len(vm.volumes)
-                       }
-               }
-       }()
-
-       return vm
-}
-
-func (vm *RRVolumeManager) Volumes() []Volume {
-       return vm.volumes
-}
-
-func (vm *RRVolumeManager) Choose() Volume {
-       return <-vm.nextwrite
+       i := atomic.AddUint32(&vm.counter, 1)
+       return vm.writables[i % uint32(len(vm.writables))]
 }
 
-func (vm *RRVolumeManager) Quit() {
-       vm.quit <- 1
+func (vm *RRVolumeManager) Close() {
 }