Merge branch '13937-keepstore-prometheus'
[arvados.git] / services / keepstore / unix_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "testing"
20         "time"
21
22         "github.com/ghodss/yaml"
23         "github.com/prometheus/client_golang/prometheus"
24         check "gopkg.in/check.v1"
25 )
26
27 type TestableUnixVolume struct {
28         UnixVolume
29         t TB
30 }
31
32 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
33         d, err := ioutil.TempDir("", "volume_test")
34         if err != nil {
35                 t.Fatal(err)
36         }
37         var locker sync.Locker
38         if serialize {
39                 locker = &sync.Mutex{}
40         }
41         return &TestableUnixVolume{
42                 UnixVolume: UnixVolume{
43                         Root:     d,
44                         ReadOnly: readonly,
45                         locker:   locker,
46                 },
47                 t: t,
48         }
49 }
50
51 // PutRaw writes a Keep block directly into a UnixVolume, even if
52 // the volume is readonly.
53 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
54         defer func(orig bool) {
55                 v.ReadOnly = orig
56         }(v.ReadOnly)
57         v.ReadOnly = false
58         err := v.Put(context.Background(), locator, data)
59         if err != nil {
60                 v.t.Fatal(err)
61         }
62 }
63
64 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
65         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
66         if err != nil {
67                 v.t.Fatal(err)
68         }
69 }
70
71 func (v *TestableUnixVolume) Teardown() {
72         if err := os.RemoveAll(v.Root); err != nil {
73                 v.t.Fatal(err)
74         }
75 }
76
77 func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
78         return "open", "create"
79 }
80
81 // serialize = false; readonly = false
82 func TestUnixVolumeWithGenericTests(t *testing.T) {
83         DoGenericVolumeTests(t, func(t TB) TestableVolume {
84                 return NewTestableUnixVolume(t, false, false)
85         })
86 }
87
88 // serialize = false; readonly = true
89 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
90         DoGenericVolumeTests(t, func(t TB) TestableVolume {
91                 return NewTestableUnixVolume(t, false, true)
92         })
93 }
94
95 // serialize = true; readonly = false
96 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
97         DoGenericVolumeTests(t, func(t TB) TestableVolume {
98                 return NewTestableUnixVolume(t, true, false)
99         })
100 }
101
102 // serialize = false; readonly = false
103 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
104         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
105                 vols := make([]Volume, 2)
106                 testableUnixVols := make([]TestableVolume, 2)
107
108                 for i := range vols {
109                         v := NewTestableUnixVolume(t, false, false)
110                         vols[i] = v
111                         testableUnixVols[i] = v
112                 }
113
114                 return MakeRRVolumeManager(vols), testableUnixVols
115         })
116 }
117
118 func TestReplicationDefault1(t *testing.T) {
119         v := &UnixVolume{
120                 Root:     "/",
121                 ReadOnly: true,
122         }
123         metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
124         if err := v.Start(metrics); err != nil {
125                 t.Error(err)
126         }
127         if got := v.Replication(); got != 1 {
128                 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
129         }
130 }
131
132 func TestGetNotFound(t *testing.T) {
133         v := NewTestableUnixVolume(t, false, false)
134         defer v.Teardown()
135         v.Put(context.Background(), TestHash, TestBlock)
136
137         buf := make([]byte, BlockSize)
138         n, err := v.Get(context.Background(), TestHash2, buf)
139         switch {
140         case os.IsNotExist(err):
141                 break
142         case err == nil:
143                 t.Errorf("Read should have failed, returned %+q", buf[:n])
144         default:
145                 t.Errorf("Read expected ErrNotExist, got: %s", err)
146         }
147 }
148
149 func TestPut(t *testing.T) {
150         v := NewTestableUnixVolume(t, false, false)
151         defer v.Teardown()
152
153         err := v.Put(context.Background(), TestHash, TestBlock)
154         if err != nil {
155                 t.Error(err)
156         }
157         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
158         if buf, err := ioutil.ReadFile(p); err != nil {
159                 t.Error(err)
160         } else if bytes.Compare(buf, TestBlock) != 0 {
161                 t.Errorf("Write should have stored %s, did store %s",
162                         string(TestBlock), string(buf))
163         }
164 }
165
166 func TestPutBadVolume(t *testing.T) {
167         v := NewTestableUnixVolume(t, false, false)
168         defer v.Teardown()
169
170         os.Chmod(v.Root, 000)
171         err := v.Put(context.Background(), TestHash, TestBlock)
172         if err == nil {
173                 t.Error("Write should have failed")
174         }
175 }
176
177 func TestUnixVolumeReadonly(t *testing.T) {
178         v := NewTestableUnixVolume(t, false, true)
179         defer v.Teardown()
180
181         v.PutRaw(TestHash, TestBlock)
182
183         buf := make([]byte, BlockSize)
184         _, err := v.Get(context.Background(), TestHash, buf)
185         if err != nil {
186                 t.Errorf("got err %v, expected nil", err)
187         }
188
189         err = v.Put(context.Background(), TestHash, TestBlock)
190         if err != MethodDisabledError {
191                 t.Errorf("got err %v, expected MethodDisabledError", err)
192         }
193
194         err = v.Touch(TestHash)
195         if err != MethodDisabledError {
196                 t.Errorf("got err %v, expected MethodDisabledError", err)
197         }
198
199         err = v.Trash(TestHash)
200         if err != MethodDisabledError {
201                 t.Errorf("got err %v, expected MethodDisabledError", err)
202         }
203 }
204
205 func TestIsFull(t *testing.T) {
206         v := NewTestableUnixVolume(t, false, false)
207         defer v.Teardown()
208
209         fullPath := v.Root + "/full"
210         now := fmt.Sprintf("%d", time.Now().Unix())
211         os.Symlink(now, fullPath)
212         if !v.IsFull() {
213                 t.Errorf("%s: claims not to be full", v)
214         }
215         os.Remove(fullPath)
216
217         // Test with an expired /full link.
218         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
219         os.Symlink(expired, fullPath)
220         if v.IsFull() {
221                 t.Errorf("%s: should no longer be full", v)
222         }
223 }
224
225 func TestNodeStatus(t *testing.T) {
226         v := NewTestableUnixVolume(t, false, false)
227         defer v.Teardown()
228
229         // Get node status and make a basic sanity check.
230         volinfo := v.Status()
231         if volinfo.MountPoint != v.Root {
232                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
233         }
234         if volinfo.DeviceNum == 0 {
235                 t.Errorf("uninitialized device_num in %v", volinfo)
236         }
237         if volinfo.BytesFree == 0 {
238                 t.Errorf("uninitialized bytes_free in %v", volinfo)
239         }
240         if volinfo.BytesUsed == 0 {
241                 t.Errorf("uninitialized bytes_used in %v", volinfo)
242         }
243 }
244
245 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
246         v := NewTestableUnixVolume(t, false, false)
247         defer v.Teardown()
248
249         v.Put(context.Background(), TestHash, TestBlock)
250         mockErr := errors.New("Mock error")
251         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
252                 return mockErr
253         })
254         if err != mockErr {
255                 t.Errorf("Got %v, expected %v", err, mockErr)
256         }
257 }
258
259 func TestUnixVolumeGetFuncFileError(t *testing.T) {
260         v := NewTestableUnixVolume(t, false, false)
261         defer v.Teardown()
262
263         funcCalled := false
264         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
265                 funcCalled = true
266                 return nil
267         })
268         if err == nil {
269                 t.Errorf("Expected error opening non-existent file")
270         }
271         if funcCalled {
272                 t.Errorf("Worker func should not have been called")
273         }
274 }
275
276 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
277         v := NewTestableUnixVolume(t, false, false)
278         defer v.Teardown()
279
280         v.Put(context.Background(), TestHash, TestBlock)
281
282         mtx := NewMockMutex()
283         v.locker = mtx
284
285         funcCalled := make(chan struct{})
286         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
287                 funcCalled <- struct{}{}
288                 return nil
289         })
290         select {
291         case mtx.AllowLock <- struct{}{}:
292         case <-funcCalled:
293                 t.Fatal("Function was called before mutex was acquired")
294         case <-time.After(5 * time.Second):
295                 t.Fatal("Timed out before mutex was acquired")
296         }
297         select {
298         case <-funcCalled:
299         case mtx.AllowUnlock <- struct{}{}:
300                 t.Fatal("Mutex was released before function was called")
301         case <-time.After(5 * time.Second):
302                 t.Fatal("Timed out waiting for funcCalled")
303         }
304         select {
305         case mtx.AllowUnlock <- struct{}{}:
306         case <-time.After(5 * time.Second):
307                 t.Fatal("Timed out waiting for getFunc() to release mutex")
308         }
309 }
310
311 func TestUnixVolumeCompare(t *testing.T) {
312         v := NewTestableUnixVolume(t, false, false)
313         defer v.Teardown()
314
315         v.Put(context.Background(), TestHash, TestBlock)
316         err := v.Compare(context.Background(), TestHash, TestBlock)
317         if err != nil {
318                 t.Errorf("Got err %q, expected nil", err)
319         }
320
321         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
322         if err != CollisionError {
323                 t.Errorf("Got err %q, expected %q", err, CollisionError)
324         }
325
326         v.Put(context.Background(), TestHash, []byte("baddata"))
327         err = v.Compare(context.Background(), TestHash, TestBlock)
328         if err != DiskHashError {
329                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
330         }
331
332         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
333         os.Chmod(p, 000)
334         err = v.Compare(context.Background(), TestHash, TestBlock)
335         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
336                 t.Errorf("Got err %q, expected %q", err, "permission denied")
337         }
338 }
339
340 func TestUnixVolumeContextCancelPut(t *testing.T) {
341         v := NewTestableUnixVolume(t, true, false)
342         defer v.Teardown()
343         v.locker.Lock()
344         ctx, cancel := context.WithCancel(context.Background())
345         go func() {
346                 time.Sleep(50 * time.Millisecond)
347                 cancel()
348                 time.Sleep(50 * time.Millisecond)
349                 v.locker.Unlock()
350         }()
351         err := v.Put(ctx, TestHash, TestBlock)
352         if err != context.Canceled {
353                 t.Errorf("Put() returned %s -- expected short read / canceled", err)
354         }
355 }
356
357 func TestUnixVolumeContextCancelGet(t *testing.T) {
358         v := NewTestableUnixVolume(t, false, false)
359         defer v.Teardown()
360         bpath := v.blockPath(TestHash)
361         v.PutRaw(TestHash, TestBlock)
362         os.Remove(bpath)
363         err := syscall.Mkfifo(bpath, 0600)
364         if err != nil {
365                 t.Fatalf("Mkfifo %s: %s", bpath, err)
366         }
367         defer os.Remove(bpath)
368         ctx, cancel := context.WithCancel(context.Background())
369         go func() {
370                 time.Sleep(50 * time.Millisecond)
371                 cancel()
372         }()
373         buf := make([]byte, len(TestBlock))
374         n, err := v.Get(ctx, TestHash, buf)
375         if n == len(TestBlock) || err != context.Canceled {
376                 t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
377         }
378 }
379
380 var _ = check.Suite(&UnixVolumeSuite{})
381
382 type UnixVolumeSuite struct {
383         volume *TestableUnixVolume
384 }
385
386 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
387         if s.volume != nil {
388                 s.volume.Teardown()
389         }
390 }
391
392 func (s *UnixVolumeSuite) TestStats(c *check.C) {
393         s.volume = NewTestableUnixVolume(c, false, false)
394         stats := func() string {
395                 buf, err := json.Marshal(s.volume.InternalStats())
396                 c.Check(err, check.IsNil)
397                 return string(buf)
398         }
399
400         c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
401         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
402
403         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
404         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
405         c.Check(err, check.NotNil)
406         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
407         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
408         c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
409         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
410         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
411         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
412
413         err = s.volume.Put(context.Background(), loc, []byte("foo"))
414         c.Check(err, check.IsNil)
415         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
416         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
417         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
418         c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
419
420         err = s.volume.Touch(loc)
421         c.Check(err, check.IsNil)
422         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
423         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
424         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
425
426         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
427         c.Check(err, check.IsNil)
428         err = s.volume.Compare(context.Background(), loc, []byte("foo"))
429         c.Check(err, check.IsNil)
430         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
431         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
432
433         err = s.volume.Trash(loc)
434         c.Check(err, check.IsNil)
435         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
436 }
437
438 func (s *UnixVolumeSuite) TestConfig(c *check.C) {
439         var cfg Config
440         err := yaml.Unmarshal([]byte(`
441 Volumes:
442   - Type: Directory
443     StorageClasses: ["class_a", "class_b"]
444 `), &cfg)
445
446         c.Check(err, check.IsNil)
447         c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
448 }