15521: Fix truncated index response.
[arvados.git] / services / keepstore / volume.go
index 1b5294952820b5cb2f0be2d53c37eb84f2cefed5..1dea6194d51c88b195f6155392b233b879cbeee8 100644 (file)
-// A Volume is an interface representing a Keep back-end storage unit:
-// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
-// etc.
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
 
 package main
 
 import (
+       "context"
+       "crypto/rand"
+       "fmt"
        "io"
+       "math/big"
        "sync/atomic"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
+type BlockWriter interface {
+       // WriteBlock reads all data from r, writes it to a backing
+       // store as "loc", and returns the number of bytes written.
+       WriteBlock(ctx context.Context, loc string, r io.Reader) error
+}
+
+type BlockReader interface {
+       // ReadBlock retrieves data previously stored as "loc" and
+       // writes it to w.
+       ReadBlock(ctx context.Context, loc string, w io.Writer) error
+}
+
+var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){}
+
+// A Volume is an interface representing a Keep back-end storage unit:
+// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
+// etc.
 type Volume interface {
-       Get(loc string) ([]byte, error)
-       Put(loc string, block []byte) error
+       // Get a block: copy the block data into buf, and return the
+       // number of bytes copied.
+       //
+       // loc is guaranteed to consist of 32 or more lowercase hex
+       // digits.
+       //
+       // Get should not verify the integrity of the data: it should
+       // just return whatever was found in its backing
+       // store. (Integrity checking is the caller's responsibility.)
+       //
+       // If an error is encountered that prevents it from
+       // retrieving the data, that error should be returned so the
+       // caller can log (and send to the client) a more useful
+       // message.
+       //
+       // If the error is "not found", and there's no particular
+       // reason to expect the block to be found (other than that a
+       // caller is asking for it), the returned error should satisfy
+       // os.IsNotExist(err): this is a normal condition and will not
+       // be logged as an error (except that a 404 will appear in the
+       // access log if the block is not found on any other volumes
+       // either).
+       //
+       // If the data in the backing store is bigger than len(buf),
+       // then Get is permitted to return an error without reading
+       // any of the data.
+       //
+       // len(buf) will not exceed BlockSize.
+       Get(ctx context.Context, loc string, buf []byte) (int, error)
+
+       // Compare the given data with the stored data (i.e., what Get
+       // would return). If equal, return nil. If not, return
+       // CollisionError or DiskHashError (depending on whether the
+       // data on disk matches the expected hash), or whatever error
+       // was encountered opening/reading the stored data.
+       Compare(ctx context.Context, loc string, data []byte) error
+
+       // Put writes a block to an underlying storage device.
+       //
+       // loc is as described in Get.
+       //
+       // len(block) is guaranteed to be between 0 and BlockSize.
+       //
+       // If a block is already stored under the same name (loc) with
+       // different content, Put must either overwrite the existing
+       // data with the new data or return a non-nil error. When
+       // overwriting existing data, it must never leave the storage
+       // device in an inconsistent state: a subsequent call to Get
+       // must return either the entire old block, the entire new
+       // block, or an error. (An implementation that cannot peform
+       // atomic updates must leave the old data alone and return an
+       // error.)
+       //
+       // Put also sets the timestamp for the given locator to the
+       // current time.
+       //
+       // Put must return a non-nil error unless it can guarantee
+       // that the entire block has been written and flushed to
+       // persistent storage, and that its timestamp is current. Of
+       // course, this guarantee is only as good as the underlying
+       // storage device, but it is Put's responsibility to at least
+       // get whatever guarantee is offered by the storage device.
+       //
+       // Put should not verify that loc==hash(block): this is the
+       // caller's responsibility.
+       Put(ctx context.Context, loc string, block []byte) error
+
+       // Touch sets the timestamp for the given locator to the
+       // current time.
+       //
+       // loc is as described in Get.
+       //
+       // If invoked at time t0, Touch must guarantee that a
+       // subsequent call to Mtime will return a timestamp no older
+       // than {t0 minus one second}. For example, if Touch is called
+       // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
+       // subsequent Mtime to return any of the following:
+       //
+       //   - 2015-07-07T01:23:45.00000000Z
+       //   - 2015-07-07T01:23:45.67890123Z
+       //   - 2015-07-07T01:23:46.67890123Z
+       //   - 2015-07-08T00:00:00.00000000Z
+       //
+       // It is not acceptable for a subsequente Mtime to return
+       // either of the following:
+       //
+       //   - 2015-07-07T00:00:00.00000000Z -- ERROR
+       //   - 2015-07-07T01:23:44.00000000Z -- ERROR
+       //
+       // Touch must return a non-nil error if the timestamp cannot
+       // be updated.
        Touch(loc string) error
+
+       // Mtime returns the stored timestamp for the given locator.
+       //
+       // loc is as described in Get.
+       //
+       // Mtime must return a non-nil error if the given block is not
+       // found or the timestamp could not be retrieved.
        Mtime(loc string) (time.Time, error)
+
+       // IndexTo writes a complete list of locators with the given
+       // prefix for which Get() can retrieve data.
+       //
+       // prefix consists of zero or more lowercase hexadecimal
+       // digits.
+       //
+       // Each locator must be written to the given writer using the
+       // following format:
+       //
+       //   loc "+" size " " timestamp "\n"
+       //
+       // where:
+       //
+       //   - size is the number of bytes of content, given as a
+       //     decimal number with one or more digits
+       //
+       //   - timestamp is the timestamp stored for the locator,
+       //     given as a decimal number of seconds after January 1,
+       //     1970 UTC.
+       //
+       // IndexTo must not write any other data to writer: for
+       // example, it must not write any blank lines.
+       //
+       // If an error makes it impossible to provide a complete
+       // index, IndexTo must return a non-nil error. It is
+       // acceptable to return a non-nil error after writing a
+       // partial index to writer.
+       //
+       // The resulting index is not expected to be sorted in any
+       // particular order.
        IndexTo(prefix string, writer io.Writer) error
-       Delete(loc string) error
+
+       // Trash moves the block data from the underlying storage
+       // device to trash area. The block then stays in trash for
+       // BlobTrashLifetime before it is actually deleted.
+       //
+       // loc is as described in Get.
+       //
+       // If the timestamp for the given locator is newer than
+       // BlobSigningTTL, Trash must not trash the data.
+       //
+       // If a Trash operation overlaps with any Touch or Put
+       // operations on the same locator, the implementation must
+       // ensure one of the following outcomes:
+       //
+       //   - Touch and Put return a non-nil error, or
+       //   - Trash does not trash the block, or
+       //   - Both of the above.
+       //
+       // If it is possible for the storage device to be accessed by
+       // a different process or host, the synchronization mechanism
+       // should also guard against races with other processes and
+       // hosts. If such a mechanism is not available, there must be
+       // a mechanism for detecting unsafe configurations, alerting
+       // the operator, and aborting or falling back to a read-only
+       // state. In other words, running multiple keepstore processes
+       // with the same underlying storage device must either work
+       // reliably or fail outright.
+       //
+       // Corollary: A successful Touch or Put guarantees a block
+       // will not be trashed for at least BlobSigningTTL seconds.
+       Trash(loc string) error
+
+       // Untrash moves block from trash back into store
+       Untrash(loc string) error
+
+       // Status returns a *VolumeStatus representing the current
+       // in-use and available storage capacity and an
+       // implementation-specific volume identifier (e.g., "mount
+       // point" for a UnixVolume).
        Status() *VolumeStatus
+
+       // String returns an identifying label for this volume,
+       // suitable for including in log messages. It should contain
+       // enough information to uniquely identify the underlying
+       // storage device, but should not contain any credentials or
+       // secrets.
        String() string
-       Writable() bool
+
+       // EmptyTrash looks for trashed blocks that exceeded
+       // BlobTrashLifetime and deletes them from the volume.
+       EmptyTrash()
+
+       // Return a globally unique ID of the underlying storage
+       // device if possible, otherwise "".
+       GetDeviceID() string
+}
+
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+       Volume
+       Examples() []Volume
 }
 
 // A VolumeManager tells callers which volumes can read, which volumes
 // can write, and on which volume the next write should be attempted.
 type VolumeManager interface {
-       // AllReadable returns all volumes.
-       AllReadable() []Volume
-       // AllWritable returns all volumes that aren't known to be in
+       // Mounts returns all mounts (volume attachments).
+       Mounts() []*VolumeMount
+
+       // Lookup returns the mount with the given UUID. Returns nil
+       // if the mount does not exist. If write==true, returns nil if
+       // the mount is not writable.
+       Lookup(uuid string, write bool) *VolumeMount
+
+       // AllReadable returns all mounts.
+       AllReadable() []*VolumeMount
+
+       // AllWritable returns all mounts that aren't known to be in
        // a read-only state. (There is no guarantee that a write to
        // one will succeed, though.)
-       AllWritable() []Volume
+       AllWritable() []*VolumeMount
+
        // NextWritable returns the volume where the next new block
        // should be written. A VolumeManager can select a volume in
        // order to distribute activity across spindles, fill up disks
        // with more free space, etc.
-       NextWritable() Volume
+       NextWritable() *VolumeMount
+
+       // VolumeStats returns the ioStats used for tracking stats for
+       // the given Volume.
+       VolumeStats(Volume) *ioStats
+
        // Close shuts down the volume manager cleanly.
        Close()
 }
 
