Merge branch '2291-new-keepd-read-blocks' (fixes #2291)
authorTim Pierce <twp@curoverse.com>
Wed, 26 Mar 2014 21:34:50 +0000 (17:34 -0400)
committerTim Pierce <twp@curoverse.com>
Wed, 26 Mar 2014 21:34:50 +0000 (17:34 -0400)
services/keep/keep.go [new file with mode: 0644]
services/keep/keep_test.go [new file with mode: 0644]

diff --git a/services/keep/keep.go b/services/keep/keep.go
new file mode 100644 (file)
index 0000000..06a81b6
--- /dev/null
@@ -0,0 +1,137 @@
+package main
+
+import (
+       "bufio"
+       "crypto/md5"
+       "errors"
+       "fmt"
+       "github.com/gorilla/mux"
+       "log"
+       "net/http"
+       "os"
+       "strings"
+)
+
+const DEFAULT_PORT = 25107
+const BLOCKSIZE = 64 * 1024 * 1024
+
+var KeepVolumes []string
+
+func main() {
+       // Look for local keep volumes.
+       KeepVolumes = FindKeepVolumes()
+       if len(KeepVolumes) == 0 {
+               log.Fatal("could not find any keep volumes")
+       }
+       for _, v := range KeepVolumes {
+               log.Println("keep volume:", v)
+       }
+
+       // Set up REST handlers.
+       //
+       // Start with a router that will route each URL path to an
+       // appropriate handler.
+       //
+       rest := mux.NewRouter()
+       rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
+
+       // Tell the built-in HTTP server to direct all requests to the REST
+       // router.
+       http.Handle("/", rest)
+
+       // Start listening for requests.
+       port := fmt.Sprintf(":%d", DEFAULT_PORT)
+       http.ListenAndServe(port, nil)
+}
+
+// FindKeepVolumes
+//     Returns a list of Keep volumes mounted on this system.
+//
+//     A Keep volume is a normal or tmpfs volume with a /keep
+//     directory at the top level of the mount point.
+//
+func FindKeepVolumes() []string {
+       vols := make([]string, 0)
+
+       if f, err := os.Open("/proc/mounts"); err != nil {
+               log.Fatal("could not read /proc/mounts: ", err)
+       } else {
+               scanner := bufio.NewScanner(f)
+               for scanner.Scan() {
+                       args := strings.Fields(scanner.Text())
+                       dev, mount := args[0], args[1]
+                       if (dev == "tmpfs" || strings.HasPrefix(dev, "/dev/")) && mount != "/" {
+                               keep := mount + "/keep"
+                               if st, err := os.Stat(keep); err == nil && st.IsDir() {
+                                       vols = append(vols, keep)
+                               }
+                       }
+               }
+               if err := scanner.Err(); err != nil {
+                       log.Fatal(err)
+               }
+       }
+       return vols
+}
+
+func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
+       hash := mux.Vars(req)["hash"]
+
+       block, err := GetBlock(hash)
+       if err != nil {
+               http.Error(w, err.Error(), 404)
+               return
+       }
+
+       _, err = w.Write(block)
+       if err != nil {
+               log.Printf("GetBlockHandler: writing response: %s", err)
+       }
+
+       return
+}
+
+func GetBlock(hash string) ([]byte, error) {
+       var buf = make([]byte, BLOCKSIZE)
+
+       // Attempt to read the requested hash from a keep volume.
+       for _, vol := range KeepVolumes {
+               var f *os.File
+               var err error
+               var nread int
+
+               path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
+
+               f, err = os.Open(path)
+               if err != nil {
+                       log.Printf("%s: opening %s: %s\n", vol, path, err)
+                       continue
+               }
+
+               nread, err = f.Read(buf)
+               if err != nil {
+                       log.Printf("%s: reading %s: %s\n", vol, path, err)
+                       continue
+               }
+
+               // Double check the file checksum.
+               //
+               filehash := fmt.Sprintf("%x", md5.Sum(buf[:nread]))
+               if filehash != hash {
+                       // TODO(twp): this condition probably represents a bad disk and
+                       // should raise major alarm bells for an administrator: e.g.
+                       // they should be sent directly to an event manager at high
+                       // priority or logged as urgent problems.
+                       //
+                       log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
+                               vol, path, filehash)
+                       continue
+               }
+
+               // Success!
+               return buf[:nread], nil
+       }
+
+       log.Printf("%s: not found on any volumes, giving up\n", hash)
+       return buf, errors.New("not found: " + hash)
+}
diff --git a/services/keep/keep_test.go b/services/keep/keep_test.go
new file mode 100644 (file)
index 0000000..5e3b763
--- /dev/null
@@ -0,0 +1,125 @@
+package main
+
+import (
+       "fmt"
+       "io/ioutil"
+       "os"
+       "path"
+       "testing"
+)
+
+var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
+var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
+var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
+
+// Test simple block reads.
+func TestGetBlockOK(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes and store a block in each of them.
+       setup(t, 2)
+       for _, vol := range KeepVolumes {
+               store(t, vol, TEST_HASH, TEST_BLOCK)
+       }
+
+       // Check that GetBlock returns success.
+       result, err := GetBlock(TEST_HASH)
+       if err != nil {
+               t.Errorf("GetBlock error: %s", err)
+       }
+       if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
+               t.Errorf("expected %s, got %s", TEST_BLOCK, result)
+       }
+}
+
+// Test block reads when one Keep volume is missing.
+func TestGetBlockOneKeepOK(t *testing.T) {
+       defer teardown()
+
+       // Two test Keep volumes, only the second has a block.
+       setup(t, 2)
+       store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
+
+       // Check that GetBlock returns success.
+       result, err := GetBlock(TEST_HASH)
+       if err != nil {
+               t.Errorf("GetBlock error: %s", err)
+       }
+       if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
+               t.Errorf("expected %s, got %s", TEST_BLOCK, result)
+       }
+}
+
+// Test block read failure.
+func TestGetBlockFail(t *testing.T) {
+       defer teardown()
+
+       // Create two empty test Keep volumes.
+       setup(t, 2)
+
+       // Check that GetBlock returns failure.
+       result, err := GetBlock(TEST_HASH)
+       if err == nil {
+               t.Errorf("GetBlock incorrectly returned success: ", result)
+       }
+}
+
+// Test reading a corrupt block.
+func TestGetBlockCorrupt(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes and store a block in each of them,
+       // but the hash of the block does not match the filename.
+       setup(t, 2)
+       for _, vol := range KeepVolumes {
+               store(t, vol, TEST_HASH, BAD_BLOCK)
+       }
+
+       // Check that GetBlock returns failure.
+       result, err := GetBlock(TEST_HASH)
+       if err == nil {
+               t.Errorf("GetBlock incorrectly returned success: %s", result)
+       }
+}
+
+// setup
+//     Create KeepVolumes for testing.
+//
+func setup(t *testing.T, num_volumes int) {
+       KeepVolumes = make([]string, num_volumes)
+       for i := range KeepVolumes {
+               if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
+                       KeepVolumes[i] = dir + "/keep"
+               } else {
+                       t.Fatal(err)
+               }
+       }
+}
+
+// teardown
+//     Cleanup to perform after each test.
+//
+func teardown() {
+       for _, vol := range KeepVolumes {
+               os.RemoveAll(path.Dir(vol))
+       }
+}
+
+// store
+//
+func store(t *testing.T, keepdir string, filename string, block []byte) error {
+       blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
+       if err := os.MkdirAll(blockdir, 0755); err != nil {
+               t.Fatal(err)
+       }
+
+       blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
+       if f, err := os.Create(blockpath); err == nil {
+               f.Write(block)
+               f.Close()
+       } else {
+               t.Fatal(err)
+       }
+
+       return nil
+}