Merge branch '10467-client-disconnect' closes #10467
[arvados.git] / services / keepstore / volume_unix.go
index 1c676b12e1b99dc5569c789304e2ee2976fadec6..234eec1d05a1a3babf3cd3c76a5a0841e7803b26 100644 (file)
@@ -7,7 +7,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
        "path/filepath"
        "regexp"
@@ -16,6 +15,8 @@ import (
        "sync"
        "syscall"
        "time"
+
+       log "github.com/Sirupsen/logrus"
 )
 
 type unixVolumeAdder struct {
@@ -159,10 +160,10 @@ func (v *UnixVolume) Touch(loc string) error {
                return err
        }
        defer f.Close()
-       if v.locker != nil {
-               v.locker.Lock()
-               defer v.locker.Unlock()
+       if err := v.lock(context.TODO()); err != nil {
+               return err
        }
+       defer v.unlock()
        if e := lockfile(f); e != nil {
                return e
        }
@@ -183,11 +184,11 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
 
 // Lock the locker (if one is in use), open the file for reading, and
 // call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
-       if v.locker != nil {
-               v.locker.Lock()
-               defer v.locker.Unlock()
+func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
+       if err := v.lock(ctx); err != nil {
+               return err
        }
+       defer v.unlock()
        f, err := os.Open(path)
        if err != nil {
                return err
@@ -212,33 +213,35 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
 // Get retrieves a block, copies it to the given slice, and returns
 // the number of bytes copied.
 func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+       return getWithPipe(ctx, loc, buf, v)
+}
+
+// ReadBlock implements BlockReader.
+func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
        path := v.blockPath(loc)
        stat, err := v.stat(path)
        if err != nil {
-               return 0, v.translateError(err)
-       }
-       if stat.Size() > int64(len(buf)) {
-               return 0, TooLongError
+               return v.translateError(err)
        }
-       var read int
-       size := int(stat.Size())
-       err = v.getFunc(path, func(rdr io.Reader) error {
-               read, err = io.ReadFull(rdr, buf[:size])
+       return v.getFunc(ctx, path, func(rdr io.Reader) error {
+               n, err := io.Copy(w, rdr)
+               if err == nil && n != stat.Size() {
+                       err = io.ErrUnexpectedEOF
+               }
                return err
        })
-       return read, err
 }
 
 // Compare returns nil if Get(loc) would return the same content as
 // expect. It is functionally equivalent to Get() followed by
 // bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
        path := v.blockPath(loc)
        if _, err := v.stat(path); err != nil {
                return v.translateError(err)
        }
-       return v.getFunc(path, func(rdr io.Reader) error {
-               return compareReaderWithBuf(rdr, expect, loc[:32])
+       return v.getFunc(ctx, path, func(rdr io.Reader) error {
+               return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
        })
 }
 
@@ -247,6 +250,11 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error {
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
 func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
+       return putWithPipe(ctx, loc, block, v)
+}
+
+// ReadBlock implements BlockWriter.
+func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
        if v.ReadOnly {
                return MethodDisabledError
        }
@@ -265,18 +273,14 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
                log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
                return tmperr
        }
+
        bpath := v.blockPath(loc)
 
-       if v.locker != nil {
-               v.locker.Lock()
-               defer v.locker.Unlock()
-       }
-       select {
-       case <-ctx.Done():
-               return ctx.Err()
-       default:
+       if err := v.lock(ctx); err != nil {
+               return err
        }
-       if _, err := tmpfile.Write(block); err != nil {
+       defer v.unlock()
+       if _, err := io.Copy(tmpfile, rdr); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
                tmpfile.Close()
                os.Remove(tmpfile.Name())
@@ -319,7 +323,12 @@ func (v *UnixVolume) Status() *VolumeStatus {
        // uses fs.Blocks - fs.Bfree.
        free := fs.Bavail * uint64(fs.Bsize)
        used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{v.Root, devnum, free, used}
+       return &VolumeStatus{
+               MountPoint: v.Root,
+               DeviceNum:  devnum,
+               BytesFree:  free,
+               BytesUsed:  used,
+       }
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
@@ -409,10 +418,10 @@ func (v *UnixVolume) Trash(loc string) error {
        if v.ReadOnly {
                return MethodDisabledError
        }
-       if v.locker != nil {
-               v.locker.Lock()
-               defer v.locker.Unlock()
+       if err := v.lock(context.TODO()); err != nil {
+               return err
        }
+       defer v.unlock()
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
@@ -550,6 +559,38 @@ func (v *UnixVolume) Replication() int {
        return v.DirectoryReplication
 }
 
+// lock acquires the serialize lock, if one is in use. If ctx is done
+// before the lock is acquired, lock returns ctx.Err() instead of
+// acquiring the lock.
+func (v *UnixVolume) lock(ctx context.Context) error {
+       if v.locker == nil {
+               return nil
+       }
+       locked := make(chan struct{})
+       go func() {
+               v.locker.Lock()
+               close(locked)
+       }()
+       select {
+       case <-ctx.Done():
+               go func() {
+                       <-locked
+                       v.locker.Unlock()
+               }()
+               return ctx.Err()
+       case <-locked:
+               return nil
+       }
+}
+
+// unlock releases the serialize lock, if one is in use.
+func (v *UnixVolume) unlock() {
+       if v.locker == nil {
+               return
+       }
+       v.locker.Unlock()
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)