-// A Volume is an interface representing a Keep back-end storage unit:
-// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
-// etc.
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
package main
import (
- "errors"
+ "context"
+ "crypto/rand"
"fmt"
- "os"
- "strings"
+ "io"
+ "math/big"
+ "sync/atomic"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
+type BlockWriter interface {
+ // WriteBlock reads all data from r, writes it to a backing
+ // store as "loc", and returns the number of bytes written.
+ WriteBlock(ctx context.Context, loc string, r io.Reader) error
+}
+
+type BlockReader interface {
+ // ReadBlock retrieves data previously stored as "loc" and
+ // writes it to w.
+ ReadBlock(ctx context.Context, loc string, w io.Writer) error
+}
+
+// A Volume is an interface representing a Keep back-end storage unit:
+// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
+// etc.
type Volume interface {
- Get(loc string) ([]byte, error)
- Put(loc string, block []byte) error
+ // Volume type as specified in config file. Examples: "S3",
+ // "Directory".
+ Type() string
+
+ // Do whatever private setup tasks and configuration checks
+ // are needed. Return non-nil if the volume is unusable (e.g.,
+ // invalid config).
+ Start() error
+
+ // Get a block: copy the block data into buf, and return the
+ // number of bytes copied.
+ //
+ // loc is guaranteed to consist of 32 or more lowercase hex
+ // digits.
+ //
+ // Get should not verify the integrity of the data: it should
+ // just return whatever was found in its backing
+ // store. (Integrity checking is the caller's responsibility.)
+ //
+ // If an error is encountered that prevents it from
+ // retrieving the data, that error should be returned so the
+ // caller can log (and send to the client) a more useful
+ // message.
+ //
+ // If the error is "not found", and there's no particular
+ // reason to expect the block to be found (other than that a
+ // caller is asking for it), the returned error should satisfy
+ // os.IsNotExist(err): this is a normal condition and will not
+ // be logged as an error (except that a 404 will appear in the
+ // access log if the block is not found on any other volumes
+ // either).
+ //
+ // If the data in the backing store is bigger than len(buf),
+ // then Get is permitted to return an error without reading
+ // any of the data.
+ //
+ // len(buf) will not exceed BlockSize.
+ Get(ctx context.Context, loc string, buf []byte) (int, error)
+
+ // Compare the given data with the stored data (i.e., what Get
+ // would return). If equal, return nil. If not, return
+ // CollisionError or DiskHashError (depending on whether the
+ // data on disk matches the expected hash), or whatever error
+ // was encountered opening/reading the stored data.
+ Compare(ctx context.Context, loc string, data []byte) error
+
+ // Put writes a block to an underlying storage device.
+ //
+ // loc is as described in Get.
+ //
+ // len(block) is guaranteed to be between 0 and BlockSize.
+ //
+ // If a block is already stored under the same name (loc) with
+ // different content, Put must either overwrite the existing
+ // data with the new data or return a non-nil error. When
+ // overwriting existing data, it must never leave the storage
+ // device in an inconsistent state: a subsequent call to Get
+ // must return either the entire old block, the entire new
+ // block, or an error. (An implementation that cannot peform
+ // atomic updates must leave the old data alone and return an
+ // error.)
+ //
+ // Put also sets the timestamp for the given locator to the
+ // current time.
+ //
+ // Put must return a non-nil error unless it can guarantee
+ // that the entire block has been written and flushed to
+ // persistent storage, and that its timestamp is current. Of
+ // course, this guarantee is only as good as the underlying
+ // storage device, but it is Put's responsibility to at least
+ // get whatever guarantee is offered by the storage device.
+ //
+ // Put should not verify that loc==hash(block): this is the
+ // caller's responsibility.
+ Put(ctx context.Context, loc string, block []byte) error
+
+ // Touch sets the timestamp for the given locator to the
+ // current time.
+ //
+ // loc is as described in Get.
+ //
+ // If invoked at time t0, Touch must guarantee that a
+ // subsequent call to Mtime will return a timestamp no older
+ // than {t0 minus one second}. For example, if Touch is called
+ // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
+ // subsequent Mtime to return any of the following:
+ //
+ // - 2015-07-07T01:23:45.00000000Z
+ // - 2015-07-07T01:23:45.67890123Z
+ // - 2015-07-07T01:23:46.67890123Z
+ // - 2015-07-08T00:00:00.00000000Z
+ //
+ // It is not acceptable for a subsequente Mtime to return
+ // either of the following:
+ //
+ // - 2015-07-07T00:00:00.00000000Z -- ERROR
+ // - 2015-07-07T01:23:44.00000000Z -- ERROR
+ //
+ // Touch must return a non-nil error if the timestamp cannot
+ // be updated.
Touch(loc string) error
+
+ // Mtime returns the stored timestamp for the given locator.
+ //
+ // loc is as described in Get.
+ //
+ // Mtime must return a non-nil error if the given block is not
+ // found or the timestamp could not be retrieved.
Mtime(loc string) (time.Time, error)
- Index(prefix string) string
- Delete(loc string) error
+
+ // IndexTo writes a complete list of locators with the given
+ // prefix for which Get() can retrieve data.
+ //
+ // prefix consists of zero or more lowercase hexadecimal
+ // digits.
+ //
+ // Each locator must be written to the given writer using the
+ // following format:
+ //
+ // loc "+" size " " timestamp "\n"
+ //
+ // where:
+ //
+ // - size is the number of bytes of content, given as a
+ // decimal number with one or more digits
+ //
+ // - timestamp is the timestamp stored for the locator,
+ // given as a decimal number of seconds after January 1,
+ // 1970 UTC.
+ //
+ // IndexTo must not write any other data to writer: for
+ // example, it must not write any blank lines.
+ //
+ // If an error makes it impossible to provide a complete
+ // index, IndexTo must return a non-nil error. It is
+ // acceptable to return a non-nil error after writing a
+ // partial index to writer.
+ //
+ // The resulting index is not expected to be sorted in any
+ // particular order.
+ IndexTo(prefix string, writer io.Writer) error
+
+ // Trash moves the block data from the underlying storage
+ // device to trash area. The block then stays in trash for
+ // -trash-lifetime interval before it is actually deleted.
+ //
+ // loc is as described in Get.
+ //
+ // If the timestamp for the given locator is newer than
+ // BlobSignatureTTL, Trash must not trash the data.
+ //
+ // If a Trash operation overlaps with any Touch or Put
+ // operations on the same locator, the implementation must
+ // ensure one of the following outcomes:
+ //
+ // - Touch and Put return a non-nil error, or
+ // - Trash does not trash the block, or
+ // - Both of the above.
+ //
+ // If it is possible for the storage device to be accessed by
+ // a different process or host, the synchronization mechanism
+ // should also guard against races with other processes and
+ // hosts. If such a mechanism is not available, there must be
+ // a mechanism for detecting unsafe configurations, alerting
+ // the operator, and aborting or falling back to a read-only
+ // state. In other words, running multiple keepstore processes
+ // with the same underlying storage device must either work
+ // reliably or fail outright.
+ //
+ // Corollary: A successful Touch or Put guarantees a block
+ // will not be trashed for at least BlobSignatureTTL
+ // seconds.
+ Trash(loc string) error
+
+ // Untrash moves block from trash back into store
+ Untrash(loc string) error
+
+ // Status returns a *VolumeStatus representing the current
+ // in-use and available storage capacity and an
+ // implementation-specific volume identifier (e.g., "mount
+ // point" for a UnixVolume).
Status() *VolumeStatus
+
+ // String returns an identifying label for this volume,
+ // suitable for including in log messages. It should contain
+ // enough information to uniquely identify the underlying
+ // storage device, but should not contain any credentials or
+ // secrets.
String() string
+
+ // Writable returns false if all future Put, Mtime, and Delete
+ // calls are expected to fail.
+ //
+ // If the volume is only temporarily unwritable -- or if Put
+ // will fail because it is full, but Mtime or Delete can
+ // succeed -- then Writable should return false.
+ Writable() bool
+
+ // Replication returns the storage redundancy of the
+ // underlying device. It will be passed on to clients in
+ // responses to PUT requests.
+ Replication() int
+
+ // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
+ // and deletes them from the volume.
+ EmptyTrash()
+
+ // Return a globally unique ID of the underlying storage
+ // device if possible, otherwise "".
+ DeviceID() string
+
+ // Get the storage classes associated with this volume
+ GetStorageClasses() []string
}
-// MockVolumes are Volumes used to test the Keep front end.
-//
-// If the Bad field is true, this volume should return an error
-// on all writes and puts.
-//
-// The Touchable field signifies whether the Touch method will
-// succeed. Defaults to true. Note that Bad and Touchable are
-// independent: a MockVolume may be set up so that Put fails but Touch
-// works or vice versa.
-//
-// TODO(twp): rename Bad to something more descriptive, e.g. Writable,
-// and make sure that the tests that rely on it are testing the right
-// thing. We may need to simulate Writable, Touchable and Corrupt
-// volumes in different ways.
-//
-type MockVolume struct {
- Store map[string][]byte
- Timestamps map[string]time.Time
- Bad bool
- Touchable bool
-}
-
-func CreateMockVolume() *MockVolume {
- return &MockVolume{
- Store: make(map[string][]byte),
- Timestamps: make(map[string]time.Time),
- Bad: false,
- Touchable: true,
- }
+// A VolumeWithExamples provides example configs to display in the
+// -help message.
+type VolumeWithExamples interface {
+ Volume
+ Examples() []Volume
}
-func (v *MockVolume) Get(loc string) ([]byte, error) {
- if v.Bad {
- return nil, errors.New("Bad volume")
- } else if block, ok := v.Store[loc]; ok {
- return block, nil
- }
- return nil, os.ErrNotExist
+// A VolumeManager tells callers which volumes can read, which volumes
+// can write, and on which volume the next write should be attempted.
+type VolumeManager interface {
+ // Mounts returns all mounts (volume attachments).
+ Mounts() []*VolumeMount
+
+ // Lookup returns the volume under the given mount
+ // UUID. Returns nil if the mount does not exist. If
+ // write==true, returns nil if the volume is not writable.
+ Lookup(uuid string, write bool) Volume
+
+ // AllReadable returns all volumes.
+ AllReadable() []Volume
+
+ // AllWritable returns all volumes that aren't known to be in
+ // a read-only state. (There is no guarantee that a write to
+ // one will succeed, though.)
+ AllWritable() []Volume
+
+ // NextWritable returns the volume where the next new block
+ // should be written. A VolumeManager can select a volume in
+ // order to distribute activity across spindles, fill up disks
+ // with more free space, etc.
+ NextWritable() Volume
+
+ // VolumeStats returns the ioStats used for tracking stats for
+ // the given Volume.
+ VolumeStats(Volume) *ioStats
+
+ // Close shuts down the volume manager cleanly.
+ Close()
}
-func (v *MockVolume) Put(loc string, block []byte) error {
- if v.Bad {
- return errors.New("Bad volume")
- }
- v.Store[loc] = block
- return v.Touch(loc)
+// A VolumeMount is an attachment of a Volume to a VolumeManager.
+type VolumeMount struct {
+ arvados.KeepMount
+ volume Volume
}
-func (v *MockVolume) Touch(loc string) error {
- if v.Touchable {
- v.Timestamps[loc] = time.Now()
- return nil
+// Generate a UUID the way API server would for a "KeepVolumeMount"
+// object.
+func (*VolumeMount) generateUUID() string {
+ var max big.Int
+ _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
+ if !ok {
+ panic("big.Int parse failed")
}
- return errors.New("Touch failed")
+ r, err := rand.Int(rand.Reader, &max)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
}
-func (v *MockVolume) Mtime(loc string) (time.Time, error) {
- var mtime time.Time
- var err error
- if v.Bad {
- err = errors.New("Bad volume")
- } else if t, ok := v.Timestamps[loc]; ok {
- mtime = t
- } else {
- err = os.ErrNotExist
- }
- return mtime, err
+// RRVolumeManager is a round-robin VolumeManager: the Nth call to
+// NextWritable returns the (N % len(writables))th writable Volume
+// (where writables are all Volumes v where v.Writable()==true).
+type RRVolumeManager struct {
+ mounts []*VolumeMount
+ mountMap map[string]*VolumeMount
+ readables []Volume
+ writables []Volume
+ counter uint32
+ iostats map[Volume]*ioStats
}
-func (v *MockVolume) Index(prefix string) string {
- var result string
- for loc, block := range v.Store {
- if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
- result = result + fmt.Sprintf("%s+%d %d\n",
- loc, len(block), 123456789)
+// MakeRRVolumeManager initializes RRVolumeManager
+func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
+ vm := &RRVolumeManager{
+ iostats: make(map[Volume]*ioStats),
+ }
+ vm.mountMap = make(map[string]*VolumeMount)
+ for _, v := range volumes {
+ sc := v.GetStorageClasses()
+ if len(sc) == 0 {
+ sc = []string{"default"}
+ }
+ mnt := &VolumeMount{
+ KeepMount: arvados.KeepMount{
+ UUID: (*VolumeMount)(nil).generateUUID(),
+ DeviceID: v.DeviceID(),
+ ReadOnly: !v.Writable(),
+ Replication: v.Replication(),
+ StorageClasses: sc,
+ },
+ volume: v,
+ }
+ vm.iostats[v] = &ioStats{}
+ vm.mounts = append(vm.mounts, mnt)
+ vm.mountMap[mnt.UUID] = mnt
+ vm.readables = append(vm.readables, v)
+ if v.Writable() {
+ vm.writables = append(vm.writables, v)
}
}
- return result
+ return vm
}
-func (v *MockVolume) Delete(loc string) error {
- if _, ok := v.Store[loc]; ok {
- if time.Since(v.Timestamps[loc]) < permission_ttl {
- return nil
- }
- delete(v.Store, loc)
+func (vm *RRVolumeManager) Mounts() []*VolumeMount {
+ return vm.mounts
+}
+
+func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) Volume {
+ if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
+ return mnt.volume
+ } else {
return nil
}
- return os.ErrNotExist
}
-func (v *MockVolume) Status() *VolumeStatus {
- var used uint64
- for _, block := range v.Store {
- used = used + uint64(len(block))
- }
- return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
+// AllReadable returns an array of all readable volumes
+func (vm *RRVolumeManager) AllReadable() []Volume {
+ return vm.readables
}
-func (v *MockVolume) String() string {
- return "[MockVolume]"
+// AllWritable returns an array of all writable volumes
+func (vm *RRVolumeManager) AllWritable() []Volume {
+ return vm.writables
}
-// A VolumeManager manages a collection of volumes.
-//
-// - Volumes is a slice of available Volumes.
-// - Choose() returns a Volume suitable for writing to.
-// - Quit() instructs the VolumeManager to shut down gracefully.
-//
-type VolumeManager interface {
- Volumes() []Volume
- Choose() Volume
- Quit()
+// NextWritable returns the next writable
+func (vm *RRVolumeManager) NextWritable() Volume {
+ if len(vm.writables) == 0 {
+ return nil
+ }
+ i := atomic.AddUint32(&vm.counter, 1)
+ return vm.writables[i%uint32(len(vm.writables))]
}
-type RRVolumeManager struct {
- volumes []Volume
- nextwrite chan Volume
- quit chan int
-}
-
-func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
- // Create a new VolumeManager struct with the specified volumes,
- // and with new Nextwrite and Quit channels.
- // The Quit channel is buffered with a capacity of 1 so that
- // another routine may write to it without blocking.
- vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
-
- // This goroutine implements round-robin volume selection.
- // It sends each available Volume in turn to the Nextwrite
- // channel, until receiving a notification on the Quit channel
- // that it should terminate.
- go func() {
- var i int = 0
- for {
- select {
- case <-vm.quit:
- return
- case vm.nextwrite <- vm.volumes[i]:
- i = (i + 1) % len(vm.volumes)
- }
- }
- }()
+// VolumeStats returns an ioStats for the given volume.
+func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
+ return vm.iostats[v]
+}
- return vm
+// Close the RRVolumeManager
+func (vm *RRVolumeManager) Close() {
}
-func (vm *RRVolumeManager) Volumes() []Volume {
- return vm.volumes
+// VolumeStatus describes the current condition of a volume
+type VolumeStatus struct {
+ MountPoint string
+ DeviceNum uint64
+ BytesFree uint64
+ BytesUsed uint64
}
-func (vm *RRVolumeManager) Choose() Volume {
- return <-vm.nextwrite
+// ioStats tracks I/O statistics for a volume or server
+type ioStats struct {
+ Errors uint64
+ Ops uint64
+ CompareOps uint64
+ GetOps uint64
+ PutOps uint64
+ TouchOps uint64
+ InBytes uint64
+ OutBytes uint64
}
-func (vm *RRVolumeManager) Quit() {
- vm.quit <- 1
+type InternalStatser interface {
+ InternalStats() interface{}
}