Merge branch 'main' into 21357-favorites-names
[arvados.git] / services / keepstore / unix_volume.go
index 98edfae14d7e602d79e055a77d698dc8a6b466d2..92cf12ac189803d4f72f120708aced520a252c7f 100644 (file)
@@ -32,12 +32,13 @@ func init() {
 }
 
 func newUnixVolume(params newVolumeParams) (volume, error) {
-       v := &UnixVolume{
-               uuid:    params.UUID,
-               cluster: params.Cluster,
-               volume:  params.ConfigVolume,
-               logger:  params.Logger,
-               metrics: params.MetricsVecs,
+       v := &unixVolume{
+               uuid:       params.UUID,
+               cluster:    params.Cluster,
+               volume:     params.ConfigVolume,
+               logger:     params.Logger,
+               metrics:    params.MetricsVecs,
+               bufferPool: params.BufferPool,
        }
        err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
        if err != nil {
@@ -47,7 +48,7 @@ func newUnixVolume(params newVolumeParams) (volume, error) {
        return v, v.check()
 }
 
-func (v *UnixVolume) check() error {
+func (v *unixVolume) check() error {
        if v.Root == "" {
                return errors.New("DriverParameters.Root was not provided")
        }
@@ -66,16 +67,17 @@ func (v *UnixVolume) check() error {
        return err
 }
 
-// A UnixVolume stores and retrieves blocks in a local directory.
-type UnixVolume struct {
+// A unixVolume stores and retrieves blocks in a local directory.
+type unixVolume struct {
        Root      string // path to the volume's root directory
        Serialize bool
 
-       uuid    string
-       cluster *arvados.Cluster
-       volume  arvados.Volume
-       logger  logrus.FieldLogger
-       metrics *volumeMetricsVecs
+       uuid       string
+       cluster    *arvados.Cluster
+       volume     arvados.Volume
+       logger     logrus.FieldLogger
+       metrics    *volumeMetricsVecs
+       bufferPool *bufferPool
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -89,7 +91,7 @@ type UnixVolume struct {
 // filesystem root to storage directory, joined by "/". For example,
 // the device ID for a local directory "/mnt/xvda1/keep" might be
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) DeviceID() string {
+func (v *unixVolume) DeviceID() string {
        giveup := func(f string, args ...interface{}) string {
                v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...)
                host, _ := os.Hostname()
@@ -163,7 +165,7 @@ func (v *UnixVolume) DeviceID() string {
 }
 
 // BlockTouch sets the timestamp for the given locator to the current time
-func (v *UnixVolume) BlockTouch(hash string) error {
+func (v *unixVolume) BlockTouch(hash string) error {
        p := v.blockPath(hash)
        f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
@@ -187,7 +189,7 @@ func (v *UnixVolume) BlockTouch(hash string) error {
 }
 
 // Mtime returns the stored timestamp for the given locator.
-func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
+func (v *unixVolume) Mtime(loc string) (time.Time, error) {
        p := v.blockPath(loc)
        fi, err := v.os.Stat(p)
        if err != nil {
@@ -196,23 +198,8 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        return fi.ModTime(), nil
 }
 
-// Lock the locker (if one is in use), open the file for reading, and
-// call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
-       if err := v.lock(ctx); err != nil {
-               return err
-       }
-       defer v.unlock()
-       f, err := v.os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer f.Close()
-       return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
-}
-
 // stat is os.Stat() with some extra sanity checks.
-func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
+func (v *unixVolume) stat(path string) (os.FileInfo, error) {
        stat, err := v.os.Stat(path)
        if err == nil {
                if stat.Size() < 0 {
@@ -225,26 +212,33 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 }
 
 // BlockRead reads a block from the volume.
-func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+func (v *unixVolume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
        path := v.blockPath(hash)
        stat, err := v.stat(path)
        if err != nil {
-               return 0, v.translateError(err)
+               return v.translateError(err)
        }
-       var n int64
-       err = v.getFunc(ctx, path, func(rdr io.Reader) error {
-               n, err = io.Copy(w, rdr)
-               if err == nil && n != stat.Size() {
-                       err = io.ErrUnexpectedEOF
-               }
+       if err := v.lock(ctx); err != nil {
                return err
-       })
-       return int(n), err
+       }
+       defer v.unlock()
+       f, err := v.os.Open(path)
+       if err != nil {
+               return err
+       }
+       defer f.Close()
+       src := newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)
+       dst := io.NewOffsetWriter(w, 0)
+       n, err := io.Copy(dst, src)
+       if err == nil && n != stat.Size() {
+               err = io.ErrUnexpectedEOF
+       }
+       return err
 }
 
 // BlockWrite stores a block on the volume. If it already exists, its
 // timestamp is updated.
-func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+func (v *unixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
        if v.isFull() {
                return errFull
        }
@@ -293,7 +287,7 @@ func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) e
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
-func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
+func (v *unixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
        rootdir, err := v.os.Open(v.Root)
        if err != nil {
                return err
@@ -374,7 +368,7 @@ func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) erro
 // BlobTrashLifetime == 0, the block is deleted; otherwise, the block
 // is renamed as path/{loc}.trash.{deadline}, where deadline = now +
 // BlobTrashLifetime.
-func (v *UnixVolume) BlockTrash(loc string) error {
+func (v *unixVolume) BlockTrash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
        // and Trash() because either (a) the file will be trashed and Touch()
@@ -417,7 +411,7 @@ func (v *UnixVolume) BlockTrash(loc string) error {
 // BlockUntrash moves block from trash back into store
 // Look for path/{loc}.trash.{deadline} in storage,
 // and rename the first such file as path/{loc}
-func (v *UnixVolume) BlockUntrash(hash string) error {
+func (v *unixVolume) BlockUntrash(hash string) error {
        v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        files, err := ioutil.ReadDir(v.blockDir(hash))
@@ -450,19 +444,19 @@ func (v *UnixVolume) BlockUntrash(hash string) error {
 
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
-func (v *UnixVolume) blockDir(loc string) string {
+func (v *unixVolume) blockDir(loc string) string {
        return filepath.Join(v.Root, loc[0:3])
 }
 
 // blockPath returns the fully qualified pathname for the path to loc
 // on this volume.
-func (v *UnixVolume) blockPath(loc string) string {
+func (v *unixVolume) blockPath(loc string) string {
        return filepath.Join(v.blockDir(loc), loc)
 }
 
 // isFull returns true if the free space on the volume is less than
 // MinFreeKilobytes.
-func (v *UnixVolume) isFull() (isFull bool) {
+func (v *unixVolume) isFull() (isFull bool) {
        fullSymlink := v.Root + "/full"
 
        // Check if the volume has been marked as full in the last hour.
@@ -492,7 +486,7 @@ func (v *UnixVolume) isFull() (isFull bool) {
 
 // FreeDiskSpace returns the number of unused 1k blocks available on
 // the volume.
-func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
+func (v *unixVolume) FreeDiskSpace() (free uint64, err error) {
        var fs syscall.Statfs_t
        err = syscall.Statfs(v.Root, &fs)
        if err == nil {
@@ -504,14 +498,14 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
 }
 
 // InternalStats returns I/O and filesystem ops counters.
-func (v *UnixVolume) InternalStats() interface{} {
+func (v *unixVolume) InternalStats() interface{} {
        return &v.os.stats
 }
 
 // lock acquires the serialize lock, if one is in use. If ctx is done
 // before the lock is acquired, lock returns ctx.Err() instead of
 // acquiring the lock.
-func (v *UnixVolume) lock(ctx context.Context) error {
+func (v *unixVolume) lock(ctx context.Context) error {
        if v.locker == nil {
                return nil
        }
@@ -535,7 +529,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
 }
 
 // unlock releases the serialize lock, if one is in use.
-func (v *UnixVolume) unlock() {
+func (v *unixVolume) unlock() {
        if v.locker == nil {
                return
        }
@@ -543,7 +537,7 @@ func (v *UnixVolume) unlock() {
 }
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
-func (v *UnixVolume) lockfile(f *os.File) error {
+func (v *unixVolume) lockfile(f *os.File) error {
        v.os.stats.TickOps("flock")
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
@@ -551,7 +545,7 @@ func (v *UnixVolume) lockfile(f *os.File) error {
        return err
 }
 
-func (v *UnixVolume) unlockfile(f *os.File) error {
+func (v *unixVolume) unlockfile(f *os.File) error {
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
        v.os.stats.TickErr(err)
        return err
@@ -559,7 +553,7 @@ func (v *UnixVolume) unlockfile(f *os.File) error {
 
 // Where appropriate, translate a more specific filesystem error to an
 // error recognized by handlers, like os.ErrNotExist.
-func (v *UnixVolume) translateError(err error) error {
+func (v *unixVolume) translateError(err error) error {
        switch err.(type) {
        case *os.PathError:
                // stat() returns a PathError if the parent directory
@@ -574,7 +568,7 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
 
 // EmptyTrash walks hierarchy looking for {hash}.trash.*
 // and deletes those with deadline < now.
-func (v *UnixVolume) EmptyTrash() {
+func (v *unixVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int64