X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cca1529c082c2111636f9ae60601f22afdfe73ae..7930d7abaabf2fd1f3432eca10f26b821e0ef94f:/services/keepstore/volume.go diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index e7683ee991..d3616d0812 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -5,179 +5,81 @@ package main import ( - "errors" - "fmt" - "os" - "strings" + "io" + "sync/atomic" "time" ) type Volume interface { + // Get a block. IFF the returned error is nil, the caller must + // put the returned slice back into the buffer pool when it's + // finished with it. Get(loc string) ([]byte, error) - Put(loc string, block []byte) error + // Confirm Get() would return buf. If so, return nil. If not, + // return CollisionError or DiskHashError (depending on + // whether the data on disk matches the expected hash), or + // whatever error was encountered opening/reading the file. + Compare(loc string, data []byte) error + Put(loc string, data []byte) error Touch(loc string) error Mtime(loc string) (time.Time, error) - Index(prefix string) string + IndexTo(prefix string, writer io.Writer) error Delete(loc string) error Status() *VolumeStatus String() string + Writable() bool } -// MockVolumes are Volumes used to test the Keep front end. -// -// If the Bad field is true, this volume should return an error -// on all writes and puts. -// -// The Touchable field signifies whether the Touch method will -// succeed. Defaults to true. Note that Bad and Touchable are -// independent: a MockVolume may be set up so that Put fails but Touch -// works or vice versa. -// -// TODO(twp): rename Bad to something more descriptive, e.g. Writable, -// and make sure that the tests that rely on it are testing the right -// thing. We may need to simulate Writable, Touchable and Corrupt -// volumes in different ways. -// -type MockVolume struct { - Store map[string][]byte - Timestamps map[string]time.Time - Bad bool - Touchable bool -} - -func CreateMockVolume() *MockVolume { - return &MockVolume{ - Store: make(map[string][]byte), - Timestamps: make(map[string]time.Time), - Bad: false, - Touchable: true, - } -} - -func (v *MockVolume) Get(loc string) ([]byte, error) { - if v.Bad { - return nil, errors.New("Bad volume") - } else if block, ok := v.Store[loc]; ok { - return block, nil - } - return nil, os.ErrNotExist +// A VolumeManager tells callers which volumes can read, which volumes +// can write, and on which volume the next write should be attempted. +type VolumeManager interface { + // AllReadable returns all volumes. + AllReadable() []Volume + // AllWritable returns all volumes that aren't known to be in + // a read-only state. (There is no guarantee that a write to + // one will succeed, though.) + AllWritable() []Volume + // NextWritable returns the volume where the next new block + // should be written. A VolumeManager can select a volume in + // order to distribute activity across spindles, fill up disks + // with more free space, etc. + NextWritable() Volume + // Close shuts down the volume manager cleanly. + Close() } -func (v *MockVolume) Put(loc string, block []byte) error { - if v.Bad { - return errors.New("Bad volume") - } - v.Store[loc] = block - return v.Touch(loc) +type RRVolumeManager struct { + readables []Volume + writables []Volume + counter uint32 } -func (v *MockVolume) Touch(loc string) error { - if v.Touchable { - v.Timestamps[loc] = time.Now() - return nil +func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager { + vm := &RRVolumeManager{} + for _, v := range volumes { + vm.readables = append(vm.readables, v) + if v.Writable() { + vm.writables = append(vm.writables, v) + } } - return errors.New("Touch failed") + return vm } -func (v *MockVolume) Mtime(loc string) (time.Time, error) { - var mtime time.Time - var err error - if v.Bad { - err = errors.New("Bad volume") - } else if t, ok := v.Timestamps[loc]; ok { - mtime = t - } else { - err = os.ErrNotExist - } - return mtime, err +func (vm *RRVolumeManager) AllReadable() []Volume { + return vm.readables } -func (v *MockVolume) Index(prefix string) string { - var result string - for loc, block := range v.Store { - if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) { - result = result + fmt.Sprintf("%s+%d %d\n", - loc, len(block), 123456789) - } - } - return result +func (vm *RRVolumeManager) AllWritable() []Volume { + return vm.writables } -func (v *MockVolume) Delete(loc string) error { - if _, ok := v.Store[loc]; ok { - if time.Since(v.Timestamps[loc]) < permission_ttl { - return nil - } - delete(v.Store, loc) +func (vm *RRVolumeManager) NextWritable() Volume { + if len(vm.writables) == 0 { return nil } - return os.ErrNotExist -} - -func (v *MockVolume) Status() *VolumeStatus { - var used uint64 - for _, block := range v.Store { - used = used + uint64(len(block)) - } - return &VolumeStatus{"/bogo", 123, 1000000 - used, used} -} - -func (v *MockVolume) String() string { - return "[MockVolume]" -} - -// A VolumeManager manages a collection of volumes. -// -// - Volumes is a slice of available Volumes. -// - Choose() returns a Volume suitable for writing to. -// - Quit() instructs the VolumeManager to shut down gracefully. -// -type VolumeManager interface { - Volumes() []Volume - Choose() Volume - Quit() -} - -type RRVolumeManager struct { - volumes []Volume - nextwrite chan Volume - quit chan int -} - -func MakeRRVolumeManager(vols []Volume) *RRVolumeManager { - // Create a new VolumeManager struct with the specified volumes, - // and with new Nextwrite and Quit channels. - // The Quit channel is buffered with a capacity of 1 so that - // another routine may write to it without blocking. - vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)} - - // This goroutine implements round-robin volume selection. - // It sends each available Volume in turn to the Nextwrite - // channel, until receiving a notification on the Quit channel - // that it should terminate. - go func() { - var i int = 0 - for { - select { - case <-vm.quit: - return - case vm.nextwrite <- vm.volumes[i]: - i = (i + 1) % len(vm.volumes) - } - } - }() - - return vm -} - -func (vm *RRVolumeManager) Volumes() []Volume { - return vm.volumes -} - -func (vm *RRVolumeManager) Choose() Volume { - return <-vm.nextwrite + i := atomic.AddUint32(&vm.counter, 1) + return vm.writables[i%uint32(len(vm.writables))] } -func (vm *RRVolumeManager) Quit() { - vm.quit <- 1 +func (vm *RRVolumeManager) Close() { }