Merge branch '13647-keepstore-config'
[arvados.git] / services / keepstore / unix_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "time"
20
21         "git.curoverse.com/arvados.git/sdk/go/arvados"
22         "github.com/prometheus/client_golang/prometheus"
23         "github.com/sirupsen/logrus"
24         check "gopkg.in/check.v1"
25 )
26
27 type TestableUnixVolume struct {
28         UnixVolume
29         t TB
30 }
31
32 // PutRaw writes a Keep block directly into a UnixVolume, even if
33 // the volume is readonly.
34 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
35         defer func(orig bool) {
36                 v.volume.ReadOnly = orig
37         }(v.volume.ReadOnly)
38         v.volume.ReadOnly = false
39         err := v.Put(context.Background(), locator, data)
40         if err != nil {
41                 v.t.Fatal(err)
42         }
43 }
44
45 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
46         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
47         if err != nil {
48                 v.t.Fatal(err)
49         }
50 }
51
52 func (v *TestableUnixVolume) Teardown() {
53         if err := os.RemoveAll(v.Root); err != nil {
54                 v.t.Error(err)
55         }
56 }
57
58 func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
59         return "open", "create"
60 }
61
62 var _ = check.Suite(&UnixVolumeSuite{})
63
64 type UnixVolumeSuite struct {
65         cluster *arvados.Cluster
66         volumes []*TestableUnixVolume
67         metrics *volumeMetricsVecs
68 }
69
70 func (s *UnixVolumeSuite) SetUpTest(c *check.C) {
71         s.cluster = testCluster(c)
72         s.metrics = newVolumeMetricsVecs(prometheus.NewRegistry())
73 }
74
75 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
76         for _, v := range s.volumes {
77                 v.Teardown()
78         }
79 }
80
81 func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Cluster, volume arvados.Volume, metrics *volumeMetricsVecs, serialize bool) *TestableUnixVolume {
82         d, err := ioutil.TempDir("", "volume_test")
83         c.Check(err, check.IsNil)
84         var locker sync.Locker
85         if serialize {
86                 locker = &sync.Mutex{}
87         }
88         v := &TestableUnixVolume{
89                 UnixVolume: UnixVolume{
90                         Root:    d,
91                         locker:  locker,
92                         cluster: cluster,
93                         volume:  volume,
94                         metrics: metrics,
95                 },
96                 t: c,
97         }
98         c.Check(v.check(), check.IsNil)
99         s.volumes = append(s.volumes, v)
100         return v
101 }
102
103 // serialize = false; readonly = false
104 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTests(c *check.C) {
105         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
106                 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
107         })
108 }
109
110 // serialize = false; readonly = true
111 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsReadOnly(c *check.C) {
112         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
113                 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
114         })
115 }
116
117 // serialize = true; readonly = false
118 func (s *UnixVolumeSuite) TestUnixVolumeWithGenericTestsSerialized(c *check.C) {
119         DoGenericVolumeTests(c, false, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
120                 return s.newTestableUnixVolume(c, cluster, volume, metrics, false)
121         })
122 }
123
124 // serialize = true; readonly = true
125 func (s *UnixVolumeSuite) TestUnixVolumeHandlersWithGenericVolumeTests(c *check.C) {
126         DoGenericVolumeTests(c, true, func(t TB, cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) TestableVolume {
127                 return s.newTestableUnixVolume(c, cluster, volume, metrics, true)
128         })
129 }
130
131 func (s *UnixVolumeSuite) TestGetNotFound(c *check.C) {
132         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
133         defer v.Teardown()
134         v.Put(context.Background(), TestHash, TestBlock)
135
136         buf := make([]byte, BlockSize)
137         n, err := v.Get(context.Background(), TestHash2, buf)
138         switch {
139         case os.IsNotExist(err):
140                 break
141         case err == nil:
142                 c.Errorf("Read should have failed, returned %+q", buf[:n])
143         default:
144                 c.Errorf("Read expected ErrNotExist, got: %s", err)
145         }
146 }
147
148 func (s *UnixVolumeSuite) TestPut(c *check.C) {
149         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
150         defer v.Teardown()
151
152         err := v.Put(context.Background(), TestHash, TestBlock)
153         if err != nil {
154                 c.Error(err)
155         }
156         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
157         if buf, err := ioutil.ReadFile(p); err != nil {
158                 c.Error(err)
159         } else if bytes.Compare(buf, TestBlock) != 0 {
160                 c.Errorf("Write should have stored %s, did store %s",
161                         string(TestBlock), string(buf))
162         }
163 }
164
165 func (s *UnixVolumeSuite) TestPutBadVolume(c *check.C) {
166         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
167         defer v.Teardown()
168
169         os.Chmod(v.Root, 000)
170         err := v.Put(context.Background(), TestHash, TestBlock)
171         if err == nil {
172                 c.Error("Write should have failed")
173         }
174 }
175
176 func (s *UnixVolumeSuite) TestUnixVolumeReadonly(c *check.C) {
177         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{ReadOnly: true, Replication: 1}, s.metrics, false)
178         defer v.Teardown()
179
180         v.PutRaw(TestHash, TestBlock)
181
182         buf := make([]byte, BlockSize)
183         _, err := v.Get(context.Background(), TestHash, buf)
184         if err != nil {
185                 c.Errorf("got err %v, expected nil", err)
186         }
187
188         err = v.Put(context.Background(), TestHash, TestBlock)
189         if err != MethodDisabledError {
190                 c.Errorf("got err %v, expected MethodDisabledError", err)
191         }
192
193         err = v.Touch(TestHash)
194         if err != MethodDisabledError {
195                 c.Errorf("got err %v, expected MethodDisabledError", err)
196         }
197
198         err = v.Trash(TestHash)
199         if err != MethodDisabledError {
200                 c.Errorf("got err %v, expected MethodDisabledError", err)
201         }
202 }
203
204 func (s *UnixVolumeSuite) TestIsFull(c *check.C) {
205         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
206         defer v.Teardown()
207
208         fullPath := v.Root + "/full"
209         now := fmt.Sprintf("%d", time.Now().Unix())
210         os.Symlink(now, fullPath)
211         if !v.IsFull() {
212                 c.Errorf("%s: claims not to be full", v)
213         }
214         os.Remove(fullPath)
215
216         // Test with an expired /full link.
217         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
218         os.Symlink(expired, fullPath)
219         if v.IsFull() {
220                 c.Errorf("%s: should no longer be full", v)
221         }
222 }
223
224 func (s *UnixVolumeSuite) TestNodeStatus(c *check.C) {
225         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
226         defer v.Teardown()
227
228         // Get node status and make a basic sanity check.
229         volinfo := v.Status()
230         if volinfo.MountPoint != v.Root {
231                 c.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
232         }
233         if volinfo.DeviceNum == 0 {
234                 c.Errorf("uninitialized device_num in %v", volinfo)
235         }
236         if volinfo.BytesFree == 0 {
237                 c.Errorf("uninitialized bytes_free in %v", volinfo)
238         }
239         if volinfo.BytesUsed == 0 {
240                 c.Errorf("uninitialized bytes_used in %v", volinfo)
241         }
242 }
243
244 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerError(c *check.C) {
245         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
246         defer v.Teardown()
247
248         v.Put(context.Background(), TestHash, TestBlock)
249         mockErr := errors.New("Mock error")
250         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
251                 return mockErr
252         })
253         if err != mockErr {
254                 c.Errorf("Got %v, expected %v", err, mockErr)
255         }
256 }
257
258 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncFileError(c *check.C) {
259         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
260         defer v.Teardown()
261
262         funcCalled := false
263         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
264                 funcCalled = true
265                 return nil
266         })
267         if err == nil {
268                 c.Errorf("Expected error opening non-existent file")
269         }
270         if funcCalled {
271                 c.Errorf("Worker func should not have been called")
272         }
273 }
274
275 func (s *UnixVolumeSuite) TestUnixVolumeGetFuncWorkerWaitsOnMutex(c *check.C) {
276         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
277         defer v.Teardown()
278
279         v.Put(context.Background(), TestHash, TestBlock)
280
281         mtx := NewMockMutex()
282         v.locker = mtx
283
284         funcCalled := make(chan struct{})
285         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
286                 funcCalled <- struct{}{}
287                 return nil
288         })
289         select {
290         case mtx.AllowLock <- struct{}{}:
291         case <-funcCalled:
292                 c.Fatal("Function was called before mutex was acquired")
293         case <-time.After(5 * time.Second):
294                 c.Fatal("Timed out before mutex was acquired")
295         }
296         select {
297         case <-funcCalled:
298         case mtx.AllowUnlock <- struct{}{}:
299                 c.Fatal("Mutex was released before function was called")
300         case <-time.After(5 * time.Second):
301                 c.Fatal("Timed out waiting for funcCalled")
302         }
303         select {
304         case mtx.AllowUnlock <- struct{}{}:
305         case <-time.After(5 * time.Second):
306                 c.Fatal("Timed out waiting for getFunc() to release mutex")
307         }
308 }
309
310 func (s *UnixVolumeSuite) TestUnixVolumeCompare(c *check.C) {
311         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
312         defer v.Teardown()
313
314         v.Put(context.Background(), TestHash, TestBlock)
315         err := v.Compare(context.Background(), TestHash, TestBlock)
316         if err != nil {
317                 c.Errorf("Got err %q, expected nil", err)
318         }
319
320         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
321         if err != CollisionError {
322                 c.Errorf("Got err %q, expected %q", err, CollisionError)
323         }
324
325         v.Put(context.Background(), TestHash, []byte("baddata"))
326         err = v.Compare(context.Background(), TestHash, TestBlock)
327         if err != DiskHashError {
328                 c.Errorf("Got err %q, expected %q", err, DiskHashError)
329         }
330
331         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
332         os.Chmod(p, 000)
333         err = v.Compare(context.Background(), TestHash, TestBlock)
334         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
335                 c.Errorf("Got err %q, expected %q", err, "permission denied")
336         }
337 }
338
339 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelPut(c *check.C) {
340         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, true)
341         defer v.Teardown()
342         v.locker.Lock()
343         ctx, cancel := context.WithCancel(context.Background())
344         go func() {
345                 time.Sleep(50 * time.Millisecond)
346                 cancel()
347                 time.Sleep(50 * time.Millisecond)
348                 v.locker.Unlock()
349         }()
350         err := v.Put(ctx, TestHash, TestBlock)
351         if err != context.Canceled {
352                 c.Errorf("Put() returned %s -- expected short read / canceled", err)
353         }
354 }
355
356 func (s *UnixVolumeSuite) TestUnixVolumeContextCancelGet(c *check.C) {
357         v := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
358         defer v.Teardown()
359         bpath := v.blockPath(TestHash)
360         v.PutRaw(TestHash, TestBlock)
361         os.Remove(bpath)
362         err := syscall.Mkfifo(bpath, 0600)
363         if err != nil {
364                 c.Fatalf("Mkfifo %s: %s", bpath, err)
365         }
366         defer os.Remove(bpath)
367         ctx, cancel := context.WithCancel(context.Background())
368         go func() {
369                 time.Sleep(50 * time.Millisecond)
370                 cancel()
371         }()
372         buf := make([]byte, len(TestBlock))
373         n, err := v.Get(ctx, TestHash, buf)
374         if n == len(TestBlock) || err != context.Canceled {
375                 c.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
376         }
377 }
378
379 func (s *UnixVolumeSuite) TestStats(c *check.C) {
380         vol := s.newTestableUnixVolume(c, s.cluster, arvados.Volume{Replication: 1}, s.metrics, false)
381         stats := func() string {
382                 buf, err := json.Marshal(vol.InternalStats())
383                 c.Check(err, check.IsNil)
384                 return string(buf)
385         }
386
387         c.Check(stats(), check.Matches, `.*"StatOps":1,.*`) // (*UnixVolume)check() calls Stat() once
388         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
389
390         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
391         _, err := vol.Get(context.Background(), loc, make([]byte, 3))
392         c.Check(err, check.NotNil)
393         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
394         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
395         c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
396         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
397         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
398         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
399
400         err = vol.Put(context.Background(), loc, []byte("foo"))
401         c.Check(err, check.IsNil)
402         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
403         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
404         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
405         c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
406
407         err = vol.Touch(loc)
408         c.Check(err, check.IsNil)
409         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
410         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
411         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
412
413         _, err = vol.Get(context.Background(), loc, make([]byte, 3))
414         c.Check(err, check.IsNil)
415         err = vol.Compare(context.Background(), loc, []byte("foo"))
416         c.Check(err, check.IsNil)
417         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
418         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
419
420         err = vol.Trash(loc)
421         c.Check(err, check.IsNil)
422         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
423 }