17574: Merge branch 'main'
[arvados.git] / services / keepstore / volume.go
index 6fb1a1e0876324aac78046932dbfe511f3384cf4..26e6b731828f9be0861044cb6a7c4e10d097d05f 100644 (file)
-// A Volume is an interface representing a Keep back-end storage unit:
-// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
-// etc.
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
 
 package main
 
 import (
-       "errors"
+       "context"
+       "crypto/rand"
        "fmt"
-       "os"
-       "strings"
+       "io"
+       "math/big"
+       "sync/atomic"
        "time"
+
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
+type BlockWriter interface {
+       // WriteBlock reads all data from r, writes it to a backing
+       // store as "loc", and returns the number of bytes written.
+       WriteBlock(ctx context.Context, loc string, r io.Reader) error
+}
+
+type BlockReader interface {
+       // ReadBlock retrieves data previously stored as "loc" and
+       // writes it to w.
+       ReadBlock(ctx context.Context, loc string, w io.Writer) error
+}
+
+var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){}
+
+// A Volume is an interface representing a Keep back-end storage unit:
+// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
+// etc.
 type Volume interface {
-       Get(loc string) ([]byte, error)
-       Put(loc string, block []byte) error
+       // Get a block: copy the block data into buf, and return the
+       // number of bytes copied.
+       //
+       // loc is guaranteed to consist of 32 or more lowercase hex
+       // digits.
+       //
+       // Get should not verify the integrity of the data: it should
+       // just return whatever was found in its backing
+       // store. (Integrity checking is the caller's responsibility.)
+       //
+       // If an error is encountered that prevents it from
+       // retrieving the data, that error should be returned so the
+       // caller can log (and send to the client) a more useful
+       // message.
+       //
+       // If the error is "not found", and there's no particular
+       // reason to expect the block to be found (other than that a
+       // caller is asking for it), the returned error should satisfy
+       // os.IsNotExist(err): this is a normal condition and will not
+       // be logged as an error (except that a 404 will appear in the
+       // access log if the block is not found on any other volumes
+       // either).
+       //
+       // If the data in the backing store is bigger than len(buf),
+       // then Get is permitted to return an error without reading
+       // any of the data.
+       //
+       // len(buf) will not exceed BlockSize.
+       Get(ctx context.Context, loc string, buf []byte) (int, error)
+
+       // Compare the given data with the stored data (i.e., what Get
+       // would return). If equal, return nil. If not, return
+       // CollisionError or DiskHashError (depending on whether the
+       // data on disk matches the expected hash), or whatever error
+       // was encountered opening/reading the stored data.
+       Compare(ctx context.Context, loc string, data []byte) error
+
+       // Put writes a block to an underlying storage device.
+       //
+       // loc is as described in Get.
+       //
+       // len(block) is guaranteed to be between 0 and BlockSize.
+       //
+       // If a block is already stored under the same name (loc) with
+       // different content, Put must either overwrite the existing
+       // data with the new data or return a non-nil error. When
+       // overwriting existing data, it must never leave the storage
+       // device in an inconsistent state: a subsequent call to Get
+       // must return either the entire old block, the entire new
+       // block, or an error. (An implementation that cannot peform
+       // atomic updates must leave the old data alone and return an
+       // error.)
+       //
+       // Put also sets the timestamp for the given locator to the
+       // current time.
+       //
+       // Put must return a non-nil error unless it can guarantee
+       // that the entire block has been written and flushed to
+       // persistent storage, and that its timestamp is current. Of
+       // course, this guarantee is only as good as the underlying
+       // storage device, but it is Put's responsibility to at least
+       // get whatever guarantee is offered by the storage device.
+       //
+       // Put should not verify that loc==hash(block): this is the
+       // caller's responsibility.
+       Put(ctx context.Context, loc string, block []byte) error
+
+       // Touch sets the timestamp for the given locator to the
+       // current time.
+       //
+       // loc is as described in Get.
+       //
+       // If invoked at time t0, Touch must guarantee that a
+       // subsequent call to Mtime will return a timestamp no older
+       // than {t0 minus one second}. For example, if Touch is called
+       // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
+       // subsequent Mtime to return any of the following:
+       //
+       //   - 2015-07-07T01:23:45.00000000Z
+       //   - 2015-07-07T01:23:45.67890123Z
+       //   - 2015-07-07T01:23:46.67890123Z
+       //   - 2015-07-08T00:00:00.00000000Z
+       //
+       // It is not acceptable for a subsequente Mtime to return
+       // either of the following:
+       //
+       //   - 2015-07-07T00:00:00.00000000Z -- ERROR
+       //   - 2015-07-07T01:23:44.00000000Z -- ERROR
+       //
+       // Touch must return a non-nil error if the timestamp cannot
+       // be updated.
        Touch(loc string) error
+
+       // Mtime returns the stored timestamp for the given locator.
+       //
+       // loc is as described in Get.
+       //
+       // Mtime must return a non-nil error if the given block is not
+       // found or the timestamp could not be retrieved.
        Mtime(loc string) (time.Time, error)
-       Index(prefix string) string
-       Delete(loc string) error
+
+       // IndexTo writes a complete list of locators with the given
+       // prefix for which Get() can retrieve data.
+       //
+       // prefix consists of zero or more lowercase hexadecimal
+       // digits.
+       //
+       // Each locator must be written to the given writer using the
+       // following format:
+       //
+       //   loc "+" size " " timestamp "\n"
+       //
+       // where:
+       //
+       //   - size is the number of bytes of content, given as a
+       //     decimal number with one or more digits
+       //
+       //   - timestamp is the timestamp stored for the locator,
+       //     given as a decimal number of seconds after January 1,
+       //     1970 UTC.
+       //
+       // IndexTo must not write any other data to writer: for
+       // example, it must not write any blank lines.
+       //
+       // If an error makes it impossible to provide a complete
+       // index, IndexTo must return a non-nil error. It is
+       // acceptable to return a non-nil error after writing a
+       // partial index to writer.
+       //
+       // The resulting index is not expected to be sorted in any
+       // particular order.
+       IndexTo(prefix string, writer io.Writer) error
+
+       // Trash moves the block data from the underlying storage
+       // device to trash area. The block then stays in trash for
+       // BlobTrashLifetime before it is actually deleted.
+       //
+       // loc is as described in Get.
+       //
+       // If the timestamp for the given locator is newer than
+       // BlobSigningTTL, Trash must not trash the data.
+       //
+       // If a Trash operation overlaps with any Touch or Put
+       // operations on the same locator, the implementation must
+       // ensure one of the following outcomes:
+       //
+       //   - Touch and Put return a non-nil error, or
+       //   - Trash does not trash the block, or
+       //   - Both of the above.
+       //
+       // If it is possible for the storage device to be accessed by
+       // a different process or host, the synchronization mechanism
+       // should also guard against races with other processes and
+       // hosts. If such a mechanism is not available, there must be
+       // a mechanism for detecting unsafe configurations, alerting
+       // the operator, and aborting or falling back to a read-only
+       // state. In other words, running multiple keepstore processes
+       // with the same underlying storage device must either work
+       // reliably or fail outright.
+       //
+       // Corollary: A successful Touch or Put guarantees a block
+       // will not be trashed for at least BlobSigningTTL seconds.
+       Trash(loc string) error
+
+       // Untrash moves block from trash back into store
+       Untrash(loc string) error
+
+       // Status returns a *VolumeStatus representing the current
+       // in-use and available storage capacity and an
+       // implementation-specific volume identifier (e.g., "mount
+       // point" for a UnixVolume).
        Status() *VolumeStatus
+
+       // String returns an identifying label for this volume,
+       // suitable for including in log messages. It should contain
+       // enough information to uniquely identify the underlying
+       // storage device, but should not contain any credentials or
+       // secrets.
        String() string
+
+       // EmptyTrash looks for trashed blocks that exceeded
+       // BlobTrashLifetime and deletes them from the volume.
+       EmptyTrash()
+
+       // Return a globally unique ID of the underlying storage
+       // device if possible, otherwise "".
+       GetDeviceID() string
 }
 
