10467: Abort S3 and release buffer if caller disconnects during S3 PUT request.
[arvados.git] / services / keepstore / volume_unix_test.go
index 08ca31cc5b639d6da8a32db79386e9c17000e976..fad1f1216465210c2a6af3d4b7041963d631d0bb 100644 (file)
@@ -2,13 +2,12 @@ package main
 
 import (
        "bytes"
+       "context"
        "errors"
        "fmt"
        "io"
        "io/ioutil"
        "os"
-       "regexp"
-       "sort"
        "strings"
        "sync"
        "syscall"
@@ -16,7 +15,12 @@ import (
        "time"
 )
 
-func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
+type TestableUnixVolume struct {
+       UnixVolume
+       t TB
+}
+
+func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
        d, err := ioutil.TempDir("", "volume_test")
        if err != nil {
                t.Fatal(err)
@@ -25,362 +29,193 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
        if serialize {
                locker = &sync.Mutex{}
        }
-       return &UnixVolume{
-               root:     d,
-               locker:   locker,
-               readonly: readonly,
+       return &TestableUnixVolume{
+               UnixVolume: UnixVolume{
+                       Root:     d,
+                       ReadOnly: readonly,
+                       locker:   locker,
+               },
+               t: t,
        }
 }
 
-func _teardown(v *UnixVolume) {
-       os.RemoveAll(v.root)
+// PutRaw writes a Keep block directly into a UnixVolume, even if
+// the volume is readonly.
+func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
+       defer func(orig bool) {
+               v.ReadOnly = orig
+       }(v.ReadOnly)
+       v.ReadOnly = false
+       err := v.Put(context.TODO(), locator, data)
+       if err != nil {
+               v.t.Fatal(err)
+       }
 }
 
-// _store writes a Keep block directly into a UnixVolume, bypassing
-// the overhead and safeguards of Put(). Useful for storing bogus data
-// and isolating unit tests from Put() behavior.
-func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
-       blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
-       if err := os.MkdirAll(blockdir, 0755); err != nil {
-               t.Fatal(err)
+func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
+       err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
+       if err != nil {
+               v.t.Fatal(err)
        }
+}
 
-       blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
-       if f, err := os.Create(blockpath); err == nil {
-               f.Write(block)
-               f.Close()
-       } else {
-               t.Fatal(err)
+func (v *TestableUnixVolume) Teardown() {
+       if err := os.RemoveAll(v.Root); err != nil {
+               v.t.Fatal(err)
        }
 }
 
-func TestGet(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-       _store(t, v, TEST_HASH, TEST_BLOCK)
+// serialize = false; readonly = false
+func TestUnixVolumeWithGenericTests(t *testing.T) {
+       DoGenericVolumeTests(t, func(t TB) TestableVolume {
+               return NewTestableUnixVolume(t, false, false)
+       })
+}
 
-       buf, err := v.Get(TEST_HASH)
-       if err != nil {
+// serialize = false; readonly = true
+func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
+       DoGenericVolumeTests(t, func(t TB) TestableVolume {
+               return NewTestableUnixVolume(t, false, true)
+       })
+}
+
+// serialize = true; readonly = false
+func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
+       DoGenericVolumeTests(t, func(t TB) TestableVolume {
+               return NewTestableUnixVolume(t, true, false)
+       })
+}
+
+// serialize = false; readonly = false
+func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
+       DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
+               vols := make([]Volume, 2)
+               testableUnixVols := make([]TestableVolume, 2)
+
+               for i := range vols {
+                       v := NewTestableUnixVolume(t, false, false)
+                       vols[i] = v
+                       testableUnixVols[i] = v
+               }
+
+               return MakeRRVolumeManager(vols), testableUnixVols
+       })
+}
+
+func TestReplicationDefault1(t *testing.T) {
+       v := &UnixVolume{
+               Root:     "/",
+               ReadOnly: true,
+       }
+       if err := v.Start(); err != nil {
                t.Error(err)
        }
-       if bytes.Compare(buf, TEST_BLOCK) != 0 {
-               t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
+       if got := v.Replication(); got != 1 {
+               t.Errorf("Replication() returned %d, expected 1 if no config given", got)
        }
 }
 
 func TestGetNotFound(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-       _store(t, v, TEST_HASH, TEST_BLOCK)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
+       v.Put(context.TODO(), TestHash, TestBlock)
 
-       buf, err := v.Get(TEST_HASH_2)
+       buf := make([]byte, BlockSize)
+       n, err := v.Get(context.TODO(), TestHash2, buf)
        switch {
        case os.IsNotExist(err):
                break
        case err == nil:
-               t.Errorf("Read should have failed, returned %s", string(buf))
+               t.Errorf("Read should have failed, returned %+q", buf[:n])
        default:
                t.Errorf("Read expected ErrNotExist, got: %s", err)
        }
 }
 
-func TestIndexTo(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-
-       _store(t, v, TEST_HASH, TEST_BLOCK)
-       _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
-       _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
-
-       buf := new(bytes.Buffer)
-       v.IndexTo("", buf)
-       index_rows := strings.Split(string(buf.Bytes()), "\n")
-       sort.Strings(index_rows)
-       sorted_index := strings.Join(index_rows, "\n")
-       m, err := regexp.MatchString(
-               `^\n`+TEST_HASH+`\+\d+ \d+\n`+
-                       TEST_HASH_3+`\+\d+ \d+\n`+
-                       TEST_HASH_2+`\+\d+ \d+$`,
-               sorted_index)
-       if err != nil {
-               t.Error(err)
-       } else if !m {
-               t.Errorf("Got index %q for empty prefix", sorted_index)
-       }
-
-       for _, prefix := range []string{"f", "f15", "f15ac"} {
-               buf = new(bytes.Buffer)
-               v.IndexTo(prefix, buf)
-               m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
-               if err != nil {
-                       t.Error(err)
-               } else if !m {
-                       t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
-               }
-       }
-}
-
 func TestPut(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
-       err := v.Put(TEST_HASH, TEST_BLOCK)
+       err := v.Put(context.TODO(), TestHash, TestBlock)
        if err != nil {
                t.Error(err)
        }
-       p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+       p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
        if buf, err := ioutil.ReadFile(p); err != nil {
                t.Error(err)
-       } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
+       } else if bytes.Compare(buf, TestBlock) != 0 {
                t.Errorf("Write should have stored %s, did store %s",
-                       string(TEST_BLOCK), string(buf))
+                       string(TestBlock), string(buf))
        }
 }
 
 func TestPutBadVolume(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
-       os.Chmod(v.root, 000)
-       err := v.Put(TEST_HASH, TEST_BLOCK)
+       os.Chmod(v.Root, 000)
+       err := v.Put(context.TODO(), TestHash, TestBlock)
        if err == nil {
                t.Error("Write should have failed")
        }
 }
 
 func TestUnixVolumeReadonly(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, true)
