1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "github.com/ghodss/yaml"
23 "github.com/prometheus/client_golang/prometheus"
24 check "gopkg.in/check.v1"
27 type TestableUnixVolume struct {
32 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
33 d, err := ioutil.TempDir("", "volume_test")
37 var locker sync.Locker
39 locker = &sync.Mutex{}
41 return &TestableUnixVolume{
42 UnixVolume: UnixVolume{
51 // PutRaw writes a Keep block directly into a UnixVolume, even if
52 // the volume is readonly.
53 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
54 defer func(orig bool) {
58 err := v.Put(context.Background(), locator, data)
64 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
65 err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
71 func (v *TestableUnixVolume) Teardown() {
72 if err := os.RemoveAll(v.Root); err != nil {
77 func (v *TestableUnixVolume) GetMetricsVecs() (opsCounters, errCounters, ioBytes *prometheus.CounterVec) {
78 opsCounters = v.os.stats.opsCounters
79 errCounters = v.os.stats.errCounters
80 ioBytes = v.os.stats.ioBytes
84 // serialize = false; readonly = false
85 func TestUnixVolumeWithGenericTests(t *testing.T) {
86 DoGenericVolumeTests(t, func(t TB) TestableVolume {
87 return NewTestableUnixVolume(t, false, false)
91 // serialize = false; readonly = true
92 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
93 DoGenericVolumeTests(t, func(t TB) TestableVolume {
94 return NewTestableUnixVolume(t, false, true)
98 // serialize = true; readonly = false
99 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
100 DoGenericVolumeTests(t, func(t TB) TestableVolume {
101 return NewTestableUnixVolume(t, true, false)
105 // serialize = false; readonly = false
106 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
107 DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
108 vols := make([]Volume, 2)
109 testableUnixVols := make([]TestableVolume, 2)
111 for i := range vols {
112 v := NewTestableUnixVolume(t, false, false)
114 testableUnixVols[i] = v
117 return MakeRRVolumeManager(vols), testableUnixVols
121 func TestReplicationDefault1(t *testing.T) {
126 metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
127 if err := v.Start(metrics); err != nil {
130 if got := v.Replication(); got != 1 {
131 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
135 func TestGetNotFound(t *testing.T) {
136 v := NewTestableUnixVolume(t, false, false)
138 v.Put(context.Background(), TestHash, TestBlock)
140 buf := make([]byte, BlockSize)
141 n, err := v.Get(context.Background(), TestHash2, buf)
143 case os.IsNotExist(err):
146 t.Errorf("Read should have failed, returned %+q", buf[:n])
148 t.Errorf("Read expected ErrNotExist, got: %s", err)
152 func TestPut(t *testing.T) {
153 v := NewTestableUnixVolume(t, false, false)
156 err := v.Put(context.Background(), TestHash, TestBlock)
160 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
161 if buf, err := ioutil.ReadFile(p); err != nil {
163 } else if bytes.Compare(buf, TestBlock) != 0 {
164 t.Errorf("Write should have stored %s, did store %s",
165 string(TestBlock), string(buf))
169 func TestPutBadVolume(t *testing.T) {
170 v := NewTestableUnixVolume(t, false, false)
173 os.Chmod(v.Root, 000)
174 err := v.Put(context.Background(), TestHash, TestBlock)
176 t.Error("Write should have failed")
180 func TestUnixVolumeReadonly(t *testing.T) {
181 v := NewTestableUnixVolume(t, false, true)
184 v.PutRaw(TestHash, TestBlock)
186 buf := make([]byte, BlockSize)
187 _, err := v.Get(context.Background(), TestHash, buf)
189 t.Errorf("got err %v, expected nil", err)
192 err = v.Put(context.Background(), TestHash, TestBlock)
193 if err != MethodDisabledError {
194 t.Errorf("got err %v, expected MethodDisabledError", err)
197 err = v.Touch(TestHash)
198 if err != MethodDisabledError {
199 t.Errorf("got err %v, expected MethodDisabledError", err)
202 err = v.Trash(TestHash)
203 if err != MethodDisabledError {
204 t.Errorf("got err %v, expected MethodDisabledError", err)
208 func TestIsFull(t *testing.T) {
209 v := NewTestableUnixVolume(t, false, false)
212 fullPath := v.Root + "/full"
213 now := fmt.Sprintf("%d", time.Now().Unix())
214 os.Symlink(now, fullPath)
216 t.Errorf("%s: claims not to be full", v)
220 // Test with an expired /full link.
221 expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
222 os.Symlink(expired, fullPath)
224 t.Errorf("%s: should no longer be full", v)
228 func TestNodeStatus(t *testing.T) {
229 v := NewTestableUnixVolume(t, false, false)
232 // Get node status and make a basic sanity check.
233 volinfo := v.Status()
234 if volinfo.MountPoint != v.Root {
235 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
237 if volinfo.DeviceNum == 0 {
238 t.Errorf("uninitialized device_num in %v", volinfo)
240 if volinfo.BytesFree == 0 {
241 t.Errorf("uninitialized bytes_free in %v", volinfo)
243 if volinfo.BytesUsed == 0 {
244 t.Errorf("uninitialized bytes_used in %v", volinfo)
248 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
249 v := NewTestableUnixVolume(t, false, false)
252 v.Put(context.Background(), TestHash, TestBlock)
253 mockErr := errors.New("Mock error")
254 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
258 t.Errorf("Got %v, expected %v", err, mockErr)
262 func TestUnixVolumeGetFuncFileError(t *testing.T) {
263 v := NewTestableUnixVolume(t, false, false)
267 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
272 t.Errorf("Expected error opening non-existent file")
275 t.Errorf("Worker func should not have been called")
279 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
280 v := NewTestableUnixVolume(t, false, false)
283 v.Put(context.Background(), TestHash, TestBlock)
285 mtx := NewMockMutex()
288 funcCalled := make(chan struct{})
289 go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
290 funcCalled <- struct{}{}
294 case mtx.AllowLock <- struct{}{}:
296 t.Fatal("Function was called before mutex was acquired")
297 case <-time.After(5 * time.Second):
298 t.Fatal("Timed out before mutex was acquired")
302 case mtx.AllowUnlock <- struct{}{}:
303 t.Fatal("Mutex was released before function was called")
304 case <-time.After(5 * time.Second):
305 t.Fatal("Timed out waiting for funcCalled")
308 case mtx.AllowUnlock <- struct{}{}:
309 case <-time.After(5 * time.Second):
310 t.Fatal("Timed out waiting for getFunc() to release mutex")
314 func TestUnixVolumeCompare(t *testing.T) {
315 v := NewTestableUnixVolume(t, false, false)
318 v.Put(context.Background(), TestHash, TestBlock)
319 err := v.Compare(context.Background(), TestHash, TestBlock)
321 t.Errorf("Got err %q, expected nil", err)
324 err = v.Compare(context.Background(), TestHash, []byte("baddata"))
325 if err != CollisionError {
326 t.Errorf("Got err %q, expected %q", err, CollisionError)
329 v.Put(context.Background(), TestHash, []byte("baddata"))
330 err = v.Compare(context.Background(), TestHash, TestBlock)
331 if err != DiskHashError {
332 t.Errorf("Got err %q, expected %q", err, DiskHashError)
335 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
337 err = v.Compare(context.Background(), TestHash, TestBlock)
338 if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
339 t.Errorf("Got err %q, expected %q", err, "permission denied")
343 func TestUnixVolumeContextCancelPut(t *testing.T) {
344 v := NewTestableUnixVolume(t, true, false)
347 ctx, cancel := context.WithCancel(context.Background())
349 time.Sleep(50 * time.Millisecond)
351 time.Sleep(50 * time.Millisecond)
354 err := v.Put(ctx, TestHash, TestBlock)
355 if err != context.Canceled {
356 t.Errorf("Put() returned %s -- expected short read / canceled", err)
360 func TestUnixVolumeContextCancelGet(t *testing.T) {
361 v := NewTestableUnixVolume(t, false, false)
363 bpath := v.blockPath(TestHash)
364 v.PutRaw(TestHash, TestBlock)
366 err := syscall.Mkfifo(bpath, 0600)
368 t.Fatalf("Mkfifo %s: %s", bpath, err)
370 defer os.Remove(bpath)
371 ctx, cancel := context.WithCancel(context.Background())
373 time.Sleep(50 * time.Millisecond)
376 buf := make([]byte, len(TestBlock))
377 n, err := v.Get(ctx, TestHash, buf)
378 if n == len(TestBlock) || err != context.Canceled {
379 t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
383 var _ = check.Suite(&UnixVolumeSuite{})
385 type UnixVolumeSuite struct {
386 volume *TestableUnixVolume
389 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
395 func (s *UnixVolumeSuite) TestStats(c *check.C) {
396 s.volume = NewTestableUnixVolume(c, false, false)
397 stats := func() string {
398 buf, err := json.Marshal(s.volume.InternalStats())
399 c.Check(err, check.IsNil)
403 c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
404 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
406 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
407 _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
408 c.Check(err, check.NotNil)
409 c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
410 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
411 c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
412 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
413 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
414 c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
416 err = s.volume.Put(context.Background(), loc, []byte("foo"))
417 c.Check(err, check.IsNil)
418 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
419 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
420 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
421 c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
423 err = s.volume.Touch(loc)
424 c.Check(err, check.IsNil)
425 c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
426 c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
427 c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
429 _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
430 c.Check(err, check.IsNil)
431 err = s.volume.Compare(context.Background(), loc, []byte("foo"))
432 c.Check(err, check.IsNil)
433 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
434 c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
436 err = s.volume.Trash(loc)
437 c.Check(err, check.IsNil)
438 c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
441 func (s *UnixVolumeSuite) TestConfig(c *check.C) {
443 err := yaml.Unmarshal([]byte(`
446 StorageClasses: ["class_a", "class_b"]
449 c.Check(err, check.IsNil)
450 c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})