-// MockVolumes are Volumes used to test the Keep front end.
-//
-// If the Bad field is true, this volume should return an error
-// on all writes and puts.
-//
-type MockVolume struct {
-       Store      map[string][]byte
-       Timestamps map[string]time.Time
-       Bad        bool
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+       Volume
+       Examples() []Volume
 }
 
-func CreateMockVolume() *MockVolume {
-       return &MockVolume{
-               make(map[string][]byte),
-               make(map[string]time.Time),
-               false,
-       }
+// A VolumeManager tells callers which volumes can read, which volumes
+// can write, and on which volume the next write should be attempted.
+type VolumeManager interface {
+       // Mounts returns all mounts (volume attachments).
+       Mounts() []*VolumeMount
+
+       // Lookup returns the mount with the given UUID. Returns nil
+       // if the mount does not exist. If write==true, returns nil if
+       // the mount is not writable.
+       Lookup(uuid string, write bool) *VolumeMount
+
+       // AllReadable returns all mounts.
+       AllReadable() []*VolumeMount
+
+       // AllWritable returns all mounts that aren't known to be in
+       // a read-only state. (There is no guarantee that a write to
+       // one will succeed, though.)
+       AllWritable() []*VolumeMount
+
+       // NextWritable returns the volume where the next new block
+       // should be written. A VolumeManager can select a volume in
+       // order to distribute activity across spindles, fill up disks
+       // with more free space, etc.
+       NextWritable() *VolumeMount
+
+       // VolumeStats returns the ioStats used for tracking stats for
+       // the given Volume.
+       VolumeStats(Volume) *ioStats
+
+       // Close shuts down the volume manager cleanly.
+       Close()
 }
 
-func (v *MockVolume) Get(loc string) ([]byte, error) {
-       if v.Bad {
-               return nil, errors.New("Bad volume")
-       } else if block, ok := v.Store[loc]; ok {
-               return block, nil
-       }
-       return nil, os.ErrNotExist
+// A VolumeMount is an attachment of a Volume to a VolumeManager.
+type VolumeMount struct {
+       arvados.KeepMount
+       Volume
 }
 
-func (v *MockVolume) Put(loc string, block []byte) error {
-       if v.Bad {
-               return errors.New("Bad volume")
+// Generate a UUID the way API server would for a "KeepVolumeMount"
+// object.
+func (*VolumeMount) generateUUID() string {
+       var max big.Int
+       _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
+       if !ok {
+               panic("big.Int parse failed")
+       }
+       r, err := rand.Int(rand.Reader, &max)
+       if err != nil {
+               panic(err)
        }
-       v.Store[loc] = block
-       return v.Touch(loc)
+       return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
 }
 
-func (v *MockVolume) Touch(loc string) error {
-       if v.Bad {
-               return errors.New("Bad volume")
-       }
-       v.Timestamps[loc] = time.Now()
-       return nil
+// RRVolumeManager is a round-robin VolumeManager: the Nth call to
+// NextWritable returns the (N % len(writables))th writable Volume
+// (where writables are all Volumes v where v.Writable()==true).
+type RRVolumeManager struct {
+       mounts    []*VolumeMount
+       mountMap  map[string]*VolumeMount
+       readables []*VolumeMount
+       writables []*VolumeMount
+       counter   uint32
+       iostats   map[Volume]*ioStats
 }
 
-func (v *MockVolume) Mtime(loc string) (time.Time, error) {
-       var mtime time.Time
-       var err error
-       if v.Bad {
-               err = errors.New("Bad volume")
-       } else if t, ok := v.Timestamps[loc]; ok {
-               mtime = t
-       } else {
-               err = os.ErrNotExist
+func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) {
+       vm := &RRVolumeManager{
+               iostats: make(map[Volume]*ioStats),
        }
-       return mtime, err
-}
+       vm.mountMap = make(map[string]*VolumeMount)
+       for uuid, cfgvol := range cluster.Volumes {
+               va, ok := cfgvol.AccessViaHosts[myURL]
+               if !ok && len(cfgvol.AccessViaHosts) > 0 {
+                       continue
+               }
+               dri, ok := driver[cfgvol.Driver]
+               if !ok {
+                       return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
+               }
+               vol, err := dri(cluster, cfgvol, logger, metrics)
+               if err != nil {
+                       return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err)
+               }
+               logger.Printf("started volume %s (%s), ReadOnly=%v", uuid, vol, cfgvol.ReadOnly || va.ReadOnly)
 
-func (v *MockVolume) Index(prefix string) string {
-       var result string
-       for loc, block := range v.Store {
-               if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
-                       result = result + fmt.Sprintf("%s+%d %d\n",
-                               loc, len(block), 123456789)
+               sc := cfgvol.StorageClasses
+               if len(sc) == 0 {
+                       sc = map[string]bool{"default": true}
+               }
+               repl := cfgvol.Replication
+               if repl < 1 {
+                       repl = 1
+               }
+               mnt := &VolumeMount{
+                       KeepMount: arvados.KeepMount{
+                               UUID:           uuid,
+                               DeviceID:       vol.GetDeviceID(),
+                               ReadOnly:       cfgvol.ReadOnly || va.ReadOnly,
+                               Replication:    repl,
+                               StorageClasses: sc,
+                       },
+                       Volume: vol,
+               }
+               vm.iostats[vol] = &ioStats{}
+               vm.mounts = append(vm.mounts, mnt)
+               vm.mountMap[uuid] = mnt
+               vm.readables = append(vm.readables, mnt)
+               if !mnt.KeepMount.ReadOnly {
+                       vm.writables = append(vm.writables, mnt)
                }
        }
-       return result
+       return vm, nil
 }
 
-func (v *MockVolume) Delete(loc string) error {
-       if _, ok := v.Store[loc]; ok {
-               if time.Since(v.Timestamps[loc]) < permission_ttl {
-                       return nil
-               }
-               delete(v.Store, loc)
-               return nil
-       }
-       return os.ErrNotExist
+func (vm *RRVolumeManager) Mounts() []*VolumeMount {
+       return vm.mounts
 }
 
-func (v *MockVolume) Status() *VolumeStatus {
-       var used uint64
-       for _, block := range v.Store {
-               used = used + uint64(len(block))
+func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount {
+       if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
+               return mnt
        }
-       return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
+       return nil
 }
 
-func (v *MockVolume) String() string {
-       return "[MockVolume]"
+// AllReadable returns an array of all readable volumes
+func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
+       return vm.readables
 }
 
-// A VolumeManager manages a collection of volumes.
-//
-// - Volumes is a slice of available Volumes.
-// - Choose() returns a Volume suitable for writing to.
-// - Quit() instructs the VolumeManager to shut down gracefully.
-//
-type VolumeManager interface {
-       Volumes() []Volume
-       Choose() Volume
-       Quit()
+// AllWritable returns an array of all writable volumes
+func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
+       return vm.writables
 }
 
-type RRVolumeManager struct {
-       volumes   []Volume
-       nextwrite chan Volume
-       quit      chan int
-}
-
-func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
-       // Create a new VolumeManager struct with the specified volumes,
-       // and with new Nextwrite and Quit channels.
-       // The Quit channel is buffered with a capacity of 1 so that
-       // another routine may write to it without blocking.
-       vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
-
-       // This goroutine implements round-robin volume selection.
-       // It sends each available Volume in turn to the Nextwrite
-       // channel, until receiving a notification on the Quit channel
-       // that it should terminate.
-       go func() {
-               var i int = 0
-               for {
-                       select {
-                       case <-vm.quit:
-                               return
-                       case vm.nextwrite <- vm.volumes[i]:
-                               i = (i + 1) % len(vm.volumes)
-                       }
-               }
-       }()
+// NextWritable returns the next writable
+func (vm *RRVolumeManager) NextWritable() *VolumeMount {
+       if len(vm.writables) == 0 {
+               return nil
+       }
+       i := atomic.AddUint32(&vm.counter, 1)
+       return vm.writables[i%uint32(len(vm.writables))]
+}
+
+// VolumeStats returns an ioStats for the given volume.
+func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
+       return vm.iostats[v]
+}
 
-       return vm
+// Close the RRVolumeManager
+func (vm *RRVolumeManager) Close() {
 }
 
-func (vm *RRVolumeManager) Volumes() []Volume {
-       return vm.volumes
+// VolumeStatus describes the current condition of a volume
+type VolumeStatus struct {
+       MountPoint string
+       DeviceNum  uint64
+       BytesFree  uint64
+       BytesUsed  uint64
 }
 
-func (vm *RRVolumeManager) Choose() Volume {
-       return <-vm.nextwrite
+// ioStats tracks I/O statistics for a volume or server
+type ioStats struct {
+       Errors     uint64
+       Ops        uint64
+       CompareOps uint64
+       GetOps     uint64
+       PutOps     uint64
+       TouchOps   uint64
+       InBytes    uint64
+       OutBytes   uint64
 }
 
-func (vm *RRVolumeManager) Quit() {
-       vm.quit <- 1
+type InternalStatser interface {
+       InternalStats() interface{}
 }