Merge branch 'master' into 1971-show-image-thumbnails
[arvados.git] / services / keep / keep_test.go
index 5e3b763d0f0bdc8a267b2c497695e63f9da0f8d7..348445e78d105a79a136ea010c6139aad9ae4beb 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "fmt"
        "io/ioutil"
        "os"
@@ -12,32 +13,32 @@ var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
 
-// Test simple block reads.
-func TestGetBlockOK(t *testing.T) {
-       defer teardown()
-
-       // Create two test Keep volumes and store a block in each of them.
-       setup(t, 2)
-       for _, vol := range KeepVolumes {
-               store(t, vol, TEST_HASH, TEST_BLOCK)
-       }
-
-       // Check that GetBlock returns success.
-       result, err := GetBlock(TEST_HASH)
-       if err != nil {
-               t.Errorf("GetBlock error: %s", err)
-       }
-       if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
-               t.Errorf("expected %s, got %s", TEST_BLOCK, result)
-       }
-}
+// TODO(twp): Tests still to be written
+//
+//   * PutBlockFull
+//       - test that PutBlock returns 503 Full if the filesystem is full.
+//         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
+//
+//   * PutBlockWriteErr
+//       - test the behavior when Write returns an error.
+//           - Possible solutions: use a small tmpfs and a high
+//             MIN_FREE_KILOBYTES to trick PutBlock into attempting
+//             to write a block larger than the amount of space left
+//           - use an interface to mock ioutil.TempFile with a File
+//             object that always returns an error on write
+//
+// ========================================
+// GetBlock tests.
+// ========================================
 
