Added -serialize flag.
authorTim Pierce <twp@curoverse.com>
Thu, 24 Apr 2014 22:02:49 +0000 (18:02 -0400)
committerTim Pierce <twp@curoverse.com>
Thu, 24 Apr 2014 22:02:49 +0000 (18:02 -0400)
Added IORequest and IOResponse types for communicating I/O
requests over channels with a goroutine.

New IOHandler method on UnixVolume. IOHandler receives requests on a
command channel, handles them, and delivers responses. Whenever a
UnixVolume is created with a non-nil queue, an IOHandler must be started
to handle its requests.

UnixVolume methods Get and Put now handle external I/O requests.  These
methods serialize I/O requests if serialization is enabled for that
volume; otherwise they call Read and Write directly.

New unit tests: TestGetSerialized and TestPutSerialized.

Refs #2620.

services/keep/src/keep/keep.go
services/keep/src/keep/keep_test.go
services/keep/src/keep/volume.go
services/keep/src/keep/volume_test.go

index 1a70a3fc88d390e6ab6c898c3525c74f0f8fd022..9e0dfec23efa9d4885d162aade84fdf406c3334d 100644 (file)
@@ -87,10 +87,13 @@ func main() {
        //    directories.
 
        var listen, volumearg string
+       var serialize_io bool
        flag.StringVar(&listen, "listen", DEFAULT_ADDR,
                "interface on which to listen for requests, in the format ipaddr:port. e.g. -listen=10.0.1.24:8000. Use -listen=:port to listen on all network interfaces.")
        flag.StringVar(&volumearg, "volumes", "",
                "Comma-separated list of directories to use for Keep volumes, e.g. -volumes=/var/keep1,/var/keep2. If empty or not supplied, Keep will scan mounted filesystems for volumes with a /keep top-level directory.")
+       flag.BoolVar(&serialize_io, "serialize-io", false,
+               "If set, all read and write operations on local Keep volumes will be serialized.")
        flag.Parse()
 
        // Look for local keep volumes.
@@ -109,7 +112,11 @@ func main() {
        for _, v := range keepvols {
                if _, err := os.Stat(v); err == nil {
                        log.Println("adding Keep volume:", v)
-                       KeepVolumes = append(KeepVolumes, &UnixVolume{v})
+                       newvol := &UnixVolume{v, nil}
+                       if serialize_io {
+                               newvol.queue = make(chan *IORequest)
+                       }
+                       KeepVolumes = append(KeepVolumes, newvol)
                } else {
                        log.Printf("bad Keep volume: %s\n", err)
                }
@@ -309,7 +316,7 @@ func GetVolumeStatus(volume string) *VolumeStatus {
 func GetBlock(hash string) ([]byte, error) {
        // Attempt to read the requested hash from a keep volume.
        for _, vol := range KeepVolumes {
-               if buf, err := vol.Read(hash); err != nil {
+               if buf, err := vol.Get(hash); err != nil {
                        // IsNotExist is an expected error and may be ignored.
                        // (If all volumes report IsNotExist, we return a NotFoundError)
                        // A CorruptError should be returned immediately.
@@ -394,7 +401,7 @@ func PutBlock(block []byte, hash string) error {
        // Store the block on the first available Keep volume.
        allFull := true
        for _, vol := range KeepVolumes {
-               err := vol.Write(hash, block)
+               err := vol.Put(hash, block)
                if err == nil {
                        return nil // success!
                }
index d1048f7142a22177e2befdf23d51de55f5badf5d..6c82920853229dea0cb9c93ff72f4dd6300d67ef 100644 (file)
@@ -369,7 +369,7 @@ func setup(t *testing.T, num_volumes int) []Volume {
        for i := range vols {
                if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
                        root := dir + "/keep"
-                       vols[i] = &UnixVolume{root}
+                       vols[i] = &UnixVolume{root, nil}
                        os.Mkdir(root, 0755)
                } else {
                        t.Fatal(err)
index 9cd62318b57b3e541907d6290673fd7e103e1664..425fcfd470e2b9579b25e7a90790f0e740a7e003 100644 (file)
@@ -19,19 +19,88 @@ import (
 )
 
 type Volume interface {
-       Read(loc string) ([]byte, error)
-       Write(loc string, block []byte) error
+       Get(loc string) ([]byte, error)
+       Put(loc string, block []byte) error
        Index(prefix string) string
        Status() *VolumeStatus
        String() string
 }
 
+// IORequests are encapsulated requests to perform I/O on a Keep volume.
+// When running in serialized mode, the Keep front end sends IORequests
+// on a channel to an IORunner, which handles them one at a time and
+// returns an IOResponse.
+//
+type IOMethod int
+
+const (
+       KeepGet IOMethod = iota
+       KeepPut
+)
+
+type IORequest struct {
+       method IOMethod
+       loc    string
+       data   []byte
+       reply  chan *IOResponse
+}
+
+type IOResponse struct {
+       data []byte
+       err  error
+}
+
+// A UnixVolume is configured with:
+//
+// * root: the path to the volume's root directory
+// * queue: if non-nil, all I/O requests for this volume should be queued
+//   on this channel. The response will be delivered on the IOResponse
+//   channel included in the request.
+//
 type UnixVolume struct {
-       root string // path to this volume
+       root  string // path to this volume
+       queue chan *IORequest
 }
 
-func (v *UnixVolume) String() string {
-       return fmt.Sprintf("[UnixVolume %s]", v.root)
+func (v *UnixVolume) IOHandler() {
+       for req := range v.queue {
+               var result IOResponse
+               switch req.method {
+               case KeepGet:
+                       result.data, result.err = v.Read(req.loc)
+               case KeepPut:
+                       result.err = v.Write(req.loc, req.data)
+               }
+               req.reply <- &result
+       }
+}
+
+func MakeUnixVolume(root string, queue chan *IORequest) UnixVolume {
+       v := UnixVolume{root, queue}
+       if queue != nil {
+               go v.IOHandler()
+       }
+       return v
+}
+
+func (v *UnixVolume) Get(loc string) ([]byte, error) {
+       if v.queue == nil {
+               return v.Read(loc)
+       }
+       reply := make(chan *IOResponse)
+       v.queue <- &IORequest{KeepGet, loc, nil, reply}
+       response := <-reply
+       return response.data, response.err
+}
+
+func (v *UnixVolume) Put(loc string, block []byte) error {
+       if v.queue == nil {
+               return v.Write(loc, block)
+       }
+       reply := make(chan *IOResponse)
+       v.queue <- &IORequest{KeepPut, loc, block, reply}
+       response := <-reply
+       return response.err
 }
 
 // Read retrieves a block identified by the locator string "loc", and
@@ -230,3 +299,7 @@ func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
        }
        return
 }
+
+func (v *UnixVolume) String() string {
+       return fmt.Sprintf("[UnixVolume %s]", v.root)
+}
index 9248627d83b6505fd42c37b1bfc438b8f26e490c..e1c628e94006750cc340096e4214f93dda5134e7 100644 (file)
@@ -9,15 +9,18 @@ import (
        "time"
 )
 
-func TempUnixVolume(t *testing.T) UnixVolume {
+func TempUnixVolume(t *testing.T, queue chan *IORequest) UnixVolume {
        d, err := ioutil.TempDir("", "volume_test")
        if err != nil {
                t.Fatal(err)
        }
-       return UnixVolume{d}
+       return MakeUnixVolume(d, queue)
 }
 
 func _teardown(v UnixVolume) {
+       if v.queue != nil {
+               close(v.queue)
+       }
        os.RemoveAll(v.root)
 }
 
@@ -39,12 +42,12 @@ func _store(t *testing.T, vol UnixVolume, filename string, block []byte) {
        }
 }
 
-func TestRead(t *testing.T) {
-       v := TempUnixVolume(t)
+func TestGet(t *testing.T) {
+       v := TempUnixVolume(t, nil)
        defer _teardown(v)
        _store(t, v, TEST_HASH, TEST_BLOCK)
 
-       buf, err := v.Read(TEST_HASH)
+       buf, err := v.Get(TEST_HASH)
        if err != nil {
                t.Error(err)
        }
@@ -53,12 +56,12 @@ func TestRead(t *testing.T) {
        }
 }
 
-func TestReadNotFound(t *testing.T) {
-       v := TempUnixVolume(t)
+func TestGetNotFound(t *testing.T) {
+       v := TempUnixVolume(t, nil)
        defer _teardown(v)
        _store(t, v, TEST_HASH, TEST_BLOCK)
 
-       buf, err := v.Read(TEST_HASH_2)
+       buf, err := v.Get(TEST_HASH_2)
        switch {
        case os.IsNotExist(err):
                break
@@ -69,11 +72,11 @@ func TestReadNotFound(t *testing.T) {
        }
 }
 
-func TestWrite(t *testing.T) {
-       v := TempUnixVolume(t)
+func TestPut(t *testing.T) {
+       v := TempUnixVolume(t, nil)
        defer _teardown(v)
 
-       err := v.Write(TEST_HASH, TEST_BLOCK)
+       err := v.Put(TEST_HASH, TEST_BLOCK)
        if err != nil {
                t.Error(err)
        }
@@ -86,19 +89,135 @@ func TestWrite(t *testing.T) {
        }
 }
 
-func TestWriteBadVolume(t *testing.T) {
-       v := TempUnixVolume(t)
+func TestPutBadVolume(t *testing.T) {
+       v := TempUnixVolume(t, nil)
        defer _teardown(v)
 
        os.Chmod(v.root, 000)
-       err := v.Write(TEST_HASH, TEST_BLOCK)
+       err := v.Put(TEST_HASH, TEST_BLOCK)
        if err == nil {
                t.Error("Write should have failed")
        }
 }
 
+// Serialization tests.
+//
+// TODO(twp): a proper test of I/O serialization requires that
+// a second request start while the first one is still executing.
+// Doing this correctly requires some tricky synchronization.
+// For now we'll just launch a bunch of requests in goroutines
+// and demonstrate that they return accurate results.
+//
+func TestGetSerialized(t *testing.T) {
+       v := TempUnixVolume(t, make(chan *IORequest))
+       defer _teardown(v)
+
+       _store(t, v, TEST_HASH, TEST_BLOCK)
+       _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
+       _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
+
+       sem := make(chan int)
+       go func(sem chan int) {
+               buf, err := v.Get(TEST_HASH)
+               if err != nil {
+                       t.Errorf("err1: %v", err)
+               }
+               if bytes.Compare(buf, TEST_BLOCK) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               buf, err := v.Get(TEST_HASH_2)
+               if err != nil {
+                       t.Errorf("err2: %v", err)
+               }
+               if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               buf, err := v.Get(TEST_HASH_3)
+               if err != nil {
+                       t.Errorf("err3: %v", err)
+               }
+               if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
+               }
+               sem <- 1
+       }(sem)
+
+       // Wait for all goroutines to finish
+       for done := 0; done < 2; {
+               done += <-sem
+       }
+}
+
+func TestPutSerialized(t *testing.T) {
+       v := TempUnixVolume(t, make(chan *IORequest))
+       defer _teardown(v)
+
+       sem := make(chan int)
+       go func(sem chan int) {
+               err := v.Put(TEST_HASH, TEST_BLOCK)
+               if err != nil {
+                       t.Errorf("err1: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
+               if err != nil {
+                       t.Errorf("err2: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       go func(sem chan int) {
+               err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
+               if err != nil {
+                       t.Errorf("err3: %v", err)
+               }
+               sem <- 1
+       }(sem)
+
+       // Wait for all goroutines to finish
+       for done := 0; done < 2; {
+               done += <-sem
+       }
+
+       // Double check that we actually wrote the blocks we expected to write.
+       buf, err := v.Get(TEST_HASH)
+       if err != nil {
+               t.Errorf("Get #1: %v", err)
+       }
+       if bytes.Compare(buf, TEST_BLOCK) != 0 {
+               t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
+       }
+
+       buf, err = v.Get(TEST_HASH_2)
+       if err != nil {
+               t.Errorf("Get #2: %v", err)
+       }
+       if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
+               t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
+       }
+
+       buf, err = v.Get(TEST_HASH_3)
+       if err != nil {
+               t.Errorf("Get #3: %v", err)
+       }
+       if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
+               t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
+       }
+}
+
 func TestIsFull(t *testing.T) {
-       v := TempUnixVolume(t)
+       v := TempUnixVolume(t, nil)
        defer _teardown(v)
 
        full_path := v.root + "/full"