X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/56d363875a42c1abd485c67e030f625299548aa0..f69d2824c997c53caa11d30ba816768bad52e12b:/services/keepstore/volume_unix.go diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index e137045ef6..20bc9c50af 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -50,8 +50,9 @@ type IOResponse struct { // request. // type UnixVolume struct { - root string // path to this volume - queue chan *IORequest + root string // path to this volume + queue chan *IORequest + readonly bool } func (v *UnixVolume) IOHandler() { @@ -67,14 +68,17 @@ func (v *UnixVolume) IOHandler() { } } -func MakeUnixVolume(root string, serialize bool) (v UnixVolume) { +func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume { + v := &UnixVolume{ + root: root, + queue: nil, + readonly: readonly, + } if serialize { - v = UnixVolume{root, make(chan *IORequest)} + v.queue =make(chan *IORequest) go v.IOHandler() - } else { - v = UnixVolume{root, nil} } - return + return v } func (v *UnixVolume) Get(loc string) ([]byte, error) { @@ -88,6 +92,9 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) { } func (v *UnixVolume) Put(loc string, block []byte) error { + if v.readonly { + return MethodDisabledError + } if v.queue == nil { return v.Write(loc, block) } @@ -98,18 +105,33 @@ func (v *UnixVolume) Put(loc string, block []byte) error { } func (v *UnixVolume) Touch(loc string) error { + if v.readonly { + return MethodDisabledError + } p := v.blockPath(loc) f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { return err } - lockfile(f) + defer f.Close() + if e := lockfile(f); e != nil { + return e + } defer unlockfile(f) now := time.Now().Unix() utime := syscall.Utimbuf{now, now} return syscall.Utime(p, &utime) } +func (v *UnixVolume) Mtime(loc string) (time.Time, error) { + p := v.blockPath(loc) + if fi, err := os.Stat(p); err != nil { + return time.Time{}, err + } else { + return fi.ModTime(), nil + } +} + // Read retrieves a block identified by the locator string "loc", and // returns its contents as a byte slice. // @@ -243,24 +265,38 @@ func (v *UnixVolume) Index(prefix string) (output string) { } func (v *UnixVolume) Delete(loc string) error { + // Touch() must be called before calling Write() on a block. Touch() + // also uses lockfile(). This avoids a race condition between Write() + // and Delete() because either (a) the file will be deleted and Touch() + // will signal to the caller that the file is not present (and needs to + // be re-written), or (b) Touch() will update the file's timestamp and + // Delete() will read the correct up-to-date timestamp and choose not to + // delete the file. + + if v.readonly { + return MethodDisabledError + } p := v.blockPath(loc) f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { return err } - lockfile(f) + defer f.Close() + if e := lockfile(f); e != nil { + return e + } defer unlockfile(f) - // Return PermissionError if the block has been PUT more recently - // than -permission_ttl. This guards against a race condition - // where a block is old enough that Data Manager has added it to - // the trash list, but the user submitted a PUT for the block - // since then. + // If the block has been PUT more recently than -permission_ttl, + // return success without removing the block. This guards against + // a race condition where a block is old enough that Data Manager + // has added it to the trash list, but the user submitted a PUT + // for the block since then. if fi, err := os.Stat(p); err != nil { return err } else { if time.Since(fi.ModTime()) < permission_ttl { - return PermissionError + return nil } } return os.Remove(p) @@ -327,11 +363,15 @@ func (v *UnixVolume) String() string { return fmt.Sprintf("[UnixVolume %s]", v.root) } +func (v *UnixVolume) Writable() bool { + return !v.readonly +} + // lockfile and unlockfile use flock(2) to manage kernel file locks. -func lockfile(f os.File) error { +func lockfile(f *os.File) error { return syscall.Flock(int(f.Fd()), syscall.LOCK_EX) } -func unlockfile(f os.File) error { +func unlockfile(f *os.File) error { return syscall.Flock(int(f.Fd()), syscall.LOCK_UN) }