1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.arvados.org/arvados.git/sdk/go/arvados"
22 "git.arvados.org/arvados.git/sdk/go/ctxlog"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/sirupsen/logrus"
25 check "gopkg.in/check.v1"
28 type TestableUnixVolume struct {
33 // PutRaw writes a Keep block directly into a UnixVolume, even if
34 // the volume is readonly.
35 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
36 defer func(orig bool) {
37 v.volume.ReadOnly = orig
39 v.volume.ReadOnly = false
40 err := v.Put(context.Background(), locator, data)
46 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
47 err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
53 func (v *TestableUnixVolume) Teardown() {
54 if err := os.RemoveAll(v.Root); err != nil {
59 func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
60 return "open", "create"
63 var _ = check.Suite(&UnixVolumeSuite{})
65 type UnixVolumeSuite struct {
66 cluster *arvados.Cluster
67 volumes []*TestableUnixVolume
68 metrics *volumeMetricsVecs
71 func (s *UnixVolumeSuite) SetUpTest(c *check.C) {
72 s.cluster = testCluster(c)
73 s.metrics = newVolumeMetricsVecs(prometheus.NewRegistry())
76 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
77 for _, v := range s.volumes {
82 func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, serialize bool) *TestableUnixVolume {
83 d, err := ioutil.TempDir("", "volume_test")
84 c.Check(err, check.IsNil)
85 var locker sync.Locker
87 locker = &sync.Mutex{}
89 v := &TestableUnixVolume{
90 UnixVolume: UnixVolume{
94 logger: ctxlog.TestLogger(c),
100 c.Check(v.check(), check.IsNil)
101 s.volumes = append(s.volumes, v)
105 // serialize = false; readonly = false
106 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
107 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
108 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
112 // serialize = false; readonly = true
113 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsReadOnly(c *check.C) {
114 DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
115 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
119 // serialize = true; readonly = false
120 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsSerialized(c *check.C) {
121 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
122 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
126 // serialize = true; readonly = true
127 func (s *UnixVolumeSuite) TestUnixVolumeHandlersWithGenericVolumeTests(c *check.C) {
128 DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
129 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
133 func (s *UnixVolumeSuite) TestGetNotFound(c *check.C) {
134 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
136 v.Put(context.Background(), TestHash, TestBlock)
138 buf := make([]byte, BlockSize)
139 n, err := v.Get(context.Background(), TestHash2, buf)
141 case os.IsNotExist(err):
144 c.Errorf("Read should have failed, returned %+q", buf[:n])
146 c.Errorf("Read expected ErrNotExist, got: %s", err)
150 func (s *UnixVolumeSuite) TestPut(c *check.C) {
151 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
154 err := v.Put(context.Background(), TestHash, TestBlock)
158 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
159 if buf, err := ioutil.ReadFile(p); err != nil {
161 } else if bytes.Compare(buf, TestBlock) != 0 {
162 c.Errorf("Write should have stored %s, did store %s",
163 string(TestBlock), string(buf))
167 func (s *UnixVolumeSuite) TestPutBadVolume(c *check.C) {
168 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
171 os.Chmod(v.Root, 000)
172 err := v.Put(context.Background(), TestHash, TestBlock)
174 c.Error("Write should have failed")
178 func (s *UnixVolumeSuite) TestUnixVolumeReadonly(c *check.C) {
179 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{ReadOnly: true, Replication: 1}, s.metrics, false)
182 v.PutRaw(TestHash, TestBlock)
184 buf := make([]byte, BlockSize)
185 _, err := v.Get(context.Background(), TestHash, buf)
187 c.Errorf("got err %v, expected nil", err)
190 err = v.Put(context.Background(), TestHash, TestBlock)
191 if err != MethodDisabledError {
192 c.Errorf("got err %v, expected MethodDisabledError", err)
195 err = v.Touch(TestHash)
196 if err != MethodDisabledError {
197 c.Errorf("got err %v, expected MethodDisabledError", err)
200 err = v.Trash(TestHash)
201 if err != MethodDisabledError {
202 c.Errorf("got err %v, expected MethodDisabledError", err)
206 func (s *UnixVolumeSuite) TestIsFull(c *check.C) {
207 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
210 fullPath := v.Root + "/full"
211 now := fmt.Sprintf("%d", time.Now().Unix())
212 os.Symlink(now, fullPath)
214 c.Errorf("%s: claims not to be full", v)
218 // Test with an expired /full link.
219 expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
220 os.Symlink(expired, fullPath)
222 c.Errorf("%s: should no longer be full", v)
226 func (s *UnixVolumeSuite) TestNodeStatus(c *check.C) {
227 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
230 // Get node status and make a basic sanity check.
231 volinfo := v.Status()
232 if volinfo.MountPoint != v.Root {
233 c.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
235 if volinfo.DeviceNum == 0 {
236 c.Errorf("uninitialized device_num in %v", volinfo)
238 if volinfo.BytesFree == 0 {
239 c.Errorf("uninitialized bytes_free in %v", volinfo)
241 if volinfo.BytesUsed == 0 {
242 c.Errorf("uninitialized bytes_used in %v", volinfo)
246 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
247 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
250 v.Put(context.Background(), TestHash, TestBlock)
251 mockErr := errors.New("Mock error")
252 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
256 c.Errorf("Got %v, expected %v", err, mockErr)
260 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
261 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
265 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
270 c.Errorf("Expected error opening non-existent file")
273 c.Errorf("Worker func should not have been called")
277 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
278 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
281 v.Put(context.Background(), TestHash, TestBlock)
283 mtx := NewMockMutex()
286 funcCalled := make(chan struct{})
287 go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
288 funcCalled <- struct{}{}
292 case mtx.AllowLock <- struct{}{}:
294 c.Fatal("Function was called before mutex was acquired")
295 case <-time.After(5 * time.Second):
296 c.Fatal("Timed out before mutex was acquired")
300 case mtx.AllowUnlock <- struct{}{}:
301 c.Fatal("Mutex was released before function was called")
302 case <-time.After(5 * time.Second):
303 c.Fatal("Timed out waiting for funcCalled")
306 case mtx.AllowUnlock <- struct{}{}:
307 case <-time.After(5 * time.Second):
308 c.Fatal("Timed out waiting for getFunc() to release mutex")
312 func (s *UnixVolumeSuite) TestUnixVolumeCompare(c *check.C) {
313 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
316 v.Put(context.Background(), TestHash, TestBlock)
317 err := v.Compare(context.Background(), TestHash, TestBlock)
319 c.Errorf("Got err %q, expected nil", err)
322 err = v.Compare(context.Background(), TestHash, []byte("baddata"))
323 if err != CollisionError {
324 c.Errorf("Got err %q, expected %q", err, CollisionError)
327 v.Put(context.Background(), TestHash, []byte("baddata"))
328 err = v.Compare(context.Background(), TestHash, TestBlock)
329 if err != DiskHashError {
330 c.Errorf("Got err %q, expected %q", err, DiskHashError)
333 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
335 err = v.Compare(context.Background(), TestHash, TestBlock)
336 if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
337 c.Errorf("Got err %q, expected %q", err, "permission denied")
341 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelPut(c *check.C) {
342 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, true)
345 ctx, cancel := context.WithCancel(context.Background())
347 time.Sleep(50 * time.Millisecond)
349 time.Sleep(50 * time.Millisecond)
352 err := v.Put(ctx, TestHash, TestBlock)
353 if err != context.Canceled {
354 c.Errorf("Put() returned %s -- expected short read / canceled", err)
358 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelGet(c *check.C) {
359 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
361 bpath := v.blockPath(TestHash)
362 v.PutRaw(TestHash, TestBlock)
364 err := syscall.Mkfifo(bpath, 0600)
366 c.Fatalf("Mkfifo %s: %s", bpath, err)
368 defer os.Remove(bpath)
369 ctx, cancel := context.WithCancel(context.Background())
371 time.Sleep(50 * time.Millisecond)
374 buf := make([]byte, len(TestBlock))
375 n, err := v.Get(ctx, TestHash, buf)
376 if n == len(TestBlock) || err != context.Canceled {
377 c.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
381 func (s *UnixVolumeSuite) TestStats(c *check.C) {
382 vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
383 stats := func() string {
384 buf, err := json.Marshal(vol.InternalStats())
385 c.Check(err, check.IsNil)
389 c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
390 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
392 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
393 _, err := vol.Get(context.Background(), loc, make([]byte, 3))
394 c.Check(err, check.NotNil)
395 c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
396 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
397 c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
398 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
399 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
400 c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
402 err = vol.Put(context.Background(), loc, []byte("foo"))
403 c.Check(err, check.IsNil)
404 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
405 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
406 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
407 c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
410 c.Check(err, check.IsNil)
411 c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
412 c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
413 c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
415 _, err = vol.Get(context.Background(), loc, make([]byte, 3))
416 c.Check(err, check.IsNil)
417 err = vol.Compare(context.Background(), loc, []byte("foo"))
418 c.Check(err, check.IsNil)
419 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
420 c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
423 c.Check(err, check.IsNil)
424 c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)