+       defer v.Teardown()
 
-       // First write something before marking readonly
-       err := v.Put(TEST_HASH, TEST_BLOCK)
-       if err != nil {
-               t.Error("got err %v, expected nil", err)
-       }
+       v.PutRaw(TestHash, TestBlock)
 
-       v.readonly = true
-
-       _, err = v.Get(TEST_HASH)
+       buf := make([]byte, BlockSize)
+       _, err := v.Get(context.TODO(), TestHash, buf)
        if err != nil {
-               t.Error("got err %v, expected nil", err)
+               t.Errorf("got err %v, expected nil", err)
        }
 
-       err = v.Put(TEST_HASH, TEST_BLOCK)
+       err = v.Put(context.TODO(), TestHash, TestBlock)
        if err != MethodDisabledError {
-               t.Error("got err %v, expected MethodDisabledError", err)
+               t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 
-       err = v.Touch(TEST_HASH)
+       err = v.Touch(TestHash)
        if err != MethodDisabledError {
-               t.Error("got err %v, expected MethodDisabledError", err)
+               t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 
-       err = v.Delete(TEST_HASH)
+       err = v.Trash(TestHash)
        if err != MethodDisabledError {
-               t.Error("got err %v, expected MethodDisabledError", err)
-       }
-}
-
-// TestPutTouch
-//     Test that when applying PUT to a block that already exists,
-//     the block's modification time is updated.
-func TestPutTouch(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
-
-       if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
-               t.Error(err)
-       }
-
-       // We'll verify { t0 < threshold < t1 }, where t0 is the
-       // existing block's timestamp on disk before Put() and t1 is
-       // its timestamp after Put().
-       threshold := time.Now().Add(-time.Second)
-
-       // Set the stored block's mtime far enough in the past that we
-       // can see the difference between "timestamp didn't change"
-       // and "timestamp granularity is too low".
-       {
-               oldtime := time.Now().Add(-20 * time.Second).Unix()
-               if err := syscall.Utime(v.blockPath(TEST_HASH),
-                       &syscall.Utimbuf{oldtime, oldtime}); err != nil {
-                       t.Error(err)
-               }
-
-               // Make sure v.Mtime() agrees the above Utime really worked.
-               if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
-                       t.Errorf("Setting mtime failed: %v, %v", t0, err)
-               }
-       }
-
-       // Write the same block again.
-       if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
-               t.Error(err)
-       }
-
-       // Verify threshold < t1
-       t1, err := v.Mtime(TEST_HASH)
-       if err != nil {
-               t.Error(err)
-       }
-       if t1.Before(threshold) {
-               t.Errorf("t1 %v must be >= threshold %v after v.Put ",
-                       t1, threshold)
-       }
-}
-
-// Serialization tests: launch a bunch of concurrent
-//
-// TODO(twp): show that the underlying Read/Write operations executed
-// serially and not concurrently. The easiest way to do this is
-// probably to activate verbose or debug logging, capture log output
-// and examine it to confirm that Reads and Writes did not overlap.
-//
-// TODO(twp): a proper test of I/O serialization requires that a
-// second request start while the first one is still underway.
-// Guaranteeing that the test behaves this way requires some tricky
-// synchronization and mocking.  For now we'll just launch a bunch of
-// requests simultaenously in goroutines and demonstrate that they
-// return accurate results.
-//
-func TestGetSerialized(t *testing.T) {
-       // Create a volume with I/O serialization enabled.
-       v := TempUnixVolume(t, true, false)
-       defer _teardown(v)
-
-       _store(t, v, TEST_HASH, TEST_BLOCK)
-       _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
-       _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
-
-       sem := make(chan int)
-       go func(sem chan int) {
-               buf, err := v.Get(TEST_HASH)
-               if err != nil {
-                       t.Errorf("err1: %v", err)
-               }
-               if bytes.Compare(buf, TEST_BLOCK) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
-               }
-               sem <- 1
-       }(sem)
-
-       go func(sem chan int) {
-               buf, err := v.Get(TEST_HASH_2)
-               if err != nil {
-                       t.Errorf("err2: %v", err)
-               }
-               if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
-               }
-               sem <- 1
-       }(sem)
-
-       go func(sem chan int) {
-               buf, err := v.Get(TEST_HASH_3)
-               if err != nil {
-                       t.Errorf("err3: %v", err)
-               }
-               if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
-                       t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
-               }
-               sem <- 1
-       }(sem)
-
-       // Wait for all goroutines to finish
-       for done := 0; done < 3; {
-               done += <-sem
-       }
-}
-
-func TestPutSerialized(t *testing.T) {
-       // Create a volume with I/O serialization enabled.
-       v := TempUnixVolume(t, true, false)
-       defer _teardown(v)
-
-       sem := make(chan int)
-       go func(sem chan int) {
-               err := v.Put(TEST_HASH, TEST_BLOCK)
-               if err != nil {
-                       t.Errorf("err1: %v", err)
-               }
-               sem <- 1
-       }(sem)
-
-       go func(sem chan int) {
-               err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
-               if err != nil {
-                       t.Errorf("err2: %v", err)
-               }
-               sem <- 1
-       }(sem)
-
-       go func(sem chan int) {
-               err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
-               if err != nil {
-                       t.Errorf("err3: %v", err)
-               }
-               sem <- 1
-       }(sem)
-
-       // Wait for all goroutines to finish
-       for done := 0; done < 3; {
-               done += <-sem
-       }
-
-       // Double check that we actually wrote the blocks we expected to write.
-       buf, err := v.Get(TEST_HASH)
-       if err != nil {
-               t.Errorf("Get #1: %v", err)
-       }
-       if bytes.Compare(buf, TEST_BLOCK) != 0 {
-               t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
-       }
-
-       buf, err = v.Get(TEST_HASH_2)
-       if err != nil {
-               t.Errorf("Get #2: %v", err)
-       }
-       if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
-               t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
-       }
-
-       buf, err = v.Get(TEST_HASH_3)
-       if err != nil {
-               t.Errorf("Get #3: %v", err)
-       }
-       if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
-               t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
+               t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 }
 
 func TestIsFull(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
-       full_path := v.root + "/full"
+       fullPath := v.Root + "/full"
        now := fmt.Sprintf("%d", time.Now().Unix())
-       os.Symlink(now, full_path)
+       os.Symlink(now, fullPath)
        if !v.IsFull() {
                t.Errorf("%s: claims not to be full", v)
        }
-       os.Remove(full_path)
+       os.Remove(fullPath)
 
        // Test with an expired /full link.
        expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
-       os.Symlink(expired, full_path)
+       os.Symlink(expired, fullPath)
        if v.IsFull() {
                t.Errorf("%s: should no longer be full", v)
        }
 }
 
 func TestNodeStatus(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
        // Get node status and make a basic sanity check.
        volinfo := v.Status()
-       if volinfo.MountPoint != v.root {
-               t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
+       if volinfo.MountPoint != v.Root {
+               t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
        }
        if volinfo.DeviceNum == 0 {
                t.Errorf("uninitialized device_num in %v", volinfo)
@@ -394,12 +229,12 @@ func TestNodeStatus(t *testing.T) {
 }
 
 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
-       v.Put(TEST_HASH, TEST_BLOCK)
+       v.Put(context.TODO(), TestHash, TestBlock)
        mockErr := errors.New("Mock error")
-       err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+       err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
                return mockErr
        })
        if err != mockErr {
@@ -408,11 +243,11 @@ func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
 }
 
 func TestUnixVolumeGetFuncFileError(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
        funcCalled := false
-       err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+       err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
                funcCalled = true
                return nil
        })
