Merge branch '8784-dir-listings'
[arvados.git] / services / keepstore / volume.go
index 58710c04b269a57af236fbb36f5a6aaa61d9b256..69802abdd1b5c4e22422293331d6cb0eec371896 100644 (file)
@@ -1,26 +1,52 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
+       "context"
+       "crypto/rand"
+       "fmt"
        "io"
+       "math/big"
        "sync/atomic"
        "time"
 )
 
+type BlockWriter interface {
+       // WriteBlock reads all data from r, writes it to a backing
+       // store as "loc", and returns the number of bytes written.
+       WriteBlock(ctx context.Context, loc string, r io.Reader) error
+}
+
+type BlockReader interface {
+       // ReadBlock retrieves data previously stored as "loc" and
+       // writes it to w.
+       ReadBlock(ctx context.Context, loc string, w io.Writer) error
+}
+
 // A Volume is an interface representing a Keep back-end storage unit:
 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
 // etc.
 type Volume interface {
-       // Get a block. IFF the returned error is nil, the caller must
-       // put the returned slice back into the buffer pool when it's
-       // finished with it. (Otherwise, the buffer pool will be
-       // depleted and eventually -- when all available buffers are
-       // used and not returned -- operations will reach deadlock.)
+       // Volume type as specified in config file. Examples: "S3",
+       // "Directory".
+       Type() string
+
+       // Do whatever private setup tasks and configuration checks
+       // are needed. Return non-nil if the volume is unusable (e.g.,
+       // invalid config).
+       Start() error
+
+       // Get a block: copy the block data into buf, and return the
+       // number of bytes copied.
        //
        // loc is guaranteed to consist of 32 or more lowercase hex
        // digits.
        //
-       // Get should not verify the integrity of the returned data:
-       // it should just return whatever was found in its backing
+       // Get should not verify the integrity of the data: it should
+       // just return whatever was found in its backing
        // store. (Integrity checking is the caller's responsibility.)
        //
        // If an error is encountered that prevents it from
@@ -36,17 +62,19 @@ type Volume interface {
        // access log if the block is not found on any other volumes
        // either).
        //
-       // If the data in the backing store is bigger than BlockSize,
-       // Get is permitted to return an error without reading any of
-       // the data.
-       Get(loc string) ([]byte, error)
+       // If the data in the backing store is bigger than len(buf),
+       // then Get is permitted to return an error without reading
+       // any of the data.
+       //
+       // len(buf) will not exceed BlockSize.
+       Get(ctx context.Context, loc string, buf []byte) (int, error)
 
        // Compare the given data with the stored data (i.e., what Get
        // would return). If equal, return nil. If not, return
        // CollisionError or DiskHashError (depending on whether the
        // data on disk matches the expected hash), or whatever error
        // was encountered opening/reading the stored data.
-       Compare(loc string, data []byte) error
+       Compare(ctx context.Context, loc string, data []byte) error
 
        // Put writes a block to an underlying storage device.
        //
@@ -76,7 +104,7 @@ type Volume interface {
        //
        // Put should not verify that loc==hash(block): this is the
        // caller's responsibility.
-       Put(loc string, block []byte) error
+       Put(ctx context.Context, loc string, block []byte) error
 
        // Touch sets the timestamp for the given locator to the
        // current time.
@@ -151,7 +179,7 @@ type Volume interface {
        // loc is as described in Get.
        //
        // If the timestamp for the given locator is newer than
-       // blobSignatureTTL, Trash must not trash the data.
+       // BlobSignatureTTL, Trash must not trash the data.
        //
        // If a Trash operation overlaps with any Touch or Put
        // operations on the same locator, the implementation must
@@ -172,7 +200,7 @@ type Volume interface {
        // reliably or fail outright.
        //
        // Corollary: A successful Touch or Put guarantees a block
-       // will not be trashed for at least blobSignatureTTL
+       // will not be trashed for at least BlobSignatureTTL
        // seconds.
        Trash(loc string) error
 
@@ -204,11 +232,34 @@ type Volume interface {
        // underlying device. It will be passed on to clients in
        // responses to PUT requests.
        Replication() int
+
+       // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
+       // and deletes them from the volume.
+       EmptyTrash()
+
+       // Return a globally unique ID of the underlying storage
+       // device if possible, otherwise "".
+       DeviceID() string
+}
+
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+       Volume
+       Examples() []Volume
 }
 
 // A VolumeManager tells callers which volumes can read, which volumes
 // can write, and on which volume the next write should be attempted.
 type VolumeManager interface {
+       // Mounts returns all mounts (volume attachments).
+       Mounts() []*VolumeMount
+
+       // Lookup returns the volume under the given mount
+       // UUID. Returns nil if the mount does not exist. If
+       // write==true, returns nil if the volume is not writable.
+       Lookup(uuid string, write bool) Volume
+
        // AllReadable returns all volumes.
        AllReadable() []Volume
 
@@ -223,23 +274,69 @@ type VolumeManager interface {
        // with more free space, etc.
        NextWritable() Volume
 
+       // VolumeStats returns the ioStats used for tracking stats for
+       // the given Volume.
+       VolumeStats(Volume) *ioStats
+
        // Close shuts down the volume manager cleanly.
        Close()
 }
 
+// A VolumeMount is an attachment of a Volume to a VolumeManager.
+type VolumeMount struct {
+       UUID        string
+       DeviceID    string
+       ReadOnly    bool
+       Replication int
+       Tier        int
+       volume      Volume
+}
+
+// Generate a UUID the way API server would for a "KeepVolumeMount"
+// object.
+func (*VolumeMount) generateUUID() string {
+       var max big.Int
+       _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
+       if !ok {
+               panic("big.Int parse failed")
+       }
+       r, err := rand.Int(rand.Reader, &max)
+       if err != nil {
+               panic(err)
+       }
+       return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
+}
+
 // RRVolumeManager is a round-robin VolumeManager: the Nth call to
 // NextWritable returns the (N % len(writables))th writable Volume
 // (where writables are all Volumes v where v.Writable()==true).
 type RRVolumeManager struct {
+       mounts    []*VolumeMount
+       mountMap  map[string]*VolumeMount
        readables []Volume
        writables []Volume
        counter   uint32
+       iostats   map[Volume]*ioStats
 }
 
 // MakeRRVolumeManager initializes RRVolumeManager
 func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
-       vm := &RRVolumeManager{}
+       vm := &RRVolumeManager{
+               iostats: make(map[Volume]*ioStats),
+       }
+       vm.mountMap = make(map[string]*VolumeMount)
        for _, v := range volumes {
+               mnt := &VolumeMount{
+                       UUID:        (*VolumeMount)(nil).generateUUID(),
+                       DeviceID:    v.DeviceID(),
+                       ReadOnly:    !v.Writable(),
+                       Replication: v.Replication(),
+                       Tier:        1,
+                       volume:      v,
+               }
+               vm.iostats[v] = &ioStats{}
+               vm.mounts = append(vm.mounts, mnt)
+               vm.mountMap[mnt.UUID] = mnt
                vm.readables = append(vm.readables, v)
                if v.Writable() {
                        vm.writables = append(vm.writables, v)
@@ -248,6 +345,18 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
        return vm
 }
 
+func (vm *RRVolumeManager) Mounts() []*VolumeMount {
+       return vm.mounts
+}
+
+func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) Volume {
+       if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
+               return mnt.volume
+       } else {
+               return nil
+       }
+}
+
 // AllReadable returns an array of all readable volumes
 func (vm *RRVolumeManager) AllReadable() []Volume {
        return vm.readables
@@ -267,18 +376,35 @@ func (vm *RRVolumeManager) NextWritable() Volume {
        return vm.writables[i%uint32(len(vm.writables))]
 }
 
+// VolumeStats returns an ioStats for the given volume.
+func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
+       return vm.iostats[v]
+}
+
 // Close the RRVolumeManager
 func (vm *RRVolumeManager) Close() {
 }
 
-// VolumeStatus provides status information of the volume consisting of:
-//   * mount_point
-//   * device_num (an integer identifying the underlying storage system)
-//   * bytes_free
-//   * bytes_used
+// VolumeStatus describes the current condition of a volume
 type VolumeStatus struct {
-       MountPoint string `json:"mount_point"`
-       DeviceNum  uint64 `json:"device_num"`
-       BytesFree  uint64 `json:"bytes_free"`
-       BytesUsed  uint64 `json:"bytes_used"`
+       MountPoint string
+       DeviceNum  uint64
+       BytesFree  uint64
+       BytesUsed  uint64
+}
+
+// ioStats tracks I/O statistics for a volume or server
+type ioStats struct {
+       Errors     uint64
+       Ops        uint64
+       CompareOps uint64
+       GetOps     uint64
+       PutOps     uint64
+       TouchOps   uint64
+       InBytes    uint64
+       OutBytes   uint64
+}
+
+type InternalStatser interface {
+       InternalStats() interface{}
 }