+// A VolumeMount is an attachment of a Volume to a VolumeManager.
+type VolumeMount struct {
+       arvados.KeepMount
+       Volume
+}
+
+// Generate a UUID the way API server would for a "KeepVolumeMount"
+// object.
+func (*VolumeMount) generateUUID() string {
+       var max big.Int
+       _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
+       if !ok {
+               panic("big.Int parse failed")
+       }
+       r, err := rand.Int(rand.Reader, &max)
+       if err != nil {
+               panic(err)
+       }
+       return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
+}
+
+// RRVolumeManager is a round-robin VolumeManager: the Nth call to
+// NextWritable returns the (N % len(writables))th writable Volume
+// (where writables are all Volumes v where v.Writable()==true).
 type RRVolumeManager struct {
-       readables []Volume
-       writables []Volume
+       mounts    []*VolumeMount
+       mountMap  map[string]*VolumeMount
+       readables []*VolumeMount
+       writables []*VolumeMount
        counter   uint32
+       iostats   map[Volume]*ioStats
 }
 
-func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
-       vm := &RRVolumeManager{}
-       for _, v := range volumes {
-               vm.readables = append(vm.readables, v)
-               if v.Writable() {
-                       vm.writables = append(vm.writables, v)
+func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) {
+       vm := &RRVolumeManager{
+               iostats: make(map[Volume]*ioStats),
+       }
+       vm.mountMap = make(map[string]*VolumeMount)
+       for uuid, cfgvol := range cluster.Volumes {
+               va, ok := cfgvol.AccessViaHosts[myURL]
+               if !ok && len(cfgvol.AccessViaHosts) > 0 {
+                       continue
+               }
+               dri, ok := driver[cfgvol.Driver]
+               if !ok {
+                       return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
+               }
+               vol, err := dri(cluster, cfgvol, logger, metrics)
+               if err != nil {
+                       return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err)
+               }
+               logger.Printf("started volume %s (%s), ReadOnly=%v", uuid, vol, cfgvol.ReadOnly)
+
+               sc := cfgvol.StorageClasses
+               if len(sc) == 0 {
+                       sc = map[string]bool{"default": true}
+               }
+               repl := cfgvol.Replication
+               if repl < 1 {
+                       repl = 1
+               }
+               mnt := &VolumeMount{
+                       KeepMount: arvados.KeepMount{
+                               UUID:           uuid,
+                               DeviceID:       vol.GetDeviceID(),
+                               ReadOnly:       cfgvol.ReadOnly || va.ReadOnly,
+                               Replication:    repl,
+                               StorageClasses: sc,
+                       },
+                       Volume: vol,
+               }
+               vm.iostats[vol] = &ioStats{}
+               vm.mounts = append(vm.mounts, mnt)
+               vm.mountMap[uuid] = mnt
+               vm.readables = append(vm.readables, mnt)
+               if !mnt.KeepMount.ReadOnly {
+                       vm.writables = append(vm.writables, mnt)
                }
        }
-       return vm
+       return vm, nil
 }
 
