1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
20 "git.arvados.org/arvados.git/sdk/go/arvados"
21 "git.arvados.org/arvados.git/sdk/go/ctxlog"
22 "github.com/prometheus/client_golang/prometheus"
23 "github.com/sirupsen/logrus"
24 check "gopkg.in/check.v1"
27 type TestableUnixVolume struct {
32 // PutRaw writes a Keep block directly into a UnixVolume, even if
33 // the volume is readonly.
34 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
35 defer func(orig bool) {
36 v.volume.ReadOnly = orig
38 v.volume.ReadOnly = false
39 err := v.Put(context.Background(), locator, data)
45 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
46 err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
52 func (v *TestableUnixVolume) Teardown() {
53 if err := os.RemoveAll(v.Root); err != nil {
58 func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
59 return "open", "create"
62 var _ = check.Suite(&UnixVolumeSuite{})
64 type UnixVolumeSuite struct {
65 cluster *arvados.Cluster
66 volumes []*TestableUnixVolume
67 metrics *volumeMetricsVecs
70 func (s *UnixVolumeSuite) SetUpTest(c *check.C) {
71 s.cluster = testCluster(c)
72 s.metrics = newVolumeMetricsVecs(prometheus.NewRegistry())
75 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
76 for _, v := range s.volumes {
81 func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, serialize bool) *TestableUnixVolume {
82 d, err := ioutil.TempDir("", "volume_test")
83 c.Check(err, check.IsNil)
84 var locker sync.Locker
86 locker = &sync.Mutex{}
88 v := &TestableUnixVolume{
89 UnixVolume: UnixVolume{
93 logger: ctxlog.TestLogger(c),
99 c.Check(v.check(), check.IsNil)
100 s.volumes = append(s.volumes, v)
104 // serialize = false; readonly = false
105 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
106 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
107 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
111 // serialize = false; readonly = true
112 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsReadOnly(c *check.C) {
113 DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
114 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
118 // serialize = true; readonly = false
119 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsSerialized(c *check.C) {
120 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
121 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
125 // serialize = true; readonly = true
126 func (s *UnixVolumeSuite) TestUnixVolumeHandlersWithGenericVolumeTests(c *check.C) {
127 DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
128 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
132 func (s *UnixVolumeSuite) TestGetNotFound(c *check.C) {
133 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
135 v.Put(context.Background(), TestHash, TestBlock)
137 buf := make([]byte, BlockSize)
138 n, err := v.Get(context.Background(), TestHash2, buf)
140 case os.IsNotExist(err):
143 c.Errorf("Read should have failed, returned %+q", buf[:n])
145 c.Errorf("Read expected ErrNotExist, got: %s", err)
149 func (s *UnixVolumeSuite) TestPut(c *check.C) {
150 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
153 err := v.Put(context.Background(), TestHash, TestBlock)
157 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
158 if buf, err := ioutil.ReadFile(p); err != nil {
160 } else if bytes.Compare(buf, TestBlock) != 0 {
161 c.Errorf("Write should have stored %s, did store %s",
162 string(TestBlock), string(buf))
166 func (s *UnixVolumeSuite) TestPutBadVolume(c *check.C) {
167 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
170 err := os.RemoveAll(v.Root)
171 c.Assert(err, check.IsNil)
172 err = v.Put(context.Background(), TestHash, TestBlock)
173 c.Check(err, check.IsNil)
176 func (s *UnixVolumeSuite) TestUnixVolumeReadonly(c *check.C) {
177 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{ReadOnly: true, Replication: 1}, s.metrics, false)
180 v.PutRaw(TestHash, TestBlock)
182 buf := make([]byte, BlockSize)
183 _, err := v.Get(context.Background(), TestHash, buf)
185 c.Errorf("got err %v, expected nil", err)
188 err = v.Put(context.Background(), TestHash, TestBlock)
189 if err != MethodDisabledError {
190 c.Errorf("got err %v, expected MethodDisabledError", err)
193 err = v.Touch(TestHash)
194 if err != MethodDisabledError {
195 c.Errorf("got err %v, expected MethodDisabledError", err)
198 err = v.Trash(TestHash)
199 if err != MethodDisabledError {
200 c.Errorf("got err %v, expected MethodDisabledError", err)
204 func (s *UnixVolumeSuite) TestIsFull(c *check.C) {
205 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
208 fullPath := v.Root + "/full"
209 now := fmt.Sprintf("%d", time.Now().Unix())
210 os.Symlink(now, fullPath)
212 c.Errorf("%s: claims not to be full", v)
216 // Test with an expired /full link.
217 expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
218 os.Symlink(expired, fullPath)
220 c.Errorf("%s: should no longer be full", v)
224 func (s *UnixVolumeSuite) TestNodeStatus(c *check.C) {
225 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
228 // Get node status and make a basic sanity check.
229 volinfo := v.Status()
230 if volinfo.MountPoint != v.Root {
231 c.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
233 if volinfo.DeviceNum == 0 {
234 c.Errorf("uninitialized device_num in %v", volinfo)
236 if volinfo.BytesFree == 0 {
237 c.Errorf("uninitialized bytes_free in %v", volinfo)
239 if volinfo.BytesUsed == 0 {
240 c.Errorf("uninitialized bytes_used in %v", volinfo)
244 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
245 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
248 v.Put(context.Background(), TestHash, TestBlock)
249 mockErr := errors.New("Mock error")
250 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
254 c.Errorf("Got %v, expected %v", err, mockErr)
258 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
259 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
263 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
268 c.Errorf("Expected error opening non-existent file")
271 c.Errorf("Worker func should not have been called")
275 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
276 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
279 v.Put(context.Background(), TestHash, TestBlock)
281 mtx := NewMockMutex()
284 funcCalled := make(chan struct{})
285 go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
286 funcCalled <- struct{}{}
290 case mtx.AllowLock <- struct{}{}:
292 c.Fatal("Function was called before mutex was acquired")
293 case <-time.After(5 * time.Second):
294 c.Fatal("Timed out before mutex was acquired")
298 case mtx.AllowUnlock <- struct{}{}:
299 c.Fatal("Mutex was released before function was called")
300 case <-time.After(5 * time.Second):
301 c.Fatal("Timed out waiting for funcCalled")
304 case mtx.AllowUnlock <- struct{}{}:
305 case <-time.After(5 * time.Second):
306 c.Fatal("Timed out waiting for getFunc() to release mutex")
310 func (s *UnixVolumeSuite) TestUnixVolumeCompare(c *check.C) {
311 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
314 v.Put(context.Background(), TestHash, TestBlock)
315 err := v.Compare(context.Background(), TestHash, TestBlock)
317 c.Errorf("Got err %q, expected nil", err)
320 err = v.Compare(context.Background(), TestHash, []byte("baddata"))
321 if err != CollisionError {
322 c.Errorf("Got err %q, expected %q", err, CollisionError)
325 v.Put(context.Background(), TestHash, []byte("baddata"))
326 err = v.Compare(context.Background(), TestHash, TestBlock)
327 if err != DiskHashError {
328 c.Errorf("Got err %q, expected %q", err, DiskHashError)
331 if os.Getuid() == 0 {
332 c.Log("skipping 'permission denied' check when running as root")
334 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
335 err = os.Chmod(p, 000)
336 c.Assert(err, check.IsNil)
337 err = v.Compare(context.Background(), TestHash, TestBlock)
338 c.Check(err, check.ErrorMatches, ".*permission denied.*")
342 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelPut(c *check.C) {
343 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, true)
346 ctx, cancel := context.WithCancel(context.Background())
348 time.Sleep(50 * time.Millisecond)
350 time.Sleep(50 * time.Millisecond)
353 err := v.Put(ctx, TestHash, TestBlock)
354 if err != context.Canceled {
355 c.Errorf("Put() returned %s -- expected short read / canceled", err)
359 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelGet(c *check.C) {
360 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
362 bpath := v.blockPath(TestHash)
363 v.PutRaw(TestHash, TestBlock)
365 err := syscall.Mkfifo(bpath, 0600)
367 c.Fatalf("Mkfifo %s: %s", bpath, err)
369 defer os.Remove(bpath)
370 ctx, cancel := context.WithCancel(context.Background())
372 time.Sleep(50 * time.Millisecond)
375 buf := make([]byte, len(TestBlock))
376 n, err := v.Get(ctx, TestHash, buf)
377 if n == len(TestBlock) || err != context.Canceled {
378 c.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
382 func (s *UnixVolumeSuite) TestStats(c *check.C) {
383 vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
384 stats := func() string {
385 buf, err := json.Marshal(vol.InternalStats())
386 c.Check(err, check.IsNil)
390 c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
391 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
393 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
394 _, err := vol.Get(context.Background(), loc, make([]byte, 3))
395 c.Check(err, check.NotNil)
396 c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
397 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
398 c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
399 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
400 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
401 c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
403 err = vol.Put(context.Background(), loc, []byte("foo"))
404 c.Check(err, check.IsNil)
405 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
406 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
407 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
408 c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
411 c.Check(err, check.IsNil)
412 c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
413 c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
414 c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
416 _, err = vol.Get(context.Background(), loc, make([]byte, 3))
417 c.Check(err, check.IsNil)
418 err = vol.Compare(context.Background(), loc, []byte("foo"))
419 c.Check(err, check.IsNil)
420 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
421 c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
424 c.Check(err, check.IsNil)
425 c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
428 func (s *UnixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
429 vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
431 err := os.Mkdir(vol.UnixVolume.Root+"/aaa", 0777)
432 c.Assert(err, check.IsNil)
433 err = os.Mkdir(vol.UnixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
434 c.Assert(err, check.IsNil)
435 deleteme := vol.UnixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
436 err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
437 c.Assert(err, check.IsNil)
438 skipme := vol.UnixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
439 err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
440 c.Assert(err, check.IsNil)
443 _, err = os.Stat(skipme)
444 c.Check(err, check.IsNil)
446 _, err = os.Stat(deleteme)
447 c.Check(err, check.NotNil)
448 c.Check(os.IsNotExist(err), check.Equals, true)