var KeepVolumes []string
+type KeepError struct {
+ HTTPCode int
+ Err error
+}
+
+func (e *KeepError) Error() string {
+ return fmt.Sprintf("Error %d: %s", e.HTTPCode, e.Err.Error())
+}
+
func main() {
// Look for local keep volumes.
KeepVolumes = FindKeepVolumes()
//
rest := mux.NewRouter()
rest.HandleFunc("/{hash:[0-9a-f]{32}}", GetBlockHandler).Methods("GET")
+ rest.HandleFunc("/{hash:[0-9a-f]{32}}", PutBlockHandler).Methods("PUT")
// Tell the built-in HTTP server to direct all requests to the REST
// router.
return
}
+func PutBlockHandler(w http.ResponseWriter, req *http.Request) {
+ hash := mux.Vars(req)["hash"]
+
+ // Read the block data to be stored.
+ // TODO(twp): decide what to do when the input stream contains
+ // more than BLOCKSIZE bytes.
+ //
+ buf := make([]byte, BLOCKSIZE)
+ if nread, err := req.Body.Read(buf); err == nil {
+ if err := PutBlock(buf[:nread], hash); err == nil {
+ w.WriteHeader(http.StatusOK)
+ } else {
+ ke := err.(*KeepError)
+ http.Error(w, ke.Error(), ke.HTTPCode)
+ }
+ } else {
+ log.Println("error reading request: ", err)
+ http.Error(w, err.Error(), 500)
+ }
+}
+
func GetBlock(hash string) ([]byte, error) {
var buf = make([]byte, BLOCKSIZE)
var err error
var nread int
- path := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
+ blockFilename := fmt.Sprintf("%s/%s/%s", vol, hash[0:3], hash)
- f, err = os.Open(path)
+ f, err = os.Open(blockFilename)
if err != nil {
- log.Printf("%s: opening %s: %s\n", vol, path, err)
+ if !os.IsNotExist(err) {
+ // A block is stored on only one Keep disk,
+ // so os.IsNotExist is expected. Report any other errors.
+ log.Printf("%s: opening %s: %s\n", vol, blockFilename, err)
+ }
continue
}
nread, err = f.Read(buf)
if err != nil {
- log.Printf("%s: reading %s: %s\n", vol, path, err)
+ log.Printf("%s: reading %s: %s\n", vol, blockFilename, err)
continue
}
// priority or logged as urgent problems.
//
log.Printf("%s: checksum mismatch: %s (actual hash %s)\n",
- vol, path, filehash)
+ vol, blockFilename, filehash)
continue
}
}
log.Printf("%s: not found on any volumes, giving up\n", hash)
- return buf, errors.New("not found: " + hash)
+ return buf, &KeepError{404, errors.New("not found: " + hash)}
+}
+
+/* PutBlock(block, hash)
+ Stores the BLOCK (identified by the content id HASH) in Keep.
+
+ The MD5 checksum of the block must be identical to the content id HASH.
+ If not, an error is returned.
+
+ PutBlock stores the BLOCK on the first Keep volume with free space.
+ A failure code is returned to the user only if all volumes fail.
+
+ On success, PutBlock returns nil.
+ On failure, it returns a KeepError with one of the following codes:
+
+ 401 MD5Fail
+ -- The MD5 hash of the BLOCK does not match the argument HASH.
+ 503 Full
+ -- There was not enough space left in any Keep volume to store
+ the object.
+ 500 Fail
+ -- The object could not be stored for some other reason (e.g.
+ all writes failed). The text of the error message should
+ provide as much detail as possible.
+*/
+
+func PutBlock(block []byte, hash string) error {
+ // Check that BLOCK's checksum matches HASH.
+ blockhash := fmt.Sprintf("%x", md5.Sum(block))
+ if blockhash != hash {
+ log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
+ return &KeepError{401, errors.New("MD5Fail")}
+ }
+
+ for _, vol := range KeepVolumes {
+
+ // TODO(twp): check for a full volume here before trying to write.
+
+ blockDir := fmt.Sprintf("%s/%s", vol, hash[0:3])
+ if err := os.MkdirAll(blockDir, 0755); err != nil {
+ log.Printf("%s: could not create directory %s: %s",
+ hash, blockDir, err)
+ continue
+ }
+
+ blockFilename := fmt.Sprintf("%s/%s", blockDir, hash)
+ f, err := os.OpenFile(blockFilename, os.O_CREATE|os.O_WRONLY, 0644)
+ if err != nil {
+ // if the block already exists, just return success.
+ // TODO(twp): should we check here whether the file on disk
+ // matches the file we were asked to store?
+ if os.IsExist(err) {
+ return nil
+ } else {
+ // Open failed for some other reason.
+ log.Printf("%s: creating %s: %s\n", vol, blockFilename, err)
+ continue
+ }
+ }
+
+ if _, err := f.Write(block); err == nil {
+ f.Close()
+ return nil
+ } else {
+ log.Printf("%s: writing to %s: %s\n", vol, blockFilename, err)
+ continue
+ }
+ }
+
+ // All volumes failed; report the failure and return an error.
+ //
+ log.Printf("all Keep volumes failed")
+ return &KeepError{500, errors.New("Fail")}
}
var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
-// Test simple block reads.
-func TestGetBlockOK(t *testing.T) {
- defer teardown()
-
- // Create two test Keep volumes and store a block in each of them.
- KeepVolumes = setup(t, 2)
- fmt.Println("KeepVolumes = ", KeepVolumes)
-
- for _, vol := range KeepVolumes {
- store(t, vol, TEST_HASH, TEST_BLOCK)
- }
-
- // Check that GetBlock returns success.
- result, err := GetBlock(TEST_HASH)
- if err != nil {
- t.Errorf("GetBlock error: %s", err)
- }
- if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
- t.Errorf("expected %s, got %s", TEST_BLOCK, result)
- }
-}
+// ========================================
+// GetBlock tests.
+// ========================================
-// Test block reads when one Keep volume is missing.
-func TestGetBlockOneKeepOK(t *testing.T) {
+// TestGetBlock
+// Test that simple block reads succeed.
+//
+func TestGetBlock(t *testing.T) {
defer teardown()
- // Two test Keep volumes, only the second has a block.
+ // Prepare two test Keep volumes. Our block is stored on the second volume.
KeepVolumes = setup(t, 2)
store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
}
}
-// Test block read failure.
-func TestGetBlockFail(t *testing.T) {
+// TestGetBlockMissing
+// GetBlock must return an error when the block is not found.
+//
+func TestGetBlockMissing(t *testing.T) {
defer teardown()
// Create two empty test Keep volumes.
}
}
-// Test reading a corrupt block.
+// TestGetBlockCorrupt
+// GetBlock must return an error when a corrupted block is requested
+// (the contents of the file do not checksum to its hash).
+//
func TestGetBlockCorrupt(t *testing.T) {
defer teardown()
}
}
-// Test finding Keep volumes.
+// ========================================
+// PutBlock tests
+// ========================================
+
+// TestPutBlockOK
+// PutBlock can perform a simple block write and returns success.
+//
+func TestPutBlockOK(t *testing.T) {
+ defer teardown()
+
+ // Create two test Keep volumes.
+ KeepVolumes = setup(t, 2)
+
+ // Check that PutBlock stores the data as expected.
+ if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+ t.Fatalf("PutBlock: %v", err)
+ }
+
+ result, err := GetBlock(TEST_HASH)
+ if err != nil {
+ t.Fatalf("GetBlock: %s", err.Error())
+ }
+ if string(result) != string(TEST_BLOCK) {
+ t.Error("PutBlock/GetBlock mismatch")
+ t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
+ string(TEST_BLOCK), string(result))
+ }
+}
+
+// TestPutBlockOneVol
+// PutBlock still returns success even when only one of the known
+// volumes is online.
+//
+func TestPutBlockOneVol(t *testing.T) {
+ defer teardown()
+
+ // Create two test Keep volumes, but cripple one of them.
+ KeepVolumes = setup(t, 2)
+ os.Chmod(KeepVolumes[0], 000)
+
+ // Check that PutBlock stores the data as expected.
+ if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+ t.Fatalf("PutBlock: %v", err)
+ }
+
+ result, err := GetBlock(TEST_HASH)
+ if err != nil {
+ t.Fatalf("GetBlock: %s", err.Error())
+ }
+ if string(result) != string(TEST_BLOCK) {
+ t.Error("PutBlock/GetBlock mismatch")
+ t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
+ string(TEST_BLOCK), string(result))
+ }
+}
+
+// TestPutBlockCorrupt
+// Check that PutBlock returns an error if passed a block and hash that
+// do not match.
+//
+func TestPutBlockCorrupt(t *testing.T) {
+ defer teardown()
+
+ // Create two test Keep volumes.
+ KeepVolumes = setup(t, 2)
+
+ // Check that PutBlock returns the expected error when the hash does
+ // not match the block.
+ if err := PutBlock(BAD_BLOCK, TEST_HASH); err == nil {
+ t.Error("PutBlock succeeded despite a block mismatch")
+ } else {
+ ke := err.(*KeepError)
+ if ke.HTTPCode != 401 || ke.Err.Error() != "MD5Fail" {
+ t.Errorf("PutBlock returned the wrong error (%v)", ke)
+ }
+ }
+
+ // Confirm that GetBlock fails to return anything.
+ if result, err := GetBlock(TEST_HASH); err == nil {
+ t.Errorf("GetBlock succeded after a corrupt block store, returned '%s'",
+ string(result))
+ }
+}
+
+// TestFindKeepVolumes
+// Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
+// directories at the top level.
+//
func TestFindKeepVolumes(t *testing.T) {
defer teardown()
}
}
-// Test that FindKeepVolumes returns an empty slice when no Keep volumes
-// are present.
+// TestFindKeepVolumesFail
+// When no Keep volumes are present, FindKeepVolumes returns an empty slice.
+//
func TestFindKeepVolumesFail(t *testing.T) {
defer teardown()
}
}
+// ========================================
+// Helper functions for unit tests.
+// ========================================
+
// setup
// Create KeepVolumes for testing.
// Returns a slice of pathnames to temporary Keep volumes.
}
// store
+// Low-level code to write Keep blocks directly to disk for testing.
//
func store(t *testing.T, keepdir string, filename string, block []byte) error {
blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])