-func (vm *RRVolumeManager) AllReadable() []Volume {
+func (vm *RRVolumeManager) Mounts() []*VolumeMount {
+       return vm.mounts
+}
+
+func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount {
+       if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
+               return mnt
+       } else {
+               return nil
+       }
+}
+
+// AllReadable returns an array of all readable volumes
+func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
        return vm.readables
 }
 
-func (vm *RRVolumeManager) AllWritable() []Volume {
+// AllWritable returns an array of all writable volumes
+func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
        return vm.writables
 }
 
-func (vm *RRVolumeManager) NextWritable() Volume {
+// NextWritable returns the next writable
+func (vm *RRVolumeManager) NextWritable() *VolumeMount {
        if len(vm.writables) == 0 {
                return nil
        }
@@ -73,5 +377,35 @@ func (vm *RRVolumeManager) NextWritable() Volume {
        return vm.writables[i%uint32(len(vm.writables))]
 }
 
+// VolumeStats returns an ioStats for the given volume.
+func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
+       return vm.iostats[v]
+}
+
+// Close the RRVolumeManager
 func (vm *RRVolumeManager) Close() {
 }
+
+// VolumeStatus describes the current condition of a volume
+type VolumeStatus struct {
+       MountPoint string
+       DeviceNum  uint64
+       BytesFree  uint64
+       BytesUsed  uint64
+}
+
+// ioStats tracks I/O statistics for a volume or server
+type ioStats struct {
+       Errors     uint64
+       Ops        uint64
+       CompareOps uint64
+       GetOps     uint64
+       PutOps     uint64
+       TouchOps   uint64
+       InBytes    uint64
+       OutBytes   uint64
+}
+
+type InternalStatser interface {
+       InternalStats() interface{}
+}