X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/244159419c42341baeb388236ad29cc546b7eca1..3a3d67ccee068a85aa3b79c5abd40170223071e3:/services/keepstore/volume_unix_test.go diff --git a/services/keepstore/volume_unix_test.go b/services/keepstore/volume_unix_test.go index 278e656066..05c7a93ae4 100644 --- a/services/keepstore/volume_unix_test.go +++ b/services/keepstore/volume_unix_test.go @@ -1,245 +1,445 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "bytes" + "context" + "encoding/json" + "errors" "fmt" + "io" "io/ioutil" "os" + "strings" + "sync" + "syscall" "testing" "time" + + "github.com/ghodss/yaml" + "github.com/prometheus/client_golang/prometheus" + check "gopkg.in/check.v1" ) -func TempUnixVolume(t *testing.T, serialize bool) UnixVolume { +type TestableUnixVolume struct { + UnixVolume + t TB +} + +func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume { d, err := ioutil.TempDir("", "volume_test") if err != nil { t.Fatal(err) } - return MakeUnixVolume(d, serialize) + var locker sync.Locker + if serialize { + locker = &sync.Mutex{} + } + return &TestableUnixVolume{ + UnixVolume: UnixVolume{ + Root: d, + ReadOnly: readonly, + locker: locker, + }, + t: t, + } } -func _teardown(v UnixVolume) { - if v.queue != nil { - close(v.queue) +// PutRaw writes a Keep block directly into a UnixVolume, even if +// the volume is readonly. +func (v *TestableUnixVolume) PutRaw(locator string, data []byte) { + defer func(orig bool) { + v.ReadOnly = orig + }(v.ReadOnly) + v.ReadOnly = false + err := v.Put(context.Background(), locator, data) + if err != nil { + v.t.Fatal(err) } - os.RemoveAll(v.root) } -// store writes a Keep block directly into a UnixVolume, for testing -// UnixVolume methods. -// -func _store(t *testing.T, vol UnixVolume, filename string, block []byte) { - blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3]) - if err := os.MkdirAll(blockdir, 0755); err != nil { - t.Fatal(err) +func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) { + err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()}) + if err != nil { + v.t.Fatal(err) } +} - blockpath := fmt.Sprintf("%s/%s", blockdir, filename) - if f, err := os.Create(blockpath); err == nil { - f.Write(block) - f.Close() - } else { - t.Fatal(err) +func (v *TestableUnixVolume) Teardown() { + if err := os.RemoveAll(v.Root); err != nil { + v.t.Fatal(err) } } -func TestGet(t *testing.T) { - v := TempUnixVolume(t, false) - defer _teardown(v) - _store(t, v, TEST_HASH, TEST_BLOCK) +// serialize = false; readonly = false +func TestUnixVolumeWithGenericTests(t *testing.T) { + DoGenericVolumeTests(t, func(t TB) TestableVolume { + return NewTestableUnixVolume(t, false, false) + }) +} - buf, err := v.Get(TEST_HASH) - if err != nil { +// serialize = false; readonly = true +func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) { + DoGenericVolumeTests(t, func(t TB) TestableVolume { + return NewTestableUnixVolume(t, false, true) + }) +} + +// serialize = true; readonly = false +func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) { + DoGenericVolumeTests(t, func(t TB) TestableVolume { + return NewTestableUnixVolume(t, true, false) + }) +} + +// serialize = false; readonly = false +func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) { + DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) { + vols := make([]Volume, 2) + testableUnixVols := make([]TestableVolume, 2) + + for i := range vols { + v := NewTestableUnixVolume(t, false, false) + vols[i] = v + testableUnixVols[i] = v + } + + return MakeRRVolumeManager(vols), testableUnixVols + }) +} + +func TestReplicationDefault1(t *testing.T) { + v := &UnixVolume{ + Root: "/", + ReadOnly: true, + } + metrics := newVolumeMetricsVecs(prometheus.NewRegistry()).curryWith( + v.String(), v.Status().MountPoint, fmt.Sprintf("%d", v.Status().DeviceNum)) + if err := v.Start(metrics); err != nil { t.Error(err) } - if bytes.Compare(buf, TEST_BLOCK) != 0 { - t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf)) + if got := v.Replication(); got != 1 { + t.Errorf("Replication() returned %d, expected 1 if no config given", got) } } func TestGetNotFound(t *testing.T) { - v := TempUnixVolume(t, false) - defer _teardown(v) - _store(t, v, TEST_HASH, TEST_BLOCK) + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() + v.Put(context.Background(), TestHash, TestBlock) - buf, err := v.Get(TEST_HASH_2) + buf := make([]byte, BlockSize) + n, err := v.Get(context.Background(), TestHash2, buf) switch { case os.IsNotExist(err): break case err == nil: - t.Errorf("Read should have failed, returned %s", string(buf)) + t.Errorf("Read should have failed, returned %+q", buf[:n]) default: t.Errorf("Read expected ErrNotExist, got: %s", err) } } func TestPut(t *testing.T) { - v := TempUnixVolume(t, false) - defer _teardown(v) + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() - err := v.Put(TEST_HASH, TEST_BLOCK) + err := v.Put(context.Background(), TestHash, TestBlock) if err != nil { t.Error(err) } - p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH) + p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash) if buf, err := ioutil.ReadFile(p); err != nil { t.Error(err) - } else if bytes.Compare(buf, TEST_BLOCK) != 0 { + } else if bytes.Compare(buf, TestBlock) != 0 { t.Errorf("Write should have stored %s, did store %s", - string(TEST_BLOCK), string(buf)) + string(TestBlock), string(buf)) } } func TestPutBadVolume(t *testing.T) { - v := TempUnixVolume(t, false) - defer _teardown(v) + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() - os.Chmod(v.root, 000) - err := v.Put(TEST_HASH, TEST_BLOCK) + os.Chmod(v.Root, 000) + err := v.Put(context.Background(), TestHash, TestBlock) if err == nil { t.Error("Write should have failed") } } -// Serialization tests: launch a bunch of concurrent -// -// TODO(twp): show that the underlying Read/Write operations executed -// serially and not concurrently. The easiest way to do this is -// probably to activate verbose or debug logging, capture log output -// and examine it to confirm that Reads and Writes did not overlap. -// -// TODO(twp): a proper test of I/O serialization requires that a -// second request start while the first one is still underway. -// Guaranteeing that the test behaves this way requires some tricky -// synchronization and mocking. For now we'll just launch a bunch of -// requests simultaenously in goroutines and demonstrate that they -// return accurate results. -// -func TestGetSerialized(t *testing.T) { - // Create a volume with I/O serialization enabled. - v := TempUnixVolume(t, true) - defer _teardown(v) - - _store(t, v, TEST_HASH, TEST_BLOCK) - _store(t, v, TEST_HASH_2, TEST_BLOCK_2) - _store(t, v, TEST_HASH_3, TEST_BLOCK_3) - - sem := make(chan int) - go func(sem chan int) { - buf, err := v.Get(TEST_HASH) - if err != nil { - t.Errorf("err1: %v", err) - } - if bytes.Compare(buf, TEST_BLOCK) != 0 { - t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf)) - } - sem <- 1 - }(sem) +func TestUnixVolumeReadonly(t *testing.T) { + v := NewTestableUnixVolume(t, false, true) + defer v.Teardown() - go func(sem chan int) { - buf, err := v.Get(TEST_HASH_2) - if err != nil { - t.Errorf("err2: %v", err) - } - if bytes.Compare(buf, TEST_BLOCK_2) != 0 { - t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf)) - } - sem <- 1 - }(sem) + v.PutRaw(TestHash, TestBlock) - go func(sem chan int) { - buf, err := v.Get(TEST_HASH_3) - if err != nil { - t.Errorf("err3: %v", err) - } - if bytes.Compare(buf, TEST_BLOCK_3) != 0 { - t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf)) - } - sem <- 1 - }(sem) + buf := make([]byte, BlockSize) + _, err := v.Get(context.Background(), TestHash, buf) + if err != nil { + t.Errorf("got err %v, expected nil", err) + } + + err = v.Put(context.Background(), TestHash, TestBlock) + if err != MethodDisabledError { + t.Errorf("got err %v, expected MethodDisabledError", err) + } + + err = v.Touch(TestHash) + if err != MethodDisabledError { + t.Errorf("got err %v, expected MethodDisabledError", err) + } - // Wait for all goroutines to finish - for done := 0; done < 3; { - done += <-sem + err = v.Trash(TestHash) + if err != MethodDisabledError { + t.Errorf("got err %v, expected MethodDisabledError", err) } } -func TestPutSerialized(t *testing.T) { - // Create a volume with I/O serialization enabled. - v := TempUnixVolume(t, true) - defer _teardown(v) +func TestIsFull(t *testing.T) { + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() - sem := make(chan int) - go func(sem chan int) { - err := v.Put(TEST_HASH, TEST_BLOCK) - if err != nil { - t.Errorf("err1: %v", err) - } - sem <- 1 - }(sem) + fullPath := v.Root + "/full" + now := fmt.Sprintf("%d", time.Now().Unix()) + os.Symlink(now, fullPath) + if !v.IsFull() { + t.Errorf("%s: claims not to be full", v) + } + os.Remove(fullPath) - go func(sem chan int) { - err := v.Put(TEST_HASH_2, TEST_BLOCK_2) - if err != nil { - t.Errorf("err2: %v", err) - } - sem <- 1 - }(sem) + // Test with an expired /full link. + expired := fmt.Sprintf("%d", time.Now().Unix()-3605) + os.Symlink(expired, fullPath) + if v.IsFull() { + t.Errorf("%s: should no longer be full", v) + } +} - go func(sem chan int) { - err := v.Put(TEST_HASH_3, TEST_BLOCK_3) - if err != nil { - t.Errorf("err3: %v", err) - } - sem <- 1 - }(sem) +func TestNodeStatus(t *testing.T) { + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() - // Wait for all goroutines to finish - for done := 0; done < 2; { - done += <-sem + // Get node status and make a basic sanity check. + volinfo := v.Status() + if volinfo.MountPoint != v.Root { + t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root) + } + if volinfo.DeviceNum == 0 { + t.Errorf("uninitialized device_num in %v", volinfo) + } + if volinfo.BytesFree == 0 { + t.Errorf("uninitialized bytes_free in %v", volinfo) } + if volinfo.BytesUsed == 0 { + t.Errorf("uninitialized bytes_used in %v", volinfo) + } +} - // Double check that we actually wrote the blocks we expected to write. - buf, err := v.Get(TEST_HASH) - if err != nil { - t.Errorf("Get #1: %v", err) +func TestUnixVolumeGetFuncWorkerError(t *testing.T) { + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() + + v.Put(context.Background(), TestHash, TestBlock) + mockErr := errors.New("Mock error") + err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { + return mockErr + }) + if err != mockErr { + t.Errorf("Got %v, expected %v", err, mockErr) + } +} + +func TestUnixVolumeGetFuncFileError(t *testing.T) { + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() + + funcCalled := false + err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { + funcCalled = true + return nil + }) + if err == nil { + t.Errorf("Expected error opening non-existent file") } - if bytes.Compare(buf, TEST_BLOCK) != 0 { - t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf)) + if funcCalled { + t.Errorf("Worker func should not have been called") } +} + +func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) { + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() + + v.Put(context.Background(), TestHash, TestBlock) + + mtx := NewMockMutex() + v.locker = mtx - buf, err = v.Get(TEST_HASH_2) + funcCalled := make(chan struct{}) + go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { + funcCalled <- struct{}{} + return nil + }) + select { + case mtx.AllowLock <- struct{}{}: + case <-funcCalled: + t.Fatal("Function was called before mutex was acquired") + case <-time.After(5 * time.Second): + t.Fatal("Timed out before mutex was acquired") + } + select { + case <-funcCalled: + case mtx.AllowUnlock <- struct{}{}: + t.Fatal("Mutex was released before function was called") + case <-time.After(5 * time.Second): + t.Fatal("Timed out waiting for funcCalled") + } + select { + case mtx.AllowUnlock <- struct{}{}: + case <-time.After(5 * time.Second): + t.Fatal("Timed out waiting for getFunc() to release mutex") + } +} + +func TestUnixVolumeCompare(t *testing.T) { + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() + + v.Put(context.Background(), TestHash, TestBlock) + err := v.Compare(context.Background(), TestHash, TestBlock) if err != nil { - t.Errorf("Get #2: %v", err) + t.Errorf("Got err %q, expected nil", err) + } + + err = v.Compare(context.Background(), TestHash, []byte("baddata")) + if err != CollisionError { + t.Errorf("Got err %q, expected %q", err, CollisionError) + } + + v.Put(context.Background(), TestHash, []byte("baddata")) + err = v.Compare(context.Background(), TestHash, TestBlock) + if err != DiskHashError { + t.Errorf("Got err %q, expected %q", err, DiskHashError) } - if bytes.Compare(buf, TEST_BLOCK_2) != 0 { - t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf)) + + p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash) + os.Chmod(p, 000) + err = v.Compare(context.Background(), TestHash, TestBlock) + if err == nil || strings.Index(err.Error(), "permission denied") < 0 { + t.Errorf("Got err %q, expected %q", err, "permission denied") } +} - buf, err = v.Get(TEST_HASH_3) +func TestUnixVolumeContextCancelPut(t *testing.T) { + v := NewTestableUnixVolume(t, true, false) + defer v.Teardown() + v.locker.Lock() + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + time.Sleep(50 * time.Millisecond) + v.locker.Unlock() + }() + err := v.Put(ctx, TestHash, TestBlock) + if err != context.Canceled { + t.Errorf("Put() returned %s -- expected short read / canceled", err) + } +} + +func TestUnixVolumeContextCancelGet(t *testing.T) { + v := NewTestableUnixVolume(t, false, false) + defer v.Teardown() + bpath := v.blockPath(TestHash) + v.PutRaw(TestHash, TestBlock) + os.Remove(bpath) + err := syscall.Mkfifo(bpath, 0600) if err != nil { - t.Errorf("Get #3: %v", err) + t.Fatalf("Mkfifo %s: %s", bpath, err) } - if bytes.Compare(buf, TEST_BLOCK_3) != 0 { - t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf)) + defer os.Remove(bpath) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + buf := make([]byte, len(TestBlock)) + n, err := v.Get(ctx, TestHash, buf) + if n == len(TestBlock) || err != context.Canceled { + t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err) } } -func TestIsFull(t *testing.T) { - v := TempUnixVolume(t, false) - defer _teardown(v) +var _ = check.Suite(&UnixVolumeSuite{}) - full_path := v.root + "/full" - now := fmt.Sprintf("%d", time.Now().Unix()) - os.Symlink(now, full_path) - if !v.IsFull() { - t.Errorf("%s: claims not to be full", v) +type UnixVolumeSuite struct { + volume *TestableUnixVolume +} + +func (s *UnixVolumeSuite) TearDownTest(c *check.C) { + if s.volume != nil { + s.volume.Teardown() } - os.Remove(full_path) +} - // Test with an expired /full link. - expired := fmt.Sprintf("%d", time.Now().Unix()-3605) - os.Symlink(expired, full_path) - if v.IsFull() { - t.Errorf("%s: should no longer be full", v) +func (s *UnixVolumeSuite) TestStats(c *check.C) { + s.volume = NewTestableUnixVolume(c, false, false) + stats := func() string { + buf, err := json.Marshal(s.volume.InternalStats()) + c.Check(err, check.IsNil) + return string(buf) } + + c.Check(stats(), check.Matches, `.*"StatOps":0,.*`) + c.Check(stats(), check.Matches, `.*"Errors":0,.*`) + + loc := "acbd18db4cc2f85cedef654fccc4a4d8" + _, err := s.volume.Get(context.Background(), loc, make([]byte, 3)) + c.Check(err, check.NotNil) + c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`) + c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`) + c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`) + c.Check(stats(), check.Matches, `.*"InBytes":0,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`) + c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`) + + err = s.volume.Put(context.Background(), loc, []byte("foo")) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) + c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`) + c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`) + + err = s.volume.Touch(loc) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`) + c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`) + + _, err = s.volume.Get(context.Background(), loc, make([]byte, 3)) + c.Check(err, check.IsNil) + err = s.volume.Compare(context.Background(), loc, []byte("foo")) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`) + + err = s.volume.Trash(loc) + c.Check(err, check.IsNil) + c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`) +} + +func (s *UnixVolumeSuite) TestConfig(c *check.C) { + var cfg Config + err := yaml.Unmarshal([]byte(` +Volumes: + - Type: Directory + StorageClasses: ["class_a", "class_b"] +`), &cfg) + + c.Check(err, check.IsNil) + c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"}) }