5745: Serialize writes and data reads, but allow concurrent requests
authorTom Clegg <tom@curoverse.com>
Wed, 6 May 2015 16:56:34 +0000 (12:56 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 7 May 2015 16:45:06 +0000 (12:45 -0400)
to do read-only non-data operations (like finding existing blocks and
checking free disk space) which are likely to be cached by the OS and
therefore not involve any disk activity.

Also:
* Serialize Touch and Delete.
* Make sure to close and delete tempfiles on write errors.
* Update comments.

services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go

index 6492045c68b1f0cbd9de2e00aaa0859ce6ec8b9a..75b56eb827da7880d24fd43befb98e551914b7d5 100644 (file)
@@ -40,35 +40,19 @@ func MakeRESTRouter() *mux.Router {
 
        rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
        rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
-
-       // For IndexHandler we support:
-       //   /index           - returns all locators
-       //   /index/{prefix}  - returns all locators that begin with {prefix}
-       //      {prefix} is a string of hexadecimal digits between 0 and 32 digits.
-       //      If {prefix} is the empty string, return an index of all locators
-       //      (so /index and /index/ behave identically)
-       //      A client may supply a full 32-digit locator string, in which
-       //      case the server will return an index with either zero or one
-       //      entries. This usage allows a client to check whether a block is
-       //      present, and its size and upload time, without retrieving the
-       //      entire block.
-       //
+       // List all blocks stored here. Privileged client only.
        rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
-       rest.HandleFunc(
-               `/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+       // List blocks stored here whose hash has the given prefix.
+       // Privileged client only.
+       rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+
+       // List volumes: path, device number, bytes used/avail.
        rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
 
-       // The PullHandler and TrashHandler process "PUT /pull" and "PUT
-       // /trash" requests from Data Manager.  These requests instruct
-       // Keep to replicate or delete blocks; see
-       // https://arvados.org/projects/arvados/wiki/Keep_Design_Doc
-       // for more details.
-       //
-       // Each handler parses the JSON list of block management requests
-       // in the message body, and replaces any existing pull queue or
-       // trash queue with their contentes.
-       //
+       // Replace the current pull queue.
        rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+
+       // Replace the current trash queue.
        rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
        // Any request which does not match any of these routes gets
index c6cb00db0fa28b621dc306e9a1d7ec7775c41282..71e577fe54c7c5bb64c459d61ce78a8adf8951ca 100644 (file)
@@ -129,7 +129,11 @@ func (vs *volumeSet) Set(value string) error {
        if _, err := os.Stat(value); err != nil {
                return err
        }
-       *vs = append(*vs, MakeUnixVolume(value, flagSerializeIO, flagReadonly))
+       *vs = append(*vs, &UnixVolume{
+               root:      value,
+               serialize: flagSerializeIO,
+               readonly:  flagReadonly,
+       })
        return nil
 }
 
index 8d23d116185858975a4dcf267eb5d274a8365206..bcf57c1647ff54b2e5d2efd1432d54ab15be0559 100644 (file)
@@ -11,98 +11,17 @@ import (
        "path/filepath"
        "strconv"
        "strings"
+       "sync"
        "syscall"
        "time"
 )
 
-// IORequests are encapsulated Get or Put requests.  They are used to
-// implement serialized I/O (i.e. only one read/write operation per
-// volume). When running in serialized mode, the Keep front end sends
-// IORequests on a channel to an IORunner, which handles them one at a
-// time and returns an IOResponse.
-//
-type IOMethod int
-
-const (
-       KeepGet IOMethod = iota
-       KeepPut
-)
-
-type IORequest struct {
-       method IOMethod
-       loc    string
-       data   []byte
-       reply  chan *IOResponse
-}
-
-type IOResponse struct {
-       data []byte
-       err  error
-}
-
-// A UnixVolume has the following properties:
-//
-//   root
-//       the path to the volume's root directory
-//   queue
-//       A channel of IORequests. If non-nil, all I/O requests for
-//       this volume should be queued on this channel; the result
-//       will be delivered on the IOResponse channel supplied in the
-//       request.
-//
+// A UnixVolume stores and retrieves blocks in a local directory.
 type UnixVolume struct {
-       root     string // path to this volume
-       queue    chan *IORequest
-       readonly bool
-}
-
-func (v *UnixVolume) IOHandler() {
-       for req := range v.queue {
-               var result IOResponse
-               switch req.method {
-               case KeepGet:
-                       result.data, result.err = v.Read(req.loc)
-               case KeepPut:
-                       result.err = v.Write(req.loc, req.data)
-               }
-               req.reply <- &result
-       }
-}
-
-func MakeUnixVolume(root string, serialize bool, readonly bool) *UnixVolume {
-       v := &UnixVolume{
-               root:     root,
-               queue:    nil,
-               readonly: readonly,
-       }
-       if serialize {
-               v.queue = make(chan *IORequest)
-               go v.IOHandler()
-       }
-       return v
-}
-
-func (v *UnixVolume) Get(loc string) ([]byte, error) {
-       if v.queue == nil {
-               return v.Read(loc)
-       }
-       reply := make(chan *IOResponse)
-       v.queue <- &IORequest{KeepGet, loc, nil, reply}
-       response := <-reply
-       return response.data, response.err
-}
-
-func (v *UnixVolume) Put(loc string, block []byte) error {
-       if v.readonly {
-               return MethodDisabledError
-       }
-       if v.queue == nil {
-               return v.Write(loc, block)
-       }
-       reply := make(chan *IOResponse)
-       v.queue <- &IORequest{KeepPut, loc, block, reply}
-       response := <-reply
-       return response.err
+       root      string // path to the volume's root directory
+       serialize bool
+       readonly  bool
+       mutex     sync.Mutex
 }
 
 func (v *UnixVolume) Touch(loc string) error {
@@ -115,6 +34,10 @@ func (v *UnixVolume) Touch(loc string) error {
                return err
        }
        defer f.Close()
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
        if e := lockfile(f); e != nil {
                return e
        }
@@ -133,28 +56,32 @@ func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
        }
 }
 
-// Read retrieves a block identified by the locator string "loc", and
+// Get retrieves a block identified by the locator string "loc", and
 // returns its contents as a byte slice.
 //
-// If the block could not be opened or read, Read returns a nil slice
-// and the os.Error that was generated.
-//
-// If the block is present but its content hash does not match loc,
-// Read returns the block and a CorruptError.  It is the caller's
-// responsibility to decide what (if anything) to do with the
-// corrupted data block.
-//
-func (v *UnixVolume) Read(loc string) ([]byte, error) {
-       buf, err := ioutil.ReadFile(v.blockPath(loc))
+// If the block could not be found, opened, or read, Get returns a nil
+// slice and whatever non-nil error was returned by Stat or ReadFile.
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+       path := v.blockPath(loc)
+       if _, err := os.Stat(path); err != nil {
+               return nil, err
+       }
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
+       buf, err := ioutil.ReadFile(path)
        return buf, err
 }
 
-// Write stores a block of data identified by the locator string
+// Put stores a block of data identified by the locator string
 // "loc".  It returns nil on success.  If the volume is full, it
 // returns a FullError.  If the write fails due to some other error,
 // that error is returned.
-//
-func (v *UnixVolume) Write(loc string, block []byte) error {
+func (v *UnixVolume) Put(loc string, block []byte) error {
+       if v.readonly {
+               return MethodDisabledError
+       }
        if v.IsFull() {
                return FullError
        }
@@ -172,8 +99,14 @@ func (v *UnixVolume) Write(loc string, block []byte) error {
        }
        bpath := v.blockPath(loc)
 
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
        if _, err := tmpfile.Write(block); err != nil {
                log.Printf("%s: writing to %s: %s\n", v, bpath, err)
+               tmpfile.Close()
+               os.Remove(tmpfile.Name())
                return err
        }
        if err := tmpfile.Close(); err != nil {
@@ -270,6 +203,10 @@ func (v *UnixVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
+       if v.serialize {
+               v.mutex.Lock()
+               defer v.mutex.Unlock()
+       }
        p := v.blockPath(loc)
        f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
        if err != nil {
index 6b39f8ff601000139f324039fc04385f640fef48..1320d315858d83b7c84064e528a43b792ab5f19e 100644 (file)
@@ -15,19 +15,20 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
        if err != nil {
                t.Fatal(err)
        }
-       return MakeUnixVolume(d, serialize, readonly)
+       return &UnixVolume{
+               root:      d,
+               serialize: serialize,
+               readonly:  readonly,
+       }
 }
 
 func _teardown(v *UnixVolume) {
-       if v.queue != nil {
-               close(v.queue)
-       }
        os.RemoveAll(v.root)
 }
 
-// store writes a Keep block directly into a UnixVolume, for testing
-// UnixVolume methods.
-//
+// _store writes a Keep block directly into a UnixVolume, bypassing
+// the overhead and safeguards of Put(). Useful for storing bogus data
+// and isolating unit tests from Put() behavior.
 func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
        blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
        if err := os.MkdirAll(blockdir, 0755); err != nil {