X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1a0a58c4f22af82e0a37440af3b0948771bca5e1..14a27997ba2a94c5a7250ddde4519d5f68b6eda0:/services/keepstore/volume.go diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index b15ef23a85..64fea34bfe 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -5,135 +5,76 @@ package main import ( - "errors" - "fmt" - "os" - "strings" + "io" + "sync/atomic" + "time" ) type Volume interface { + // Get a block. IFF the returned error is nil, the caller must + // put the returned slice back into the buffer pool when it's + // finished with it. Get(loc string) ([]byte, error) Put(loc string, block []byte) error - Index(prefix string) string + Touch(loc string) error + Mtime(loc string) (time.Time, error) + IndexTo(prefix string, writer io.Writer) error Delete(loc string) error Status() *VolumeStatus String() string + Writable() bool } -// MockVolumes are Volumes used to test the Keep front end. -// -// If the Bad field is true, this volume should return an error -// on all writes and puts. -// -type MockVolume struct { - Store map[string][]byte - Bad bool +// A VolumeManager tells callers which volumes can read, which volumes +// can write, and on which volume the next write should be attempted. +type VolumeManager interface { + // AllReadable returns all volumes. + AllReadable() []Volume + // AllWritable returns all volumes that aren't known to be in + // a read-only state. (There is no guarantee that a write to + // one will succeed, though.) + AllWritable() []Volume + // NextWritable returns the volume where the next new block + // should be written. A VolumeManager can select a volume in + // order to distribute activity across spindles, fill up disks + // with more free space, etc. + NextWritable() Volume + // Close shuts down the volume manager cleanly. + Close() } -func CreateMockVolume() *MockVolume { - return &MockVolume{make(map[string][]byte), false} +type RRVolumeManager struct { + readables []Volume + writables []Volume + counter uint32 } -func (v *MockVolume) Get(loc string) ([]byte, error) { - if v.Bad { - return nil, errors.New("Bad volume") - } else if block, ok := v.Store[loc]; ok { - return block, nil +func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager { + vm := &RRVolumeManager{} + for _, v := range volumes { + vm.readables = append(vm.readables, v) + if v.Writable() { + vm.writables = append(vm.writables, v) + } } - return nil, os.ErrNotExist + return vm } -func (v *MockVolume) Put(loc string, block []byte) error { - if v.Bad { - return errors.New("Bad volume") - } - v.Store[loc] = block - return nil +func (vm *RRVolumeManager) AllReadable() []Volume { + return vm.readables } -func (v *MockVolume) Index(prefix string) string { - var result string - for loc, block := range v.Store { - if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) { - result = result + fmt.Sprintf("%s+%d %d\n", - loc, len(block), 123456789) - } - } - return result +func (vm *RRVolumeManager) AllWritable() []Volume { + return vm.writables } -func (v *MockVolume) Delete(loc string) error { - if _, ok := v.Store[loc]; ok { - delete(v.Store, loc) +func (vm *RRVolumeManager) NextWritable() Volume { + if len(vm.writables) == 0 { return nil } - return os.ErrNotExist -} - -func (v *MockVolume) Status() *VolumeStatus { - var used uint64 - for _, block := range v.Store { - used = used + uint64(len(block)) - } - return &VolumeStatus{"/bogo", 123, 1000000 - used, used} -} - -func (v *MockVolume) String() string { - return "[MockVolume]" -} - -// A VolumeManager manages a collection of volumes. -// -// - Volumes is a slice of available Volumes. -// - Choose() returns a Volume suitable for writing to. -// - Quit() instructs the VolumeManager to shut down gracefully. -// -type VolumeManager interface { - Volumes() []Volume - Choose() Volume - Quit() -} - -type RRVolumeManager struct { - volumes []Volume - nextwrite chan Volume - quit chan int -} - -func MakeRRVolumeManager(vols []Volume) *RRVolumeManager { - // Create a new VolumeManager struct with the specified volumes, - // and with new Nextwrite and Quit channels. - // The Quit channel is buffered with a capacity of 1 so that - // another routine may write to it without blocking. - vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)} - - // This goroutine implements round-robin volume selection. - // It sends each available Volume in turn to the Nextwrite - // channel, until receiving a notification on the Quit channel - // that it should terminate. - go func() { - var i int = 0 - for { - select { - case <-vm.quit: - return - case vm.nextwrite <- vm.volumes[i]: - i = (i + 1) % len(vm.volumes) - } - } - }() - - return vm -} - -func (vm *RRVolumeManager) Volumes() []Volume { - return vm.volumes -} - -func (vm *RRVolumeManager) Choose() Volume { - return <-vm.nextwrite + i := atomic.AddUint32(&vm.counter, 1) + return vm.writables[i%uint32(len(vm.writables))] } -func (vm *RRVolumeManager) Quit() { - vm.quit <- 1 +func (vm *RRVolumeManager) Close() { }