7121: Add collisionOrCorrupt->DiskHashError test cases where the mismatched data...
[arvados.git] / services / keepstore / volume_unix_test.go
index 6bafa7c1ca03a23ff39037e95a656b27222696c9..08ca31cc5b639d6da8a32db79386e9c17000e976 100644 (file)
@@ -2,9 +2,15 @@ package main
 
 import (
        "bytes"
+       "errors"
        "fmt"
+       "io"
        "io/ioutil"
        "os"
+       "regexp"
+       "sort"
+       "strings"
+       "sync"
        "syscall"
        "testing"
        "time"
@@ -15,10 +21,14 @@ func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
        if err != nil {
                t.Fatal(err)
        }
+       var locker sync.Locker
+       if serialize {
+               locker = &sync.Mutex{}
+       }
        return &UnixVolume{
-               root:      d,
-               serialize: serialize,
-               readonly:  readonly,
+               root:     d,
+               locker:   locker,
+               readonly: readonly,
        }
 }
 
@@ -74,6 +84,42 @@ func TestGetNotFound(t *testing.T) {
        }
 }
 
+func TestIndexTo(t *testing.T) {
+       v := TempUnixVolume(t, false, false)
+       defer _teardown(v)
+
+       _store(t, v, TEST_HASH, TEST_BLOCK)
+       _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
+       _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
+
+       buf := new(bytes.Buffer)
+       v.IndexTo("", buf)
+       index_rows := strings.Split(string(buf.Bytes()), "\n")
+       sort.Strings(index_rows)
+       sorted_index := strings.Join(index_rows, "\n")
+       m, err := regexp.MatchString(
+               `^\n`+TEST_HASH+`\+\d+ \d+\n`+
+                       TEST_HASH_3+`\+\d+ \d+\n`+
+                       TEST_HASH_2+`\+\d+ \d+$`,
+               sorted_index)
+       if err != nil {
+               t.Error(err)
+       } else if !m {
+               t.Errorf("Got index %q for empty prefix", sorted_index)
+       }
+
+       for _, prefix := range []string{"f", "f15", "f15ac"} {
+               buf = new(bytes.Buffer)
+               v.IndexTo(prefix, buf)
+               m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
+               if err != nil {
+                       t.Error(err)
+               } else if !m {
+                       t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
+               }
+       }
+}
+
 func TestPut(t *testing.T) {
        v := TempUnixVolume(t, false, false)
        defer _teardown(v)
@@ -277,7 +323,7 @@ func TestPutSerialized(t *testing.T) {
        }(sem)
 
        // Wait for all goroutines to finish
-       for done := 0; done < 2; {
+       for done := 0; done < 3; {
                done += <-sem
        }
 
@@ -346,3 +392,98 @@ func TestNodeStatus(t *testing.T) {
                t.Errorf("uninitialized bytes_used in %v", volinfo)
        }
 }
+
+func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
+       v := TempUnixVolume(t, false, false)
+       defer _teardown(v)
+
+       v.Put(TEST_HASH, TEST_BLOCK)
+       mockErr := errors.New("Mock error")
+       err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+               return mockErr
+       })
+       if err != mockErr {
+               t.Errorf("Got %v, expected %v", err, mockErr)
+       }
+}
+
+func TestUnixVolumeGetFuncFileError(t *testing.T) {
+       v := TempUnixVolume(t, false, false)
+       defer _teardown(v)
+
+       funcCalled := false
+       err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+               funcCalled = true
+               return nil
+       })
+       if err == nil {
+               t.Errorf("Expected error opening non-existent file")
+       }
+       if funcCalled {
+               t.Errorf("Worker func should not have been called")
+       }
+}
+
+func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
+       v := TempUnixVolume(t, false, false)
+       defer _teardown(v)
+
+       v.Put(TEST_HASH, TEST_BLOCK)
+
+       mtx := NewMockMutex()
+       v.locker = mtx
+
+       funcCalled := make(chan struct{})
+       go v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
+               funcCalled <- struct{}{}
+               return nil
+       })
+       select {
+       case mtx.AllowLock <- struct{}{}:
+       case <-funcCalled:
+               t.Fatal("Function was called before mutex was acquired")
+       case <-time.After(5 * time.Second):
+               t.Fatal("Timed out before mutex was acquired")
+       }
+       select {
+       case <-funcCalled:
+       case mtx.AllowUnlock <- struct{}{}:
+               t.Fatal("Mutex was released before function was called")
+       case <-time.After(5 * time.Second):
+               t.Fatal("Timed out waiting for funcCalled")
+       }
+       select {
+       case mtx.AllowUnlock <- struct{}{}:
+       case <-time.After(5 * time.Second):
+               t.Fatal("Timed out waiting for getFunc() to release mutex")
+       }
+}
+
+func TestUnixVolumeCompare(t *testing.T) {
+       v := TempUnixVolume(t, false, false)
+       defer _teardown(v)
+
+       v.Put(TEST_HASH, TEST_BLOCK)
+       err := v.Compare(TEST_HASH, TEST_BLOCK)
+       if err != nil {
+               t.Errorf("Got err %q, expected nil", err)
+       }
+
+       err = v.Compare(TEST_HASH, []byte("baddata"))
+       if err != CollisionError {
+               t.Errorf("Got err %q, expected %q", err, CollisionError)
+       }
+
+       _store(t, v, TEST_HASH, []byte("baddata"))
+       err = v.Compare(TEST_HASH, TEST_BLOCK)
+       if err != DiskHashError {
+               t.Errorf("Got err %q, expected %q", err, DiskHashError)
+       }
+
+       p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
+       os.Chmod(p, 000)
+       err = v.Compare(TEST_HASH, TEST_BLOCK)
+       if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
+               t.Errorf("Got err %q, expected %q", err, "permission denied")
+       }
+}