+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"context"
+ "crypto/rand"
+ "fmt"
"io"
+ "math/big"
"sync/atomic"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
+type BlockWriter interface {
+ // WriteBlock reads all data from r, writes it to a backing
+ // store as "loc", and returns the number of bytes written.
+ WriteBlock(ctx context.Context, loc string, r io.Reader) error
+}
+
+type BlockReader interface {
+ // ReadBlock retrieves data previously stored as "loc" and
+ // writes it to w.
+ ReadBlock(ctx context.Context, loc string, w io.Writer) error
+}
+
// A Volume is an interface representing a Keep back-end storage unit:
// for example, a single mounted disk, a RAID array, an Amazon S3 volume,
// etc.
// EmptyTrash looks for trashed blocks that exceeded TrashLifetime
// and deletes them from the volume.
EmptyTrash()
+
+ // Return a globally unique ID of the underlying storage
+ // device if possible, otherwise "".
+ DeviceID() string
+
+ // Get the storage classes associated with this volume
+ GetStorageClasses() []string
}
// A VolumeWithExamples provides example configs to display in the
// A VolumeManager tells callers which volumes can read, which volumes
// can write, and on which volume the next write should be attempted.
type VolumeManager interface {
+ // Mounts returns all mounts (volume attachments).
+ Mounts() []*VolumeMount
+
+ // Lookup returns the volume under the given mount
+ // UUID. Returns nil if the mount does not exist. If
+ // write==true, returns nil if the volume is not writable.
+ Lookup(uuid string, write bool) Volume
+
// AllReadable returns all volumes.
AllReadable() []Volume
Close()
}
+// A VolumeMount is an attachment of a Volume to a VolumeManager.
+type VolumeMount struct {
+ arvados.KeepMount
+ volume Volume
+}
+
+// Generate a UUID the way API server would for a "KeepVolumeMount"
+// object.
+func (*VolumeMount) generateUUID() string {
+ var max big.Int
+ _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
+ if !ok {
+ panic("big.Int parse failed")
+ }
+ r, err := rand.Int(rand.Reader, &max)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
+}
+
// RRVolumeManager is a round-robin VolumeManager: the Nth call to
// NextWritable returns the (N % len(writables))th writable Volume
// (where writables are all Volumes v where v.Writable()==true).
type RRVolumeManager struct {
+ mounts []*VolumeMount
+ mountMap map[string]*VolumeMount
readables []Volume
writables []Volume
counter uint32
vm := &RRVolumeManager{
iostats: make(map[Volume]*ioStats),
}
+ vm.mountMap = make(map[string]*VolumeMount)
for _, v := range volumes {
+ sc := v.GetStorageClasses()
+ if len(sc) == 0 {
+ sc = []string{"default"}
+ }
+ mnt := &VolumeMount{
+ KeepMount: arvados.KeepMount{
+ UUID: (*VolumeMount)(nil).generateUUID(),
+ DeviceID: v.DeviceID(),
+ ReadOnly: !v.Writable(),
+ Replication: v.Replication(),
+ StorageClasses: sc,
+ },
+ volume: v,
+ }
vm.iostats[v] = &ioStats{}
+ vm.mounts = append(vm.mounts, mnt)
+ vm.mountMap[mnt.UUID] = mnt
vm.readables = append(vm.readables, v)
if v.Writable() {
vm.writables = append(vm.writables, v)
return vm
}
+func (vm *RRVolumeManager) Mounts() []*VolumeMount {
+ return vm.mounts
+}
+
+func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) Volume {
+ if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
+ return mnt.volume
+ } else {
+ return nil
+ }
+}
+
// AllReadable returns an array of all readable volumes
func (vm *RRVolumeManager) AllReadable() []Volume {
return vm.readables