X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/b9d2799dfebae724dda3b3e28641116ca5daf5c7..b9e5c8b32858338850da3e12ce27570b828898b3:/services/keepstore/volume_unix.go diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index 92c897eac0..fff02aac26 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -2,11 +2,11 @@ package main import ( "bufio" + "context" "flag" "fmt" "io" "io/ioutil" - "log" "os" "path/filepath" "regexp" @@ -15,6 +15,8 @@ import ( "sync" "syscall" "time" + + log "github.com/Sirupsen/logrus" ) type unixVolumeAdder struct { @@ -101,9 +103,10 @@ func (vs *unixVolumeAdder) Discover() int { // A UnixVolume stores and retrieves blocks in a local directory. type UnixVolume struct { - Root string // path to the volume's root directory - ReadOnly bool - Serialize bool + Root string // path to the volume's root directory + ReadOnly bool + Serialize bool + DirectoryReplication int // something to lock during IO, typically a sync.Mutex (or nil // to skip locking) @@ -114,12 +117,14 @@ type UnixVolume struct { func (*UnixVolume) Examples() []Volume { return []Volume{ &UnixVolume{ - Root: "/mnt/local-disk", - Serialize: true, + Root: "/mnt/local-disk", + Serialize: true, + DirectoryReplication: 1, }, &UnixVolume{ - Root: "/mnt/network-disk", - Serialize: false, + Root: "/mnt/network-disk", + Serialize: false, + DirectoryReplication: 2, }, } } @@ -137,6 +142,9 @@ func (v *UnixVolume) Start() error { if !strings.HasPrefix(v.Root, "/") { return fmt.Errorf("volume root does not start with '/': %q", v.Root) } + if v.DirectoryReplication == 0 { + v.DirectoryReplication = 1 + } _, err := os.Stat(v.Root) return err } @@ -176,11 +184,14 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) { // Lock the locker (if one is in use), open the file for reading, and // call the given function if and when the file is ready to read. -func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error { +func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error { if v.locker != nil { v.locker.Lock() defer v.locker.Unlock() } + if ctx.Err() != nil { + return ctx.Err() + } f, err := os.Open(path) if err != nil { return err @@ -204,7 +215,7 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) { // Get retrieves a block, copies it to the given slice, and returns // the number of bytes copied. -func (v *UnixVolume) Get(loc string, buf []byte) (int, error) { +func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) { path := v.blockPath(loc) stat, err := v.stat(path) if err != nil { @@ -215,7 +226,7 @@ func (v *UnixVolume) Get(loc string, buf []byte) (int, error) { } var read int size := int(stat.Size()) - err = v.getFunc(path, func(rdr io.Reader) error { + err = v.getFunc(ctx, path, func(rdr io.Reader) error { read, err = io.ReadFull(rdr, buf[:size]) return err }) @@ -225,13 +236,13 @@ func (v *UnixVolume) Get(loc string, buf []byte) (int, error) { // Compare returns nil if Get(loc) would return the same content as // expect. It is functionally equivalent to Get() followed by // bytes.Compare(), but uses less memory. -func (v *UnixVolume) Compare(loc string, expect []byte) error { +func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error { path := v.blockPath(loc) if _, err := v.stat(path); err != nil { return v.translateError(err) } - return v.getFunc(path, func(rdr io.Reader) error { - return compareReaderWithBuf(rdr, expect, loc[:32]) + return v.getFunc(ctx, path, func(rdr io.Reader) error { + return compareReaderWithBuf(ctx, rdr, expect, loc[:32]) }) } @@ -239,7 +250,7 @@ func (v *UnixVolume) Compare(loc string, expect []byte) error { // "loc". It returns nil on success. If the volume is full, it // returns a FullError. If the write fails due to some other error, // that error is returned. -func (v *UnixVolume) Put(loc string, block []byte) error { +func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error { if v.ReadOnly { return MethodDisabledError } @@ -264,6 +275,11 @@ func (v *UnixVolume) Put(loc string, block []byte) error { v.locker.Lock() defer v.locker.Unlock() } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } if _, err := tmpfile.Write(block); err != nil { log.Printf("%s: writing to %s: %s\n", v, bpath, err) tmpfile.Close() @@ -307,7 +323,12 @@ func (v *UnixVolume) Status() *VolumeStatus { // uses fs.Blocks - fs.Bfree. free := fs.Bavail * uint64(fs.Bsize) used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize) - return &VolumeStatus{v.Root, devnum, free, used} + return &VolumeStatus{ + MountPoint: v.Root, + DeviceNum: devnum, + BytesFree: free, + BytesUsed: used, + } } var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`) @@ -533,9 +554,9 @@ func (v *UnixVolume) Writable() bool { } // Replication returns the number of replicas promised by the -// underlying device (currently assumed to be 1). +// underlying device (as specified in configuration). func (v *UnixVolume) Replication() int { - return 1 + return v.DirectoryReplication } // lockfile and unlockfile use flock(2) to manage kernel file locks.