Merge branch '5523-stats-error' closes #5523
[arvados.git] / services / keepstore / volume_unix.go
index 75a75229a6861f83f45b9264bab9d577d7d0f880..8d23d116185858975a4dcf267eb5d274a8365206 100644 (file)
@@ -4,6 +4,7 @@ package main
 
 import (
        "fmt"
+       "io"
        "io/ioutil"
        "log"
        "os"
@@ -50,8 +51,9 @@ type IOResponse struct {
 //       request.
 //
 type UnixVolume struct {
-       root  string // path to this volume
-       queue chan *IORequest
+       root     string // path to this volume
+       queue    chan *IORequest
+       readonly bool
 }
 
 func (v *UnixVolume) IOHandler() {
@@ -67,14 +69,17 @@ func (v *UnixVolume) IOHandler() {
        }
 }
 
-func MakeUnixVolume(root string, serialize bool) (v UnixVolume) {
+func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
+       v := &UnixVolume{
+               root:     root,
+               queue:    nil,
+               readonly: readonly,
+       }
        if serialize {
-               v = UnixVolume{root, make(chan *IORequest)}
+               v.queue = make(chan *IORequest)
                go v.IOHandler()
-       } else {
-               v = UnixVolume{root, nil}
        }
-       return
+       return v
 }
 
 func (v *UnixVolume) Get(loc string) ([]byte, error) {
@@ -88,6 +93,9 @@ func (v *UnixVolume) Get(loc string) ([]byte, error) {
 }
 
 func (v *UnixVolume) Put(loc string, block []byte) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
        if v.queue == nil {
                return v.Write(loc, block)
        }
@@ -98,11 +106,15 @@ func (v *UnixVolume) Put(loc string, block []byte) error {
 }
 
 func (v *UnixVolume) Touch(loc string) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
                return err
        }
+       defer f.Close()
        if e := lockfile(f); e != nil {
                return e
        }
@@ -204,14 +216,13 @@ func (v *UnixVolume) Status() *VolumeStatus {
        return &VolumeStatus{v.root, devnum, free, used}
 }
 
-// Index returns a list of blocks found on this volume which begin with
-// the specified prefix. If the prefix is an empty string, Index returns
-// a complete list of blocks.
+// IndexTo writes (to the given Writer) a list of blocks found on this
+// volume which begin with the specified prefix. If the prefix is an
+// empty string, IndexTo writes a complete list of blocks.
 //
-// The return value is a multiline string (separated by
-// newlines). Each line is in the format
+// Each block is given in the format
 //
-//     locator+size modification-time
+//     locator+size modification-time {newline}
 //
 // e.g.:
 //
@@ -219,60 +230,66 @@ func (v *UnixVolume) Status() *VolumeStatus {
 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
-func (v *UnixVolume) Index(prefix string) (output string) {
-       filepath.Walk(v.root,
+func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
+       return filepath.Walk(v.root,
                func(path string, info os.FileInfo, err error) error {
-                       // This WalkFunc inspects each path in the volume
-                       // and prints an index line for all files that begin
-                       // with prefix.
                        if err != nil {
-                               log.Printf("IndexHandler: %s: walking to %s: %s",
+                               log.Printf("%s: IndexTo Walk error at %s: %s",
                                        v, path, err)
                                return nil
                        }
-                       locator := filepath.Base(path)
-                       // Skip directories that do not match prefix.
-                       // We know there is nothing interesting inside.
+                       basename := filepath.Base(path)
                        if info.IsDir() &&
-                               !strings.HasPrefix(locator, prefix) &&
-                               !strings.HasPrefix(prefix, locator) {
+                               !strings.HasPrefix(basename, prefix) &&
+                               !strings.HasPrefix(prefix, basename) {
+                               // Skip directories that do not match
+                               // prefix. We know there is nothing
+                               // interesting inside.
                                return filepath.SkipDir
                        }
-                       // Skip any file that is not apparently a locator, e.g. .meta files
-                       if !IsValidLocator(locator) {
+                       if info.IsDir() ||
+                               !IsValidLocator(basename) ||
+                               !strings.HasPrefix(basename, prefix) {
                                return nil
                        }
-                       // Print filenames beginning with prefix
-                       if !info.IsDir() && strings.HasPrefix(locator, prefix) {
-                               output = output + fmt.Sprintf(
-                                       "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
-                       }
-                       return nil
+                       _, err = fmt.Fprintf(w, "%s+%d %d\n",
+                               basename, info.Size(), info.ModTime().Unix())
+                       return err
                })
-
-       return
 }
 
 func (v *UnixVolume) Delete(loc string) error {
+       // Touch() must be called before calling Write() on a block.  Touch()
+       // also uses lockfile().  This avoids a race condition between Write()
+       // and Delete() because either (a) the file will be deleted and Touch()
+       // will signal to the caller that the file is not present (and needs to
+       // be re-written), or (b) Touch() will update the file's timestamp and
+       // Delete() will read the correct up-to-date timestamp and choose not to
+       // delete the file.
+
+       if v.readonly {
+               return MethodDisabledError
+       }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
                return err
        }
+       defer f.Close()
        if e := lockfile(f); e != nil {
                return e
        }
        defer unlockfile(f)
 
-       // If the block has been PUT more recently than -permission_ttl,
-       // return success without removing the block.  This guards against
-       // a race condition where a block is old enough that Data Manager
-       // has added it to the trash list, but the user submitted a PUT
-       // for the block since then.
+       // If the block has been PUT in the last blob_signature_ttl
+       // seconds, return success without removing the block. This
+       // protects data from garbage collection until it is no longer
+       // possible for clients to retrieve the unreferenced blocks
+       // anyway (because the permission signatures have expired).
        if fi, err := os.Stat(p); err != nil {
                return err
        } else {
-               if time.Since(fi.ModTime()) < permission_ttl {
+               if time.Since(fi.ModTime()) < blob_signature_ttl {
                        return nil
                }
        }
@@ -340,6 +357,10 @@ func (v *UnixVolume) String() string {
        return fmt.Sprintf("[UnixVolume %s]", v.root)
 }
 
+func (v *UnixVolume) Writable() bool {
+       return !v.readonly
+}
+
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func lockfile(f *os.File) error {
        return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)