X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f12350f25275fbf4c6c8692122f5eacce89794ee..f00aabd51e64355ca9f6001bd0f87fd162812915:/services/keepstore/volume.go diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index e7683ee991..1dea6194d5 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -1,183 +1,411 @@ -// A Volume is an interface representing a Keep back-end storage unit: -// for example, a single mounted disk, a RAID array, an Amazon S3 volume, -// etc. +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 package main import ( - "errors" + "context" + "crypto/rand" "fmt" - "os" - "strings" + "io" + "math/big" + "sync/atomic" "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) +type BlockWriter interface { + // WriteBlock reads all data from r, writes it to a backing + // store as "loc", and returns the number of bytes written. + WriteBlock(ctx context.Context, loc string, r io.Reader) error +} + +type BlockReader interface { + // ReadBlock retrieves data previously stored as "loc" and + // writes it to w. + ReadBlock(ctx context.Context, loc string, w io.Writer) error +} + +var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){} + +// A Volume is an interface representing a Keep back-end storage unit: +// for example, a single mounted disk, a RAID array, an Amazon S3 volume, +// etc. type Volume interface { - Get(loc string) ([]byte, error) - Put(loc string, block []byte) error + // Get a block: copy the block data into buf, and return the + // number of bytes copied. + // + // loc is guaranteed to consist of 32 or more lowercase hex + // digits. + // + // Get should not verify the integrity of the data: it should + // just return whatever was found in its backing + // store. (Integrity checking is the caller's responsibility.) + // + // If an error is encountered that prevents it from + // retrieving the data, that error should be returned so the + // caller can log (and send to the client) a more useful + // message. + // + // If the error is "not found", and there's no particular + // reason to expect the block to be found (other than that a + // caller is asking for it), the returned error should satisfy + // os.IsNotExist(err): this is a normal condition and will not + // be logged as an error (except that a 404 will appear in the + // access log if the block is not found on any other volumes + // either). + // + // If the data in the backing store is bigger than len(buf), + // then Get is permitted to return an error without reading + // any of the data. + // + // len(buf) will not exceed BlockSize. + Get(ctx context.Context, loc string, buf []byte) (int, error) + + // Compare the given data with the stored data (i.e., what Get + // would return). If equal, return nil. If not, return + // CollisionError or DiskHashError (depending on whether the + // data on disk matches the expected hash), or whatever error + // was encountered opening/reading the stored data. + Compare(ctx context.Context, loc string, data []byte) error + + // Put writes a block to an underlying storage device. + // + // loc is as described in Get. + // + // len(block) is guaranteed to be between 0 and BlockSize. + // + // If a block is already stored under the same name (loc) with + // different content, Put must either overwrite the existing + // data with the new data or return a non-nil error. When + // overwriting existing data, it must never leave the storage + // device in an inconsistent state: a subsequent call to Get + // must return either the entire old block, the entire new + // block, or an error. (An implementation that cannot peform + // atomic updates must leave the old data alone and return an + // error.) + // + // Put also sets the timestamp for the given locator to the + // current time. + // + // Put must return a non-nil error unless it can guarantee + // that the entire block has been written and flushed to + // persistent storage, and that its timestamp is current. Of + // course, this guarantee is only as good as the underlying + // storage device, but it is Put's responsibility to at least + // get whatever guarantee is offered by the storage device. + // + // Put should not verify that loc==hash(block): this is the + // caller's responsibility. + Put(ctx context.Context, loc string, block []byte) error + + // Touch sets the timestamp for the given locator to the + // current time. + // + // loc is as described in Get. + // + // If invoked at time t0, Touch must guarantee that a + // subsequent call to Mtime will return a timestamp no older + // than {t0 minus one second}. For example, if Touch is called + // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a + // subsequent Mtime to return any of the following: + // + // - 2015-07-07T01:23:45.00000000Z + // - 2015-07-07T01:23:45.67890123Z + // - 2015-07-07T01:23:46.67890123Z + // - 2015-07-08T00:00:00.00000000Z + // + // It is not acceptable for a subsequente Mtime to return + // either of the following: + // + // - 2015-07-07T00:00:00.00000000Z -- ERROR + // - 2015-07-07T01:23:44.00000000Z -- ERROR + // + // Touch must return a non-nil error if the timestamp cannot + // be updated. Touch(loc string) error + + // Mtime returns the stored timestamp for the given locator. + // + // loc is as described in Get. + // + // Mtime must return a non-nil error if the given block is not + // found or the timestamp could not be retrieved. Mtime(loc string) (time.Time, error) - Index(prefix string) string - Delete(loc string) error + + // IndexTo writes a complete list of locators with the given + // prefix for which Get() can retrieve data. + // + // prefix consists of zero or more lowercase hexadecimal + // digits. + // + // Each locator must be written to the given writer using the + // following format: + // + // loc "+" size " " timestamp "\n" + // + // where: + // + // - size is the number of bytes of content, given as a + // decimal number with one or more digits + // + // - timestamp is the timestamp stored for the locator, + // given as a decimal number of seconds after January 1, + // 1970 UTC. + // + // IndexTo must not write any other data to writer: for + // example, it must not write any blank lines. + // + // If an error makes it impossible to provide a complete + // index, IndexTo must return a non-nil error. It is + // acceptable to return a non-nil error after writing a + // partial index to writer. + // + // The resulting index is not expected to be sorted in any + // particular order. + IndexTo(prefix string, writer io.Writer) error + + // Trash moves the block data from the underlying storage + // device to trash area. The block then stays in trash for + // BlobTrashLifetime before it is actually deleted. + // + // loc is as described in Get. + // + // If the timestamp for the given locator is newer than + // BlobSigningTTL, Trash must not trash the data. + // + // If a Trash operation overlaps with any Touch or Put + // operations on the same locator, the implementation must + // ensure one of the following outcomes: + // + // - Touch and Put return a non-nil error, or + // - Trash does not trash the block, or + // - Both of the above. + // + // If it is possible for the storage device to be accessed by + // a different process or host, the synchronization mechanism + // should also guard against races with other processes and + // hosts. If such a mechanism is not available, there must be + // a mechanism for detecting unsafe configurations, alerting + // the operator, and aborting or falling back to a read-only + // state. In other words, running multiple keepstore processes + // with the same underlying storage device must either work + // reliably or fail outright. + // + // Corollary: A successful Touch or Put guarantees a block + // will not be trashed for at least BlobSigningTTL seconds. + Trash(loc string) error + + // Untrash moves block from trash back into store + Untrash(loc string) error + + // Status returns a *VolumeStatus representing the current + // in-use and available storage capacity and an + // implementation-specific volume identifier (e.g., "mount + // point" for a UnixVolume). Status() *VolumeStatus + + // String returns an identifying label for this volume, + // suitable for including in log messages. It should contain + // enough information to uniquely identify the underlying + // storage device, but should not contain any credentials or + // secrets. String() string + + // EmptyTrash looks for trashed blocks that exceeded + // BlobTrashLifetime and deletes them from the volume. + EmptyTrash() + + // Return a globally unique ID of the underlying storage + // device if possible, otherwise "". + GetDeviceID() string } -// MockVolumes are Volumes used to test the Keep front end. -// -// If the Bad field is true, this volume should return an error -// on all writes and puts. -// -// The Touchable field signifies whether the Touch method will -// succeed. Defaults to true. Note that Bad and Touchable are -// independent: a MockVolume may be set up so that Put fails but Touch -// works or vice versa. -// -// TODO(twp): rename Bad to something more descriptive, e.g. Writable, -// and make sure that the tests that rely on it are testing the right -// thing. We may need to simulate Writable, Touchable and Corrupt -// volumes in different ways. -// -type MockVolume struct { - Store map[string][]byte - Timestamps map[string]time.Time - Bad bool - Touchable bool -} - -func CreateMockVolume() *MockVolume { - return &MockVolume{ - Store: make(map[string][]byte), - Timestamps: make(map[string]time.Time), - Bad: false, - Touchable: true, - } +// A VolumeWithExamples provides example configs to display in the +// -help message. +type VolumeWithExamples interface { + Volume + Examples() []Volume } -func (v *MockVolume) Get(loc string) ([]byte, error) { - if v.Bad { - return nil, errors.New("Bad volume") - } else if block, ok := v.Store[loc]; ok { - return block, nil - } - return nil, os.ErrNotExist +// A VolumeManager tells callers which volumes can read, which volumes +// can write, and on which volume the next write should be attempted. +type VolumeManager interface { + // Mounts returns all mounts (volume attachments). + Mounts() []*VolumeMount + + // Lookup returns the mount with the given UUID. Returns nil + // if the mount does not exist. If write==true, returns nil if + // the mount is not writable. + Lookup(uuid string, write bool) *VolumeMount + + // AllReadable returns all mounts. + AllReadable() []*VolumeMount + + // AllWritable returns all mounts that aren't known to be in + // a read-only state. (There is no guarantee that a write to + // one will succeed, though.) + AllWritable() []*VolumeMount + + // NextWritable returns the volume where the next new block + // should be written. A VolumeManager can select a volume in + // order to distribute activity across spindles, fill up disks + // with more free space, etc. + NextWritable() *VolumeMount + + // VolumeStats returns the ioStats used for tracking stats for + // the given Volume. + VolumeStats(Volume) *ioStats + + // Close shuts down the volume manager cleanly. + Close() } -func (v *MockVolume) Put(loc string, block []byte) error { - if v.Bad { - return errors.New("Bad volume") - } - v.Store[loc] = block - return v.Touch(loc) +// A VolumeMount is an attachment of a Volume to a VolumeManager. +type VolumeMount struct { + arvados.KeepMount + Volume } -func (v *MockVolume) Touch(loc string) error { - if v.Touchable { - v.Timestamps[loc] = time.Now() - return nil +// Generate a UUID the way API server would for a "KeepVolumeMount" +// object. +func (*VolumeMount) generateUUID() string { + var max big.Int + _, ok := max.SetString("zzzzzzzzzzzzzzz", 36) + if !ok { + panic("big.Int parse failed") + } + r, err := rand.Int(rand.Reader, &max) + if err != nil { + panic(err) } - return errors.New("Touch failed") + return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36)) } -func (v *MockVolume) Mtime(loc string) (time.Time, error) { - var mtime time.Time - var err error - if v.Bad { - err = errors.New("Bad volume") - } else if t, ok := v.Timestamps[loc]; ok { - mtime = t - } else { - err = os.ErrNotExist - } - return mtime, err +// RRVolumeManager is a round-robin VolumeManager: the Nth call to +// NextWritable returns the (N % len(writables))th writable Volume +// (where writables are all Volumes v where v.Writable()==true). +type RRVolumeManager struct { + mounts []*VolumeMount + mountMap map[string]*VolumeMount + readables []*VolumeMount + writables []*VolumeMount + counter uint32 + iostats map[Volume]*ioStats } -func (v *MockVolume) Index(prefix string) string { - var result string - for loc, block := range v.Store { - if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) { - result = result + fmt.Sprintf("%s+%d %d\n", - loc, len(block), 123456789) +func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) { + vm := &RRVolumeManager{ + iostats: make(map[Volume]*ioStats), + } + vm.mountMap = make(map[string]*VolumeMount) + for uuid, cfgvol := range cluster.Volumes { + va, ok := cfgvol.AccessViaHosts[myURL] + if !ok && len(cfgvol.AccessViaHosts) > 0 { + continue + } + dri, ok := driver[cfgvol.Driver] + if !ok { + return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver) + } + vol, err := dri(cluster, cfgvol, logger, metrics) + if err != nil { + return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err) + } + logger.Printf("started volume %s (%s), ReadOnly=%v", uuid, vol, cfgvol.ReadOnly) + + sc := cfgvol.StorageClasses + if len(sc) == 0 { + sc = map[string]bool{"default": true} + } + repl := cfgvol.Replication + if repl < 1 { + repl = 1 + } + mnt := &VolumeMount{ + KeepMount: arvados.KeepMount{ + UUID: uuid, + DeviceID: vol.GetDeviceID(), + ReadOnly: cfgvol.ReadOnly || va.ReadOnly, + Replication: repl, + StorageClasses: sc, + }, + Volume: vol, + } + vm.iostats[vol] = &ioStats{} + vm.mounts = append(vm.mounts, mnt) + vm.mountMap[uuid] = mnt + vm.readables = append(vm.readables, mnt) + if !mnt.KeepMount.ReadOnly { + vm.writables = append(vm.writables, mnt) } } - return result + return vm, nil } -func (v *MockVolume) Delete(loc string) error { - if _, ok := v.Store[loc]; ok { - if time.Since(v.Timestamps[loc]) < permission_ttl { - return nil - } - delete(v.Store, loc) +func (vm *RRVolumeManager) Mounts() []*VolumeMount { + return vm.mounts +} + +func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount { + if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) { + return mnt + } else { return nil } - return os.ErrNotExist } -func (v *MockVolume) Status() *VolumeStatus { - var used uint64 - for _, block := range v.Store { - used = used + uint64(len(block)) - } - return &VolumeStatus{"/bogo", 123, 1000000 - used, used} +// AllReadable returns an array of all readable volumes +func (vm *RRVolumeManager) AllReadable() []*VolumeMount { + return vm.readables } -func (v *MockVolume) String() string { - return "[MockVolume]" +// AllWritable returns an array of all writable volumes +func (vm *RRVolumeManager) AllWritable() []*VolumeMount { + return vm.writables } -// A VolumeManager manages a collection of volumes. -// -// - Volumes is a slice of available Volumes. -// - Choose() returns a Volume suitable for writing to. -// - Quit() instructs the VolumeManager to shut down gracefully. -// -type VolumeManager interface { - Volumes() []Volume - Choose() Volume - Quit() +// NextWritable returns the next writable +func (vm *RRVolumeManager) NextWritable() *VolumeMount { + if len(vm.writables) == 0 { + return nil + } + i := atomic.AddUint32(&vm.counter, 1) + return vm.writables[i%uint32(len(vm.writables))] } -type RRVolumeManager struct { - volumes []Volume - nextwrite chan Volume - quit chan int -} - -func MakeRRVolumeManager(vols []Volume) *RRVolumeManager { - // Create a new VolumeManager struct with the specified volumes, - // and with new Nextwrite and Quit channels. - // The Quit channel is buffered with a capacity of 1 so that - // another routine may write to it without blocking. - vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)} - - // This goroutine implements round-robin volume selection. - // It sends each available Volume in turn to the Nextwrite - // channel, until receiving a notification on the Quit channel - // that it should terminate. - go func() { - var i int = 0 - for { - select { - case <-vm.quit: - return - case vm.nextwrite <- vm.volumes[i]: - i = (i + 1) % len(vm.volumes) - } - } - }() +// VolumeStats returns an ioStats for the given volume. +func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats { + return vm.iostats[v] +} - return vm +// Close the RRVolumeManager +func (vm *RRVolumeManager) Close() { } -func (vm *RRVolumeManager) Volumes() []Volume { - return vm.volumes +// VolumeStatus describes the current condition of a volume +type VolumeStatus struct { + MountPoint string + DeviceNum uint64 + BytesFree uint64 + BytesUsed uint64 } -func (vm *RRVolumeManager) Choose() Volume { - return <-vm.nextwrite +// ioStats tracks I/O statistics for a volume or server +type ioStats struct { + Errors uint64 + Ops uint64 + CompareOps uint64 + GetOps uint64 + PutOps uint64 + TouchOps uint64 + InBytes uint64 + OutBytes uint64 } -func (vm *RRVolumeManager) Quit() { - vm.quit <- 1 +type InternalStatser interface { + InternalStats() interface{} }