Merge branch '15305-keep-balance-bytes'
[arvados.git] / services / keepstore / unix_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "time"
20
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "git.curoverse.com/arvados.git/sdk/go/ctxlog"
23         "github.com/prometheus/client_golang/prometheus"
24         "github.com/sirupsen/logrus"
25         check "gopkg.in/check.v1"
26 )
27
28 type TestableUnixVolume struct {
29         UnixVolume
30         t TB
31 }
32
33 // PutRaw writes a Keep block directly into a UnixVolume, even if
34 // the volume is readonly.
35 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
36         defer func(orig bool) {
37                 v.volume.ReadOnly = orig
38         }(v.volume.ReadOnly)
39         v.volume.ReadOnly = false
40         err := v.Put(context.Background(), locator, data)
41         if err != nil {
42                 v.t.Fatal(err)
43         }
44 }
45
46 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
47         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
48         if err != nil {
49                 v.t.Fatal(err)
50         }
51 }
52
53 func (v *TestableUnixVolume) Teardown() {
54         if err := os.RemoveAll(v.Root); err != nil {
55                 v.t.Error(err)
56         }
57 }
58
59 func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
60         return "open", "create"
61 }
62
63 var _ = check.Suite(&UnixVolumeSuite{})
64
65 type UnixVolumeSuite struct {
66         cluster *arvados.Cluster
67         volumes []*TestableUnixVolume
68         metrics *volumeMetricsVecs
69 }
70
71 func (s *UnixVolumeSuite) SetUpTest(c *check.C) {
72         s.cluster = testCluster(c)
73         s.metrics = newVolumeMetricsVecs(prometheus.NewRegistry())
74 }
75
76 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
77         for _, v := range s.volumes {
78                 v.Teardown()
79         }
80 }
81
82 func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, serialize bool) *TestableUnixVolume {
83         d, err := ioutil.TempDir("", "volume_test")
84         c.Check(err, check.IsNil)
85         var locker sync.Locker
86         if serialize {
87                 locker = &sync.Mutex{}
88         }
89         v := &TestableUnixVolume{
90                 UnixVolume: UnixVolume{
91                         Root:    d,
92                         locker:  locker,
93                         cluster: cluster,
94                         logger:  ctxlog.TestLogger(c),
95                         volume:  volume,
96                         metrics: metrics,
97                 },
98                 t: c,
99         }
100         c.Check(v.check(), check.IsNil)
101         s.volumes = append(s.volumes, v)
102         return v
103 }
104
105 // serialize = false; readonly = false
106 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
107         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
108                 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
109         })
110 }
111
112 // serialize = false; readonly = true
113 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsReadOnly(c *check.C) {
114         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
115                 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
116         })
117 }
118
119 // serialize = true; readonly = false
120 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsSerialized(c *check.C) {
121         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
122                 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
123         })
124 }
125
126 // serialize = true; readonly = true
127 func (s *UnixVolumeSuite) TestUnixVolumeHandlersWithGenericVolumeTests(c *check.C) {
128         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
129                 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
130         })
131 }
132
133 func (s *UnixVolumeSuite) TestGetNotFound(c *check.C) {
134         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
135         defer v.Teardown()
136         v.Put(context.Background(), TestHash, TestBlock)
137
138         buf := make([]byte, BlockSize)
139         n, err := v.Get(context.Background(), TestHash2, buf)
140         switch {
141         case os.IsNotExist(err):
142                 break
143         case err == nil:
144                 c.Errorf("Read should have failed, returned %+q", buf[:n])
145         default:
146                 c.Errorf("Read expected ErrNotExist, got: %s", err)
147         }
148 }
149
150 func (s *UnixVolumeSuite) TestPut(c *check.C) {
151         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
152         defer v.Teardown()
153
154         err := v.Put(context.Background(), TestHash, TestBlock)
155         if err != nil {
156                 c.Error(err)
157         }
158         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
159         if buf, err := ioutil.ReadFile(p); err != nil {
160                 c.Error(err)
161         } else if bytes.Compare(buf, TestBlock) != 0 {
162                 c.Errorf("Write should have stored %s, did store %s",
163                         string(TestBlock), string(buf))
164         }
165 }
166
167 func (s *UnixVolumeSuite) TestPutBadVolume(c *check.C) {
168         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
169         defer v.Teardown()
170
171         os.Chmod(v.Root, 000)
172         err := v.Put(context.Background(), TestHash, TestBlock)
173         if err == nil {
174                 c.Error("Write should have failed")
175         }
176 }
177
178 func (s *UnixVolumeSuite) TestUnixVolumeReadonly(c *check.C) {
179         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{ReadOnly: true, Replication: 1}, s.metrics, false)
180         defer v.Teardown()
181
182         v.PutRaw(TestHash, TestBlock)
183
184         buf := make([]byte, BlockSize)
185         _, err := v.Get(context.Background(), TestHash, buf)
186         if err != nil {
187                 c.Errorf("got err %v, expected nil", err)
188         }
189
190         err = v.Put(context.Background(), TestHash, TestBlock)
191         if err != MethodDisabledError {
192                 c.Errorf("got err %v, expected MethodDisabledError", err)
193         }
194
195         err = v.Touch(TestHash)
196         if err != MethodDisabledError {
197                 c.Errorf("got err %v, expected MethodDisabledError", err)
198         }
199
200         err = v.Trash(TestHash)
201         if err != MethodDisabledError {
202                 c.Errorf("got err %v, expected MethodDisabledError", err)
203         }
204 }
205
206 func (s *UnixVolumeSuite) TestIsFull(c *check.C) {
207         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
208         defer v.Teardown()
209
210         fullPath := v.Root + "/full"
211         now := fmt.Sprintf("%d", time.Now().Unix())
212         os.Symlink(now, fullPath)
213         if !v.IsFull() {
214                 c.Errorf("%s: claims not to be full", v)
215         }
216         os.Remove(fullPath)
217
218         // Test with an expired /full link.
219         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
220         os.Symlink(expired, fullPath)
221         if v.IsFull() {
222                 c.Errorf("%s: should no longer be full", v)
223         }
224 }
225
226 func (s *UnixVolumeSuite) TestNodeStatus(c *check.C) {
227         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
228         defer v.Teardown()
229
230         // Get node status and make a basic sanity check.
231         volinfo := v.Status()
232         if volinfo.MountPoint != v.Root {
233                 c.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
234         }
235         if volinfo.DeviceNum == 0 {
236                 c.Errorf("uninitialized device_num in %v", volinfo)
237         }
238         if volinfo.BytesFree == 0 {
239                 c.Errorf("uninitialized bytes_free in %v", volinfo)
240         }
241         if volinfo.BytesUsed == 0 {
242                 c.Errorf("uninitialized bytes_used in %v", volinfo)
243         }
244 }
245
246 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
247         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
248         defer v.Teardown()
249
250         v.Put(context.Background(), TestHash, TestBlock)
251         mockErr := errors.New("Mock error")
252         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
253                 return mockErr
254         })
255         if err != mockErr {
256                 c.Errorf("Got %v, expected %v", err, mockErr)
257         }
258 }
259
260 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
261         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
262         defer v.Teardown()
263
264         funcCalled := false
265         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
266                 funcCalled = true
267                 return nil
268         })
269         if err == nil {
270                 c.Errorf("Expected error opening non-existent file")
271         }
272         if funcCalled {
273                 c.Errorf("Worker func should not have been called")
274         }
275 }
276
277 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
278         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
279         defer v.Teardown()
280
281         v.Put(context.Background(), TestHash, TestBlock)
282
283         mtx := NewMockMutex()
284         v.locker = mtx
285
286         funcCalled := make(chan struct{})
287         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
288                 funcCalled <- struct{}{}
289                 return nil
290         })
291         select {
292         case mtx.AllowLock <- struct{}{}:
293         case <-funcCalled:
294                 c.Fatal("Function was called before mutex was acquired")
295         case <-time.After(5 * time.Second):
296                 c.Fatal("Timed out before mutex was acquired")
297         }
298         select {
299         case <-funcCalled:
300         case mtx.AllowUnlock <- struct{}{}:
301                 c.Fatal("Mutex was released before function was called")
302         case <-time.After(5 * time.Second):
303                 c.Fatal("Timed out waiting for funcCalled")
304         }
305         select {
306         case mtx.AllowUnlock <- struct{}{}:
307         case <-time.After(5 * time.Second):
308                 c.Fatal("Timed out waiting for getFunc() to release mutex")
309         }
310 }
311
312 func (s *UnixVolumeSuite) TestUnixVolumeCompare(c *check.C) {
313         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
314         defer v.Teardown()
315
316         v.Put(context.Background(), TestHash, TestBlock)
317         err := v.Compare(context.Background(), TestHash, TestBlock)
318         if err != nil {
319                 c.Errorf("Got err %q, expected nil", err)
320         }
321
322         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
323         if err != CollisionError {
324                 c.Errorf("Got err %q, expected %q", err, CollisionError)
325         }
326
327         v.Put(context.Background(), TestHash, []byte("baddata"))
328         err = v.Compare(context.Background(), TestHash, TestBlock)
329         if err != DiskHashError {
330                 c.Errorf("Got err %q, expected %q", err, DiskHashError)
331         }
332
333         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
334         os.Chmod(p, 000)
335         err = v.Compare(context.Background(), TestHash, TestBlock)
336         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
337                 c.Errorf("Got err %q, expected %q", err, "permission denied")
338         }
339 }
340
341 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelPut(c *check.C) {
342         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, true)
343         defer v.Teardown()
344         v.locker.Lock()
345         ctx, cancel := context.WithCancel(context.Background())
346         go func() {
347                 time.Sleep(50 * time.Millisecond)
348                 cancel()
349                 time.Sleep(50 * time.Millisecond)
350                 v.locker.Unlock()
351         }()
352         err := v.Put(ctx, TestHash, TestBlock)
353         if err != context.Canceled {
354                 c.Errorf("Put() returned %s -- expected short read / canceled", err)
355         }
356 }
357
358 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelGet(c *check.C) {
359         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
360         defer v.Teardown()
361         bpath := v.blockPath(TestHash)
362         v.PutRaw(TestHash, TestBlock)
363         os.Remove(bpath)
364         err := syscall.Mkfifo(bpath, 0600)
365         if err != nil {
366                 c.Fatalf("Mkfifo %s: %s", bpath, err)
367         }
368         defer os.Remove(bpath)
369         ctx, cancel := context.WithCancel(context.Background())
370         go func() {
371                 time.Sleep(50 * time.Millisecond)
372                 cancel()
373         }()
374         buf := make([]byte, len(TestBlock))
375         n, err := v.Get(ctx, TestHash, buf)
376         if n == len(TestBlock) || err != context.Canceled {
377                 c.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
378         }
379 }
380
381 func (s *UnixVolumeSuite) TestStats(c *check.C) {
382         vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
383         stats := func() string {
384                 buf, err := json.Marshal(vol.InternalStats())
385                 c.Check(err, check.IsNil)
386                 return string(buf)
387         }
388
389         c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
390         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
391
392         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
393         _, err := vol.Get(context.Background(), loc, make([]byte, 3))
394         c.Check(err, check.NotNil)
395         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
396         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
397         c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
398         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
399         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
400         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
401
402         err = vol.Put(context.Background(), loc, []byte("foo"))
403         c.Check(err, check.IsNil)
404         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
405         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
406         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
407         c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
408
409         err = vol.Touch(loc)
410         c.Check(err, check.IsNil)
411         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
412         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
413         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
414
415         _, err = vol.Get(context.Background(), loc, make([]byte, 3))
416         c.Check(err, check.IsNil)
417         err = vol.Compare(context.Background(), loc, []byte("foo"))
418         c.Check(err, check.IsNil)
419         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
420         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
421
422         err = vol.Trash(loc)
423         c.Check(err, check.IsNil)
424         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
425 }