12085: Idle node times tracking, with tests.
[arvados.git] / services / keepstore / volume_unix_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "testing"
20         "time"
21
22         "github.com/ghodss/yaml"
23         check "gopkg.in/check.v1"
24 )
25
26 type TestableUnixVolume struct {
27         UnixVolume
28         t TB
29 }
30
31 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
32         d, err := ioutil.TempDir("", "volume_test")
33         if err != nil {
34                 t.Fatal(err)
35         }
36         var locker sync.Locker
37         if serialize {
38                 locker = &sync.Mutex{}
39         }
40         return &TestableUnixVolume{
41                 UnixVolume: UnixVolume{
42                         Root:     d,
43                         ReadOnly: readonly,
44                         locker:   locker,
45                 },
46                 t: t,
47         }
48 }
49
50 // PutRaw writes a Keep block directly into a UnixVolume, even if
51 // the volume is readonly.
52 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
53         defer func(orig bool) {
54                 v.ReadOnly = orig
55         }(v.ReadOnly)
56         v.ReadOnly = false
57         err := v.Put(context.Background(), locator, data)
58         if err != nil {
59                 v.t.Fatal(err)
60         }
61 }
62
63 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
64         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
65         if err != nil {
66                 v.t.Fatal(err)
67         }
68 }
69
70 func (v *TestableUnixVolume) Teardown() {
71         if err := os.RemoveAll(v.Root); err != nil {
72                 v.t.Fatal(err)
73         }
74 }
75
76 // serialize = false; readonly = false
77 func TestUnixVolumeWithGenericTests(t *testing.T) {
78         DoGenericVolumeTests(t, func(t TB) TestableVolume {
79                 return NewTestableUnixVolume(t, false, false)
80         })
81 }
82
83 // serialize = false; readonly = true
84 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
85         DoGenericVolumeTests(t, func(t TB) TestableVolume {
86                 return NewTestableUnixVolume(t, false, true)
87         })
88 }
89
90 // serialize = true; readonly = false
91 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
92         DoGenericVolumeTests(t, func(t TB) TestableVolume {
93                 return NewTestableUnixVolume(t, true, false)
94         })
95 }
96
97 // serialize = false; readonly = false
98 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
99         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
100                 vols := make([]Volume, 2)
101                 testableUnixVols := make([]TestableVolume, 2)
102
103                 for i := range vols {
104                         v := NewTestableUnixVolume(t, false, false)
105                         vols[i] = v
106                         testableUnixVols[i] = v
107                 }
108
109                 return MakeRRVolumeManager(vols), testableUnixVols
110         })
111 }
112
113 func TestReplicationDefault1(t *testing.T) {
114         v := &UnixVolume{
115                 Root:     "/",
116                 ReadOnly: true,
117         }
118         if err := v.Start(); err != nil {
119                 t.Error(err)
120         }
121         if got := v.Replication(); got != 1 {
122                 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
123         }
124 }
125
126 func TestGetNotFound(t *testing.T) {
127         v := NewTestableUnixVolume(t, false, false)
128         defer v.Teardown()
129         v.Put(context.Background(), TestHash, TestBlock)
130
131         buf := make([]byte, BlockSize)
132         n, err := v.Get(context.Background(), TestHash2, buf)
133         switch {
134         case os.IsNotExist(err):
135                 break
136         case err == nil:
137                 t.Errorf("Read should have failed, returned %+q", buf[:n])
138         default:
139                 t.Errorf("Read expected ErrNotExist, got: %s", err)
140         }
141 }
142
143 func TestPut(t *testing.T) {
144         v := NewTestableUnixVolume(t, false, false)
145         defer v.Teardown()
146
147         err := v.Put(context.Background(), TestHash, TestBlock)
148         if err != nil {
149                 t.Error(err)
150         }
151         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
152         if buf, err := ioutil.ReadFile(p); err != nil {
153                 t.Error(err)
154         } else if bytes.Compare(buf, TestBlock) != 0 {
155                 t.Errorf("Write should have stored %s, did store %s",
156                         string(TestBlock), string(buf))
157         }
158 }
159
160 func TestPutBadVolume(t *testing.T) {
161         v := NewTestableUnixVolume(t, false, false)
162         defer v.Teardown()
163
164         os.Chmod(v.Root, 000)
165         err := v.Put(context.Background(), TestHash, TestBlock)
166         if err == nil {
167                 t.Error("Write should have failed")
168         }
169 }
170
171 func TestUnixVolumeReadonly(t *testing.T) {
172         v := NewTestableUnixVolume(t, false, true)
173         defer v.Teardown()
174
175         v.PutRaw(TestHash, TestBlock)
176
177         buf := make([]byte, BlockSize)
178         _, err := v.Get(context.Background(), TestHash, buf)
179         if err != nil {
180                 t.Errorf("got err %v, expected nil", err)
181         }
182
183         err = v.Put(context.Background(), TestHash, TestBlock)
184         if err != MethodDisabledError {
185                 t.Errorf("got err %v, expected MethodDisabledError", err)
186         }
187
188         err = v.Touch(TestHash)
189         if err != MethodDisabledError {
190                 t.Errorf("got err %v, expected MethodDisabledError", err)
191         }
192
193         err = v.Trash(TestHash)
194         if err != MethodDisabledError {
195                 t.Errorf("got err %v, expected MethodDisabledError", err)
196         }
197 }
198
199 func TestIsFull(t *testing.T) {
200         v := NewTestableUnixVolume(t, false, false)
201         defer v.Teardown()
202
203         fullPath := v.Root + "/full"
204         now := fmt.Sprintf("%d", time.Now().Unix())
205         os.Symlink(now, fullPath)
206         if !v.IsFull() {
207                 t.Errorf("%s: claims not to be full", v)
208         }
209         os.Remove(fullPath)
210
211         // Test with an expired /full link.
212         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
213         os.Symlink(expired, fullPath)
214         if v.IsFull() {
215                 t.Errorf("%s: should no longer be full", v)
216         }
217 }
218
219 func TestNodeStatus(t *testing.T) {
220         v := NewTestableUnixVolume(t, false, false)
221         defer v.Teardown()
222
223         // Get node status and make a basic sanity check.
224         volinfo := v.Status()
225         if volinfo.MountPoint != v.Root {
226                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
227         }
228         if volinfo.DeviceNum == 0 {
229                 t.Errorf("uninitialized device_num in %v", volinfo)
230         }
231         if volinfo.BytesFree == 0 {
232                 t.Errorf("uninitialized bytes_free in %v", volinfo)
233         }
234         if volinfo.BytesUsed == 0 {
235                 t.Errorf("uninitialized bytes_used in %v", volinfo)
236         }
237 }
238
239 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
240         v := NewTestableUnixVolume(t, false, false)
241         defer v.Teardown()
242
243         v.Put(context.Background(), TestHash, TestBlock)
244         mockErr := errors.New("Mock error")
245         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
246                 return mockErr
247         })
248         if err != mockErr {
249                 t.Errorf("Got %v, expected %v", err, mockErr)
250         }
251 }
252
253 func TestUnixVolumeGetFuncFileError(t *testing.T) {
254         v := NewTestableUnixVolume(t, false, false)
255         defer v.Teardown()
256
257         funcCalled := false
258         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
259                 funcCalled = true
260                 return nil
261         })
262         if err == nil {
263                 t.Errorf("Expected error opening non-existent file")
264         }
265         if funcCalled {
266                 t.Errorf("Worker func should not have been called")
267         }
268 }
269
270 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
271         v := NewTestableUnixVolume(t, false, false)
272         defer v.Teardown()
273
274         v.Put(context.Background(), TestHash, TestBlock)
275
276         mtx := NewMockMutex()
277         v.locker = mtx
278
279         funcCalled := make(chan struct{})
280         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
281                 funcCalled <- struct{}{}
282                 return nil
283         })
284         select {
285         case mtx.AllowLock <- struct{}{}:
286         case <-funcCalled:
287                 t.Fatal("Function was called before mutex was acquired")
288         case <-time.After(5 * time.Second):
289                 t.Fatal("Timed out before mutex was acquired")
290         }
291         select {
292         case <-funcCalled:
293         case mtx.AllowUnlock <- struct{}{}:
294                 t.Fatal("Mutex was released before function was called")
295         case <-time.After(5 * time.Second):
296                 t.Fatal("Timed out waiting for funcCalled")
297         }
298         select {
299         case mtx.AllowUnlock <- struct{}{}:
300         case <-time.After(5 * time.Second):
301                 t.Fatal("Timed out waiting for getFunc() to release mutex")
302         }
303 }
304
305 func TestUnixVolumeCompare(t *testing.T) {
306         v := NewTestableUnixVolume(t, false, false)
307         defer v.Teardown()
308
309         v.Put(context.Background(), TestHash, TestBlock)
310         err := v.Compare(context.Background(), TestHash, TestBlock)
311         if err != nil {
312                 t.Errorf("Got err %q, expected nil", err)
313         }
314
315         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
316         if err != CollisionError {
317                 t.Errorf("Got err %q, expected %q", err, CollisionError)
318         }
319
320         v.Put(context.Background(), TestHash, []byte("baddata"))
321         err = v.Compare(context.Background(), TestHash, TestBlock)
322         if err != DiskHashError {
323                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
324         }
325
326         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
327         os.Chmod(p, 000)
328         err = v.Compare(context.Background(), TestHash, TestBlock)
329         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
330                 t.Errorf("Got err %q, expected %q", err, "permission denied")
331         }
332 }
333
334 func TestUnixVolumeContextCancelPut(t *testing.T) {
335         v := NewTestableUnixVolume(t, true, false)
336         defer v.Teardown()
337         v.locker.Lock()
338         ctx, cancel := context.WithCancel(context.Background())
339         go func() {
340                 time.Sleep(50 * time.Millisecond)
341                 cancel()
342                 time.Sleep(50 * time.Millisecond)
343                 v.locker.Unlock()
344         }()
345         err := v.Put(ctx, TestHash, TestBlock)
346         if err != context.Canceled {
347                 t.Errorf("Put() returned %s -- expected short read / canceled", err)
348         }
349 }
350
351 func TestUnixVolumeContextCancelGet(t *testing.T) {
352         v := NewTestableUnixVolume(t, false, false)
353         defer v.Teardown()
354         bpath := v.blockPath(TestHash)
355         v.PutRaw(TestHash, TestBlock)
356         os.Remove(bpath)
357         err := syscall.Mkfifo(bpath, 0600)
358         if err != nil {
359                 t.Fatalf("Mkfifo %s: %s", bpath, err)
360         }
361         defer os.Remove(bpath)
362         ctx, cancel := context.WithCancel(context.Background())
363         go func() {
364                 time.Sleep(50 * time.Millisecond)
365                 cancel()
366         }()
367         buf := make([]byte, len(TestBlock))
368         n, err := v.Get(ctx, TestHash, buf)
369         if n == len(TestBlock) || err != context.Canceled {
370                 t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
371         }
372 }
373
374 var _ = check.Suite(&UnixVolumeSuite{})
375
376 type UnixVolumeSuite struct {
377         volume *TestableUnixVolume
378 }
379
380 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
381         if s.volume != nil {
382                 s.volume.Teardown()
383         }
384 }
385
386 func (s *UnixVolumeSuite) TestStats(c *check.C) {
387         s.volume = NewTestableUnixVolume(c, false, false)
388         stats := func() string {
389                 buf, err := json.Marshal(s.volume.InternalStats())
390                 c.Check(err, check.IsNil)
391                 return string(buf)
392         }
393
394         c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
395         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
396
397         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
398         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
399         c.Check(err, check.NotNil)
400         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
401         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
402         c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
403         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
404         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
405         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
406
407         err = s.volume.Put(context.Background(), loc, []byte("foo"))
408         c.Check(err, check.IsNil)
409         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
410         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
411         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
412         c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
413
414         err = s.volume.Touch(loc)
415         c.Check(err, check.IsNil)
416         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
417         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
418         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
419
420         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
421         c.Check(err, check.IsNil)
422         err = s.volume.Compare(context.Background(), loc, []byte("foo"))
423         c.Check(err, check.IsNil)
424         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
425         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
426
427         err = s.volume.Trash(loc)
428         c.Check(err, check.IsNil)
429         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
430 }
431
432 func (s *UnixVolumeSuite) TestConfig(c *check.C) {
433         var cfg Config
434         err := yaml.Unmarshal([]byte(`
435 Volumes:
436   - Type: Directory
437     StorageClasses: ["class_a", "class_b"]
438 `), &cfg)
439
440         c.Check(err, check.IsNil)
441         c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
442 }