13937: Adds tests for operation & I/O counters.
[arvados.git] / services / keepstore / unix_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "testing"
20         "time"
21
22         "github.com/ghodss/yaml"
23         "github.com/prometheus/client_golang/prometheus"
24         check "gopkg.in/check.v1"
25 )
26
27 type TestableUnixVolume struct {
28         UnixVolume
29         t TB
30 }
31
32 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
33         d, err := ioutil.TempDir("", "volume_test")
34         if err != nil {
35                 t.Fatal(err)
36         }
37         var locker sync.Locker
38         if serialize {
39                 locker = &sync.Mutex{}
40         }
41         return &TestableUnixVolume{
42                 UnixVolume: UnixVolume{
43                         Root:     d,
44                         ReadOnly: readonly,
45                         locker:   locker,
46                 },
47                 t: t,
48         }
49 }
50
51 // PutRaw writes a Keep block directly into a UnixVolume, even if
52 // the volume is readonly.
53 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
54         defer func(orig bool) {
55                 v.ReadOnly = orig
56         }(v.ReadOnly)
57         v.ReadOnly = false
58         err := v.Put(context.Background(), locator, data)
59         if err != nil {
60                 v.t.Fatal(err)
61         }
62 }
63
64 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
65         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
66         if err != nil {
67                 v.t.Fatal(err)
68         }
69 }
70
71 func (v *TestableUnixVolume) Teardown() {
72         if err := os.RemoveAll(v.Root); err != nil {
73                 v.t.Fatal(err)
74         }
75 }
76
77 func (v *TestableUnixVolume) GetMetricsVecs() (opsCounters, errCounters, ioBytes *prometheus.CounterVec) {
78         opsCounters = v.os.stats.opsCounters
79         errCounters = v.os.stats.errCounters
80         ioBytes = v.os.stats.ioBytes
81         return
82 }
83
84 // serialize = false; readonly = false
85 func TestUnixVolumeWithGenericTests(t *testing.T) {
86         DoGenericVolumeTests(t, func(t TB) TestableVolume {
87                 return NewTestableUnixVolume(t, false, false)
88         })
89 }
90
91 // serialize = false; readonly = true
92 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
93         DoGenericVolumeTests(t, func(t TB) TestableVolume {
94                 return NewTestableUnixVolume(t, false, true)
95         })
96 }
97
98 // serialize = true; readonly = false
99 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
100         DoGenericVolumeTests(t, func(t TB) TestableVolume {
101                 return NewTestableUnixVolume(t, true, false)
102         })
103 }
104
105 // serialize = false; readonly = false
106 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
107         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
108                 vols := make([]Volume, 2)
109                 testableUnixVols := make([]TestableVolume, 2)
110
111                 for i := range vols {
112                         v := NewTestableUnixVolume(t, false, false)
113                         vols[i] = v
114                         testableUnixVols[i] = v
115                 }
116
117                 return MakeRRVolumeManager(vols), testableUnixVols
118         })
119 }
120
121 func TestReplicationDefault1(t *testing.T) {
122         v := &UnixVolume{
123                 Root:     "/",
124                 ReadOnly: true,
125         }
126         metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
127         if err := v.Start(metrics); err != nil {
128                 t.Error(err)
129         }
130         if got := v.Replication(); got != 1 {
131                 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
132         }
133 }
134
135 func TestGetNotFound(t *testing.T) {
136         v := NewTestableUnixVolume(t, false, false)
137         defer v.Teardown()
138         v.Put(context.Background(), TestHash, TestBlock)
139
140         buf := make([]byte, BlockSize)
141         n, err := v.Get(context.Background(), TestHash2, buf)
142         switch {
143         case os.IsNotExist(err):
144                 break
145         case err == nil:
146                 t.Errorf("Read should have failed, returned %+q", buf[:n])
147         default:
148                 t.Errorf("Read expected ErrNotExist, got: %s", err)
149         }
150 }
151
152 func TestPut(t *testing.T) {
153         v := NewTestableUnixVolume(t, false, false)
154         defer v.Teardown()
155
156         err := v.Put(context.Background(), TestHash, TestBlock)
157         if err != nil {
158                 t.Error(err)
159         }
160         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
161         if buf, err := ioutil.ReadFile(p); err != nil {
162                 t.Error(err)
163         } else if bytes.Compare(buf, TestBlock) != 0 {
164                 t.Errorf("Write should have stored %s, did store %s",
165                         string(TestBlock), string(buf))
166         }
167 }
168
169 func TestPutBadVolume(t *testing.T) {
170         v := NewTestableUnixVolume(t, false, false)
171         defer v.Teardown()
172
173         os.Chmod(v.Root, 000)
174         err := v.Put(context.Background(), TestHash, TestBlock)
175         if err == nil {
176                 t.Error("Write should have failed")
177         }
178 }
179
180 func TestUnixVolumeReadonly(t *testing.T) {
181         v := NewTestableUnixVolume(t, false, true)
182         defer v.Teardown()
183
184         v.PutRaw(TestHash, TestBlock)
185
186         buf := make([]byte, BlockSize)
187         _, err := v.Get(context.Background(), TestHash, buf)
188         if err != nil {
189                 t.Errorf("got err %v, expected nil", err)
190         }
191
192         err = v.Put(context.Background(), TestHash, TestBlock)
193         if err != MethodDisabledError {
194                 t.Errorf("got err %v, expected MethodDisabledError", err)
195         }
196
197         err = v.Touch(TestHash)
198         if err != MethodDisabledError {
199                 t.Errorf("got err %v, expected MethodDisabledError", err)
200         }
201
202         err = v.Trash(TestHash)
203         if err != MethodDisabledError {
204                 t.Errorf("got err %v, expected MethodDisabledError", err)
205         }
206 }
207
208 func TestIsFull(t *testing.T) {
209         v := NewTestableUnixVolume(t, false, false)
210         defer v.Teardown()
211
212         fullPath := v.Root + "/full"
213         now := fmt.Sprintf("%d", time.Now().Unix())
214         os.Symlink(now, fullPath)
215         if !v.IsFull() {
216                 t.Errorf("%s: claims not to be full", v)
217         }
218         os.Remove(fullPath)
219
220         // Test with an expired /full link.
221         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
222         os.Symlink(expired, fullPath)
223         if v.IsFull() {
224                 t.Errorf("%s: should no longer be full", v)
225         }
226 }
227
228 func TestNodeStatus(t *testing.T) {
229         v := NewTestableUnixVolume(t, false, false)
230         defer v.Teardown()
231
232         // Get node status and make a basic sanity check.
233         volinfo := v.Status()
234         if volinfo.MountPoint != v.Root {
235                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
236         }
237         if volinfo.DeviceNum == 0 {
238                 t.Errorf("uninitialized device_num in %v", volinfo)
239         }
240         if volinfo.BytesFree == 0 {
241                 t.Errorf("uninitialized bytes_free in %v", volinfo)
242         }
243         if volinfo.BytesUsed == 0 {
244                 t.Errorf("uninitialized bytes_used in %v", volinfo)
245         }
246 }
247
248 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
249         v := NewTestableUnixVolume(t, false, false)
250         defer v.Teardown()
251
252         v.Put(context.Background(), TestHash, TestBlock)
253         mockErr := errors.New("Mock error")
254         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
255                 return mockErr
256         })
257         if err != mockErr {
258                 t.Errorf("Got %v, expected %v", err, mockErr)
259         }
260 }
261
262 func TestUnixVolumeGetFuncFileError(t *testing.T) {
263         v := NewTestableUnixVolume(t, false, false)
264         defer v.Teardown()
265
266         funcCalled := false
267         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
268                 funcCalled = true
269                 return nil
270         })
271         if err == nil {
272                 t.Errorf("Expected error opening non-existent file")
273         }
274         if funcCalled {
275                 t.Errorf("Worker func should not have been called")
276         }
277 }
278
279 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
280         v := NewTestableUnixVolume(t, false, false)
281         defer v.Teardown()
282
283         v.Put(context.Background(), TestHash, TestBlock)
284
285         mtx := NewMockMutex()
286         v.locker = mtx
287
288         funcCalled := make(chan struct{})
289         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
290                 funcCalled <- struct{}{}
291                 return nil
292         })
293         select {
294         case mtx.AllowLock <- struct{}{}:
295         case <-funcCalled:
296                 t.Fatal("Function was called before mutex was acquired")
297         case <-time.After(5 * time.Second):
298                 t.Fatal("Timed out before mutex was acquired")
299         }
300         select {
301         case <-funcCalled:
302         case mtx.AllowUnlock <- struct{}{}:
303                 t.Fatal("Mutex was released before function was called")
304         case <-time.After(5 * time.Second):
305                 t.Fatal("Timed out waiting for funcCalled")
306         }
307         select {
308         case mtx.AllowUnlock <- struct{}{}:
309         case <-time.After(5 * time.Second):
310                 t.Fatal("Timed out waiting for getFunc() to release mutex")
311         }
312 }
313
314 func TestUnixVolumeCompare(t *testing.T) {
315         v := NewTestableUnixVolume(t, false, false)
316         defer v.Teardown()
317
318         v.Put(context.Background(), TestHash, TestBlock)
319         err := v.Compare(context.Background(), TestHash, TestBlock)
320         if err != nil {
321                 t.Errorf("Got err %q, expected nil", err)
322         }
323
324         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
325         if err != CollisionError {
326                 t.Errorf("Got err %q, expected %q", err, CollisionError)
327         }
328
329         v.Put(context.Background(), TestHash, []byte("baddata"))
330         err = v.Compare(context.Background(), TestHash, TestBlock)
331         if err != DiskHashError {
332                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
333         }
334
335         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
336         os.Chmod(p, 000)
337         err = v.Compare(context.Background(), TestHash, TestBlock)
338         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
339                 t.Errorf("Got err %q, expected %q", err, "permission denied")
340         }
341 }
342
343 func TestUnixVolumeContextCancelPut(t *testing.T) {
344         v := NewTestableUnixVolume(t, true, false)
345         defer v.Teardown()
346         v.locker.Lock()
347         ctx, cancel := context.WithCancel(context.Background())
348         go func() {
349                 time.Sleep(50 * time.Millisecond)
350                 cancel()
351                 time.Sleep(50 * time.Millisecond)
352                 v.locker.Unlock()
353         }()
354         err := v.Put(ctx, TestHash, TestBlock)
355         if err != context.Canceled {
356                 t.Errorf("Put() returned %s -- expected short read / canceled", err)
357         }
358 }
359
360 func TestUnixVolumeContextCancelGet(t *testing.T) {
361         v := NewTestableUnixVolume(t, false, false)
362         defer v.Teardown()
363         bpath := v.blockPath(TestHash)
364         v.PutRaw(TestHash, TestBlock)
365         os.Remove(bpath)
366         err := syscall.Mkfifo(bpath, 0600)
367         if err != nil {
368                 t.Fatalf("Mkfifo %s: %s", bpath, err)
369         }
370         defer os.Remove(bpath)
371         ctx, cancel := context.WithCancel(context.Background())
372         go func() {
373                 time.Sleep(50 * time.Millisecond)
374                 cancel()
375         }()
376         buf := make([]byte, len(TestBlock))
377         n, err := v.Get(ctx, TestHash, buf)
378         if n == len(TestBlock) || err != context.Canceled {
379                 t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
380         }
381 }
382
383 var _ = check.Suite(&UnixVolumeSuite{})
384
385 type UnixVolumeSuite struct {
386         volume *TestableUnixVolume
387 }
388
389 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
390         if s.volume != nil {
391                 s.volume.Teardown()
392         }
393 }
394
395 func (s *UnixVolumeSuite) TestStats(c *check.C) {
396         s.volume = NewTestableUnixVolume(c, false, false)
397         stats := func() string {
398                 buf, err := json.Marshal(s.volume.InternalStats())
399                 c.Check(err, check.IsNil)
400                 return string(buf)
401         }
402
403         c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
404         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
405
406         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
407         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
408         c.Check(err, check.NotNil)
409         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
410         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
411         c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
412         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
413         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
414         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
415
416         err = s.volume.Put(context.Background(), loc, []byte("foo"))
417         c.Check(err, check.IsNil)
418         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
419         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
420         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
421         c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
422
423         err = s.volume.Touch(loc)
424         c.Check(err, check.IsNil)
425         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
426         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
427         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
428
429         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
430         c.Check(err, check.IsNil)
431         err = s.volume.Compare(context.Background(), loc, []byte("foo"))
432         c.Check(err, check.IsNil)
433         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
434         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
435
436         err = s.volume.Trash(loc)
437         c.Check(err, check.IsNil)
438         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
439 }
440
441 func (s *UnixVolumeSuite) TestConfig(c *check.C) {
442         var cfg Config
443         err := yaml.Unmarshal([]byte(`
444 Volumes:
445   - Type: Directory
446     StorageClasses: ["class_a", "class_b"]
447 `), &cfg)
448
449         c.Check(err, check.IsNil)
450         c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
451 }