Merge branch '21189-changeset-limit'
[arvados.git] / services / keepstore / unix_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "sync"
17         "syscall"
18         "time"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/ctxlog"
22         "github.com/prometheus/client_golang/prometheus"
23         "github.com/sirupsen/logrus"
24         check "gopkg.in/check.v1"
25 )
26
27 type TestableUnixVolume struct {
28         UnixVolume
29         t TB
30 }
31
32 // PutRaw writes a Keep block directly into a UnixVolume, even if
33 // the volume is readonly.
34 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
35         defer func(orig bool) {
36                 v.volume.ReadOnly = orig
37         }(v.volume.ReadOnly)
38         v.volume.ReadOnly = false
39         err := v.Put(context.Background(), locator, data)
40         if err != nil {
41                 v.t.Fatal(err)
42         }
43 }
44
45 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
46         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
47         if err != nil {
48                 v.t.Fatal(err)
49         }
50 }
51
52 func (v *TestableUnixVolume) Teardown() {
53         if err := os.RemoveAll(v.Root); err != nil {
54                 v.t.Error(err)
55         }
56 }
57
58 func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
59         return "open", "create"
60 }
61
62 var _ = check.Suite(&UnixVolumeSuite{})
63
64 type UnixVolumeSuite struct {
65         cluster *arvados.Cluster
66         volumes []*TestableUnixVolume
67         metrics *volumeMetricsVecs
68 }
69
70 func (s *UnixVolumeSuite) SetUpTest(c *check.C) {
71         s.cluster = testCluster(c)
72         s.metrics = newVolumeMetricsVecs(prometheus.NewRegistry())
73 }
74
75 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
76         for _, v := range s.volumes {
77                 v.Teardown()
78         }
79 }
80
81 func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, serialize bool) *TestableUnixVolume {
82         d, err := ioutil.TempDir("", "volume_test")
83         c.Check(err, check.IsNil)
84         var locker sync.Locker
85         if serialize {
86                 locker = &sync.Mutex{}
87         }
88         v := &TestableUnixVolume{
89                 UnixVolume: UnixVolume{
90                         Root:    d,
91                         locker:  locker,
92                         cluster: cluster,
93                         logger:  ctxlog.TestLogger(c),
94                         volume:  volume,
95                         metrics: metrics,
96                 },
97                 t: c,
98         }
99         c.Check(v.check(), check.IsNil)
100         s.volumes = append(s.volumes, v)
101         return v
102 }
103
104 // serialize = false; readonly = false
105 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
106         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
107                 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
108         })
109 }
110
111 // serialize = false; readonly = true
112 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsReadOnly(c *check.C) {
113         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
114                 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
115         })
116 }
117
118 // serialize = true; readonly = false
119 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsSerialized(c *check.C) {
120         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
121                 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
122         })
123 }
124
125 // serialize = true; readonly = true
126 func (s *UnixVolumeSuite) TestUnixVolumeHandlersWithGenericVolumeTests(c *check.C) {
127         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
128                 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
129         })
130 }
131
132 func (s *UnixVolumeSuite) TestGetNotFound(c *check.C) {
133         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
134         defer v.Teardown()
135         v.Put(context.Background(), TestHash, TestBlock)
136
137         buf := make([]byte, BlockSize)
138         n, err := v.Get(context.Background(), TestHash2, buf)
139         switch {
140         case os.IsNotExist(err):
141                 break
142         case err == nil:
143                 c.Errorf("Read should have failed, returned %+q", buf[:n])
144         default:
145                 c.Errorf("Read expected ErrNotExist, got: %s", err)
146         }
147 }
148
149 func (s *UnixVolumeSuite) TestPut(c *check.C) {
150         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
151         defer v.Teardown()
152
153         err := v.Put(context.Background(), TestHash, TestBlock)
154         if err != nil {
155                 c.Error(err)
156         }
157         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
158         if buf, err := ioutil.ReadFile(p); err != nil {
159                 c.Error(err)
160         } else if bytes.Compare(buf, TestBlock) != 0 {
161                 c.Errorf("Write should have stored %s, did store %s",
162                         string(TestBlock), string(buf))
163         }
164 }
165
166 func (s *UnixVolumeSuite) TestPutBadVolume(c *check.C) {
167         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
168         defer v.Teardown()
169
170         err := os.RemoveAll(v.Root)
171         c.Assert(err, check.IsNil)
172         err = v.Put(context.Background(), TestHash, TestBlock)
173         c.Check(err, check.IsNil)
174 }
175
176 func (s *UnixVolumeSuite) TestUnixVolumeReadonly(c *check.C) {
177         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{ReadOnly: true, Replication: 1}, s.metrics, false)
178         defer v.Teardown()
179
180         v.PutRaw(TestHash, TestBlock)
181
182         buf := make([]byte, BlockSize)
183         _, err := v.Get(context.Background(), TestHash, buf)
184         if err != nil {
185                 c.Errorf("got err %v, expected nil", err)
186         }
187
188         err = v.Put(context.Background(), TestHash, TestBlock)
189         if err != MethodDisabledError {
190                 c.Errorf("got err %v, expected MethodDisabledError", err)
191         }
192
193         err = v.Touch(TestHash)
194         if err != MethodDisabledError {
195                 c.Errorf("got err %v, expected MethodDisabledError", err)
196         }
197
198         err = v.Trash(TestHash)
199         if err != MethodDisabledError {
200                 c.Errorf("got err %v, expected MethodDisabledError", err)
201         }
202 }
203
204 func (s *UnixVolumeSuite) TestIsFull(c *check.C) {
205         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
206         defer v.Teardown()
207
208         fullPath := v.Root + "/full"
209         now := fmt.Sprintf("%d", time.Now().Unix())
210         os.Symlink(now, fullPath)
211         if !v.IsFull() {
212                 c.Errorf("%s: claims not to be full", v)
213         }
214         os.Remove(fullPath)
215
216         // Test with an expired /full link.
217         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
218         os.Symlink(expired, fullPath)
219         if v.IsFull() {
220                 c.Errorf("%s: should no longer be full", v)
221         }
222 }
223
224 func (s *UnixVolumeSuite) TestNodeStatus(c *check.C) {
225         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
226         defer v.Teardown()
227
228         // Get node status and make a basic sanity check.
229         volinfo := v.Status()
230         if volinfo.MountPoint != v.Root {
231                 c.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
232         }
233         if volinfo.DeviceNum == 0 {
234                 c.Errorf("uninitialized device_num in %v", volinfo)
235         }
236         if volinfo.BytesFree == 0 {
237                 c.Errorf("uninitialized bytes_free in %v", volinfo)
238         }
239         if volinfo.BytesUsed == 0 {
240                 c.Errorf("uninitialized bytes_used in %v", volinfo)
241         }
242 }
243
244 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
245         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
246         defer v.Teardown()
247
248         v.Put(context.Background(), TestHash, TestBlock)
249         mockErr := errors.New("Mock error")
250         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
251                 return mockErr
252         })
253         if err != mockErr {
254                 c.Errorf("Got %v, expected %v", err, mockErr)
255         }
256 }
257
258 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
259         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
260         defer v.Teardown()
261
262         funcCalled := false
263         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
264                 funcCalled = true
265                 return nil
266         })
267         if err == nil {
268                 c.Errorf("Expected error opening non-existent file")
269         }
270         if funcCalled {
271                 c.Errorf("Worker func should not have been called")
272         }
273 }
274
275 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
276         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
277         defer v.Teardown()
278
279         v.Put(context.Background(), TestHash, TestBlock)
280
281         mtx := NewMockMutex()
282         v.locker = mtx
283
284         funcCalled := make(chan struct{})
285         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
286                 funcCalled <- struct{}{}
287                 return nil
288         })
289         select {
290         case mtx.AllowLock <- struct{}{}:
291         case <-funcCalled:
292                 c.Fatal("Function was called before mutex was acquired")
293         case <-time.After(5 * time.Second):
294                 c.Fatal("Timed out before mutex was acquired")
295         }
296         select {
297         case <-funcCalled:
298         case mtx.AllowUnlock <- struct{}{}:
299                 c.Fatal("Mutex was released before function was called")
300         case <-time.After(5 * time.Second):
301                 c.Fatal("Timed out waiting for funcCalled")
302         }
303         select {
304         case mtx.AllowUnlock <- struct{}{}:
305         case <-time.After(5 * time.Second):
306                 c.Fatal("Timed out waiting for getFunc() to release mutex")
307         }
308 }
309
310 func (s *UnixVolumeSuite) TestUnixVolumeCompare(c *check.C) {
311         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
312         defer v.Teardown()
313
314         v.Put(context.Background(), TestHash, TestBlock)
315         err := v.Compare(context.Background(), TestHash, TestBlock)
316         if err != nil {
317                 c.Errorf("Got err %q, expected nil", err)
318         }
319
320         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
321         if err != CollisionError {
322                 c.Errorf("Got err %q, expected %q", err, CollisionError)
323         }
324
325         v.Put(context.Background(), TestHash, []byte("baddata"))
326         err = v.Compare(context.Background(), TestHash, TestBlock)
327         if err != DiskHashError {
328                 c.Errorf("Got err %q, expected %q", err, DiskHashError)
329         }
330
331         if os.Getuid() == 0 {
332                 c.Log("skipping 'permission denied' check when running as root")
333         } else {
334                 p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
335                 err = os.Chmod(p, 000)
336                 c.Assert(err, check.IsNil)
337                 err = v.Compare(context.Background(), TestHash, TestBlock)
338                 c.Check(err, check.ErrorMatches, ".*permission denied.*")
339         }
340 }
341
342 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelPut(c *check.C) {
343         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, true)
344         defer v.Teardown()
345         v.locker.Lock()
346         ctx, cancel := context.WithCancel(context.Background())
347         go func() {
348                 time.Sleep(50 * time.Millisecond)
349                 cancel()
350                 time.Sleep(50 * time.Millisecond)
351                 v.locker.Unlock()
352         }()
353         err := v.Put(ctx, TestHash, TestBlock)
354         if err != context.Canceled {
355                 c.Errorf("Put() returned %s -- expected short read / canceled", err)
356         }
357 }
358
359 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelGet(c *check.C) {
360         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
361         defer v.Teardown()
362         bpath := v.blockPath(TestHash)
363         v.PutRaw(TestHash, TestBlock)
364         os.Remove(bpath)
365         err := syscall.Mkfifo(bpath, 0600)
366         if err != nil {
367                 c.Fatalf("Mkfifo %s: %s", bpath, err)
368         }
369         defer os.Remove(bpath)
370         ctx, cancel := context.WithCancel(context.Background())
371         go func() {
372                 time.Sleep(50 * time.Millisecond)
373                 cancel()
374         }()
375         buf := make([]byte, len(TestBlock))
376         n, err := v.Get(ctx, TestHash, buf)
377         if n == len(TestBlock) || err != context.Canceled {
378                 c.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
379         }
380 }
381
382 func (s *UnixVolumeSuite) TestStats(c *check.C) {
383         vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
384         stats := func() string {
385                 buf, err := json.Marshal(vol.InternalStats())
386                 c.Check(err, check.IsNil)
387                 return string(buf)
388         }
389
390         c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
391         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
392
393         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
394         _, err := vol.Get(context.Background(), loc, make([]byte, 3))
395         c.Check(err, check.NotNil)
396         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
397         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
398         c.Check(stats(), check.Matches, `.*"\*(fs|os)\.PathError":[^0].*`) // os.PathError changed to fs.PathError in Go 1.16
399         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
400         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
401         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
402
403         err = vol.Put(context.Background(), loc, []byte("foo"))
404         c.Check(err, check.IsNil)
405         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
406         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
407         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
408         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
409
410         err = vol.Touch(loc)
411         c.Check(err, check.IsNil)
412         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
413         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
414         c.Check(stats(), check.Matches, `.*"UtimesOps":2,.*`)
415
416         _, err = vol.Get(context.Background(), loc, make([]byte, 3))
417         c.Check(err, check.IsNil)
418         err = vol.Compare(context.Background(), loc, []byte("foo"))
419         c.Check(err, check.IsNil)
420         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
421         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
422
423         err = vol.Trash(loc)
424         c.Check(err, check.IsNil)
425         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
426 }
427
428 func (s *UnixVolumeSuite) TestSkipUnusedDirs(c *check.C) {
429         vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
430
431         err := os.Mkdir(vol.UnixVolume.Root+"/aaa", 0777)
432         c.Assert(err, check.IsNil)
433         err = os.Mkdir(vol.UnixVolume.Root+"/.aaa", 0777) // EmptyTrash should not look here
434         c.Assert(err, check.IsNil)
435         deleteme := vol.UnixVolume.Root + "/aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
436         err = ioutil.WriteFile(deleteme, []byte{1, 2, 3}, 0777)
437         c.Assert(err, check.IsNil)
438         skipme := vol.UnixVolume.Root + "/.aaa/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.trash.1"
439         err = ioutil.WriteFile(skipme, []byte{1, 2, 3}, 0777)
440         c.Assert(err, check.IsNil)
441         vol.EmptyTrash()
442
443         _, err = os.Stat(skipme)
444         c.Check(err, check.IsNil)
445
446         _, err = os.Stat(deleteme)
447         c.Check(err, check.NotNil)
448         c.Check(os.IsNotExist(err), check.Equals, true)
449 }