-// Test block reads when one Keep volume is missing.
-func TestGetBlockOneKeepOK(t *testing.T) {
+// TestGetBlock
+//     Test that simple block reads succeed.
+//
+func TestGetBlock(t *testing.T) {
        defer teardown()
 
-       // Two test Keep volumes, only the second has a block.
-       setup(t, 2)
+       // Prepare two test Keep volumes. Our block is stored on the second volume.
+       KeepVolumes = setup(t, 2)
        store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
 
        // Check that GetBlock returns success.
@@ -50,50 +51,254 @@ func TestGetBlockOneKeepOK(t *testing.T) {
        }
 }
 
-// Test block read failure.
-func TestGetBlockFail(t *testing.T) {
+// TestGetBlockMissing
+//     GetBlock must return an error when the block is not found.
+//
+func TestGetBlockMissing(t *testing.T) {
        defer teardown()
 
        // Create two empty test Keep volumes.
-       setup(t, 2)
+       KeepVolumes = setup(t, 2)
 
        // Check that GetBlock returns failure.
        result, err := GetBlock(TEST_HASH)
-       if err == nil {
-               t.Errorf("GetBlock incorrectly returned success: ", result)
+       if err != NotFoundError {
+               t.Errorf("Expected NotFoundError, got %v", result)
        }
 }
 
-// Test reading a corrupt block.
+// TestGetBlockCorrupt
+//     GetBlock must return an error when a corrupted block is requested
+//     (the contents of the file do not checksum to its hash).
+//
 func TestGetBlockCorrupt(t *testing.T) {
        defer teardown()
 
        // Create two test Keep volumes and store a block in each of them,
        // but the hash of the block does not match the filename.
-       setup(t, 2)
+       KeepVolumes = setup(t, 2)
        for _, vol := range KeepVolumes {
                store(t, vol, TEST_HASH, BAD_BLOCK)
        }
 
        // Check that GetBlock returns failure.
        result, err := GetBlock(TEST_HASH)
-       if err == nil {
-               t.Errorf("GetBlock incorrectly returned success: %s", result)
+       if err != CorruptError {
+               t.Errorf("Expected CorruptError, got %v", result)
        }
 }
 
+// ========================================
+// PutBlock tests
+// ========================================
+
+// TestPutBlockOK
+//     PutBlock can perform a simple block write and returns success.
+//
+func TestPutBlockOK(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes.
+       KeepVolumes = setup(t, 2)
+
+       // Check that PutBlock stores the data as expected.
+       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+               t.Fatalf("PutBlock: %v", err)
+       }
+
+       result, err := GetBlock(TEST_HASH)
+       if err != nil {
+               t.Fatalf("GetBlock returned error: %v", err)
+       }
+       if string(result) != string(TEST_BLOCK) {
+               t.Error("PutBlock/GetBlock mismatch")
+               t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
+                       string(TEST_BLOCK), string(result))
+       }
+}
+
+// TestPutBlockOneVol
+//     PutBlock still returns success even when only one of the known
+//     volumes is online.
+//
+func TestPutBlockOneVol(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes, but cripple one of them.
+       KeepVolumes = setup(t, 2)
+       os.Chmod(KeepVolumes[0], 000)
+
+       // Check that PutBlock stores the data as expected.
+       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+               t.Fatalf("PutBlock: %v", err)
+       }
+
+       result, err := GetBlock(TEST_HASH)
+       if err != nil {
+               t.Fatalf("GetBlock: %v", err)
+       }
+       if string(result) != string(TEST_BLOCK) {
+               t.Error("PutBlock/GetBlock mismatch")
+               t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
+                       string(TEST_BLOCK), string(result))
+       }
+}
+
+// TestPutBlockMD5Fail
+//     Check that PutBlock returns an error if passed a block and hash that
+//     do not match.
+//
+func TestPutBlockMD5Fail(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes.
+       KeepVolumes = setup(t, 2)
+
+       // Check that PutBlock returns the expected error when the hash does
+       // not match the block.
+       if err := PutBlock(BAD_BLOCK, TEST_HASH); err != MD5Error {
+               t.Error("Expected MD5Error, got %v", err)
+       }
+
+       // Confirm that GetBlock fails to return anything.
+       if result, err := GetBlock(TEST_HASH); err != NotFoundError {
+               t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
+                       string(result), err)
+       }
+}
+
+// TestPutBlockCorrupt
+//     PutBlock should overwrite corrupt blocks on disk when given
+//     a PUT request with a good block.
+//
+func TestPutBlockCorrupt(t *testing.T) {
+       defer teardown()
+
+       // Create two test Keep volumes.
+       KeepVolumes = setup(t, 2)
+
+       // Store a corrupted block under TEST_HASH.
+       store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
+       if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
+               t.Errorf("PutBlock: %v", err)
+       }
+
+       // The block on disk should now match TEST_BLOCK.
+       if block, err := GetBlock(TEST_HASH); err != nil {
+               t.Errorf("GetBlock: %v", err)
+       } else if bytes.Compare(block, TEST_BLOCK) != 0 {
+               t.Errorf("GetBlock returned: '%s'", string(block))
+       }
+}
+
+// PutBlockCollision
+//     PutBlock returns a 400 Collision error when attempting to
+//     store a block that collides with another block on disk.
+//
+func TestPutBlockCollision(t *testing.T) {
+       defer teardown()
+
+       // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
+       var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
+       var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
+       var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
+
+       // Prepare two test Keep volumes. Store one block,
+       // then attempt to store the other.
+       KeepVolumes = setup(t, 2)
+       store(t, KeepVolumes[1], locator, b1)
+
+       if err := PutBlock(b2, locator); err == nil {
+               t.Error("PutBlock did not report a collision")
+       } else if err != CollisionError {
+               t.Errorf("PutBlock returned %v", err)
+       }
+}
+
+// ========================================
+// FindKeepVolumes tests.
+// ========================================
+
+// TestFindKeepVolumes
+//     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
+//     directories at the top level.
+//
+func TestFindKeepVolumes(t *testing.T) {
+       defer teardown()
+
+       // Initialize two keep volumes.
+       var tempVols []string = setup(t, 2)
+
+       // Set up a bogus PROC_MOUNTS file.
+       if f, err := ioutil.TempFile("", "keeptest"); err == nil {
+               for _, vol := range tempVols {
+                       fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
+               }
+               f.Close()
+               PROC_MOUNTS = f.Name()
+
+               // Check that FindKeepVolumes finds the temp volumes.
+               resultVols := FindKeepVolumes()
+               if len(tempVols) != len(resultVols) {
+                       t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
+                               len(tempVols), len(resultVols))
+               }
+               for i := range tempVols {
+                       if tempVols[i] != resultVols[i] {
+                               t.Errorf("FindKeepVolumes returned %s, expected %s\n",
+                                       resultVols[i], tempVols[i])
+                       }
+               }
+
+               os.Remove(f.Name())
+       }
+}
+
+// TestFindKeepVolumesFail
+//     When no Keep volumes are present, FindKeepVolumes returns an empty slice.
+//
+func TestFindKeepVolumesFail(t *testing.T) {
+       defer teardown()
+
+       // Set up a bogus PROC_MOUNTS file with no Keep vols.
+       if f, err := ioutil.TempFile("", "keeptest"); err == nil {
+               fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
+               fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
+               fmt.Fprintln(f, "proc /proc proc opts 0 0")
+               fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
+               fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
+               f.Close()
+               PROC_MOUNTS = f.Name()
+
+               // Check that FindKeepVolumes returns an empty array.
+               resultVols := FindKeepVolumes()
+               if len(resultVols) != 0 {
+                       t.Fatalf("FindKeepVolumes returned %v", resultVols)
+               }
+
+               os.Remove(PROC_MOUNTS)
+       }
+}
+
+// ========================================
+// Helper functions for unit tests.
+// ========================================
+
 // setup
 //     Create KeepVolumes for testing.
+//     Returns a slice of pathnames to temporary Keep volumes.
 //
-func setup(t *testing.T, num_volumes int) {
-       KeepVolumes = make([]string, num_volumes)
-       for i := range KeepVolumes {
+func setup(t *testing.T, num_volumes int) []string {
+       vols := make([]string, num_volumes)
+       for i := range vols {
                if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
-                       KeepVolumes[i] = dir + "/keep"
+                       vols[i] = dir + "/keep"
+                       os.Mkdir(vols[i], 0755)
                } else {
                        t.Fatal(err)
                }
        }
+       return vols
 }
 
 // teardown
@@ -103,11 +308,13 @@ func teardown() {
        for _, vol := range KeepVolumes {
                os.RemoveAll(path.Dir(vol))
        }
+       KeepVolumes = nil
 }
 
 // store
+//     Low-level code to write Keep blocks directly to disk for testing.
 //
-func store(t *testing.T, keepdir string, filename string, block []byte) error {
+func store(t *testing.T, keepdir string, filename string, block []byte) {
        blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
        if err := os.MkdirAll(blockdir, 0755); err != nil {
                t.Fatal(err)
@@ -120,6 +327,4 @@ func store(t *testing.T, keepdir string, filename string, block []byte) error {
        } else {
                t.Fatal(err)
        }
-
-       return nil
 }