X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a3222e35cda68c8e48a17921c33ac37ecb5c3bac..eeeb4aef6e54d5cd3290bdeba91a8009f3e261bc:/services/keepstore/volume.go diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index b15ef23a85..2e80fac4bc 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -5,135 +5,199 @@ package main import ( - "errors" - "fmt" - "os" - "strings" + "io" + "sync/atomic" + "time" ) type Volume interface { + // Get a block. IFF the returned error is nil, the caller must + // put the returned slice back into the buffer pool when it's + // finished with it. + // + // loc is guaranteed to consist of 32 or more lowercase hex + // digits. + // + // Get should not verify the integrity of the returned data: + // it should just return whatever was found in its backing + // store. + // + // If an error is encountered that prevents it from + // retrieving the data, that error should be returned so the + // caller can log (and send to the client) a more useful + // message. + // + // If the error is "not found", and there's no particular + // reason to expect the block to be found (other than that a + // caller is asking for it), the returned error should satisfy + // os.IsNotExist(err): this is a normal condition and will not + // be logged as an error (except that a 404 will appear in the + // access log if the block is not found on any other volumes + // either). + // + // If the data in the backing store is bigger than BLOCKSIZE, + // Get is permitted to return an error without reading any of + // the data. Get(loc string) ([]byte, error) - Put(loc string, block []byte) error - Index(prefix string) string - Delete(loc string) error - Status() *VolumeStatus - String() string -} - -// MockVolumes are Volumes used to test the Keep front end. -// -// If the Bad field is true, this volume should return an error -// on all writes and puts. -// -type MockVolume struct { - Store map[string][]byte - Bad bool -} - -func CreateMockVolume() *MockVolume { - return &MockVolume{make(map[string][]byte), false} -} - -func (v *MockVolume) Get(loc string) ([]byte, error) { - if v.Bad { - return nil, errors.New("Bad volume") - } else if block, ok := v.Store[loc]; ok { - return block, nil - } - return nil, os.ErrNotExist -} -func (v *MockVolume) Put(loc string, block []byte) error { - if v.Bad { - return errors.New("Bad volume") - } - v.Store[loc] = block - return nil -} + // Put writes a block to an underlying storage device. + // + // loc is as described in Get. + // + // len(block) is guaranteed to be between 0 and BLOCKSIZE. + // + // If a block is already stored under the same name (loc) with + // different content, Put must either overwrite the existing + // data with the new data or return a non-nil error. + // + // Put must return a non-nil error unless it can guarantee + // that the entire block has been written and flushed to + // persistent storage. Of course, this guarantee is only as + // good as the underlying storage device, but it is Put's + // responsibility to at least get whatever guarantee is + // offered by the storage device. + // + // Put should not verify that loc==hash(block): this is the + // caller's responsibility. + Put(loc string, block []byte) error -func (v *MockVolume) Index(prefix string) string { - var result string - for loc, block := range v.Store { - if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) { - result = result + fmt.Sprintf("%s+%d %d\n", - loc, len(block), 123456789) - } - } - return result -} + // Touch sets the timestamp for the given locator to the + // current time. + // + // loc is as described in Get. + // + // Touch must return a non-nil error unless it can guarantee + // that a future call to Mtime() will return a timestamp newer + // than {now minus one second}. + Touch(loc string) error + + // Mtime returns the stored timestamp for the given locator. + // + // loc is as described in Get. + // + // Mtime must return a non-nil error if the given block is not + // found or the timestamp could not be retrieved. + Mtime(loc string) (time.Time, error) + + // IndexTo writes a complete list of locators with the given + // prefix for which Get() can retrieve data. + // + // prefix consists of zero or more lowercase hexadecimal + // digits. + // + // Each locator must be written to the given writer using the + // following format: + // + // loc "+" size " " timestamp "\n" + // + // where: + // + // - size is the number of bytes of content, given as a + // decimal number with one or more digits + // + // - timestamp is the timestamp stored for the locator, + // given as a decimal number of seconds after January 1, + // 1970 UTC. + // + // IndexTo must not write any other data to writer: for + // example, it must not write any blank lines. + // + // If an error makes it impossible to provide a complete + // index, IndexTo must return a non-nil error. It is + // acceptable to return a non-nil error after writing a + // partial index to writer. + // + // The resulting index is not expected to be sorted in any + // particular order. + IndexTo(prefix string, writer io.Writer) error + + // Delete deletes the block data from the underlying storage + // device. + // + // loc is as described in Get. + // + // If the timestamp for the given locator is newer than + // blob_signature_ttl, Delete must not delete the data. + // + // If callers in different goroutines invoke overlapping + // Delete() and Touch() operations on the same locator, the + // implementation must guarantee that Touch() returns a + // non-nil error, or Delete() does not delete the block, or + // both. + Delete(loc string) error -func (v *MockVolume) Delete(loc string) error { - if _, ok := v.Store[loc]; ok { - delete(v.Store, loc) - return nil - } - return os.ErrNotExist -} + // Status() returns a *VolumeStatus representing the current + // in-use and available storage capacity and an + // implementation-specific volume identifier (e.g., "mount + // point" for a UnixVolume). + Status() *VolumeStatus -func (v *MockVolume) Status() *VolumeStatus { - var used uint64 - for _, block := range v.Store { - used = used + uint64(len(block)) - } - return &VolumeStatus{"/bogo", 123, 1000000 - used, used} -} + // String() returns an identifying label for this volume, + // suitable for including in log messages. It should contain + // enough information to uniquely identify the underlying + // storage device, but should not contain any credentials or + // secrets. + String() string -func (v *MockVolume) String() string { - return "[MockVolume]" + // Writable() returns false if all future Put(), Mtime(), and + // Delete() calls are expected to fail. + // + // If the volume is only temporarily unwritable -- or if Put() + // will fail because it is full, but Mtime() or Delete() can + // succeed -- then Writable() should return false. + Writable() bool } -// A VolumeManager manages a collection of volumes. -// -// - Volumes is a slice of available Volumes. -// - Choose() returns a Volume suitable for writing to. -// - Quit() instructs the VolumeManager to shut down gracefully. -// +// A VolumeManager tells callers which volumes can read, which volumes +// can write, and on which volume the next write should be attempted. type VolumeManager interface { - Volumes() []Volume - Choose() Volume - Quit() + // AllReadable returns all volumes. + AllReadable() []Volume + // AllWritable returns all volumes that aren't known to be in + // a read-only state. (There is no guarantee that a write to + // one will succeed, though.) + AllWritable() []Volume + // NextWritable returns the volume where the next new block + // should be written. A VolumeManager can select a volume in + // order to distribute activity across spindles, fill up disks + // with more free space, etc. + NextWritable() Volume + // Close shuts down the volume manager cleanly. + Close() } type RRVolumeManager struct { - volumes []Volume - nextwrite chan Volume - quit chan int + readables []Volume + writables []Volume + counter uint32 } -func MakeRRVolumeManager(vols []Volume) *RRVolumeManager { - // Create a new VolumeManager struct with the specified volumes, - // and with new Nextwrite and Quit channels. - // The Quit channel is buffered with a capacity of 1 so that - // another routine may write to it without blocking. - vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)} - - // This goroutine implements round-robin volume selection. - // It sends each available Volume in turn to the Nextwrite - // channel, until receiving a notification on the Quit channel - // that it should terminate. - go func() { - var i int = 0 - for { - select { - case <-vm.quit: - return - case vm.nextwrite <- vm.volumes[i]: - i = (i + 1) % len(vm.volumes) - } +func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager { + vm := &RRVolumeManager{} + for _, v := range volumes { + vm.readables = append(vm.readables, v) + if v.Writable() { + vm.writables = append(vm.writables, v) } - }() - + } return vm } -func (vm *RRVolumeManager) Volumes() []Volume { - return vm.volumes +func (vm *RRVolumeManager) AllReadable() []Volume { + return vm.readables } -func (vm *RRVolumeManager) Choose() Volume { - return <-vm.nextwrite +func (vm *RRVolumeManager) AllWritable() []Volume { + return vm.writables +} + +func (vm *RRVolumeManager) NextWritable() Volume { + if len(vm.writables) == 0 { + return nil + } + i := atomic.AddUint32(&vm.counter, 1) + return vm.writables[i%uint32(len(vm.writables))] } -func (vm *RRVolumeManager) Quit() { - vm.quit <- 1 +func (vm *RRVolumeManager) Close() { }