5745: Serialize writes and data reads, but allow concurrent requests
[arvados.git] / services / keepstore / volume_unix.go
index 3b7c9930f780cf0b735016bbaac0b7c243a6d780..bcf57c1647ff54b2e5d2efd1432d54ab15be0559 100644 (file)
@@ -4,104 +4,24 @@ package main
 
 import (
        "fmt"
+       "io"
        "io/ioutil"
        "log"
        "os"
        "path/filepath"
        "strconv"
        "strings"
+       "sync"
        "syscall"
        "time"
 )
 
-// IORequests are encapsulated Get or Put requests.  They are used to
-// implement serialized I/O (i.e. only one read/write operation per
-// volume). When running in serialized mode, the Keep front end sends
-// IORequests on a channel to an IORunner, which handles them one at a
-// time and returns an IOResponse.
-//
-type IOMethod int
-
-const (
-       KeepGet IOMethod = iota
-       KeepPut
-)
-
-type IORequest struct {
-       method IOMethod
-       loc    string
-       data   []byte
-       reply  chan *IOResponse
-}
-
-type IOResponse struct {
-       data []byte
-       err  error
-}
-
-// A UnixVolume has the following properties:
-//
-//   root
-//       the path to the volume's root directory
-//   queue
-//       A channel of IORequests. If non-nil, all I/O requests for
-//       this volume should be queued on this channel; the result
-//       will be delivered on the IOResponse channel supplied in the
-//       request.
-//
+// A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       root     string // path to this volume
-       queue    chan *IORequest
-       readonly bool
-}
-
-func (v *UnixVolume) IOHandler() {
-       for req := range v.queue {
-               var result IOResponse
-               switch req.method {
-               case KeepGet:
-                       result.data, result.err = v.Read(req.loc)
-               case KeepPut:
-                       result.err = v.Write(req.loc, req.data)
-               }
-               req.reply <- &result
-       }
-}
-
-func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
-       v := &UnixVolume{
-               root:     root,
-               queue:    nil,
-               readonly: readonly,
-       }
-       if serialize {
-               v.queue = make(chan *IORequest)
-               go v.IOHandler()
-       }
-       return v
-}
-
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
-       if v.queue == nil {
-               return v.Read(loc)
-       }
-       reply := make(chan *IOResponse)
-       v.queue <- &IORequest{KeepGet, loc, nil, reply}
-       response := <-reply
-       return response.data, response.err
-}
-
-func (v *UnixVolume) Put(loc string, block []byte) error {
-       if v.readonly {
-               return MethodDisabledError
-       }
-       if v.queue == nil {
-               return v.Write(loc, block)
-       }
-       reply := make(chan *IOResponse)
-       v.queue <- &IORequest{KeepPut, loc, block, reply}
-       response := <-reply
-       return response.err
+       root      string // path to the volume's root directory
+       serialize bool
+       readonly  bool
+       mutex     sync.Mutex
 }
 
 func (v *UnixVolume) Touch(loc string) error {
@@ -114,6 +34,10 @@ func (v *UnixVolume) Touch(loc string) error {
                return err
        }
        defer f.Close()
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
        if e := lockfile(f); e != nil {
                return e
        }
@@ -132,28 +56,32 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        }
 }
 
-// Read retrieves a block identified by the locator string "loc", and
+// Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be opened or read, Read returns a nil slice
-// and the os.Error that was generated.
-//
-// If the block is present but its content hash does not match loc,
-// Read returns the block and a CorruptError.  It is the caller's
-// responsibility to decide what (if anything) to do with the
-// corrupted data block.
-//
-func (v *UnixVolume) Read(loc string) ([]byte, error) {
-       buf, err := ioutil.ReadFile(v.blockPath(loc))
+// If the block could not be found, opened, or read, Get returns a nil
+// slice and whatever non-nil error was returned by Stat or ReadFile.
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+       path := v.blockPath(loc)
+       if _, err := os.Stat(path); err != nil {
+               return nil, err
+       }
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
+       buf, err := ioutil.ReadFile(path)
        return buf, err
 }
 
-// Write stores a block of data identified by the locator string
+// Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
-//
-func (v *UnixVolume) Write(loc string, block []byte) error {
+func (v *UnixVolume) Put(loc string, block []byte) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
        if v.IsFull() {
                return FullError
        }
@@ -171,8 +99,14 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
        }
        bpath := v.blockPath(loc)
 
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
+               tmpfile.Close()
+               os.Remove(tmpfile.Name())
                return err
        }
        if err := tmpfile.Close(); err != nil {
@@ -215,14 +149,13 @@ func (v *UnixVolume) Status() *VolumeStatus {
        return &VolumeStatus{v.root, devnum, free, used}
 }
 
-// Index returns a list of blocks found on this volume which begin with
-// the specified prefix. If the prefix is an empty string, Index returns
-// a complete list of blocks.
+// IndexTo writes (to the given Writer) a list of blocks found on this
+// volume which begin with the specified prefix. If the prefix is an
+// empty string, IndexTo writes a complete list of blocks.
 //
-// The return value is a multiline string (separated by
-// newlines). Each line is in the format
+// Each block is given in the format
 //
-//     locator+size modification-time
+//     locator+size modification-time {newline}
 //
 // e.g.:
 //
@@ -230,38 +163,32 @@ func (v *UnixVolume) Status() *VolumeStatus {
 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
 //
-func (v *UnixVolume) Index(prefix string) (output string) {
-       filepath.Walk(v.root,
+func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
+       return filepath.Walk(v.root,
                func(path string, info os.FileInfo, err error) error {
-                       // This WalkFunc inspects each path in the volume
-                       // and prints an index line for all files that begin
-                       // with prefix.
                        if err != nil {
-                               log.Printf("IndexHandler: %s: walking to %s: %s",
+                               log.Printf("%s: IndexTo Walk error at %s: %s",
                                        v, path, err)
                                return nil
                        }
-                       locator := filepath.Base(path)
-                       // Skip directories that do not match prefix.
-                       // We know there is nothing interesting inside.
+                       basename := filepath.Base(path)
                        if info.IsDir() &&
-                               !strings.HasPrefix(locator, prefix) &&
-                               !strings.HasPrefix(prefix, locator) {
+                               !strings.HasPrefix(basename, prefix) &&
+                               !strings.HasPrefix(prefix, basename) {
+                               // Skip directories that do not match
+                               // prefix. We know there is nothing
+                               // interesting inside.
                                return filepath.SkipDir
                        }
-                       // Skip any file that is not apparently a locator, e.g. .meta files
-                       if !IsValidLocator(locator) {
+                       if info.IsDir() ||
+                               !IsValidLocator(basename) ||
+                               !strings.HasPrefix(basename, prefix) {
                                return nil
                        }
-                       // Print filenames beginning with prefix
-                       if !info.IsDir() && strings.HasPrefix(locator, prefix) {
-                               output = output + fmt.Sprintf(
-                                       "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
-                       }
-                       return nil
+                       _, err = fmt.Fprintf(w, "%s+%d %d\n",
+                               basename, info.Size(), info.ModTime().Unix())
+                       return err
                })
-
-       return
 }
 
 func (v *UnixVolume) Delete(loc string) error {
@@ -276,6 +203,10 @@ func (v *UnixVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {