1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
21 "git.curoverse.com/arvados.git/sdk/go/arvados"
22 "github.com/prometheus/client_golang/prometheus"
23 "github.com/sirupsen/logrus"
24 check "gopkg.in/check.v1"
27 type TestableUnixVolume struct {
32 // PutRaw writes a Keep block directly into a UnixVolume, even if
33 // the volume is readonly.
34 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
35 defer func(orig bool) {
36 v.volume.ReadOnly = orig
38 v.volume.ReadOnly = false
39 err := v.Put(context.Background(), locator, data)
45 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
46 err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
52 func (v *TestableUnixVolume) Teardown() {
53 if err := os.RemoveAll(v.Root); err != nil {
58 func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
59 return "open", "create"
62 var _ = check.Suite(&UnixVolumeSuite{})
64 type UnixVolumeSuite struct {
65 cluster *arvados.Cluster
66 volumes []*TestableUnixVolume
67 metrics *volumeMetricsVecs
70 func (s *UnixVolumeSuite) SetUpTest(c *check.C) {
71 s.cluster = testCluster(c)
72 s.metrics = newVolumeMetricsVecs(prometheus.NewRegistry())
75 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
76 for _, v := range s.volumes {
81 func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, serialize bool) *TestableUnixVolume {
82 d, err := ioutil.TempDir("", "volume_test")
83 c.Check(err, check.IsNil)
84 var locker sync.Locker
86 locker = &sync.Mutex{}
88 v := &TestableUnixVolume{
89 UnixVolume: UnixVolume{
98 c.Check(v.check(), check.IsNil)
99 s.volumes = append(s.volumes, v)
103 // serialize = false; readonly = false
104 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
105 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
106 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
110 // serialize = false; readonly = true
111 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsReadOnly(c *check.C) {
112 DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
113 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
117 // serialize = true; readonly = false
118 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsSerialized(c *check.C) {
119 DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
120 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
124 // serialize = true; readonly = true
125 func (s *UnixVolumeSuite) TestUnixVolumeHandlersWithGenericVolumeTests(c *check.C) {
126 DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
127 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
131 func (s *UnixVolumeSuite) TestGetNotFound(c *check.C) {
132 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
134 v.Put(context.Background(), TestHash, TestBlock)
136 buf := make([]byte, BlockSize)
137 n, err := v.Get(context.Background(), TestHash2, buf)
139 case os.IsNotExist(err):
142 c.Errorf("Read should have failed, returned %+q", buf[:n])
144 c.Errorf("Read expected ErrNotExist, got: %s", err)
148 func (s *UnixVolumeSuite) TestPut(c *check.C) {
149 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
152 err := v.Put(context.Background(), TestHash, TestBlock)
156 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
157 if buf, err := ioutil.ReadFile(p); err != nil {
159 } else if bytes.Compare(buf, TestBlock) != 0 {
160 c.Errorf("Write should have stored %s, did store %s",
161 string(TestBlock), string(buf))
165 func (s *UnixVolumeSuite) TestPutBadVolume(c *check.C) {
166 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
169 os.Chmod(v.Root, 000)
170 err := v.Put(context.Background(), TestHash, TestBlock)
172 c.Error("Write should have failed")
176 func (s *UnixVolumeSuite) TestUnixVolumeReadonly(c *check.C) {
177 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{ReadOnly: true, Replication: 1}, s.metrics, false)
180 v.PutRaw(TestHash, TestBlock)
182 buf := make([]byte, BlockSize)
183 _, err := v.Get(context.Background(), TestHash, buf)
185 c.Errorf("got err %v, expected nil", err)
188 err = v.Put(context.Background(), TestHash, TestBlock)
189 if err != MethodDisabledError {
190 c.Errorf("got err %v, expected MethodDisabledError", err)
193 err = v.Touch(TestHash)
194 if err != MethodDisabledError {
195 c.Errorf("got err %v, expected MethodDisabledError", err)
198 err = v.Trash(TestHash)
199 if err != MethodDisabledError {
200 c.Errorf("got err %v, expected MethodDisabledError", err)
204 func (s *UnixVolumeSuite) TestIsFull(c *check.C) {
205 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
208 fullPath := v.Root + "/full"
209 now := fmt.Sprintf("%d", time.Now().Unix())
210 os.Symlink(now, fullPath)
212 c.Errorf("%s: claims not to be full", v)
216 // Test with an expired /full link.
217 expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
218 os.Symlink(expired, fullPath)
220 c.Errorf("%s: should no longer be full", v)
224 func (s *UnixVolumeSuite) TestNodeStatus(c *check.C) {
225 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
228 // Get node status and make a basic sanity check.
229 volinfo := v.Status()
230 if volinfo.MountPoint != v.Root {
231 c.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
233 if volinfo.DeviceNum == 0 {
234 c.Errorf("uninitialized device_num in %v", volinfo)
236 if volinfo.BytesFree == 0 {
237 c.Errorf("uninitialized bytes_free in %v", volinfo)
239 if volinfo.BytesUsed == 0 {
240 c.Errorf("uninitialized bytes_used in %v", volinfo)
244 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
245 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
248 v.Put(context.Background(), TestHash, TestBlock)
249 mockErr := errors.New("Mock error")
250 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
254 c.Errorf("Got %v, expected %v", err, mockErr)
258 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
259 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
263 err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
268 c.Errorf("Expected error opening non-existent file")
271 c.Errorf("Worker func should not have been called")
275 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
276 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
279 v.Put(context.Background(), TestHash, TestBlock)
281 mtx := NewMockMutex()
284 funcCalled := make(chan struct{})
285 go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
286 funcCalled <- struct{}{}
290 case mtx.AllowLock <- struct{}{}:
292 c.Fatal("Function was called before mutex was acquired")
293 case <-time.After(5 * time.Second):
294 c.Fatal("Timed out before mutex was acquired")
298 case mtx.AllowUnlock <- struct{}{}:
299 c.Fatal("Mutex was released before function was called")
300 case <-time.After(5 * time.Second):
301 c.Fatal("Timed out waiting for funcCalled")
304 case mtx.AllowUnlock <- struct{}{}:
305 case <-time.After(5 * time.Second):
306 c.Fatal("Timed out waiting for getFunc() to release mutex")
310 func (s *UnixVolumeSuite) TestUnixVolumeCompare(c *check.C) {
311 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
314 v.Put(context.Background(), TestHash, TestBlock)
315 err := v.Compare(context.Background(), TestHash, TestBlock)
317 c.Errorf("Got err %q, expected nil", err)
320 err = v.Compare(context.Background(), TestHash, []byte("baddata"))
321 if err != CollisionError {
322 c.Errorf("Got err %q, expected %q", err, CollisionError)
325 v.Put(context.Background(), TestHash, []byte("baddata"))
326 err = v.Compare(context.Background(), TestHash, TestBlock)
327 if err != DiskHashError {
328 c.Errorf("Got err %q, expected %q", err, DiskHashError)
331 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
333 err = v.Compare(context.Background(), TestHash, TestBlock)
334 if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
335 c.Errorf("Got err %q, expected %q", err, "permission denied")
339 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelPut(c *check.C) {
340 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, true)
343 ctx, cancel := context.WithCancel(context.Background())
345 time.Sleep(50 * time.Millisecond)
347 time.Sleep(50 * time.Millisecond)
350 err := v.Put(ctx, TestHash, TestBlock)
351 if err != context.Canceled {
352 c.Errorf("Put() returned %s -- expected short read / canceled", err)
356 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelGet(c *check.C) {
357 v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
359 bpath := v.blockPath(TestHash)
360 v.PutRaw(TestHash, TestBlock)
362 err := syscall.Mkfifo(bpath, 0600)
364 c.Fatalf("Mkfifo %s: %s", bpath, err)
366 defer os.Remove(bpath)
367 ctx, cancel := context.WithCancel(context.Background())
369 time.Sleep(50 * time.Millisecond)
372 buf := make([]byte, len(TestBlock))
373 n, err := v.Get(ctx, TestHash, buf)
374 if n == len(TestBlock) || err != context.Canceled {
375 c.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
379 func (s *UnixVolumeSuite) TestStats(c *check.C) {
380 vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
381 stats := func() string {
382 buf, err := json.Marshal(vol.InternalStats())
383 c.Check(err, check.IsNil)
387 c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
388 c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
390 loc := "acbd18db4cc2f85cedef654fccc4a4d8"
391 _, err := vol.Get(context.Background(), loc, make([]byte, 3))
392 c.Check(err, check.NotNil)
393 c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
394 c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
395 c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
396 c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
397 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
398 c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
400 err = vol.Put(context.Background(), loc, []byte("foo"))
401 c.Check(err, check.IsNil)
402 c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
403 c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
404 c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
405 c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
408 c.Check(err, check.IsNil)
409 c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
410 c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
411 c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
413 _, err = vol.Get(context.Background(), loc, make([]byte, 3))
414 c.Check(err, check.IsNil)
415 err = vol.Compare(context.Background(), loc, []byte("foo"))
416 c.Check(err, check.IsNil)
417 c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
418 c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
421 c.Check(err, check.IsNil)
422 c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)