13937: Fixes file naming inconsistency between storage drivers.
[arvados.git] / services / keepstore / unix_volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "testing"
20         "time"
21
22         "github.com/ghodss/yaml"
23         "github.com/prometheus/client_golang/prometheus"
24         check "gopkg.in/check.v1"
25 )
26
27 type TestableUnixVolume struct {
28         UnixVolume
29         t TB
30 }
31
32 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
33         d, err := ioutil.TempDir("", "volume_test")
34         if err != nil {
35                 t.Fatal(err)
36         }
37         var locker sync.Locker
38         if serialize {
39                 locker = &sync.Mutex{}
40         }
41         return &TestableUnixVolume{
42                 UnixVolume: UnixVolume{
43                         Root:     d,
44                         ReadOnly: readonly,
45                         locker:   locker,
46                 },
47                 t: t,
48         }
49 }
50
51 // PutRaw writes a Keep block directly into a UnixVolume, even if
52 // the volume is readonly.
53 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
54         defer func(orig bool) {
55                 v.ReadOnly = orig
56         }(v.ReadOnly)
57         v.ReadOnly = false
58         err := v.Put(context.Background(), locator, data)
59         if err != nil {
60                 v.t.Fatal(err)
61         }
62 }
63
64 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
65         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
66         if err != nil {
67                 v.t.Fatal(err)
68         }
69 }
70
71 func (v *TestableUnixVolume) Teardown() {
72         if err := os.RemoveAll(v.Root); err != nil {
73                 v.t.Fatal(err)
74         }
75 }
76
77 // serialize = false; readonly = false
78 func TestUnixVolumeWithGenericTests(t *testing.T) {
79         DoGenericVolumeTests(t, func(t TB) TestableVolume {
80                 return NewTestableUnixVolume(t, false, false)
81         })
82 }
83
84 // serialize = false; readonly = true
85 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
86         DoGenericVolumeTests(t, func(t TB) TestableVolume {
87                 return NewTestableUnixVolume(t, false, true)
88         })
89 }
90
91 // serialize = true; readonly = false
92 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
93         DoGenericVolumeTests(t, func(t TB) TestableVolume {
94                 return NewTestableUnixVolume(t, true, false)
95         })
96 }
97
98 // serialize = false; readonly = false
99 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
100         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
101                 vols := make([]Volume, 2)
102                 testableUnixVols := make([]TestableVolume, 2)
103
104                 for i := range vols {
105                         v := NewTestableUnixVolume(t, false, false)
106                         vols[i] = v
107                         testableUnixVols[i] = v
108                 }
109
110                 return MakeRRVolumeManager(vols), testableUnixVols
111         })
112 }
113
114 func TestReplicationDefault1(t *testing.T) {
115         v := &UnixVolume{
116                 Root:     "/",
117                 ReadOnly: true,
118         }
119         metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
120         if err := v.Start(metrics.opsCounters, metrics.errCounters, metrics.ioBytes); err != nil {
121                 t.Error(err)
122         }
123         if got := v.Replication(); got != 1 {
124                 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
125         }
126 }
127
128 func TestGetNotFound(t *testing.T) {
129         v := NewTestableUnixVolume(t, false, false)
130         defer v.Teardown()
131         v.Put(context.Background(), TestHash, TestBlock)
132
133         buf := make([]byte, BlockSize)
134         n, err := v.Get(context.Background(), TestHash2, buf)
135         switch {
136         case os.IsNotExist(err):
137                 break
138         case err == nil:
139                 t.Errorf("Read should have failed, returned %+q", buf[:n])
140         default:
141                 t.Errorf("Read expected ErrNotExist, got: %s", err)
142         }
143 }
144
145 func TestPut(t *testing.T) {
146         v := NewTestableUnixVolume(t, false, false)
147         defer v.Teardown()
148
149         err := v.Put(context.Background(), TestHash, TestBlock)
150         if err != nil {
151                 t.Error(err)
152         }
153         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
154         if buf, err := ioutil.ReadFile(p); err != nil {
155                 t.Error(err)
156         } else if bytes.Compare(buf, TestBlock) != 0 {
157                 t.Errorf("Write should have stored %s, did store %s",
158                         string(TestBlock), string(buf))
159         }
160 }
161
162 func TestPutBadVolume(t *testing.T) {
163         v := NewTestableUnixVolume(t, false, false)
164         defer v.Teardown()
165
166         os.Chmod(v.Root, 000)
167         err := v.Put(context.Background(), TestHash, TestBlock)
168         if err == nil {
169                 t.Error("Write should have failed")
170         }
171 }
172
173 func TestUnixVolumeReadonly(t *testing.T) {
174         v := NewTestableUnixVolume(t, false, true)
175         defer v.Teardown()
176
177         v.PutRaw(TestHash, TestBlock)
178
179         buf := make([]byte, BlockSize)
180         _, err := v.Get(context.Background(), TestHash, buf)
181         if err != nil {
182                 t.Errorf("got err %v, expected nil", err)
183         }
184
185         err = v.Put(context.Background(), TestHash, TestBlock)
186         if err != MethodDisabledError {
187                 t.Errorf("got err %v, expected MethodDisabledError", err)
188         }
189
190         err = v.Touch(TestHash)
191         if err != MethodDisabledError {
192                 t.Errorf("got err %v, expected MethodDisabledError", err)
193         }
194
195         err = v.Trash(TestHash)
196         if err != MethodDisabledError {
197                 t.Errorf("got err %v, expected MethodDisabledError", err)
198         }
199 }
200
201 func TestIsFull(t *testing.T) {
202         v := NewTestableUnixVolume(t, false, false)
203         defer v.Teardown()
204
205         fullPath := v.Root + "/full"
206         now := fmt.Sprintf("%d", time.Now().Unix())
207         os.Symlink(now, fullPath)
208         if !v.IsFull() {
209                 t.Errorf("%s: claims not to be full", v)
210         }
211         os.Remove(fullPath)
212
213         // Test with an expired /full link.
214         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
215         os.Symlink(expired, fullPath)
216         if v.IsFull() {
217                 t.Errorf("%s: should no longer be full", v)
218         }
219 }
220
221 func TestNodeStatus(t *testing.T) {
222         v := NewTestableUnixVolume(t, false, false)
223         defer v.Teardown()
224
225         // Get node status and make a basic sanity check.
226         volinfo := v.Status()
227         if volinfo.MountPoint != v.Root {
228                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
229         }
230         if volinfo.DeviceNum == 0 {
231                 t.Errorf("uninitialized device_num in %v", volinfo)
232         }
233         if volinfo.BytesFree == 0 {
234                 t.Errorf("uninitialized bytes_free in %v", volinfo)
235         }
236         if volinfo.BytesUsed == 0 {
237                 t.Errorf("uninitialized bytes_used in %v", volinfo)
238         }
239 }
240
241 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
242         v := NewTestableUnixVolume(t, false, false)
243         defer v.Teardown()
244
245         v.Put(context.Background(), TestHash, TestBlock)
246         mockErr := errors.New("Mock error")
247         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
248                 return mockErr
249         })
250         if err != mockErr {
251                 t.Errorf("Got %v, expected %v", err, mockErr)
252         }
253 }
254
255 func TestUnixVolumeGetFuncFileError(t *testing.T) {
256         v := NewTestableUnixVolume(t, false, false)
257         defer v.Teardown()
258
259         funcCalled := false
260         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
261                 funcCalled = true
262                 return nil
263         })
264         if err == nil {
265                 t.Errorf("Expected error opening non-existent file")
266         }
267         if funcCalled {
268                 t.Errorf("Worker func should not have been called")
269         }
270 }
271
272 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
273         v := NewTestableUnixVolume(t, false, false)
274         defer v.Teardown()
275
276         v.Put(context.Background(), TestHash, TestBlock)
277
278         mtx := NewMockMutex()
279         v.locker = mtx
280
281         funcCalled := make(chan struct{})
282         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
283                 funcCalled <- struct{}{}
284                 return nil
285         })
286         select {
287         case mtx.AllowLock <- struct{}{}:
288         case <-funcCalled:
289                 t.Fatal("Function was called before mutex was acquired")
290         case <-time.After(5 * time.Second):
291                 t.Fatal("Timed out before mutex was acquired")
292         }
293         select {
294         case <-funcCalled:
295         case mtx.AllowUnlock <- struct{}{}:
296                 t.Fatal("Mutex was released before function was called")
297         case <-time.After(5 * time.Second):
298                 t.Fatal("Timed out waiting for funcCalled")
299         }
300         select {
301         case mtx.AllowUnlock <- struct{}{}:
302         case <-time.After(5 * time.Second):
303                 t.Fatal("Timed out waiting for getFunc() to release mutex")
304         }
305 }
306
307 func TestUnixVolumeCompare(t *testing.T) {
308         v := NewTestableUnixVolume(t, false, false)
309         defer v.Teardown()
310
311         v.Put(context.Background(), TestHash, TestBlock)
312         err := v.Compare(context.Background(), TestHash, TestBlock)
313         if err != nil {
314                 t.Errorf("Got err %q, expected nil", err)
315         }
316
317         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
318         if err != CollisionError {
319                 t.Errorf("Got err %q, expected %q", err, CollisionError)
320         }
321
322         v.Put(context.Background(), TestHash, []byte("baddata"))
323         err = v.Compare(context.Background(), TestHash, TestBlock)
324         if err != DiskHashError {
325                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
326         }
327
328         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
329         os.Chmod(p, 000)
330         err = v.Compare(context.Background(), TestHash, TestBlock)
331         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
332                 t.Errorf("Got err %q, expected %q", err, "permission denied")
333         }
334 }
335
336 func TestUnixVolumeContextCancelPut(t *testing.T) {
337         v := NewTestableUnixVolume(t, true, false)
338         defer v.Teardown()
339         v.locker.Lock()
340         ctx, cancel := context.WithCancel(context.Background())
341         go func() {
342                 time.Sleep(50 * time.Millisecond)
343                 cancel()
344                 time.Sleep(50 * time.Millisecond)
345                 v.locker.Unlock()
346         }()
347         err := v.Put(ctx, TestHash, TestBlock)
348         if err != context.Canceled {
349                 t.Errorf("Put() returned %s -- expected short read / canceled", err)
350         }
351 }
352
353 func TestUnixVolumeContextCancelGet(t *testing.T) {
354         v := NewTestableUnixVolume(t, false, false)
355         defer v.Teardown()
356         bpath := v.blockPath(TestHash)
357         v.PutRaw(TestHash, TestBlock)
358         os.Remove(bpath)
359         err := syscall.Mkfifo(bpath, 0600)
360         if err != nil {
361                 t.Fatalf("Mkfifo %s: %s", bpath, err)
362         }
363         defer os.Remove(bpath)
364         ctx, cancel := context.WithCancel(context.Background())
365         go func() {
366                 time.Sleep(50 * time.Millisecond)
367                 cancel()
368         }()
369         buf := make([]byte, len(TestBlock))
370         n, err := v.Get(ctx, TestHash, buf)
371         if n == len(TestBlock) || err != context.Canceled {
372                 t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
373         }
374 }
375
376 var _ = check.Suite(&UnixVolumeSuite{})
377
378 type UnixVolumeSuite struct {
379         volume *TestableUnixVolume
380 }
381
382 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
383         if s.volume != nil {
384                 s.volume.Teardown()
385         }
386 }
387
388 func (s *UnixVolumeSuite) TestStats(c *check.C) {
389         s.volume = NewTestableUnixVolume(c, false, false)
390         stats := func() string {
391                 buf, err := json.Marshal(s.volume.InternalStats())
392                 c.Check(err, check.IsNil)
393                 return string(buf)
394         }
395
396         c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
397         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
398
399         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
400         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
401         c.Check(err, check.NotNil)
402         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
403         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
404         c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
405         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
406         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
407         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
408
409         err = s.volume.Put(context.Background(), loc, []byte("foo"))
410         c.Check(err, check.IsNil)
411         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
412         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
413         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
414         c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
415
416         err = s.volume.Touch(loc)
417         c.Check(err, check.IsNil)
418         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
419         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
420         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
421
422         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
423         c.Check(err, check.IsNil)
424         err = s.volume.Compare(context.Background(), loc, []byte("foo"))
425         c.Check(err, check.IsNil)
426         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
427         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
428
429         err = s.volume.Trash(loc)
430         c.Check(err, check.IsNil)
431         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
432 }
433
434 func (s *UnixVolumeSuite) TestConfig(c *check.C) {
435         var cfg Config
436         err := yaml.Unmarshal([]byte(`
437 Volumes:
438   - Type: Directory
439     StorageClasses: ["class_a", "class_b"]
440 `), &cfg)
441
442         c.Check(err, check.IsNil)
443         c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
444 }