@@ -425,16 +260,16 @@ func TestUnixVolumeGetFuncFileError(t *testing.T) {
 }
 
 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
-       v.Put(TEST_HASH, TEST_BLOCK)
+       v.Put(context.TODO(), TestHash, TestBlock)
 
        mtx := NewMockMutex()
        v.locker = mtx
 
        funcCalled := make(chan struct{})
-       go v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+       go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
                funcCalled <- struct{}{}
                return nil
        })
@@ -460,30 +295,42 @@ func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
 }
 
 func TestUnixVolumeCompare(t *testing.T) {
-       v := TempUnixVolume(t, false, false)
-       defer _teardown(v)
+       v := NewTestableUnixVolume(t, false, false)
+       defer v.Teardown()
 
-       v.Put(TEST_HASH, TEST_BLOCK)
-       err := v.Compare(TEST_HASH, TEST_BLOCK)
+       v.Put(context.TODO(), TestHash, TestBlock)
+       err := v.Compare(TestHash, TestBlock)
        if err != nil {
                t.Errorf("Got err %q, expected nil", err)
        }
 
-       err = v.Compare(TEST_HASH, []byte("baddata"))
+       err = v.Compare(TestHash, []byte("baddata"))
        if err != CollisionError {
                t.Errorf("Got err %q, expected %q", err, CollisionError)
        }
 
-       _store(t, v, TEST_HASH, []byte("baddata"))
-       err = v.Compare(TEST_HASH, TEST_BLOCK)
+       v.Put(context.TODO(), TestHash, []byte("baddata"))
+       err = v.Compare(TestHash, TestBlock)
        if err != DiskHashError {
                t.Errorf("Got err %q, expected %q", err, DiskHashError)
        }
 
-       p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+       p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
        os.Chmod(p, 000)
-       err = v.Compare(TEST_HASH, TEST_BLOCK)
+       err = v.Compare(TestHash, TestBlock)
        if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
                t.Errorf("Got err %q, expected %q", err, "permission denied")
        }
 }
+
+// TODO(twp): show that the underlying Read/Write operations executed
+// serially and not concurrently. The easiest way to do this is
+// probably to activate verbose or debug logging, capture log output
+// and examine it to confirm that Reads and Writes did not overlap.
+//
+// TODO(twp): a proper test of I/O serialization requires that a
+// second request start while the first one is still underway.
+// Guaranteeing that the test behaves this way requires some tricky
+// synchronization and mocking.  For now we'll just launch a bunch of
+// requests simultaenously in goroutines and demonstrate that they
+// return accurate results.