X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5b0962883553bb3573787151dafea8d92988d712..c7dfdc3f58e993abad5ef7fb898ac137cca62e02:/services/keepstore/volume.go diff --git a/services/keepstore/volume.go b/services/keepstore/volume.go index 52b9b1b244..cf0b7a3902 100644 --- a/services/keepstore/volume.go +++ b/services/keepstore/volume.go @@ -10,10 +10,12 @@ import ( "fmt" "io" "math/big" + "sort" "sync/atomic" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) type BlockWriter interface { @@ -28,19 +30,12 @@ type BlockReader interface { ReadBlock(ctx context.Context, loc string, w io.Writer) error } +var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){} + // A Volume is an interface representing a Keep back-end storage unit: // for example, a single mounted disk, a RAID array, an Amazon S3 volume, // etc. type Volume interface { - // Volume type as specified in config file. Examples: "S3", - // "Directory". - Type() string - - // Do whatever private setup tasks and configuration checks - // are needed. Return non-nil if the volume is unusable (e.g., - // invalid config). - Start(vm *volumeMetricsVecs) error - // Get a block: copy the block data into buf, and return the // number of bytes copied. // @@ -176,12 +171,12 @@ type Volume interface { // Trash moves the block data from the underlying storage // device to trash area. The block then stays in trash for - // -trash-lifetime interval before it is actually deleted. + // BlobTrashLifetime before it is actually deleted. // // loc is as described in Get. // // If the timestamp for the given locator is newer than - // BlobSignatureTTL, Trash must not trash the data. + // BlobSigningTTL, Trash must not trash the data. // // If a Trash operation overlaps with any Touch or Put // operations on the same locator, the implementation must @@ -202,8 +197,7 @@ type Volume interface { // reliably or fail outright. // // Corollary: A successful Touch or Put guarantees a block - // will not be trashed for at least BlobSignatureTTL - // seconds. + // will not be trashed for at least BlobSigningTTL seconds. Trash(loc string) error // Untrash moves block from trash back into store @@ -222,29 +216,13 @@ type Volume interface { // secrets. String() string - // Writable returns false if all future Put, Mtime, and Delete - // calls are expected to fail. - // - // If the volume is only temporarily unwritable -- or if Put - // will fail because it is full, but Mtime or Delete can - // succeed -- then Writable should return false. - Writable() bool - - // Replication returns the storage redundancy of the - // underlying device. It will be passed on to clients in - // responses to PUT requests. - Replication() int - - // EmptyTrash looks for trashed blocks that exceeded TrashLifetime - // and deletes them from the volume. + // EmptyTrash looks for trashed blocks that exceeded + // BlobTrashLifetime and deletes them from the volume. EmptyTrash() // Return a globally unique ID of the underlying storage // device if possible, otherwise "". - DeviceID() string - - // Get the storage classes associated with this volume - GetStorageClasses() []string + GetDeviceID() string } // A VolumeWithExamples provides example configs to display in the @@ -260,24 +238,24 @@ type VolumeManager interface { // Mounts returns all mounts (volume attachments). Mounts() []*VolumeMount - // Lookup returns the volume under the given mount - // UUID. Returns nil if the mount does not exist. If - // write==true, returns nil if the volume is not writable. - Lookup(uuid string, write bool) Volume + // Lookup returns the mount with the given UUID. Returns nil + // if the mount does not exist. If write==true, returns nil if + // the mount is not writable. + Lookup(uuid string, write bool) *VolumeMount - // AllReadable returns all volumes. - AllReadable() []Volume + // AllReadable returns all mounts. + AllReadable() []*VolumeMount - // AllWritable returns all volumes that aren't known to be in + // AllWritable returns all mounts that aren't known to be in // a read-only state. (There is no guarantee that a write to // one will succeed, though.) - AllWritable() []Volume + AllWritable() []*VolumeMount // NextWritable returns the volume where the next new block // should be written. A VolumeManager can select a volume in // order to distribute activity across spindles, fill up disks // with more free space, etc. - NextWritable() Volume + NextWritable() *VolumeMount // VolumeStats returns the ioStats used for tracking stats for // the given Volume. @@ -290,7 +268,7 @@ type VolumeManager interface { // A VolumeMount is an attachment of a Volume to a VolumeManager. type VolumeMount struct { arvados.KeepMount - volume Volume + Volume } // Generate a UUID the way API server would for a "KeepVolumeMount" @@ -314,73 +292,123 @@ func (*VolumeMount) generateUUID() string { type RRVolumeManager struct { mounts []*VolumeMount mountMap map[string]*VolumeMount - readables []Volume - writables []Volume + readables []*VolumeMount + writables []*VolumeMount counter uint32 iostats map[Volume]*ioStats } -// MakeRRVolumeManager initializes RRVolumeManager -func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager { +func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) { vm := &RRVolumeManager{ iostats: make(map[Volume]*ioStats), } vm.mountMap = make(map[string]*VolumeMount) - for _, v := range volumes { - sc := v.GetStorageClasses() + for uuid, cfgvol := range cluster.Volumes { + va, ok := cfgvol.AccessViaHosts[myURL] + if !ok && len(cfgvol.AccessViaHosts) > 0 { + continue + } + dri, ok := driver[cfgvol.Driver] + if !ok { + return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver) + } + vol, err := dri(cluster, cfgvol, logger, metrics) + if err != nil { + return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err) + } + logger.Printf("started volume %s (%s), ReadOnly=%v", uuid, vol, cfgvol.ReadOnly || va.ReadOnly) + + sc := cfgvol.StorageClasses if len(sc) == 0 { - sc = []string{"default"} + sc = map[string]bool{"default": true} + } + repl := cfgvol.Replication + if repl < 1 { + repl = 1 } mnt := &VolumeMount{ KeepMount: arvados.KeepMount{ - UUID: (*VolumeMount)(nil).generateUUID(), - DeviceID: v.DeviceID(), - ReadOnly: !v.Writable(), - Replication: v.Replication(), + UUID: uuid, + DeviceID: vol.GetDeviceID(), + ReadOnly: cfgvol.ReadOnly || va.ReadOnly, + Replication: repl, StorageClasses: sc, }, - volume: v, + Volume: vol, } - vm.iostats[v] = &ioStats{} + vm.iostats[vol] = &ioStats{} vm.mounts = append(vm.mounts, mnt) - vm.mountMap[mnt.UUID] = mnt - vm.readables = append(vm.readables, v) - if v.Writable() { - vm.writables = append(vm.writables, v) + vm.mountMap[uuid] = mnt + vm.readables = append(vm.readables, mnt) + if !mnt.KeepMount.ReadOnly { + vm.writables = append(vm.writables, mnt) } } - return vm + // pri(mnt): return highest priority of any storage class + // offered by mnt + pri := func(mnt *VolumeMount) int { + any, best := false, 0 + for class := range mnt.KeepMount.StorageClasses { + if p := cluster.StorageClasses[class].Priority; !any || best < p { + best = p + any = true + } + } + return best + } + // less(a,b): sort first by highest priority of any offered + // storage class (highest->lowest), then by volume UUID + less := func(a, b *VolumeMount) bool { + if pa, pb := pri(a), pri(b); pa != pb { + return pa > pb + } else { + return a.KeepMount.UUID < b.KeepMount.UUID + } + } + sort.Slice(vm.readables, func(i, j int) bool { + return less(vm.readables[i], vm.readables[j]) + }) + sort.Slice(vm.writables, func(i, j int) bool { + return less(vm.writables[i], vm.writables[j]) + }) + sort.Slice(vm.mounts, func(i, j int) bool { + return less(vm.mounts[i], vm.mounts[j]) + }) + return vm, nil } func (vm *RRVolumeManager) Mounts() []*VolumeMount { return vm.mounts } -func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) Volume { +func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount { if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) { - return mnt.volume - } else { - return nil + return mnt } + return nil } // AllReadable returns an array of all readable volumes -func (vm *RRVolumeManager) AllReadable() []Volume { +func (vm *RRVolumeManager) AllReadable() []*VolumeMount { return vm.readables } -// AllWritable returns an array of all writable volumes -func (vm *RRVolumeManager) AllWritable() []Volume { +// AllWritable returns writable volumes, sorted by priority/uuid. Used +// by CompareAndTouch to ensure higher-priority volumes are checked +// first. +func (vm *RRVolumeManager) AllWritable() []*VolumeMount { return vm.writables } -// NextWritable returns the next writable -func (vm *RRVolumeManager) NextWritable() Volume { +// NextWritable returns writable volumes, rotated by vm.counter so +// each volume gets a turn to be first. Used by PutBlock to distribute +// new data across available volumes. +func (vm *RRVolumeManager) NextWritable() []*VolumeMount { if len(vm.writables) == 0 { return nil } - i := atomic.AddUint32(&vm.counter, 1) - return vm.writables[i%uint32(len(vm.writables))] + offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables) + return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...) } // VolumeStats returns an ioStats for the given volume.