2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / volume.go
index f597ff578106544c54763c9847a3190b53154130..41a0eba86f5c281b265e0a605ab4f34e7ff20a5d 100644 (file)
@@ -6,426 +6,35 @@ package keepstore
 
 import (
        "context"
-       "crypto/rand"
-       "fmt"
        "io"
-       "math/big"
-       "sort"
-       "sync/atomic"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
 )
 
-type BlockWriter interface {
-       // WriteBlock reads all data from r, writes it to a backing
-       // store as "loc", and returns the number of bytes written.
-       WriteBlock(ctx context.Context, loc string, r io.Reader) error
-}
-
-type BlockReader interface {
-       // ReadBlock retrieves data previously stored as "loc" and
-       // writes it to w.
-       ReadBlock(ctx context.Context, loc string, w io.Writer) error
-}
-
-var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){}
-
-// A Volume is an interface representing a Keep back-end storage unit:
-// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
-// etc.
-type Volume interface {
-       // Get a block: copy the block data into buf, and return the
-       // number of bytes copied.
-       //
-       // loc is guaranteed to consist of 32 or more lowercase hex
-       // digits.
-       //
-       // Get should not verify the integrity of the data: it should
-       // just return whatever was found in its backing
-       // store. (Integrity checking is the caller's responsibility.)
-       //
-       // If an error is encountered that prevents it from
-       // retrieving the data, that error should be returned so the
-       // caller can log (and send to the client) a more useful
-       // message.
-       //
-       // If the error is "not found", and there's no particular
-       // reason to expect the block to be found (other than that a
-       // caller is asking for it), the returned error should satisfy
-       // os.IsNotExist(err): this is a normal condition and will not
-       // be logged as an error (except that a 404 will appear in the
-       // access log if the block is not found on any other volumes
-       // either).
-       //
-       // If the data in the backing store is bigger than len(buf),
-       // then Get is permitted to return an error without reading
-       // any of the data.
-       //
-       // len(buf) will not exceed BlockSize.
-       Get(ctx context.Context, loc string, buf []byte) (int, error)
-
-       // Compare the given data with the stored data (i.e., what Get
-       // would return). If equal, return nil. If not, return
-       // CollisionError or DiskHashError (depending on whether the
-       // data on disk matches the expected hash), or whatever error
-       // was encountered opening/reading the stored data.
-       Compare(ctx context.Context, loc string, data []byte) error
-
-       // Put writes a block to an underlying storage device.
-       //
-       // loc is as described in Get.
-       //
-       // len(block) is guaranteed to be between 0 and BlockSize.
-       //
-       // If a block is already stored under the same name (loc) with
-       // different content, Put must either overwrite the existing
-       // data with the new data or return a non-nil error. When
-       // overwriting existing data, it must never leave the storage
-       // device in an inconsistent state: a subsequent call to Get
-       // must return either the entire old block, the entire new
-       // block, or an error. (An implementation that cannot peform
-       // atomic updates must leave the old data alone and return an
-       // error.)
-       //
-       // Put also sets the timestamp for the given locator to the
-       // current time.
-       //
-       // Put must return a non-nil error unless it can guarantee
-       // that the entire block has been written and flushed to
-       // persistent storage, and that its timestamp is current. Of
-       // course, this guarantee is only as good as the underlying
-       // storage device, but it is Put's responsibility to at least
-       // get whatever guarantee is offered by the storage device.
-       //
-       // Put should not verify that loc==hash(block): this is the
-       // caller's responsibility.
-       Put(ctx context.Context, loc string, block []byte) error
-
-       // Touch sets the timestamp for the given locator to the
-       // current time.
-       //
-       // loc is as described in Get.
-       //
-       // If invoked at time t0, Touch must guarantee that a
-       // subsequent call to Mtime will return a timestamp no older
-       // than {t0 minus one second}. For example, if Touch is called
-       // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
-       // subsequent Mtime to return any of the following:
-       //
-       //   - 2015-07-07T01:23:45.00000000Z
-       //   - 2015-07-07T01:23:45.67890123Z
-       //   - 2015-07-07T01:23:46.67890123Z
-       //   - 2015-07-08T00:00:00.00000000Z
-       //
-       // It is not acceptable for a subsequente Mtime to return
-       // either of the following:
-       //
-       //   - 2015-07-07T00:00:00.00000000Z -- ERROR
-       //   - 2015-07-07T01:23:44.00000000Z -- ERROR
-       //
-       // Touch must return a non-nil error if the timestamp cannot
-       // be updated.
-       Touch(loc string) error
-
-       // Mtime returns the stored timestamp for the given locator.
-       //
-       // loc is as described in Get.
-       //
-       // Mtime must return a non-nil error if the given block is not
-       // found or the timestamp could not be retrieved.
-       Mtime(loc string) (time.Time, error)
-
-       // IndexTo writes a complete list of locators with the given
-       // prefix for which Get() can retrieve data.
-       //
-       // prefix consists of zero or more lowercase hexadecimal
-       // digits.
-       //
-       // Each locator must be written to the given writer using the
-       // following format:
-       //
-       //   loc "+" size " " timestamp "\n"
-       //
-       // where:
-       //
-       //   - size is the number of bytes of content, given as a
-       //     decimal number with one or more digits
-       //
-       //   - timestamp is the timestamp stored for the locator,
-       //     given as a decimal number of seconds after January 1,
-       //     1970 UTC.
-       //
-       // IndexTo must not write any other data to writer: for
-       // example, it must not write any blank lines.
-       //
-       // If an error makes it impossible to provide a complete
-       // index, IndexTo must return a non-nil error. It is
-       // acceptable to return a non-nil error after writing a
-       // partial index to writer.
-       //
-       // The resulting index is not expected to be sorted in any
-       // particular order.
-       IndexTo(prefix string, writer io.Writer) error
-
-       // Trash moves the block data from the underlying storage
-       // device to trash area. The block then stays in trash for
-       // BlobTrashLifetime before it is actually deleted.
-       //
-       // loc is as described in Get.
-       //
-       // If the timestamp for the given locator is newer than
-       // BlobSigningTTL, Trash must not trash the data.
-       //
-       // If a Trash operation overlaps with any Touch or Put
-       // operations on the same locator, the implementation must
-       // ensure one of the following outcomes:
-       //
-       //   - Touch and Put return a non-nil error, or
-       //   - Trash does not trash the block, or
-       //   - Both of the above.
-       //
-       // If it is possible for the storage device to be accessed by
-       // a different process or host, the synchronization mechanism
-       // should also guard against races with other processes and
-       // hosts. If such a mechanism is not available, there must be
-       // a mechanism for detecting unsafe configurations, alerting
-       // the operator, and aborting or falling back to a read-only
-       // state. In other words, running multiple keepstore processes
-       // with the same underlying storage device must either work
-       // reliably or fail outright.
-       //
-       // Corollary: A successful Touch or Put guarantees a block
-       // will not be trashed for at least BlobSigningTTL seconds.
-       Trash(loc string) error
-
-       // Untrash moves block from trash back into store
-       Untrash(loc string) error
-
-       // Status returns a *VolumeStatus representing the current
-       // in-use and available storage capacity and an
-       // implementation-specific volume identifier (e.g., "mount
-       // point" for a UnixVolume).
-       Status() *VolumeStatus
-
-       // String returns an identifying label for this volume,
-       // suitable for including in log messages. It should contain
-       // enough information to uniquely identify the underlying
-       // storage device, but should not contain any credentials or
-       // secrets.
-       String() string
-
-       // EmptyTrash looks for trashed blocks that exceeded
-       // BlobTrashLifetime and deletes them from the volume.
+// volume is the interface to a back-end storage device.
+type volume interface {
+       BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+       BlockWrite(ctx context.Context, hash string, data []byte) error
+       DeviceID() string
+       BlockTouch(hash string) error
+       BlockTrash(hash string) error
+       BlockUntrash(hash string) error
+       Index(ctx context.Context, prefix string, writeTo io.Writer) error
+       Mtime(hash string) (time.Time, error)
        EmptyTrash()
-
-       // Return a globally unique ID of the underlying storage
-       // device if possible, otherwise "".
-       GetDeviceID() string
-}
-
-// A VolumeWithExamples provides example configs to display in the
-// -help message.
-type VolumeWithExamples interface {
-       Volume
-       Examples() []Volume
 }
 
-// A VolumeManager tells callers which volumes can read, which volumes
-// can write, and on which volume the next write should be attempted.
-type VolumeManager interface {
-       // Mounts returns all mounts (volume attachments).
-       Mounts() []*VolumeMount
-
-       // Lookup returns the mount with the given UUID. Returns nil
-       // if the mount does not exist. If write==true, returns nil if
-       // the mount is not writable.
-       Lookup(uuid string, write bool) *VolumeMount
-
-       // AllReadable returns all mounts.
-       AllReadable() []*VolumeMount
-
-       // AllWritable returns all mounts that aren't known to be in
-       // a read-only state. (There is no guarantee that a write to
-       // one will succeed, though.)
-       AllWritable() []*VolumeMount
-
-       // NextWritable returns the volume where the next new block
-       // should be written. A VolumeManager can select a volume in
-       // order to distribute activity across spindles, fill up disks
-       // with more free space, etc.
-       NextWritable() *VolumeMount
-
-       // VolumeStats returns the ioStats used for tracking stats for
-       // the given Volume.
-       VolumeStats(Volume) *ioStats
-
-       // Close shuts down the volume manager cleanly.
-       Close()
-}
-
-// A VolumeMount is an attachment of a Volume to a VolumeManager.
-type VolumeMount struct {
-       arvados.KeepMount
-       Volume
-}
-
-// Generate a UUID the way API server would for a "KeepVolumeMount"
-// object.
-func (*VolumeMount) generateUUID() string {
-       var max big.Int
-       _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
-       if !ok {
-               panic("big.Int parse failed")
-       }
-       r, err := rand.Int(rand.Reader, &max)
-       if err != nil {
-               panic(err)
-       }
-       return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
-}
-
-// RRVolumeManager is a round-robin VolumeManager: the Nth call to
-// NextWritable returns the (N % len(writables))th writable Volume
-// (where writables are all Volumes v where v.Writable()==true).
-type RRVolumeManager struct {
-       mounts    []*VolumeMount
-       mountMap  map[string]*VolumeMount
-       readables []*VolumeMount
-       writables []*VolumeMount
-       counter   uint32
-       iostats   map[Volume]*ioStats
-}
-
-func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) {
-       vm := &RRVolumeManager{
-               iostats: make(map[Volume]*ioStats),
-       }
-       vm.mountMap = make(map[string]*VolumeMount)
-       for uuid, cfgvol := range cluster.Volumes {
-               va, ok := cfgvol.AccessViaHosts[myURL]
-               if !ok && len(cfgvol.AccessViaHosts) > 0 {
-                       continue
-               }
-               dri, ok := driver[cfgvol.Driver]
-               if !ok {
-                       return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
-               }
-               vol, err := dri(cluster, cfgvol, logger, metrics)
-               if err != nil {
-                       return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err)
-               }
-               sc := cfgvol.StorageClasses
-               if len(sc) == 0 {
-                       sc = map[string]bool{"default": true}
-               }
-               repl := cfgvol.Replication
-               if repl < 1 {
-                       repl = 1
-               }
-               mnt := &VolumeMount{
-                       KeepMount: arvados.KeepMount{
-                               UUID:           uuid,
-                               DeviceID:       vol.GetDeviceID(),
-                               AllowWrite:     !va.ReadOnly && !cfgvol.ReadOnly,
-                               AllowTrash:     !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
-                               Replication:    repl,
-                               StorageClasses: sc,
-                       },
-                       Volume: vol,
-               }
-               vm.iostats[vol] = &ioStats{}
-               vm.mounts = append(vm.mounts, mnt)
-               vm.mountMap[uuid] = mnt
-               vm.readables = append(vm.readables, mnt)
-               if mnt.KeepMount.AllowWrite {
-                       vm.writables = append(vm.writables, mnt)
-               }
-               logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol, mnt.AllowWrite, mnt.AllowTrash)
-       }
-       // pri(mnt): return highest priority of any storage class
-       // offered by mnt
-       pri := func(mnt *VolumeMount) int {
-               any, best := false, 0
-               for class := range mnt.KeepMount.StorageClasses {
-                       if p := cluster.StorageClasses[class].Priority; !any || best < p {
-                               best = p
-                               any = true
-                       }
-               }
-               return best
-       }
-       // less(a,b): sort first by highest priority of any offered
-       // storage class (highest->lowest), then by volume UUID
-       less := func(a, b *VolumeMount) bool {
-               if pa, pb := pri(a), pri(b); pa != pb {
-                       return pa > pb
-               } else {
-                       return a.KeepMount.UUID < b.KeepMount.UUID
-               }
-       }
-       sort.Slice(vm.readables, func(i, j int) bool {
-               return less(vm.readables[i], vm.readables[j])
-       })
-       sort.Slice(vm.writables, func(i, j int) bool {
-               return less(vm.writables[i], vm.writables[j])
-       })
-       sort.Slice(vm.mounts, func(i, j int) bool {
-               return less(vm.mounts[i], vm.mounts[j])
-       })
-       return vm, nil
-}
-
-func (vm *RRVolumeManager) Mounts() []*VolumeMount {
-       return vm.mounts
-}
-
-func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount {
-       if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || mnt.AllowWrite) {
-               return mnt
-       }
-       return nil
-}
-
-// AllReadable returns an array of all readable volumes
-func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
-       return vm.readables
-}
-
-// AllWritable returns writable volumes, sorted by priority/uuid. Used
-// by CompareAndTouch to ensure higher-priority volumes are checked
-// first.
-func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
-       return vm.writables
-}
-
-// NextWritable returns writable volumes, rotated by vm.counter so
-// each volume gets a turn to be first. Used by PutBlock to distribute
-// new data across available volumes.
-func (vm *RRVolumeManager) NextWritable() []*VolumeMount {
-       if len(vm.writables) == 0 {
-               return nil
-       }
-       offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables)
-       return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...)
-}
-
-// VolumeStats returns an ioStats for the given volume.
-func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
-       return vm.iostats[v]
-}
-
-// Close the RRVolumeManager
-func (vm *RRVolumeManager) Close() {
-}
+type volumeDriver func(newVolumeParams) (volume, error)
 
-// VolumeStatus describes the current condition of a volume
-type VolumeStatus struct {
-       MountPoint string
-       DeviceNum  uint64
-       BytesFree  uint64
-       BytesUsed  uint64
+type newVolumeParams struct {
+       UUID         string
+       Cluster      *arvados.Cluster
+       ConfigVolume arvados.Volume
+       Logger       logrus.FieldLogger
+       MetricsVecs  *volumeMetricsVecs
+       BufferPool   *bufferPool
 }
 
 // ioStats tracks I/O statistics for a volume or server