//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"context"
- "crypto/rand"
- "fmt"
"io"
- "math/big"
- "sync/atomic"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
)
-type BlockWriter interface {
- // WriteBlock reads all data from r, writes it to a backing
- // store as "loc", and returns the number of bytes written.
- WriteBlock(ctx context.Context, loc string, r io.Reader) error
-}
-
-type BlockReader interface {
- // ReadBlock retrieves data previously stored as "loc" and
- // writes it to w.
- ReadBlock(ctx context.Context, loc string, w io.Writer) error
-}
-
-var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){}
-
-// A Volume is an interface representing a Keep back-end storage unit:
-// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
-// etc.
-type Volume interface {
- // Get a block: copy the block data into buf, and return the
- // number of bytes copied.
- //
- // loc is guaranteed to consist of 32 or more lowercase hex
- // digits.
- //
- // Get should not verify the integrity of the data: it should
- // just return whatever was found in its backing
- // store. (Integrity checking is the caller's responsibility.)
- //
- // If an error is encountered that prevents it from
- // retrieving the data, that error should be returned so the
- // caller can log (and send to the client) a more useful
- // message.
- //
- // If the error is "not found", and there's no particular
- // reason to expect the block to be found (other than that a
- // caller is asking for it), the returned error should satisfy
- // os.IsNotExist(err): this is a normal condition and will not
- // be logged as an error (except that a 404 will appear in the
- // access log if the block is not found on any other volumes
- // either).
- //
- // If the data in the backing store is bigger than len(buf),
- // then Get is permitted to return an error without reading
- // any of the data.
- //
- // len(buf) will not exceed BlockSize.
- Get(ctx context.Context, loc string, buf []byte) (int, error)
-
- // Compare the given data with the stored data (i.e., what Get
- // would return). If equal, return nil. If not, return
- // CollisionError or DiskHashError (depending on whether the
- // data on disk matches the expected hash), or whatever error
- // was encountered opening/reading the stored data.
- Compare(ctx context.Context, loc string, data []byte) error
-
- // Put writes a block to an underlying storage device.
- //
- // loc is as described in Get.
- //
- // len(block) is guaranteed to be between 0 and BlockSize.
- //
- // If a block is already stored under the same name (loc) with
- // different content, Put must either overwrite the existing
- // data with the new data or return a non-nil error. When
- // overwriting existing data, it must never leave the storage
- // device in an inconsistent state: a subsequent call to Get
- // must return either the entire old block, the entire new
- // block, or an error. (An implementation that cannot peform
- // atomic updates must leave the old data alone and return an
- // error.)
- //
- // Put also sets the timestamp for the given locator to the
- // current time.
- //
- // Put must return a non-nil error unless it can guarantee
- // that the entire block has been written and flushed to
- // persistent storage, and that its timestamp is current. Of
- // course, this guarantee is only as good as the underlying
- // storage device, but it is Put's responsibility to at least
- // get whatever guarantee is offered by the storage device.
+// volume is the interface to a back-end storage device.
+type volume interface {
+ // Return a unique identifier for the backend device. If
+ // possible, this should be chosen such that keepstore
+ // processes running on different hosts, and accessing the
+ // same backend device, will return the same string.
//
- // Put should not verify that loc==hash(block): this is the
- // caller's responsibility.
- Put(ctx context.Context, loc string, block []byte) error
+ // This helps keep-balance avoid redundantly downloading
+ // multiple index listings for the same backend device.
+ DeviceID() string
- // Touch sets the timestamp for the given locator to the
- // current time.
- //
- // loc is as described in Get.
- //
- // If invoked at time t0, Touch must guarantee that a
- // subsequent call to Mtime will return a timestamp no older
- // than {t0 minus one second}. For example, if Touch is called
- // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
- // subsequent Mtime to return any of the following:
+ // Copy a block from the backend device to writeTo.
//
- // - 2015-07-07T01:23:45.00000000Z
- // - 2015-07-07T01:23:45.67890123Z
- // - 2015-07-07T01:23:46.67890123Z
- // - 2015-07-08T00:00:00.00000000Z
+ // As with all volume methods, the hash argument is a
+ // 32-character hexadecimal string.
//
- // It is not acceptable for a subsequente Mtime to return
- // either of the following:
+ // Data can be written to writeTo in any order, and concurrent
+ // calls to writeTo.WriteAt() are allowed. However, BlockRead
+ // must not do multiple writes that intersect with any given
+ // byte offset.
//
- // - 2015-07-07T00:00:00.00000000Z -- ERROR
- // - 2015-07-07T01:23:44.00000000Z -- ERROR
+ // BlockRead is not expected to verify data integrity.
//
- // Touch must return a non-nil error if the timestamp cannot
- // be updated.
- Touch(loc string) error
+ // If the indicated block does not exist, or has been trashed,
+ // BlockRead must return os.ErrNotExist.
+ BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error
- // Mtime returns the stored timestamp for the given locator.
+ // Store a block on the backend device, and set its timestamp
+ // to the current time.
//
- // loc is as described in Get.
- //
- // Mtime must return a non-nil error if the given block is not
- // found or the timestamp could not be retrieved.
- Mtime(loc string) (time.Time, error)
+ // The implementation must ensure that regardless of any
+ // errors encountered while writing, a partially written block
+ // is not left behind: a subsequent BlockRead call must return
+ // either a) the data previously stored under the given hash,
+ // if any, or b) os.ErrNotExist.
+ BlockWrite(ctx context.Context, hash string, data []byte) error
- // IndexTo writes a complete list of locators with the given
- // prefix for which Get() can retrieve data.
- //
- // prefix consists of zero or more lowercase hexadecimal
- // digits.
- //
- // Each locator must be written to the given writer using the
- // following format:
- //
- // loc "+" size " " timestamp "\n"
- //
- // where:
- //
- // - size is the number of bytes of content, given as a
- // decimal number with one or more digits
- //
- // - timestamp is the timestamp stored for the locator,
- // given as a decimal number of seconds after January 1,
- // 1970 UTC.
- //
- // IndexTo must not write any other data to writer: for
- // example, it must not write any blank lines.
- //
- // If an error makes it impossible to provide a complete
- // index, IndexTo must return a non-nil error. It is
- // acceptable to return a non-nil error after writing a
- // partial index to writer.
- //
- // The resulting index is not expected to be sorted in any
- // particular order.
- IndexTo(prefix string, writer io.Writer) error
-
- // Trash moves the block data from the underlying storage
- // device to trash area. The block then stays in trash for
- // BlobTrashLifetime before it is actually deleted.
- //
- // loc is as described in Get.
- //
- // If the timestamp for the given locator is newer than
- // BlobSigningTTL, Trash must not trash the data.
- //
- // If a Trash operation overlaps with any Touch or Put
- // operations on the same locator, the implementation must
- // ensure one of the following outcomes:
- //
- // - Touch and Put return a non-nil error, or
- // - Trash does not trash the block, or
- // - Both of the above.
- //
- // If it is possible for the storage device to be accessed by
- // a different process or host, the synchronization mechanism
- // should also guard against races with other processes and
- // hosts. If such a mechanism is not available, there must be
- // a mechanism for detecting unsafe configurations, alerting
- // the operator, and aborting or falling back to a read-only
- // state. In other words, running multiple keepstore processes
- // with the same underlying storage device must either work
- // reliably or fail outright.
- //
- // Corollary: A successful Touch or Put guarantees a block
- // will not be trashed for at least BlobSigningTTL seconds.
- Trash(loc string) error
+ // Update the indicated block's stored timestamp to the
+ // current time.
+ BlockTouch(hash string) error
- // Untrash moves block from trash back into store
- Untrash(loc string) error
+ // Return the indicated block's stored timestamp.
+ Mtime(hash string) (time.Time, error)
- // Status returns a *VolumeStatus representing the current
- // in-use and available storage capacity and an
- // implementation-specific volume identifier (e.g., "mount
- // point" for a UnixVolume).
- Status() *VolumeStatus
+ // Mark the indicated block as trash, such that -- unless it
+ // is untrashed before time.Now() + BlobTrashLifetime --
+ // BlockRead returns os.ErrNotExist and the block is not
+ // listed by Index.
+ BlockTrash(hash string) error
- // String returns an identifying label for this volume,
- // suitable for including in log messages. It should contain
- // enough information to uniquely identify the underlying
- // storage device, but should not contain any credentials or
- // secrets.
- String() string
+ // Un-mark the indicated block as trash. If the block has not
+ // been trashed, return os.ErrNotExist.
+ BlockUntrash(hash string) error
- // EmptyTrash looks for trashed blocks that exceeded
- // BlobTrashLifetime and deletes them from the volume.
+ // Permanently delete all blocks that have been marked as
+ // trash for BlobTrashLifetime or longer.
EmptyTrash()
- // Return a globally unique ID of the underlying storage
- // device if possible, otherwise "".
- GetDeviceID() string
-}
-
-// A VolumeWithExamples provides example configs to display in the
-// -help message.
-type VolumeWithExamples interface {
- Volume
- Examples() []Volume
-}
-
-// A VolumeManager tells callers which volumes can read, which volumes
-// can write, and on which volume the next write should be attempted.
-type VolumeManager interface {
- // Mounts returns all mounts (volume attachments).
- Mounts() []*VolumeMount
-
- // Lookup returns the mount with the given UUID. Returns nil
- // if the mount does not exist. If write==true, returns nil if
- // the mount is not writable.
- Lookup(uuid string, write bool) *VolumeMount
-
- // AllReadable returns all mounts.
- AllReadable() []*VolumeMount
-
- // AllWritable returns all mounts that aren't known to be in
- // a read-only state. (There is no guarantee that a write to
- // one will succeed, though.)
- AllWritable() []*VolumeMount
-
- // NextWritable returns the volume where the next new block
- // should be written. A VolumeManager can select a volume in
- // order to distribute activity across spindles, fill up disks
- // with more free space, etc.
- NextWritable() *VolumeMount
-
- // VolumeStats returns the ioStats used for tracking stats for
- // the given Volume.
- VolumeStats(Volume) *ioStats
-
- // Close shuts down the volume manager cleanly.
- Close()
-}
-
-// A VolumeMount is an attachment of a Volume to a VolumeManager.
-type VolumeMount struct {
- arvados.KeepMount
- Volume
-}
-
-// Generate a UUID the way API server would for a "KeepVolumeMount"
-// object.
-func (*VolumeMount) generateUUID() string {
- var max big.Int
- _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
- if !ok {
- panic("big.Int parse failed")
- }
- r, err := rand.Int(rand.Reader, &max)
- if err != nil {
- panic(err)
- }
- return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
-}
-
-// RRVolumeManager is a round-robin VolumeManager: the Nth call to
-// NextWritable returns the (N % len(writables))th writable Volume
-// (where writables are all Volumes v where v.Writable()==true).
-type RRVolumeManager struct {
- mounts []*VolumeMount
- mountMap map[string]*VolumeMount
- readables []*VolumeMount
- writables []*VolumeMount
- counter uint32
- iostats map[Volume]*ioStats
-}
-
-func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) {
- vm := &RRVolumeManager{
- iostats: make(map[Volume]*ioStats),
- }
- vm.mountMap = make(map[string]*VolumeMount)
- for uuid, cfgvol := range cluster.Volumes {
- va, ok := cfgvol.AccessViaHosts[myURL]
- if !ok && len(cfgvol.AccessViaHosts) > 0 {
- continue
- }
- dri, ok := driver[cfgvol.Driver]
- if !ok {
- return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
- }
- vol, err := dri(cluster, cfgvol, logger, metrics)
- if err != nil {
- return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err)
- }
- logger.Printf("started volume %s (%s), ReadOnly=%v", uuid, vol, cfgvol.ReadOnly)
-
- sc := cfgvol.StorageClasses
- if len(sc) == 0 {
- sc = map[string]bool{"default": true}
- }
- repl := cfgvol.Replication
- if repl < 1 {
- repl = 1
- }
- mnt := &VolumeMount{
- KeepMount: arvados.KeepMount{
- UUID: uuid,
- DeviceID: vol.GetDeviceID(),
- ReadOnly: cfgvol.ReadOnly || va.ReadOnly,
- Replication: repl,
- StorageClasses: sc,
- },
- Volume: vol,
- }
- vm.iostats[vol] = &ioStats{}
- vm.mounts = append(vm.mounts, mnt)
- vm.mountMap[uuid] = mnt
- vm.readables = append(vm.readables, mnt)
- if !mnt.KeepMount.ReadOnly {
- vm.writables = append(vm.writables, mnt)
- }
- }
- return vm, nil
-}
-
-func (vm *RRVolumeManager) Mounts() []*VolumeMount {
- return vm.mounts
-}
-
-func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount {
- if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
- return mnt
- } else {
- return nil
- }
-}
-
-// AllReadable returns an array of all readable volumes
-func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
- return vm.readables
-}
-
-// AllWritable returns an array of all writable volumes
-func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
- return vm.writables
-}
-
-// NextWritable returns the next writable
-func (vm *RRVolumeManager) NextWritable() *VolumeMount {
- if len(vm.writables) == 0 {
- return nil
- }
- i := atomic.AddUint32(&vm.counter, 1)
- return vm.writables[i%uint32(len(vm.writables))]
-}
-
-// VolumeStats returns an ioStats for the given volume.
-func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
- return vm.iostats[v]
+ // Write an index of all non-trashed blocks available on the
+ // backend device whose hash begins with the given prefix
+ // (prefix is a string of zero or more hexadecimal digits).
+ //
+ // Each block is written as "{hash}+{size} {timestamp}\n"
+ // where timestamp is a decimal-formatted number of
+ // nanoseconds since the UTC Unix epoch.
+ //
+ // Index should abort and return ctx.Err() if ctx is cancelled
+ // before indexing is complete.
+ Index(ctx context.Context, prefix string, writeTo io.Writer) error
}
-// Close the RRVolumeManager
-func (vm *RRVolumeManager) Close() {
-}
+type volumeDriver func(newVolumeParams) (volume, error)
-// VolumeStatus describes the current condition of a volume
-type VolumeStatus struct {
- MountPoint string
- DeviceNum uint64
- BytesFree uint64
- BytesUsed uint64
+type newVolumeParams struct {
+ UUID string
+ Cluster *arvados.Cluster
+ ConfigVolume arvados.Volume
+ Logger logrus.FieldLogger
+ MetricsVecs *volumeMetricsVecs
+ BufferPool *bufferPool
}
// ioStats tracks I/O statistics for a volume or server
InBytes uint64
OutBytes uint64
}
-
-type InternalStatser interface {
- InternalStats() interface{}
-}