//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"context"
"fmt"
"io"
"math/big"
+ "sort"
"sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
)
type BlockWriter interface {
ReadBlock(ctx context.Context, loc string, w io.Writer) error
}
+var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){}
+
// A Volume is an interface representing a Keep back-end storage unit:
// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
// etc.
type Volume interface {
- // Volume type as specified in config file. Examples: "S3",
- // "Directory".
- Type() string
-
- // Do whatever private setup tasks and configuration checks
- // are needed. Return non-nil if the volume is unusable (e.g.,
- // invalid config).
- Start(vm *volumeMetricsVecs) error
-
// Get a block: copy the block data into buf, and return the
// number of bytes copied.
//
// Trash moves the block data from the underlying storage
// device to trash area. The block then stays in trash for
- // -trash-lifetime interval before it is actually deleted.
+ // BlobTrashLifetime before it is actually deleted.
//
// loc is as described in Get.
//
// If the timestamp for the given locator is newer than
- // BlobSignatureTTL, Trash must not trash the data.
+ // BlobSigningTTL, Trash must not trash the data.
//
// If a Trash operation overlaps with any Touch or Put
// operations on the same locator, the implementation must
// reliably or fail outright.
//
// Corollary: A successful Touch or Put guarantees a block
- // will not be trashed for at least BlobSignatureTTL
- // seconds.
+ // will not be trashed for at least BlobSigningTTL seconds.
Trash(loc string) error
// Untrash moves block from trash back into store
// secrets.
String() string
- // Writable returns false if all future Put, Mtime, and Delete
- // calls are expected to fail.
- //
- // If the volume is only temporarily unwritable -- or if Put
- // will fail because it is full, but Mtime or Delete can
- // succeed -- then Writable should return false.
- Writable() bool
-
- // Replication returns the storage redundancy of the
- // underlying device. It will be passed on to clients in
- // responses to PUT requests.
- Replication() int
-
- // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
- // and deletes them from the volume.
+ // EmptyTrash looks for trashed blocks that exceeded
+ // BlobTrashLifetime and deletes them from the volume.
EmptyTrash()
// Return a globally unique ID of the underlying storage
// device if possible, otherwise "".
- DeviceID() string
-
- // Get the storage classes associated with this volume
- GetStorageClasses() []string
+ GetDeviceID() string
}
// A VolumeWithExamples provides example configs to display in the
// Mounts returns all mounts (volume attachments).
Mounts() []*VolumeMount
- // Lookup returns the volume under the given mount
- // UUID. Returns nil if the mount does not exist. If
- // write==true, returns nil if the volume is not writable.
- Lookup(uuid string, write bool) Volume
+ // Lookup returns the mount with the given UUID. Returns nil
+ // if the mount does not exist. If write==true, returns nil if
+ // the mount is not writable.
+ Lookup(uuid string, write bool) *VolumeMount
- // AllReadable returns all volumes.
- AllReadable() []Volume
+ // AllReadable returns all mounts.
+ AllReadable() []*VolumeMount
- // AllWritable returns all volumes that aren't known to be in
+ // AllWritable returns all mounts that aren't known to be in
// a read-only state. (There is no guarantee that a write to
// one will succeed, though.)
- AllWritable() []Volume
+ AllWritable() []*VolumeMount
// NextWritable returns the volume where the next new block
// should be written. A VolumeManager can select a volume in
// order to distribute activity across spindles, fill up disks
// with more free space, etc.
- NextWritable() Volume
+ NextWritable() *VolumeMount
// VolumeStats returns the ioStats used for tracking stats for
// the given Volume.
// A VolumeMount is an attachment of a Volume to a VolumeManager.
type VolumeMount struct {
arvados.KeepMount
- volume Volume
+ Volume
}
// Generate a UUID the way API server would for a "KeepVolumeMount"
type RRVolumeManager struct {
mounts []*VolumeMount
mountMap map[string]*VolumeMount
- readables []Volume
- writables []Volume
+ readables []*VolumeMount
+ writables []*VolumeMount
counter uint32
iostats map[Volume]*ioStats
}
-// MakeRRVolumeManager initializes RRVolumeManager
-func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
+func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) {
vm := &RRVolumeManager{
iostats: make(map[Volume]*ioStats),
}
vm.mountMap = make(map[string]*VolumeMount)
- for _, v := range volumes {
- sc := v.GetStorageClasses()
+ for uuid, cfgvol := range cluster.Volumes {
+ va, ok := cfgvol.AccessViaHosts[myURL]
+ if !ok && len(cfgvol.AccessViaHosts) > 0 {
+ continue
+ }
+ dri, ok := driver[cfgvol.Driver]
+ if !ok {
+ return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
+ }
+ vol, err := dri(cluster, cfgvol, logger, metrics)
+ if err != nil {
+ return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err)
+ }
+ sc := cfgvol.StorageClasses
if len(sc) == 0 {
- sc = []string{"default"}
+ sc = map[string]bool{"default": true}
+ }
+ repl := cfgvol.Replication
+ if repl < 1 {
+ repl = 1
}
mnt := &VolumeMount{
KeepMount: arvados.KeepMount{
- UUID: (*VolumeMount)(nil).generateUUID(),
- DeviceID: v.DeviceID(),
- ReadOnly: !v.Writable(),
- Replication: v.Replication(),
+ UUID: uuid,
+ DeviceID: vol.GetDeviceID(),
+ AllowWrite: !va.ReadOnly && !cfgvol.ReadOnly,
+ AllowTrash: !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
+ Replication: repl,
StorageClasses: sc,
},
- volume: v,
+ Volume: vol,
}
- vm.iostats[v] = &ioStats{}
+ vm.iostats[vol] = &ioStats{}
vm.mounts = append(vm.mounts, mnt)
- vm.mountMap[mnt.UUID] = mnt
- vm.readables = append(vm.readables, v)
- if v.Writable() {
- vm.writables = append(vm.writables, v)
+ vm.mountMap[uuid] = mnt
+ vm.readables = append(vm.readables, mnt)
+ if mnt.KeepMount.AllowWrite {
+ vm.writables = append(vm.writables, mnt)
+ }
+ logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol, mnt.AllowWrite, mnt.AllowTrash)
+ }
+ // pri(mnt): return highest priority of any storage class
+ // offered by mnt
+ pri := func(mnt *VolumeMount) int {
+ any, best := false, 0
+ for class := range mnt.KeepMount.StorageClasses {
+ if p := cluster.StorageClasses[class].Priority; !any || best < p {
+ best = p
+ any = true
+ }
+ }
+ return best
+ }
+ // less(a,b): sort first by highest priority of any offered
+ // storage class (highest->lowest), then by volume UUID
+ less := func(a, b *VolumeMount) bool {
+ if pa, pb := pri(a), pri(b); pa != pb {
+ return pa > pb
+ } else {
+ return a.KeepMount.UUID < b.KeepMount.UUID
}
}
- return vm
+ sort.Slice(vm.readables, func(i, j int) bool {
+ return less(vm.readables[i], vm.readables[j])
+ })
+ sort.Slice(vm.writables, func(i, j int) bool {
+ return less(vm.writables[i], vm.writables[j])
+ })
+ sort.Slice(vm.mounts, func(i, j int) bool {
+ return less(vm.mounts[i], vm.mounts[j])
+ })
+ return vm, nil
}
func (vm *RRVolumeManager) Mounts() []*VolumeMount {
return vm.mounts
}
-func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) Volume {
- if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
- return mnt.volume
- } else {
- return nil
+func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount {
+ if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || mnt.AllowWrite) {
+ return mnt
}
+ return nil
}
// AllReadable returns an array of all readable volumes
-func (vm *RRVolumeManager) AllReadable() []Volume {
+func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
return vm.readables
}
-// AllWritable returns an array of all writable volumes
-func (vm *RRVolumeManager) AllWritable() []Volume {
+// AllWritable returns writable volumes, sorted by priority/uuid. Used
+// by CompareAndTouch to ensure higher-priority volumes are checked
+// first.
+func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
return vm.writables
}
-// NextWritable returns the next writable
-func (vm *RRVolumeManager) NextWritable() Volume {
+// NextWritable returns writable volumes, rotated by vm.counter so
+// each volume gets a turn to be first. Used by PutBlock to distribute
+// new data across available volumes.
+func (vm *RRVolumeManager) NextWritable() []*VolumeMount {
if len(vm.writables) == 0 {
return nil
}
- i := atomic.AddUint32(&vm.counter, 1)
- return vm.writables[i%uint32(len(vm.writables))]
+ offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables)
+ return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...)
}
// VolumeStats returns an ioStats for the given volume.