//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"bytes"
"io"
"io/ioutil"
"os"
- "strings"
"sync"
"syscall"
- "testing"
"time"
- "github.com/ghodss/yaml"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
t TB
}
-func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
- d, err := ioutil.TempDir("", "volume_test")
- if err != nil {
- t.Fatal(err)
- }
- var locker sync.Locker
- if serialize {
- locker = &sync.Mutex{}
- }
- return &TestableUnixVolume{
- UnixVolume: UnixVolume{
- Root: d,
- ReadOnly: readonly,
- locker: locker,
- },
- t: t,
- }
-}
-
// PutRaw writes a Keep block directly into a UnixVolume, even if
// the volume is readonly.
func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
defer func(orig bool) {
- v.ReadOnly = orig
- }(v.ReadOnly)
- v.ReadOnly = false
+ v.volume.ReadOnly = orig
+ }(v.volume.ReadOnly)
+ v.volume.ReadOnly = false
err := v.Put(context.Background(), locator, data)
if err != nil {
v.t.Fatal(err)
func (v *TestableUnixVolume) Teardown() {
if err := os.RemoveAll(v.Root); err != nil {
- v.t.Fatal(err)
+ v.t.Error(err)
+ }
+}
+
+func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
+ return "open", "create"
+}
+
+var _ = check.Suite(&UnixVolumeSuite{})
+
+type UnixVolumeSuite struct {
+ cluster *arvados.Cluster
+ volumes []*TestableUnixVolume
+ metrics *volumeMetricsVecs
+}
+
+func (s *UnixVolumeSuite) SetUpTest(c *check.C) {
+ s.cluster = testCluster(c)
+ s.metrics = newVolumeMetricsVecs(prometheus.NewRegistry())
+}
+
+func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
+ for _, v := range s.volumes {
+ v.Teardown()
+ }
+}
+
+func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, serialize bool) *TestableUnixVolume {
+ d, err := ioutil.TempDir("", "volume_test")
+ c.Check(err, check.IsNil)
+ var locker sync.Locker
+ if serialize {
+ locker = &sync.Mutex{}
+ }
+ v := &TestableUnixVolume{
+ UnixVolume: UnixVolume{
+ Root: d,
+ locker: locker,
+ cluster: cluster,
+ logger: ctxlog.TestLogger(c),
+ volume: volume,
+ metrics: metrics,
+ },
+ t: c,
}
+ c.Check(v.check(), check.IsNil)
+ s.volumes = append(s.volumes, v)
+ return v
}
// serialize = false; readonly = false
-func TestUnixVolumeWithGenericTests(t *testing.T) {
- DoGenericVolumeTests(t, func(t TB) TestableVolume {
- return NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
})
}
// serialize = false; readonly = true
-func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
- DoGenericVolumeTests(t, func(t TB) TestableVolume {
- return NewTestableUnixVolume(t, false, true)
+func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsReadOnly(c *check.C) {
+ DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
})
}
// serialize = true; readonly = false
-func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
- DoGenericVolumeTests(t, func(t TB) TestableVolume {
- return NewTestableUnixVolume(t, true, false)
+func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsSerialized(c *check.C) {
+ DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
})
}
-// serialize = false; readonly = false
-func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
- DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
- vols := make([]Volume, 2)
- testableUnixVols := make([]TestableVolume, 2)
-
- for i := range vols {
- v := NewTestableUnixVolume(t, false, false)
- vols[i] = v
- testableUnixVols[i] = v
- }
-
- return MakeRRVolumeManager(vols), testableUnixVols
+// serialize = true; readonly = true
+func (s *UnixVolumeSuite) TestUnixVolumeHandlersWithGenericVolumeTests(c *check.C) {
+ DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
+ return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
})
}
-func TestReplicationDefault1(t *testing.T) {
- v := &UnixVolume{
- Root: "/",
- ReadOnly: true,
- }
- metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
- if err := v.Start(metrics); err != nil {
- t.Error(err)
- }
- if got := v.Replication(); got != 1 {
- t.Errorf("Replication() returned %d, expected 1 if no config given", got)
- }
-}
-
-func TestGetNotFound(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestGetNotFound(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
v.Put(context.Background(), TestHash, TestBlock)
case os.IsNotExist(err):
break
case err == nil:
- t.Errorf("Read should have failed, returned %+q", buf[:n])
+ c.Errorf("Read should have failed, returned %+q", buf[:n])
default:
- t.Errorf("Read expected ErrNotExist, got: %s", err)
+ c.Errorf("Read expected ErrNotExist, got: %s", err)
}
}
-func TestPut(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestPut(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
- t.Error(err)
+ c.Error(err)
}
p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
if buf, err := ioutil.ReadFile(p); err != nil {
- t.Error(err)
+ c.Error(err)
} else if bytes.Compare(buf, TestBlock) != 0 {
- t.Errorf("Write should have stored %s, did store %s",
+ c.Errorf("Write should have stored %s, did store %s",
string(TestBlock), string(buf))
}
}
-func TestPutBadVolume(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestPutBadVolume(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
- os.Chmod(v.Root, 000)
- err := v.Put(context.Background(), TestHash, TestBlock)
- if err == nil {
- t.Error("Write should have failed")
- }
+ err := os.RemoveAll(v.Root)
+ c.Assert(err, check.IsNil)
+ err = v.Put(context.Background(), TestHash, TestBlock)
+ c.Check(err, check.IsNil)
}
-func TestUnixVolumeReadonly(t *testing.T) {
- v := NewTestableUnixVolume(t, false, true)
+func (s *UnixVolumeSuite) TestUnixVolumeReadonly(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{ReadOnly: true, Replication: 1}, s.metrics, false)
defer v.Teardown()
v.PutRaw(TestHash, TestBlock)
buf := make([]byte, BlockSize)
_, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
- t.Errorf("got err %v, expected nil", err)
+ c.Errorf("got err %v, expected nil", err)
}
err = v.Put(context.Background(), TestHash, TestBlock)
if err != MethodDisabledError {
- t.Errorf("got err %v, expected MethodDisabledError", err)
+ c.Errorf("got err %v, expected MethodDisabledError", err)
}
err = v.Touch(TestHash)
if err != MethodDisabledError {
- t.Errorf("got err %v, expected MethodDisabledError", err)
+ c.Errorf("got err %v, expected MethodDisabledError", err)
}
err = v.Trash(TestHash)
if err != MethodDisabledError {
- t.Errorf("got err %v, expected MethodDisabledError", err)
+ c.Errorf("got err %v, expected MethodDisabledError", err)
}
}
-func TestIsFull(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestIsFull(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
fullPath := v.Root + "/full"
now := fmt.Sprintf("%d", time.Now().Unix())
os.Symlink(now, fullPath)
if !v.IsFull() {
- t.Errorf("%s: claims not to be full", v)
+ c.Errorf("%s: claims not to be full", v)
}
os.Remove(fullPath)
expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
os.Symlink(expired, fullPath)
if v.IsFull() {
- t.Errorf("%s: should no longer be full", v)
+ c.Errorf("%s: should no longer be full", v)
}
}
-func TestNodeStatus(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestNodeStatus(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
// Get node status and make a basic sanity check.
volinfo := v.Status()
if volinfo.MountPoint != v.Root {
- t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
+ c.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
}
if volinfo.DeviceNum == 0 {
- t.Errorf("uninitialized device_num in %v", volinfo)
+ c.Errorf("uninitialized device_num in %v", volinfo)
}
if volinfo.BytesFree == 0 {
- t.Errorf("uninitialized bytes_free in %v", volinfo)
+ c.Errorf("uninitialized bytes_free in %v", volinfo)
}
if volinfo.BytesUsed == 0 {
- t.Errorf("uninitialized bytes_used in %v", volinfo)
+ c.Errorf("uninitialized bytes_used in %v", volinfo)
}
}
-func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
v.Put(context.Background(), TestHash, TestBlock)
return mockErr
})
if err != mockErr {
- t.Errorf("Got %v, expected %v", err, mockErr)
+ c.Errorf("Got %v, expected %v", err, mockErr)
}
}
-func TestUnixVolumeGetFuncFileError(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
funcCalled := false
return nil
})
if err == nil {
- t.Errorf("Expected error opening non-existent file")
+ c.Errorf("Expected error opening non-existent file")
}
if funcCalled {
- t.Errorf("Worker func should not have been called")
+ c.Errorf("Worker func should not have been called")
}
}
-func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
v.Put(context.Background(), TestHash, TestBlock)
select {
case mtx.AllowLock <- struct{}{}:
case <-funcCalled:
- t.Fatal("Function was called before mutex was acquired")
+ c.Fatal("Function was called before mutex was acquired")
case <-time.After(5 * time.Second):
- t.Fatal("Timed out before mutex was acquired")
+ c.Fatal("Timed out before mutex was acquired")
}
select {
case <-funcCalled:
case mtx.AllowUnlock <- struct{}{}:
- t.Fatal("Mutex was released before function was called")
+ c.Fatal("Mutex was released before function was called")
case <-time.After(5 * time.Second):
- t.Fatal("Timed out waiting for funcCalled")
+ c.Fatal("Timed out waiting for funcCalled")
}
select {
case mtx.AllowUnlock <- struct{}{}:
case <-time.After(5 * time.Second):
- t.Fatal("Timed out waiting for getFunc() to release mutex")
+ c.Fatal("Timed out waiting for getFunc() to release mutex")
}
}
-func TestUnixVolumeCompare(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestUnixVolumeCompare(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
v.Put(context.Background(), TestHash, TestBlock)
err := v.Compare(context.Background(), TestHash, TestBlock)
if err != nil {
- t.Errorf("Got err %q, expected nil", err)
+ c.Errorf("Got err %q, expected nil", err)
}
err = v.Compare(context.Background(), TestHash, []byte("baddata"))
if err != CollisionError {
- t.Errorf("Got err %q, expected %q", err, CollisionError)
+ c.Errorf("Got err %q, expected %q", err, CollisionError)
}
v.Put(context.Background(), TestHash, []byte("baddata"))
err = v.Compare(context.Background(), TestHash, TestBlock)
if err != DiskHashError {
- t.Errorf("Got err %q, expected %q", err, DiskHashError)
+ c.Errorf("Got err %q, expected %q", err, DiskHashError)
}
- p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
- os.Chmod(p, 000)
- err = v.Compare(context.Background(), TestHash, TestBlock)
- if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
- t.Errorf("Got err %q, expected %q", err, "permission denied")
+ if os.Getuid() == 0 {
+ c.Log("skipping 'permission denied' check when running as root")
+ } else {
+ p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
+ err = os.Chmod(p, 000)
+ c.Assert(err, check.IsNil)
+ err = v.Compare(context.Background(), TestHash, TestBlock)
+ c.Check(err, check.ErrorMatches, ".*permission denied.*")
}
}
-func TestUnixVolumeContextCancelPut(t *testing.T) {
- v := NewTestableUnixVolume(t, true, false)
+func (s *UnixVolumeSuite) TestUnixVolumeContextCancelPut(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, true)
defer v.Teardown()
v.locker.Lock()
ctx, cancel := context.WithCancel(context.Background())
}()
err := v.Put(ctx, TestHash, TestBlock)
if err != context.Canceled {
- t.Errorf("Put() returned %s -- expected short read / canceled", err)
+ c.Errorf("Put() returned %s -- expected short read / canceled", err)
}
}
-func TestUnixVolumeContextCancelGet(t *testing.T) {
- v := NewTestableUnixVolume(t, false, false)
+func (s *UnixVolumeSuite) TestUnixVolumeContextCancelGet(c *check.C) {
+ v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
defer v.Teardown()
bpath := v.blockPath(TestHash)
v.PutRaw(TestHash, TestBlock)
os.Remove(bpath)
err := syscall.Mkfifo(bpath, 0600)
if err != nil {
- t.Fatalf("Mkfifo %s: %s", bpath, err)
+ c.Fatalf("Mkfifo %s: %s", bpath, err)
}
defer os.Remove(bpath)
ctx, cancel := context.WithCancel(context.Background())
buf := make([]byte, len(TestBlock))
n, err := v.Get(ctx, TestHash, buf)
if n == len(TestBlock) || err != context.Canceled {
- t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
- }
-}
-
-var _ = check.Suite(&UnixVolumeSuite{})
-
-type UnixVolumeSuite struct {
- volume *TestableUnixVolume
-}
-
-func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
- if s.volume != nil {
- s.volume.Teardown()
+ c.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
}
}
func (s *UnixVolumeSuite) TestStats(c *check.C) {
- s.volume = NewTestableUnixVolume(c, false, false)
+ vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
stats := func() string {
- buf, err := json.Marshal(s.volume.InternalStats())
+ buf, err := json.Marshal(vol.InternalStats())
c.Check(err, check.IsNil)
return string(buf)
}
- c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
+ c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
loc := "acbd18db4cc2f85cedef654fccc4a4d8"
- _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
+ _, err := vol.Get(context.Background(), loc, make([]byte, 3))
c.Check(err, check.NotNil)
c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
- c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
+ c.Check(stats(), check.Matches, `.*"\*(fs|os)\.PathError":[^0].*`) // os.PathError changed to fs.PathError in Go 1.16
c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
- err = s.volume.Put(context.Background(), loc, []byte("foo"))
+ err = vol.Put(context.Background(), loc, []byte("foo"))
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
- c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
+ c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
- err = s.volume.Touch(loc)
+ err = vol.Touch(loc)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
- c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
+ c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
- _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
+ _, err = vol.Get(context.Background(), loc, make([]byte, 3))
c.Check(err, check.IsNil)
- err = s.volume.Compare(context.Background(), loc, []byte("foo"))
+ err = vol.Compare(context.Background(), loc, []byte("foo"))
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
- err = s.volume.Trash(loc)
+ err = vol.Trash(loc)
c.Check(err, check.IsNil)
c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
}
-func (s *UnixVolumeSuite) TestConfig(c *check.C) {
- var cfg Config
- err := yaml.Unmarshal([]byte(`
-Volumes:
- - Type: Directory
- StorageClasses: ["class_a", "class_b"]
-`), &cfg)
-
+func (s *UnixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
+ vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
+
+ err := os.Mkdir(vol.UnixVolume.Root+"/aaa", 0777)
+ c.Assert(err, check.IsNil)
+ err = os.Mkdir(vol.UnixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
+ c.Assert(err, check.IsNil)
+ deleteme := vol.UnixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
+ err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
+ c.Assert(err, check.IsNil)
+ skipme := vol.UnixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
+ err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
+ c.Assert(err, check.IsNil)
+ vol.EmptyTrash()
+
+ _, err = os.Stat(skipme)
c.Check(err, check.IsNil)
- c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+
+ _, err = os.Stat(deleteme)
+ c.Check(err, check.NotNil)
+ c.Check(os.IsNotExist(err), check.Equals, true)
}