13647: Use cluster config instead of custom keepstore config.
[arvados.git] / services / keepstore / unix_volume.go
index 4d9e798ac67c71c2a81f51abeb2128b340a6cda6..918555c2b12f6d21a172558c581f1a34c3c68870 100644 (file)
@@ -5,12 +5,13 @@
 package main
 
 import (
-       "bufio"
        "context"
-       "flag"
+       "encoding/json"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "os"
        "os/exec"
        "path/filepath"
@@ -22,98 +23,52 @@ import (
        "syscall"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
-type unixVolumeAdder struct {
-       *Config
-}
-
-// String implements flag.Value
-func (vs *unixVolumeAdder) String() string {
-       return "-"
+func init() {
+       driver["Directory"] = newDirectoryVolume
 }
 
-func (vs *unixVolumeAdder) Set(path string) error {
-       if dirs := strings.Split(path, ","); len(dirs) > 1 {
-               log.Print("DEPRECATED: using comma-separated volume list.")
-               for _, dir := range dirs {
-                       if err := vs.Set(dir); err != nil {
-                               return err
-                       }
-               }
-               return nil
+func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
+       v := &UnixVolume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
+       err := json.Unmarshal(volume.DriverParameters, &v)
+       if err != nil {
+               return nil, err
        }
-       vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
-               Root:      path,
-               ReadOnly:  deprecated.flagReadonly,
-               Serialize: deprecated.flagSerializeIO,
-       })
-       return nil
-}
-
-func init() {
-       VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
-
-       flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
-       flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
+       return v, v.check()
 }
 
-// Discover adds a UnixVolume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *unixVolumeAdder) Discover() int {
-       added := 0
-       f, err := os.Open(ProcMounts)
-       if err != nil {
-               log.Fatalf("opening %s: %s", ProcMounts, err)
+func (v *UnixVolume) check() error {
+       if v.Root == "" {
+               return errors.New("DriverParameters.Root was not provided")
        }
-       scanner := bufio.NewScanner(f)
-       for scanner.Scan() {
-               args := strings.Fields(scanner.Text())
-               if err := scanner.Err(); err != nil {
-                       log.Fatalf("reading %s: %s", ProcMounts, err)
-               }
-               dev, mount := args[0], args[1]
-               if mount == "/" {
-                       continue
-               }
-               if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
-                       continue
-               }
-               keepdir := mount + "/keep"
-               if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
-                       continue
-               }
-               // Set the -readonly flag (but only for this volume)
-               // if the filesystem is mounted readonly.
-               flagReadonlyWas := deprecated.flagReadonly
-               for _, fsopt := range strings.Split(args[3], ",") {
-                       if fsopt == "ro" {
-                               deprecated.flagReadonly = true
-                               break
-                       }
-                       if fsopt == "rw" {
-                               break
-                       }
-               }
-               if err := vs.Set(keepdir); err != nil {
-                       log.Printf("adding %q: %s", keepdir, err)
-               } else {
-                       added++
-               }
-               deprecated.flagReadonly = flagReadonlyWas
+       if v.Serialize {
+               v.locker = &sync.Mutex{}
+       }
+       if !strings.HasPrefix(v.Root, "/") {
+               return fmt.Errorf("DriverParameters.Root %q does not start with '/'", v.Root)
        }
-       return added
+
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+       v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
+
+       _, err := v.os.Stat(v.Root)
+       return err
 }
 
 // A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       Root                 string // path to the volume's root directory
-       ReadOnly             bool
-       Serialize            bool
-       DirectoryReplication int
-       StorageClasses       []string
+       Root      string // path to the volume's root directory
+       Serialize bool
+
+       cluster *arvados.Cluster
+       volume  arvados.Volume
+       logger  logrus.FieldLogger
+       metrics *volumeMetricsVecs
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -122,12 +77,12 @@ type UnixVolume struct {
        os osWithStats
 }
 
-// DeviceID returns a globally unique ID for the volume's root
+// GetDeviceID returns a globally unique ID for the volume's root
 // directory, consisting of the filesystem's UUID and the path from
 // filesystem root to storage directory, joined by "/". For example,
-// the DeviceID for a local directory "/mnt/xvda1/keep" might be
+// the device ID for a local directory "/mnt/xvda1/keep" might be
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) DeviceID() string {
+func (v *UnixVolume) GetDeviceID() string {
        giveup := func(f string, args ...interface{}) string {
                log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
                return ""
@@ -198,50 +153,9 @@ func (v *UnixVolume) DeviceID() string {
        return giveup("could not find entry in %q matching %q", udir, dev)
 }
 
-// Examples implements VolumeWithExamples.
-func (*UnixVolume) Examples() []Volume {
-       return []Volume{
-               &UnixVolume{
-                       Root:                 "/mnt/local-disk",
-                       Serialize:            true,
-                       DirectoryReplication: 1,
-               },
-               &UnixVolume{
-                       Root:                 "/mnt/network-disk",
-                       Serialize:            false,
-                       DirectoryReplication: 2,
-               },
-       }
-}
-
-// Type implements Volume
-func (v *UnixVolume) Type() string {
-       return "Directory"
-}
-
-// Start implements Volume
-func (v *UnixVolume) Start(vm *volumeMetricsVecs) error {
-       if v.Serialize {
-               v.locker = &sync.Mutex{}
-       }
-       if !strings.HasPrefix(v.Root, "/") {
-               return fmt.Errorf("volume root does not start with '/': %q", v.Root)
-       }
-       if v.DirectoryReplication == 0 {
-               v.DirectoryReplication = 1
-       }
-       // Set up prometheus metrics
-       lbls := prometheus.Labels{"device_id": v.DeviceID()}
-       v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = vm.getCounterVecsFor(lbls)
-
-       _, err := v.os.Stat(v.Root)
-
-       return err
-}
-
 // Touch sets the timestamp for the given locator to the current time
 func (v *UnixVolume) Touch(loc string) error {
-       if v.ReadOnly {
+       if v.volume.ReadOnly {
                return MethodDisabledError
        }
        p := v.blockPath(loc)
@@ -349,7 +263,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
 
 // WriteBlock implements BlockWriter.
 func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
-       if v.ReadOnly {
+       if v.volume.ReadOnly {
                return MethodDisabledError
        }
        if v.IsFull() {
@@ -516,7 +430,7 @@ func (v *UnixVolume) Trash(loc string) error {
        // Trash() will read the correct up-to-date timestamp and choose not to
        // trash the file.
 
-       if v.ReadOnly {
+       if v.volume.ReadOnly || !v.cluster.Collections.BlobTrash {
                return MethodDisabledError
        }
        if err := v.lock(context.TODO()); err != nil {
@@ -541,21 +455,21 @@ func (v *UnixVolume) Trash(loc string) error {
        // anyway (because the permission signatures have expired).
        if fi, err := v.os.Stat(p); err != nil {
                return err
-       } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
+       } else if time.Since(fi.ModTime()) < v.cluster.Collections.BlobSigningTTL.Duration() {
                return nil
        }
 
-       if theConfig.TrashLifetime == 0 {
+       if v.cluster.Collections.BlobTrashLifetime == 0 {
                return v.os.Remove(p)
        }
-       return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
+       return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()))
 }
 
 // Untrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
 func (v *UnixVolume) Untrash(loc string) (err error) {
-       if v.ReadOnly {
+       if v.volume.ReadOnly {
                return MethodDisabledError
        }
 
@@ -650,23 +564,6 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.Root)
 }
 
-// Writable returns false if all future Put, Mtime, and Delete calls
-// are expected to fail.
-func (v *UnixVolume) Writable() bool {
-       return !v.ReadOnly
-}
-
-// Replication returns the number of replicas promised by the
-// underlying device (as specified in configuration).
-func (v *UnixVolume) Replication() int {
-       return v.DirectoryReplication
-}
-
-// GetStorageClasses implements Volume
-func (v *UnixVolume) GetStorageClasses() []string {
-       return v.StorageClasses
-}
-
 // InternalStats returns I/O and filesystem ops counters.
 func (v *UnixVolume) InternalStats() interface{} {
        return &v.os.stats
@@ -739,6 +636,10 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
 func (v *UnixVolume) EmptyTrash() {
+       if v.cluster.Collections.BlobDeleteConcurrency < 1 {
+               return
+       }
+
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int64
 
@@ -774,8 +675,8 @@ func (v *UnixVolume) EmptyTrash() {
                info os.FileInfo
        }
        var wg sync.WaitGroup
-       todo := make(chan dirent, theConfig.EmptyTrashWorkers)
-       for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
+       todo := make(chan dirent, v.cluster.Collections.BlobDeleteConcurrency)
+       for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
                wg.Add(1)
                go func() {
                        defer wg.Done()