Merge branch 'master' into 2449-keep-write-blocks
authorTim Pierce <twp@curoverse.com>
Fri, 4 Apr 2014 19:51:43 +0000 (15:51 -0400)
committerTim Pierce <twp@curoverse.com>
Fri, 4 Apr 2014 19:51:43 +0000 (15:51 -0400)
services/keep/keep.go
services/keep/keep_test.go

index 345f95d10b375c124e6b3cff1449f3aa865b9743..daa967bace5aaf4ba29072f7304e51a38a1ca689 100644 (file)
@@ -19,6 +19,15 @@ var PROC_MOUNTS = "/proc/mounts"
 
 var KeepVolumes []string
 
+type KeepError struct {
+       HTTPCode int
+       Err      error
+}
+
+func (e *KeepError) Error() string {
+       return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
+}
+
 func main() {
        // Look for local keep volumes.
        KeepVolumes = FindKeepVolumes()
@@ -36,6 +45,7 @@ func main() {
        //
        rest := mux.NewRouter()
        rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
+       rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
 
        // Tell the built-in HTTP server to direct all requests to the REST
        // router.
@@ -93,6 +103,27 @@ func GetBlockHandler(w http.ResponseWriter, req *http.Request) {
        return
 }
 
+func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
+       hash := mux.Vars(req)["hash"]
+
+       // Read the block data to be stored.
+       // TODO(twp): decide what to do when the input stream contains
+       // more than BLOCKSIZE bytes.
+       //
+       buf := make([]byte, BLOCKSIZE)
+       if nread, err := req.Body.Read(buf); err == nil {
+               if err := PutBlock(buf[:nread], hash); err == nil {
+                       w.WriteHeader(http.StatusOK)
+               } else {
+                       ke := err.(*KeepError)
+                       http.Error(w, ke.Error(), ke.HTTPCode)
+               }
+       } else {
+               log.Println("error reading request: ", err)
+               http.Error(w, err.Error(), 500)
+       }
+}
+
 func GetBlock(hash string) ([]byte, error) {
        var buf = make([]byte, BLOCKSIZE)
 
@@ -102,17 +133,21 @@ func GetBlock(hash string) ([]byte, error) {
                var err error
                var nread int
 
-               path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
+               blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
 
-               f, err = os.Open(path)
+               f, err = os.Open(blockFilename)
                if err != nil {
-                       log.Printf("%s: opening %s: %s\n", vol, path, err)
+                       if !os.IsNotExist(err) {
+                               // A block is stored on only one Keep disk,
+                               // so os.IsNotExist is expected.  Report any other errors.
+                               log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
+                       }
                        continue
                }
 
                nread, err = f.Read(buf)
                if err != nil {
-                       log.Printf("%s: reading %s: %s\n", vol, path, err)
+                       log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
                        continue
                }
 
@@ -126,7 +161,7 @@ func GetBlock(hash string) ([]byte, error) {
                        // priority or logged as urgent problems.
                        //
                        log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
-                               vol, path, filehash)
+                               vol, blockFilename, filehash)
                        continue
                }
 
@@ -135,5 +170,77 @@ func GetBlock(hash string) ([]byte, error) {
        }
 
        log.Printf("%s: not found on any volumes, giving up\n", hash)
-       return buf, errors.New("not found: " + hash)
+       return buf, &KeepError{404, errors.New("not found: " + hash)}
+}
+
+/* PutBlock(block, hash)
+   Stores the BLOCK (identified by the content id HASH) in Keep.
+
+   The MD5 checksum of the block must be identical to the content id HASH.
+   If not, an error is returned.
+
+   PutBlock stores the BLOCK on the first Keep volume with free space.
+   A failure code is returned to the user only if all volumes fail.
+
+   On success, PutBlock returns nil.
+   On failure, it returns a KeepError with one of the following codes:
+
+   401 MD5Fail
+         -- The MD5 hash of the BLOCK does not match the argument HASH.
+   503 Full
+         -- There was not enough space left in any Keep volume to store
+            the object.
+   500 Fail
+         -- The object could not be stored for some other reason (e.g.
+            all writes failed). The text of the error message should
+            provide as much detail as possible.
+*/
+
+func PutBlock(block []byte, hash string) error {
+       // Check that BLOCK's checksum matches HASH.
+       blockhash := fmt.Sprintf("%x", md5.Sum(block))
+       if blockhash != hash {
+               log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
+               return &KeepError{401, errors.New("MD5Fail")}
+       }
+
+       for _, vol := range KeepVolumes {
+
+               // TODO(twp): check for a full volume here before trying to write.
+
+               blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
+               if err := os.MkdirAll(blockDir, 0755); err != nil {
+                       log.Printf("%s: could not create directory %s: %s",
+                               hash, blockDir, err)
+                       continue
+               }
+
+               blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
+               f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
+               if err != nil {
+                       // if the block already exists, just return success.
+                       // TODO(twp): should we check here whether the file on disk
+                       // matches the file we were asked to store?
+                       if os.IsExist(err) {
+                               return nil
+                       } else {
+                               // Open failed for some other reason.
+                               log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
+                               continue
+                       }
+               }
+
+               if _, err := f.Write(block); err == nil {
+                       f.Close()
+                       return nil
+               } else {
+                       log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
+                       continue
+               }
+       }
+
+       // All volumes failed; report the failure and return an error.
+       //
+       log.Printf("all Keep volumes failed")
+       return &KeepError{500, errors.New("Fail")}
 }
index 437e92cac7d45439d886cc4bfc2c3792c7b7bbd7..bbff19d0fbac0ca272e2a5a11a160589b1f22bc1 100644 (file)
@@ -12,33 +12,17 @@ var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
 
