X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8e6cd14b7884a691a110110b0f366577437c6d9e..e1df29a3d682b28400ca35e490aa4a95aa564246:/services/keepstore/volume.go diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index 778f27fcde..3f7c9cb79b 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -1,10 +1,21 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "context" + "crypto/rand" + "fmt" "io" + "math/big" + "sort" "sync/atomic" "time" + + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) type BlockWriter interface { @@ -19,19 +30,12 @@ type BlockReader interface { ReadBlock(ctx context.Context, loc string, w io.Writer) error } +var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){} + // A Volume is an interface representing a Keep back-end storage unit: // for example, a single mounted disk, a RAID array, an Amazon S3 volume, // etc. type Volume interface { - // Volume type as specified in config file. Examples: "S3", - // "Directory". - Type() string - - // Do whatever private setup tasks and configuration checks - // are needed. Return non-nil if the volume is unusable (e.g., - // invalid config). - Start() error - // Get a block: copy the block data into buf, and return the // number of bytes copied. // @@ -167,12 +171,12 @@ type Volume interface { // Trash moves the block data from the underlying storage // device to trash area. The block then stays in trash for - // -trash-lifetime interval before it is actually deleted. + // BlobTrashLifetime before it is actually deleted. // // loc is as described in Get. // // If the timestamp for the given locator is newer than - // BlobSignatureTTL, Trash must not trash the data. + // BlobSigningTTL, Trash must not trash the data. // // If a Trash operation overlaps with any Touch or Put // operations on the same locator, the implementation must @@ -193,8 +197,7 @@ type Volume interface { // reliably or fail outright. // // Corollary: A successful Touch or Put guarantees a block - // will not be trashed for at least BlobSignatureTTL - // seconds. + // will not be trashed for at least BlobSigningTTL seconds. Trash(loc string) error // Untrash moves block from trash back into store @@ -213,22 +216,13 @@ type Volume interface { // secrets. String() string - // Writable returns false if all future Put, Mtime, and Delete - // calls are expected to fail. - // - // If the volume is only temporarily unwritable -- or if Put - // will fail because it is full, but Mtime or Delete can - // succeed -- then Writable should return false. - Writable() bool - - // Replication returns the storage redundancy of the - // underlying device. It will be passed on to clients in - // responses to PUT requests. - Replication() int - - // EmptyTrash looks for trashed blocks that exceeded TrashLifetime - // and deletes them from the volume. + // EmptyTrash looks for trashed blocks that exceeded + // BlobTrashLifetime and deletes them from the volume. EmptyTrash() + + // Return a globally unique ID of the underlying storage + // device if possible, otherwise "". + GetDeviceID() string } // A VolumeWithExamples provides example configs to display in the @@ -241,19 +235,27 @@ type VolumeWithExamples interface { // A VolumeManager tells callers which volumes can read, which volumes // can write, and on which volume the next write should be attempted. type VolumeManager interface { - // AllReadable returns all volumes. - AllReadable() []Volume + // Mounts returns all mounts (volume attachments). + Mounts() []*VolumeMount - // AllWritable returns all volumes that aren't known to be in + // Lookup returns the mount with the given UUID. Returns nil + // if the mount does not exist. If write==true, returns nil if + // the mount is not writable. + Lookup(uuid string, write bool) *VolumeMount + + // AllReadable returns all mounts. + AllReadable() []*VolumeMount + + // AllWritable returns all mounts that aren't known to be in // a read-only state. (There is no guarantee that a write to // one will succeed, though.) - AllWritable() []Volume + AllWritable() []*VolumeMount // NextWritable returns the volume where the next new block // should be written. A VolumeManager can select a volume in // order to distribute activity across spindles, fill up disks // with more free space, etc. - NextWritable() Volume + NextWritable() *VolumeMount // VolumeStats returns the ioStats used for tracking stats for // the given Volume. @@ -263,48 +265,147 @@ type VolumeManager interface { Close() } +// A VolumeMount is an attachment of a Volume to a VolumeManager. +type VolumeMount struct { + arvados.KeepMount + Volume +} + +// Generate a UUID the way API server would for a "KeepVolumeMount" +// object. +func (*VolumeMount) generateUUID() string { + var max big.Int + _, ok := max.SetString("zzzzzzzzzzzzzzz", 36) + if !ok { + panic("big.Int parse failed") + } + r, err := rand.Int(rand.Reader, &max) + if err != nil { + panic(err) + } + return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36)) +} + // RRVolumeManager is a round-robin VolumeManager: the Nth call to // NextWritable returns the (N % len(writables))th writable Volume // (where writables are all Volumes v where v.Writable()==true). type RRVolumeManager struct { - readables []Volume - writables []Volume + mounts []*VolumeMount + mountMap map[string]*VolumeMount + readables []*VolumeMount + writables []*VolumeMount counter uint32 iostats map[Volume]*ioStats } -// MakeRRVolumeManager initializes RRVolumeManager -func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager { +func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) { vm := &RRVolumeManager{ iostats: make(map[Volume]*ioStats), } - for _, v := range volumes { - vm.iostats[v] = &ioStats{} - vm.readables = append(vm.readables, v) - if v.Writable() { - vm.writables = append(vm.writables, v) + vm.mountMap = make(map[string]*VolumeMount) + for uuid, cfgvol := range cluster.Volumes { + va, ok := cfgvol.AccessViaHosts[myURL] + if !ok && len(cfgvol.AccessViaHosts) > 0 { + continue + } + dri, ok := driver[cfgvol.Driver] + if !ok { + return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver) + } + vol, err := dri(cluster, cfgvol, logger, metrics) + if err != nil { + return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err) + } + logger.Printf("started volume %s (%s), ReadOnly=%v", uuid, vol, cfgvol.ReadOnly || va.ReadOnly) + + sc := cfgvol.StorageClasses + if len(sc) == 0 { + sc = map[string]bool{"default": true} } + repl := cfgvol.Replication + if repl < 1 { + repl = 1 + } + mnt := &VolumeMount{ + KeepMount: arvados.KeepMount{ + UUID: uuid, + DeviceID: vol.GetDeviceID(), + ReadOnly: cfgvol.ReadOnly || va.ReadOnly, + Replication: repl, + StorageClasses: sc, + }, + Volume: vol, + } + vm.iostats[vol] = &ioStats{} + vm.mounts = append(vm.mounts, mnt) + vm.mountMap[uuid] = mnt + vm.readables = append(vm.readables, mnt) + if !mnt.KeepMount.ReadOnly { + vm.writables = append(vm.writables, mnt) + } + } + // pri(mnt): return highest priority of any storage class + // offered by mnt + pri := func(mnt *VolumeMount) int { + any, best := false, 0 + for class := range mnt.KeepMount.StorageClasses { + if p := cluster.StorageClasses[class].Priority; !any || best < p { + best = p + any = true + } + } + return best + } + // less(a,b): sort first by highest priority of any offered + // storage class (highest->lowest), then by volume UUID + less := func(a, b *VolumeMount) bool { + if pa, pb := pri(a), pri(b); pa != pb { + return pa > pb + } else { + return a.KeepMount.UUID < b.KeepMount.UUID + } + } + sort.Slice(vm.readables, func(i, j int) bool { + return less(vm.readables[i], vm.readables[j]) + }) + sort.Slice(vm.writables, func(i, j int) bool { + return less(vm.writables[i], vm.writables[j]) + }) + return vm, nil +} + +func (vm *RRVolumeManager) Mounts() []*VolumeMount { + return vm.mounts +} + +func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount { + if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) { + return mnt } - return vm + return nil } // AllReadable returns an array of all readable volumes -func (vm *RRVolumeManager) AllReadable() []Volume { +func (vm *RRVolumeManager) AllReadable() []*VolumeMount { return vm.readables } -// AllWritable returns an array of all writable volumes -func (vm *RRVolumeManager) AllWritable() []Volume { +// AllWritable returns writable volumes, sorted by priority/uuid. Used +// by CompareAndTouch to ensure higher-priority volumes are checked +// first. +func (vm *RRVolumeManager) AllWritable() []*VolumeMount { return vm.writables } -// NextWritable returns the next writable -func (vm *RRVolumeManager) NextWritable() Volume { +// NextWritable returns writable volumes, rotated by vm.counter so +// each volume gets a turn to be first. Used by PutBlock to distribute +// new data across available volumes. +func (vm *RRVolumeManager) NextWritable() []*VolumeMount { if len(vm.writables) == 0 { return nil } - i := atomic.AddUint32(&vm.counter, 1) - return vm.writables[i%uint32(len(vm.writables))] + offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables) + return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...) } // VolumeStats returns an ioStats for the given volume.