X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f04693da1811e670d4cbb981debeecf14d79137c..HEAD:/services/keepstore/unix_volume_test.go diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go index 1ffc46513c..bcdb5f6358 100644 --- a/services/keepstore/unix_volume_test.go +++ b/services/keepstore/unix_volume_test.go @@ -2,96 +2,88 @@ // // SPDX-License-Identifier: AGPL-3.0 -package main +package keepstore import ( "bytes" "context" "encoding/json" - "errors" "fmt" - "io" "io/ioutil" "os" - "strings" "sync" "syscall" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" check "gopkg.in/check.v1" ) -type TestableUnixVolume struct { - UnixVolume +type testableUnixVolume struct { + unixVolume t TB } -// PutRaw writes a Keep block directly into a UnixVolume, even if -// the volume is readonly. -func (v *TestableUnixVolume) PutRaw(locator string, data []byte) { - defer func(orig bool) { - v.volume.ReadOnly = orig - }(v.volume.ReadOnly) - v.volume.ReadOnly = false - err := v.Put(context.Background(), locator, data) +func (v *testableUnixVolume) TouchWithDate(locator string, lastPut time.Time) { + err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{Actime: lastPut.Unix(), Modtime: lastPut.Unix()}) if err != nil { v.t.Fatal(err) } } -func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) { - err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()}) - if err != nil { - v.t.Fatal(err) - } -} - -func (v *TestableUnixVolume) Teardown() { +func (v *testableUnixVolume) Teardown() { if err := os.RemoveAll(v.Root); err != nil { v.t.Error(err) } } -func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) { +func (v *testableUnixVolume) ReadWriteOperationLabelValues() (r, w string) { return "open", "create" } -var _ = check.Suite(&UnixVolumeSuite{}) +var _ = check.Suite(&unixVolumeSuite{}) -type UnixVolumeSuite struct { - cluster *arvados.Cluster - volumes []*TestableUnixVolume - metrics *volumeMetricsVecs +type unixVolumeSuite struct { + params newVolumeParams + volumes []*testableUnixVolume } -func (s *UnixVolumeSuite) SetUpTest(c *check.C) { - s.cluster = testCluster(c) - s.metrics = newVolumeMetricsVecs(prometheus.NewRegistry()) +func (s *unixVolumeSuite) SetUpTest(c *check.C) { + logger := ctxlog.TestLogger(c) + reg := prometheus.NewRegistry() + s.params = newVolumeParams{ + UUID: "zzzzz-nyw5e-999999999999999", + Cluster: testCluster(c), + Logger: logger, + MetricsVecs: newVolumeMetricsVecs(reg), + BufferPool: newBufferPool(logger, 8, reg), + } } -func (s *UnixVolumeSuite) TearDownTest(c *check.C) { +func (s *unixVolumeSuite) TearDownTest(c *check.C) { for _, v := range s.volumes { v.Teardown() } } -func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, serialize bool) *TestableUnixVolume { +func (s *unixVolumeSuite) newTestableUnixVolume(c *check.C, params newVolumeParams, serialize bool) *testableUnixVolume { d, err := ioutil.TempDir("", "volume_test") c.Check(err, check.IsNil) var locker sync.Locker if serialize { locker = &sync.Mutex{} } - v := &TestableUnixVolume{ - UnixVolume: UnixVolume{ - Root: d, - locker: locker, - cluster: cluster, - volume: volume, - metrics: metrics, + v := &testableUnixVolume{ + unixVolume: unixVolume{ + Root: d, + locker: locker, + uuid: params.UUID, + cluster: params.Cluster, + logger: params.Logger, + volume: params.ConfigVolume, + metrics: params.MetricsVecs, + bufferPool: params.BufferPool, }, t: c, } @@ -100,56 +92,45 @@ func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Clu return v } -// serialize = false; readonly = false -func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) { - DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - return s.newTestableUnixVolume(c, cluster, volume, metrics, false) +func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) { + DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume { + return s.newTestableUnixVolume(c, params, false) }) } -// serialize = false; readonly = true -func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsReadOnly(c *check.C) { - DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - return s.newTestableUnixVolume(c, cluster, volume, metrics, true) +func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_ReadOnly(c *check.C) { + DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume { + return s.newTestableUnixVolume(c, params, false) }) } -// serialize = true; readonly = false -func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsSerialized(c *check.C) { - DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - return s.newTestableUnixVolume(c, cluster, volume, metrics, false) +func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Serialized(c *check.C) { + DoGenericVolumeTests(c, false, func(t TB, params newVolumeParams) TestableVolume { + return s.newTestableUnixVolume(c, params, true) }) } -// serialize = true; readonly = true -func (s *UnixVolumeSuite) TestUnixVolumeHandlersWithGenericVolumeTests(c *check.C) { - DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume { - return s.newTestableUnixVolume(c, cluster, volume, metrics, true) +func (s *unixVolumeSuite) TestUnixVolumeWithGenericTests_Readonly_Serialized(c *check.C) { + DoGenericVolumeTests(c, true, func(t TB, params newVolumeParams) TestableVolume { + return s.newTestableUnixVolume(c, params, true) }) } -func (s *UnixVolumeSuite) TestGetNotFound(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) +func (s *unixVolumeSuite) TestGetNotFound(c *check.C) { + v := s.newTestableUnixVolume(c, s.params, true) defer v.Teardown() - v.Put(context.Background(), TestHash, TestBlock) - - buf := make([]byte, BlockSize) - n, err := v.Get(context.Background(), TestHash2, buf) - switch { - case os.IsNotExist(err): - break - case err == nil: - c.Errorf("Read should have failed, returned %+q", buf[:n]) - default: - c.Errorf("Read expected ErrNotExist, got: %s", err) - } + v.BlockWrite(context.Background(), TestHash, TestBlock) + + buf := &brbuffer{} + err := v.BlockRead(context.Background(), TestHash2, buf) + c.Check(err, check.FitsTypeOf, os.ErrNotExist) } -func (s *UnixVolumeSuite) TestPut(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) +func (s *unixVolumeSuite) TestPut(c *check.C) { + v := s.newTestableUnixVolume(c, s.params, false) defer v.Teardown() - err := v.Put(context.Background(), TestHash, TestBlock) + err := v.BlockWrite(context.Background(), TestHash, TestBlock) if err != nil { c.Error(err) } @@ -162,262 +143,137 @@ func (s *UnixVolumeSuite) TestPut(c *check.C) { } } -func (s *UnixVolumeSuite) TestPutBadVolume(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) - defer v.Teardown() - - os.Chmod(v.Root, 000) - err := v.Put(context.Background(), TestHash, TestBlock) - if err == nil { - c.Error("Write should have failed") - } -} - -func (s *UnixVolumeSuite) TestUnixVolumeReadonly(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{ReadOnly: true, Replication: 1}, s.metrics, false) +func (s *unixVolumeSuite) TestPutBadVolume(c *check.C) { + v := s.newTestableUnixVolume(c, s.params, false) defer v.Teardown() - v.PutRaw(TestHash, TestBlock) - - buf := make([]byte, BlockSize) - _, err := v.Get(context.Background(), TestHash, buf) - if err != nil { - c.Errorf("got err %v, expected nil", err) - } - - err = v.Put(context.Background(), TestHash, TestBlock) - if err != MethodDisabledError { - c.Errorf("got err %v, expected MethodDisabledError", err) - } - - err = v.Touch(TestHash) - if err != MethodDisabledError { - c.Errorf("got err %v, expected MethodDisabledError", err) - } - - err = v.Trash(TestHash) - if err != MethodDisabledError { - c.Errorf("got err %v, expected MethodDisabledError", err) - } + err := os.RemoveAll(v.Root) + c.Assert(err, check.IsNil) + err = v.BlockWrite(context.Background(), TestHash, TestBlock) + c.Check(err, check.IsNil) } -func (s *UnixVolumeSuite) TestIsFull(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) +func (s *unixVolumeSuite) TestIsFull(c *check.C) { + v := s.newTestableUnixVolume(c, s.params, false) defer v.Teardown() fullPath := v.Root + "/full" now := fmt.Sprintf("%d", time.Now().Unix()) os.Symlink(now, fullPath) - if !v.IsFull() { - c.Errorf("%s: claims not to be full", v) + if !v.isFull() { + c.Error("volume claims not to be full") } os.Remove(fullPath) // Test with an expired /full link. expired := fmt.Sprintf("%d", time.Now().Unix()-3605) os.Symlink(expired, fullPath) - if v.IsFull() { - c.Errorf("%s: should no longer be full", v) + if v.isFull() { + c.Error("volume should no longer be full") } } -func (s *UnixVolumeSuite) TestNodeStatus(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) - defer v.Teardown() - - // Get node status and make a basic sanity check. - volinfo := v.Status() - if volinfo.MountPoint != v.Root { - c.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root) - } - if volinfo.DeviceNum == 0 { - c.Errorf("uninitialized device_num in %v", volinfo) - } - if volinfo.BytesFree == 0 { - c.Errorf("uninitialized bytes_free in %v", volinfo) - } - if volinfo.BytesUsed == 0 { - c.Errorf("uninitialized bytes_used in %v", volinfo) - } -} - -func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) - defer v.Teardown() - - v.Put(context.Background(), TestHash, TestBlock) - mockErr := errors.New("Mock error") - err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { - return mockErr - }) - if err != mockErr { - c.Errorf("Got %v, expected %v", err, mockErr) - } -} - -func (s *UnixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) - defer v.Teardown() - - funcCalled := false - err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { - funcCalled = true - return nil - }) - if err == nil { - c.Errorf("Expected error opening non-existent file") - } - if funcCalled { - c.Errorf("Worker func should not have been called") - } -} - -func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) - defer v.Teardown() - - v.Put(context.Background(), TestHash, TestBlock) - - mtx := NewMockMutex() - v.locker = mtx - - funcCalled := make(chan struct{}) - go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error { - funcCalled <- struct{}{} - return nil - }) - select { - case mtx.AllowLock <- struct{}{}: - case <-funcCalled: - c.Fatal("Function was called before mutex was acquired") - case <-time.After(5 * time.Second): - c.Fatal("Timed out before mutex was acquired") - } - select { - case <-funcCalled: - case mtx.AllowUnlock <- struct{}{}: - c.Fatal("Mutex was released before function was called") - case <-time.After(5 * time.Second): - c.Fatal("Timed out waiting for funcCalled") - } - select { - case mtx.AllowUnlock <- struct{}{}: - case <-time.After(5 * time.Second): - c.Fatal("Timed out waiting for getFunc() to release mutex") - } -} - -func (s *UnixVolumeSuite) TestUnixVolumeCompare(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) - defer v.Teardown() - - v.Put(context.Background(), TestHash, TestBlock) - err := v.Compare(context.Background(), TestHash, TestBlock) - if err != nil { - c.Errorf("Got err %q, expected nil", err) - } - - err = v.Compare(context.Background(), TestHash, []byte("baddata")) - if err != CollisionError { - c.Errorf("Got err %q, expected %q", err, CollisionError) - } - - v.Put(context.Background(), TestHash, []byte("baddata")) - err = v.Compare(context.Background(), TestHash, TestBlock) - if err != DiskHashError { - c.Errorf("Got err %q, expected %q", err, DiskHashError) - } - - p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash) - os.Chmod(p, 000) - err = v.Compare(context.Background(), TestHash, TestBlock) - if err == nil || strings.Index(err.Error(), "permission denied") < 0 { - c.Errorf("Got err %q, expected %q", err, "permission denied") - } -} - -func (s *UnixVolumeSuite) TestUnixVolumeContextCancelPut(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, true) +func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockWrite(c *check.C) { + v := s.newTestableUnixVolume(c, s.params, true) defer v.Teardown() v.locker.Lock() + defer v.locker.Unlock() ctx, cancel := context.WithCancel(context.Background()) go func() { time.Sleep(50 * time.Millisecond) cancel() - time.Sleep(50 * time.Millisecond) - v.locker.Unlock() }() - err := v.Put(ctx, TestHash, TestBlock) + err := v.BlockWrite(ctx, TestHash, TestBlock) if err != context.Canceled { - c.Errorf("Put() returned %s -- expected short read / canceled", err) + c.Errorf("BlockWrite() returned %s -- expected short read / canceled", err) } } -func (s *UnixVolumeSuite) TestUnixVolumeContextCancelGet(c *check.C) { - v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) +func (s *unixVolumeSuite) TestUnixVolumeContextCancelBlockRead(c *check.C) { + v := s.newTestableUnixVolume(c, s.params, true) defer v.Teardown() - bpath := v.blockPath(TestHash) - v.PutRaw(TestHash, TestBlock) - os.Remove(bpath) - err := syscall.Mkfifo(bpath, 0600) + err := v.BlockWrite(context.Background(), TestHash, TestBlock) if err != nil { - c.Fatalf("Mkfifo %s: %s", bpath, err) + c.Fatal(err) } - defer os.Remove(bpath) ctx, cancel := context.WithCancel(context.Background()) + v.locker.Lock() + defer v.locker.Unlock() go func() { time.Sleep(50 * time.Millisecond) cancel() }() - buf := make([]byte, len(TestBlock)) - n, err := v.Get(ctx, TestHash, buf) - if n == len(TestBlock) || err != context.Canceled { - c.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err) + buf := &brbuffer{} + err = v.BlockRead(ctx, TestHash, buf) + if buf.Len() != 0 || err != context.Canceled { + c.Errorf("BlockRead() returned %q, %s -- expected short read / canceled", buf.String(), err) } } -func (s *UnixVolumeSuite) TestStats(c *check.C) { - vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false) +func (s *unixVolumeSuite) TestStats(c *check.C) { + vol := s.newTestableUnixVolume(c, s.params, false) stats := func() string { buf, err := json.Marshal(vol.InternalStats()) c.Check(err, check.IsNil) return string(buf) } - c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once + c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*unixVolume)check() calls Stat() once c.Check(stats(), check.Matches, `.*"Errors":0,.*`) - loc := "acbd18db4cc2f85cedef654fccc4a4d8" - _, err := vol.Get(context.Background(), loc, make([]byte, 3)) + err := vol.BlockRead(context.Background(), fooHash, brdiscard) c.Check(err, check.NotNil) c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`) c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`) - c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`) + c.Check(stats(), check.Matches, `.*"\*(fs|os)\.PathError":[^0].*`) // os.PathError changed to fs.PathError in Go 1.16 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`) c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`) c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`) - err = vol.Put(context.Background(), loc, []byte("foo")) + err = vol.BlockWrite(context.Background(), fooHash, []byte("foo")) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`) c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`) c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`) - c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`) + c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`) - err = vol.Touch(loc) + err = vol.BlockTouch(fooHash) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`) c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`) - c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`) + c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`) - _, err = vol.Get(context.Background(), loc, make([]byte, 3)) + buf := &brbuffer{} + err = vol.BlockRead(context.Background(), fooHash, buf) c.Check(err, check.IsNil) - err = vol.Compare(context.Background(), loc, []byte("foo")) - c.Check(err, check.IsNil) - c.Check(stats(), check.Matches, `.*"InBytes":6,.*`) - c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`) + c.Check(buf.String(), check.Equals, "foo") + c.Check(stats(), check.Matches, `.*"InBytes":3,.*`) + c.Check(stats(), check.Matches, `.*"OpenOps":2,.*`) - err = vol.Trash(loc) + err = vol.BlockTrash(fooHash) c.Check(err, check.IsNil) c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`) } + +func (s *unixVolumeSuite) TestSkipUnusedDirs(c *check.C) { + vol := s.newTestableUnixVolume(c, s.params, false) + + err := os.Mkdir(vol.unixVolume.Root+"/aaa", 0777) + c.Assert(err, check.IsNil) + err = os.Mkdir(vol.unixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here + c.Assert(err, check.IsNil) + deleteme := vol.unixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1" + err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777) + c.Assert(err, check.IsNil) + skipme := vol.unixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1" + err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777) + c.Assert(err, check.IsNil) + vol.EmptyTrash() + + _, err = os.Stat(skipme) + c.Check(err, check.IsNil) + + _, err = os.Stat(deleteme) + c.Check(err, check.NotNil) + c.Check(os.IsNotExist(err), check.Equals, true) +}