"fmt"
"io"
"io/ioutil"
- "log"
"os"
"path/filepath"
"regexp"
"sync"
"syscall"
"time"
+
+ log "github.com/Sirupsen/logrus"
)
type unixVolumeAdder struct {
return err
}
defer f.Close()
- if v.locker != nil {
- v.locker.Lock()
- defer v.locker.Unlock()
+ if err := v.lock(context.TODO()); err != nil {
+ return err
}
+ defer v.unlock()
if e := lockfile(f); e != nil {
return e
}
// Lock the locker (if one is in use), open the file for reading, and
// call the given function if and when the file is ready to read.
func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
- if v.locker != nil {
- v.locker.Lock()
- defer v.locker.Unlock()
- }
- if ctx.Err() != nil {
- return ctx.Err()
+ if err := v.lock(ctx); err != nil {
+ return err
}
+ defer v.unlock()
f, err := os.Open(path)
if err != nil {
return err
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+ return getWithPipe(ctx, loc, buf, v)
+}
+
+// ReadBlock implements BlockReader.
+func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
- return 0, v.translateError(err)
- }
- if stat.Size() > int64(len(buf)) {
- return 0, TooLongError
+ return v.translateError(err)
}
- var read int
- size := int(stat.Size())
- err = v.getFunc(ctx, path, func(rdr io.Reader) error {
- read, err = io.ReadFull(rdr, buf[:size])
+ return v.getFunc(ctx, path, func(rdr io.Reader) error {
+ n, err := io.Copy(w, rdr)
+ if err == nil && n != stat.Size() {
+ err = io.ErrUnexpectedEOF
+ }
return err
})
- return read, err
}
// Compare returns nil if Get(loc) would return the same content as
// returns a FullError. If the write fails due to some other error,
// that error is returned.
func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
+ return putWithPipe(ctx, loc, block, v)
+}
+
+// ReadBlock implements BlockWriter.
+func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
if v.ReadOnly {
return MethodDisabledError
}
log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
return tmperr
}
+
bpath := v.blockPath(loc)
- if v.locker != nil {
- v.locker.Lock()
- defer v.locker.Unlock()
- }
- select {
- case <-ctx.Done():
- return ctx.Err()
- default:
+ if err := v.lock(ctx); err != nil {
+ return err
}
- if _, err := tmpfile.Write(block); err != nil {
+ defer v.unlock()
+ if _, err := io.Copy(tmpfile, rdr); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
tmpfile.Close()
os.Remove(tmpfile.Name())
if v.ReadOnly {
return MethodDisabledError
}
- if v.locker != nil {
- v.locker.Lock()
- defer v.locker.Unlock()
+ if err := v.lock(context.TODO()); err != nil {
+ return err
}
+ defer v.unlock()
p := v.blockPath(loc)
f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
return v.DirectoryReplication
}
+// lock acquires the serialize lock, if one is in use. If ctx is done
+// before the lock is acquired, lock returns ctx.Err() instead of
+// acquiring the lock.
+func (v *UnixVolume) lock(ctx context.Context) error {
+ if v.locker == nil {
+ return nil
+ }
+ locked := make(chan struct{})
+ go func() {
+ v.locker.Lock()
+ close(locked)
+ }()
+ select {
+ case <-ctx.Done():
+ go func() {
+ <-locked
+ v.locker.Unlock()
+ }()
+ return ctx.Err()
+ case <-locked:
+ return nil
+ }
+}
+
+// unlock releases the serialize lock, if one is in use.
+func (v *UnixVolume) unlock() {
+ if v.locker == nil {
+ return
+ }
+ v.locker.Unlock()
+}
+
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func lockfile(f *os.File) error {
return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)