Merge branch '12934-cwl-conformance' refs #12934
[arvados.git] / services / keepstore / volume_unix_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "os"
16         "strings"
17         "sync"
18         "syscall"
19         "testing"
20         "time"
21
22         check "gopkg.in/check.v1"
23 )
24
25 type TestableUnixVolume struct {
26         UnixVolume
27         t TB
28 }
29
30 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
31         d, err := ioutil.TempDir("", "volume_test")
32         if err != nil {
33                 t.Fatal(err)
34         }
35         var locker sync.Locker
36         if serialize {
37                 locker = &sync.Mutex{}
38         }
39         return &TestableUnixVolume{
40                 UnixVolume: UnixVolume{
41                         Root:     d,
42                         ReadOnly: readonly,
43                         locker:   locker,
44                 },
45                 t: t,
46         }
47 }
48
49 // PutRaw writes a Keep block directly into a UnixVolume, even if
50 // the volume is readonly.
51 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
52         defer func(orig bool) {
53                 v.ReadOnly = orig
54         }(v.ReadOnly)
55         v.ReadOnly = false
56         err := v.Put(context.Background(), locator, data)
57         if err != nil {
58                 v.t.Fatal(err)
59         }
60 }
61
62 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
63         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
64         if err != nil {
65                 v.t.Fatal(err)
66         }
67 }
68
69 func (v *TestableUnixVolume) Teardown() {
70         if err := os.RemoveAll(v.Root); err != nil {
71                 v.t.Fatal(err)
72         }
73 }
74
75 // serialize = false; readonly = false
76 func TestUnixVolumeWithGenericTests(t *testing.T) {
77         DoGenericVolumeTests(t, func(t TB) TestableVolume {
78                 return NewTestableUnixVolume(t, false, false)
79         })
80 }
81
82 // serialize = false; readonly = true
83 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
84         DoGenericVolumeTests(t, func(t TB) TestableVolume {
85                 return NewTestableUnixVolume(t, false, true)
86         })
87 }
88
89 // serialize = true; readonly = false
90 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
91         DoGenericVolumeTests(t, func(t TB) TestableVolume {
92                 return NewTestableUnixVolume(t, true, false)
93         })
94 }
95
96 // serialize = false; readonly = false
97 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
98         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
99                 vols := make([]Volume, 2)
100                 testableUnixVols := make([]TestableVolume, 2)
101
102                 for i := range vols {
103                         v := NewTestableUnixVolume(t, false, false)
104                         vols[i] = v
105                         testableUnixVols[i] = v
106                 }
107
108                 return MakeRRVolumeManager(vols), testableUnixVols
109         })
110 }
111
112 func TestReplicationDefault1(t *testing.T) {
113         v := &UnixVolume{
114                 Root:     "/",
115                 ReadOnly: true,
116         }
117         if err := v.Start(); err != nil {
118                 t.Error(err)
119         }
120         if got := v.Replication(); got != 1 {
121                 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
122         }
123 }
124
125 func TestGetNotFound(t *testing.T) {
126         v := NewTestableUnixVolume(t, false, false)
127         defer v.Teardown()
128         v.Put(context.Background(), TestHash, TestBlock)
129
130         buf := make([]byte, BlockSize)
131         n, err := v.Get(context.Background(), TestHash2, buf)
132         switch {
133         case os.IsNotExist(err):
134                 break
135         case err == nil:
136                 t.Errorf("Read should have failed, returned %+q", buf[:n])
137         default:
138                 t.Errorf("Read expected ErrNotExist, got: %s", err)
139         }
140 }
141
142 func TestPut(t *testing.T) {
143         v := NewTestableUnixVolume(t, false, false)
144         defer v.Teardown()
145
146         err := v.Put(context.Background(), TestHash, TestBlock)
147         if err != nil {
148                 t.Error(err)
149         }
150         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
151         if buf, err := ioutil.ReadFile(p); err != nil {
152                 t.Error(err)
153         } else if bytes.Compare(buf, TestBlock) != 0 {
154                 t.Errorf("Write should have stored %s, did store %s",
155                         string(TestBlock), string(buf))
156         }
157 }
158
159 func TestPutBadVolume(t *testing.T) {
160         v := NewTestableUnixVolume(t, false, false)
161         defer v.Teardown()
162
163         os.Chmod(v.Root, 000)
164         err := v.Put(context.Background(), TestHash, TestBlock)
165         if err == nil {
166                 t.Error("Write should have failed")
167         }
168 }
169
170 func TestUnixVolumeReadonly(t *testing.T) {
171         v := NewTestableUnixVolume(t, false, true)
172         defer v.Teardown()
173
174         v.PutRaw(TestHash, TestBlock)
175
176         buf := make([]byte, BlockSize)
177         _, err := v.Get(context.Background(), TestHash, buf)
178         if err != nil {
179                 t.Errorf("got err %v, expected nil", err)
180         }
181
182         err = v.Put(context.Background(), TestHash, TestBlock)
183         if err != MethodDisabledError {
184                 t.Errorf("got err %v, expected MethodDisabledError", err)
185         }
186
187         err = v.Touch(TestHash)
188         if err != MethodDisabledError {
189                 t.Errorf("got err %v, expected MethodDisabledError", err)
190         }
191
192         err = v.Trash(TestHash)
193         if err != MethodDisabledError {
194                 t.Errorf("got err %v, expected MethodDisabledError", err)
195         }
196 }
197
198 func TestIsFull(t *testing.T) {
199         v := NewTestableUnixVolume(t, false, false)
200         defer v.Teardown()
201
202         fullPath := v.Root + "/full"
203         now := fmt.Sprintf("%d", time.Now().Unix())
204         os.Symlink(now, fullPath)
205         if !v.IsFull() {
206                 t.Errorf("%s: claims not to be full", v)
207         }
208         os.Remove(fullPath)
209
210         // Test with an expired /full link.
211         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
212         os.Symlink(expired, fullPath)
213         if v.IsFull() {
214                 t.Errorf("%s: should no longer be full", v)
215         }
216 }
217
218 func TestNodeStatus(t *testing.T) {
219         v := NewTestableUnixVolume(t, false, false)
220         defer v.Teardown()
221
222         // Get node status and make a basic sanity check.
223         volinfo := v.Status()
224         if volinfo.MountPoint != v.Root {
225                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
226         }
227         if volinfo.DeviceNum == 0 {
228                 t.Errorf("uninitialized device_num in %v", volinfo)
229         }
230         if volinfo.BytesFree == 0 {
231                 t.Errorf("uninitialized bytes_free in %v", volinfo)
232         }
233         if volinfo.BytesUsed == 0 {
234                 t.Errorf("uninitialized bytes_used in %v", volinfo)
235         }
236 }
237
238 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
239         v := NewTestableUnixVolume(t, false, false)
240         defer v.Teardown()
241
242         v.Put(context.Background(), TestHash, TestBlock)
243         mockErr := errors.New("Mock error")
244         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
245                 return mockErr
246         })
247         if err != mockErr {
248                 t.Errorf("Got %v, expected %v", err, mockErr)
249         }
250 }
251
252 func TestUnixVolumeGetFuncFileError(t *testing.T) {
253         v := NewTestableUnixVolume(t, false, false)
254         defer v.Teardown()
255
256         funcCalled := false
257         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
258                 funcCalled = true
259                 return nil
260         })
261         if err == nil {
262                 t.Errorf("Expected error opening non-existent file")
263         }
264         if funcCalled {
265                 t.Errorf("Worker func should not have been called")
266         }
267 }
268
269 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
270         v := NewTestableUnixVolume(t, false, false)
271         defer v.Teardown()
272
273         v.Put(context.Background(), TestHash, TestBlock)
274
275         mtx := NewMockMutex()
276         v.locker = mtx
277
278         funcCalled := make(chan struct{})
279         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
280                 funcCalled <- struct{}{}
281                 return nil
282         })
283         select {
284         case mtx.AllowLock <- struct{}{}:
285         case <-funcCalled:
286                 t.Fatal("Function was called before mutex was acquired")
287         case <-time.After(5 * time.Second):
288                 t.Fatal("Timed out before mutex was acquired")
289         }
290         select {
291         case <-funcCalled:
292         case mtx.AllowUnlock <- struct{}{}:
293                 t.Fatal("Mutex was released before function was called")
294         case <-time.After(5 * time.Second):
295                 t.Fatal("Timed out waiting for funcCalled")
296         }
297         select {
298         case mtx.AllowUnlock <- struct{}{}:
299         case <-time.After(5 * time.Second):
300                 t.Fatal("Timed out waiting for getFunc() to release mutex")
301         }
302 }
303
304 func TestUnixVolumeCompare(t *testing.T) {
305         v := NewTestableUnixVolume(t, false, false)
306         defer v.Teardown()
307
308         v.Put(context.Background(), TestHash, TestBlock)
309         err := v.Compare(context.Background(), TestHash, TestBlock)
310         if err != nil {
311                 t.Errorf("Got err %q, expected nil", err)
312         }
313
314         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
315         if err != CollisionError {
316                 t.Errorf("Got err %q, expected %q", err, CollisionError)
317         }
318
319         v.Put(context.Background(), TestHash, []byte("baddata"))
320         err = v.Compare(context.Background(), TestHash, TestBlock)
321         if err != DiskHashError {
322                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
323         }
324
325         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
326         os.Chmod(p, 000)
327         err = v.Compare(context.Background(), TestHash, TestBlock)
328         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
329                 t.Errorf("Got err %q, expected %q", err, "permission denied")
330         }
331 }
332
333 func TestUnixVolumeContextCancelPut(t *testing.T) {
334         v := NewTestableUnixVolume(t, true, false)
335         defer v.Teardown()
336         v.locker.Lock()
337         ctx, cancel := context.WithCancel(context.Background())
338         go func() {
339                 time.Sleep(50 * time.Millisecond)
340                 cancel()
341                 time.Sleep(50 * time.Millisecond)
342                 v.locker.Unlock()
343         }()
344         err := v.Put(ctx, TestHash, TestBlock)
345         if err != context.Canceled {
346                 t.Errorf("Put() returned %s -- expected short read / canceled", err)
347         }
348 }
349
350 func TestUnixVolumeContextCancelGet(t *testing.T) {
351         v := NewTestableUnixVolume(t, false, false)
352         defer v.Teardown()
353         bpath := v.blockPath(TestHash)
354         v.PutRaw(TestHash, TestBlock)
355         os.Remove(bpath)
356         err := syscall.Mkfifo(bpath, 0600)
357         if err != nil {
358                 t.Fatalf("Mkfifo %s: %s", bpath, err)
359         }
360         defer os.Remove(bpath)
361         ctx, cancel := context.WithCancel(context.Background())
362         go func() {
363                 time.Sleep(50 * time.Millisecond)
364                 cancel()
365         }()
366         buf := make([]byte, len(TestBlock))
367         n, err := v.Get(ctx, TestHash, buf)
368         if n == len(TestBlock) || err != context.Canceled {
369                 t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
370         }
371 }
372
373 var _ = check.Suite(&UnixVolumeSuite{})
374
375 type UnixVolumeSuite struct {
376         volume *TestableUnixVolume
377 }
378
379 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
380         if s.volume != nil {
381                 s.volume.Teardown()
382         }
383 }
384
385 func (s *UnixVolumeSuite) TestStats(c *check.C) {
386         s.volume = NewTestableUnixVolume(c, false, false)
387         stats := func() string {
388                 buf, err := json.Marshal(s.volume.InternalStats())
389                 c.Check(err, check.IsNil)
390                 return string(buf)
391         }
392
393         c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
394         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
395
396         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
397         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
398         c.Check(err, check.NotNil)
399         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
400         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
401         c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
402         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
403         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
404         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
405
406         err = s.volume.Put(context.Background(), loc, []byte("foo"))
407         c.Check(err, check.IsNil)
408         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
409         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
410         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
411         c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
412
413         err = s.volume.Touch(loc)
414         c.Check(err, check.IsNil)
415         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
416         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
417         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
418
419         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
420         c.Check(err, check.IsNil)
421         err = s.volume.Compare(context.Background(), loc, []byte("foo"))
422         c.Check(err, check.IsNil)
423         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
424         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
425
426         err = s.volume.Trash(loc)
427         c.Check(err, check.IsNil)
428         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
429 }