-// Test simple block reads.
-func TestGetBlockOK(t *testing.T) {
-       defer teardown()
-
-       // Create two test Keep volumes and store a block in each of them.
-       KeepVolumes = setup(t, 2)
-       fmt.Println("KeepVolumes = ", KeepVolumes)
-
-       for _, vol := range KeepVolumes {
-               store(t, vol, TEST_HASH, TEST_BLOCK)
-       }
-
-       // Check that GetBlock returns success.
-       result, err := GetBlock(TEST_HASH)
-       if err != nil {
-               t.Errorf("GetBlock error: %s", err)
-       }
-       if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
-               t.Errorf("expected %s, got %s", TEST_BLOCK, result)
-       }
-}
+// ========================================
+// GetBlock tests.
+// ========================================
 
-// Test block reads when one Keep volume is missing.
-func TestGetBlockOneKeepOK(t *testing.T) {
+// TestGetBlock
+//     Test that simple block reads succeed.
+//
+func TestGetBlock(t *testing.T) {
        defer teardown()
 
-       // Two test Keep volumes, only the second has a block.
+       // Prepare two test Keep volumes. Our block is stored on the second volume.
        KeepVolumes = setup(t, 2)
        store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
 
@@ -52,8 +36,10 @@ func TestGetBlockOneKeepOK(t *testing.T) {
        }
 }
 
-// Test block read failure.
-func TestGetBlockFail(t *testing.T) {
+// TestGetBlockMissing
+//     GetBlock must return an error when the block is not found.
+//
+func TestGetBlockMissing(t *testing.T) {
        defer teardown()
 
        // Create two empty test Keep volumes.
@@ -66,7 +52,10 @@ func TestGetBlockFail(t *testing.T) {
        }
 }
 
-// Test reading a corrupt block.
+// TestGetBlockCorrupt
+//     GetBlock must return an error when a corrupted block is requested
+//     (the contents of the file do not checksum to its hash).
+//
 func TestGetBlockCorrupt(t *testing.T) {
        defer teardown()
 
@@ -84,7 +73,94 @@ func TestGetBlockCorrupt(t *testing.T) {
        }
 }
 
-// Test finding Keep volumes.
+// ========================================
+// PutBlock tests
+// ========================================
+
+// TestPutBlockOK
+//     PutBlock can perform a simple block write and returns success.
+//
+func TestPutBlockOK(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes.
+       KeepVolumes = setup(t, 2)
+
+       // Check that PutBlock stores the data as expected.
+       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+               t.Fatalf("PutBlock: %v", err)
+       }
+
+       result, err := GetBlock(TEST_HASH)
+       if err != nil {
+               t.Fatalf("GetBlock: %s", err.Error())
+       }
+       if string(result) != string(TEST_BLOCK) {
+               t.Error("PutBlock/GetBlock mismatch")
+               t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
+                       string(TEST_BLOCK), string(result))
+       }
+}
+
+// TestPutBlockOneVol
+//     PutBlock still returns success even when only one of the known
+//     volumes is online.
+//
+func TestPutBlockOneVol(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes, but cripple one of them.
+       KeepVolumes = setup(t, 2)
+       os.Chmod(KeepVolumes[0], 000)
+
+       // Check that PutBlock stores the data as expected.
+       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+               t.Fatalf("PutBlock: %v", err)
+       }
+
+       result, err := GetBlock(TEST_HASH)
+       if err != nil {
+               t.Fatalf("GetBlock: %s", err.Error())
+       }
+       if string(result) != string(TEST_BLOCK) {
+               t.Error("PutBlock/GetBlock mismatch")
+               t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
+                       string(TEST_BLOCK), string(result))
+       }
+}
+
+// TestPutBlockCorrupt
+//     Check that PutBlock returns an error if passed a block and hash that
+//     do not match.
+//
+func TestPutBlockCorrupt(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes.
+       KeepVolumes = setup(t, 2)
+
+       // Check that PutBlock returns the expected error when the hash does
+       // not match the block.
+       if err := PutBlock(BAD_BLOCK, TEST_HASH); err == nil {
+               t.Error("PutBlock succeeded despite a block mismatch")
+       } else {
+               ke := err.(*KeepError)
+               if ke.HTTPCode != 401 || ke.Err.Error() != "MD5Fail" {
+                       t.Errorf("PutBlock returned the wrong error (%v)", ke)
+               }
+       }
+
+       // Confirm that GetBlock fails to return anything.
+       if result, err := GetBlock(TEST_HASH); err == nil {
+               t.Errorf("GetBlock succeded after a corrupt block store, returned '%s'",
+                       string(result))
+       }
+}
+
+// TestFindKeepVolumes
+//     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
+//     directories at the top level.
+//
 func TestFindKeepVolumes(t *testing.T) {
        defer teardown()
 
@@ -116,8 +192,9 @@ func TestFindKeepVolumes(t *testing.T) {
        }
 }
 
-// Test that FindKeepVolumes returns an empty slice when no Keep volumes
-// are present.
+// TestFindKeepVolumesFail
+//     When no Keep volumes are present, FindKeepVolumes returns an empty slice.
+//
 func TestFindKeepVolumesFail(t *testing.T) {
        defer teardown()
 
@@ -141,6 +218,10 @@ func TestFindKeepVolumesFail(t *testing.T) {
        }
 }
 
+// ========================================
+// Helper functions for unit tests.
+// ========================================
+
 // setup
 //     Create KeepVolumes for testing.
 //     Returns a slice of pathnames to temporary Keep volumes.
@@ -168,6 +249,7 @@ func teardown() {
 }
 
 // store
+//     Low-level code to write Keep blocks directly to disk for testing.
 //
 func store(t *testing.T, keepdir string, filename string, block []byte) error {
        blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])