10813: Merge branch 'master' into 10813-arv-put-six-threads
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "os"
12         "strings"
13         "sync"
14         "syscall"
15         "testing"
16         "time"
17
18         check "gopkg.in/check.v1"
19 )
20
21 type TestableUnixVolume struct {
22         UnixVolume
23         t TB
24 }
25
26 func NewTestableUnixVolume(t TB, serialize bool, readonly bool) *TestableUnixVolume {
27         d, err := ioutil.TempDir("", "volume_test")
28         if err != nil {
29                 t.Fatal(err)
30         }
31         var locker sync.Locker
32         if serialize {
33                 locker = &sync.Mutex{}
34         }
35         return &TestableUnixVolume{
36                 UnixVolume: UnixVolume{
37                         Root:     d,
38                         ReadOnly: readonly,
39                         locker:   locker,
40                 },
41                 t: t,
42         }
43 }
44
45 // PutRaw writes a Keep block directly into a UnixVolume, even if
46 // the volume is readonly.
47 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
48         defer func(orig bool) {
49                 v.ReadOnly = orig
50         }(v.ReadOnly)
51         v.ReadOnly = false
52         err := v.Put(context.Background(), locator, data)
53         if err != nil {
54                 v.t.Fatal(err)
55         }
56 }
57
58 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
59         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
60         if err != nil {
61                 v.t.Fatal(err)
62         }
63 }
64
65 func (v *TestableUnixVolume) Teardown() {
66         if err := os.RemoveAll(v.Root); err != nil {
67                 v.t.Fatal(err)
68         }
69 }
70
71 // serialize = false; readonly = false
72 func TestUnixVolumeWithGenericTests(t *testing.T) {
73         DoGenericVolumeTests(t, func(t TB) TestableVolume {
74                 return NewTestableUnixVolume(t, false, false)
75         })
76 }
77
78 // serialize = false; readonly = true
79 func TestUnixVolumeWithGenericTestsReadOnly(t *testing.T) {
80         DoGenericVolumeTests(t, func(t TB) TestableVolume {
81                 return NewTestableUnixVolume(t, false, true)
82         })
83 }
84
85 // serialize = true; readonly = false
86 func TestUnixVolumeWithGenericTestsSerialized(t *testing.T) {
87         DoGenericVolumeTests(t, func(t TB) TestableVolume {
88                 return NewTestableUnixVolume(t, true, false)
89         })
90 }
91
92 // serialize = false; readonly = false
93 func TestUnixVolumeHandlersWithGenericVolumeTests(t *testing.T) {
94         DoHandlersWithGenericVolumeTests(t, func(t TB) (*RRVolumeManager, []TestableVolume) {
95                 vols := make([]Volume, 2)
96                 testableUnixVols := make([]TestableVolume, 2)
97
98                 for i := range vols {
99                         v := NewTestableUnixVolume(t, false, false)
100                         vols[i] = v
101                         testableUnixVols[i] = v
102                 }
103
104                 return MakeRRVolumeManager(vols), testableUnixVols
105         })
106 }
107
108 func TestReplicationDefault1(t *testing.T) {
109         v := &UnixVolume{
110                 Root:     "/",
111                 ReadOnly: true,
112         }
113         if err := v.Start(); err != nil {
114                 t.Error(err)
115         }
116         if got := v.Replication(); got != 1 {
117                 t.Errorf("Replication() returned %d, expected 1 if no config given", got)
118         }
119 }
120
121 func TestGetNotFound(t *testing.T) {
122         v := NewTestableUnixVolume(t, false, false)
123         defer v.Teardown()
124         v.Put(context.Background(), TestHash, TestBlock)
125
126         buf := make([]byte, BlockSize)
127         n, err := v.Get(context.Background(), TestHash2, buf)
128         switch {
129         case os.IsNotExist(err):
130                 break
131         case err == nil:
132                 t.Errorf("Read should have failed, returned %+q", buf[:n])
133         default:
134                 t.Errorf("Read expected ErrNotExist, got: %s", err)
135         }
136 }
137
138 func TestPut(t *testing.T) {
139         v := NewTestableUnixVolume(t, false, false)
140         defer v.Teardown()
141
142         err := v.Put(context.Background(), TestHash, TestBlock)
143         if err != nil {
144                 t.Error(err)
145         }
146         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
147         if buf, err := ioutil.ReadFile(p); err != nil {
148                 t.Error(err)
149         } else if bytes.Compare(buf, TestBlock) != 0 {
150                 t.Errorf("Write should have stored %s, did store %s",
151                         string(TestBlock), string(buf))
152         }
153 }
154
155 func TestPutBadVolume(t *testing.T) {
156         v := NewTestableUnixVolume(t, false, false)
157         defer v.Teardown()
158
159         os.Chmod(v.Root, 000)
160         err := v.Put(context.Background(), TestHash, TestBlock)
161         if err == nil {
162                 t.Error("Write should have failed")
163         }
164 }
165
166 func TestUnixVolumeReadonly(t *testing.T) {
167         v := NewTestableUnixVolume(t, false, true)
168         defer v.Teardown()
169
170         v.PutRaw(TestHash, TestBlock)
171
172         buf := make([]byte, BlockSize)
173         _, err := v.Get(context.Background(), TestHash, buf)
174         if err != nil {
175                 t.Errorf("got err %v, expected nil", err)
176         }
177
178         err = v.Put(context.Background(), TestHash, TestBlock)
179         if err != MethodDisabledError {
180                 t.Errorf("got err %v, expected MethodDisabledError", err)
181         }
182
183         err = v.Touch(TestHash)
184         if err != MethodDisabledError {
185                 t.Errorf("got err %v, expected MethodDisabledError", err)
186         }
187
188         err = v.Trash(TestHash)
189         if err != MethodDisabledError {
190                 t.Errorf("got err %v, expected MethodDisabledError", err)
191         }
192 }
193
194 func TestIsFull(t *testing.T) {
195         v := NewTestableUnixVolume(t, false, false)
196         defer v.Teardown()
197
198         fullPath := v.Root + "/full"
199         now := fmt.Sprintf("%d", time.Now().Unix())
200         os.Symlink(now, fullPath)
201         if !v.IsFull() {
202                 t.Errorf("%s: claims not to be full", v)
203         }
204         os.Remove(fullPath)
205
206         // Test with an expired /full link.
207         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
208         os.Symlink(expired, fullPath)
209         if v.IsFull() {
210                 t.Errorf("%s: should no longer be full", v)
211         }
212 }
213
214 func TestNodeStatus(t *testing.T) {
215         v := NewTestableUnixVolume(t, false, false)
216         defer v.Teardown()
217
218         // Get node status and make a basic sanity check.
219         volinfo := v.Status()
220         if volinfo.MountPoint != v.Root {
221                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.Root)
222         }
223         if volinfo.DeviceNum == 0 {
224                 t.Errorf("uninitialized device_num in %v", volinfo)
225         }
226         if volinfo.BytesFree == 0 {
227                 t.Errorf("uninitialized bytes_free in %v", volinfo)
228         }
229         if volinfo.BytesUsed == 0 {
230                 t.Errorf("uninitialized bytes_used in %v", volinfo)
231         }
232 }
233
234 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
235         v := NewTestableUnixVolume(t, false, false)
236         defer v.Teardown()
237
238         v.Put(context.Background(), TestHash, TestBlock)
239         mockErr := errors.New("Mock error")
240         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
241                 return mockErr
242         })
243         if err != mockErr {
244                 t.Errorf("Got %v, expected %v", err, mockErr)
245         }
246 }
247
248 func TestUnixVolumeGetFuncFileError(t *testing.T) {
249         v := NewTestableUnixVolume(t, false, false)
250         defer v.Teardown()
251
252         funcCalled := false
253         err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
254                 funcCalled = true
255                 return nil
256         })
257         if err == nil {
258                 t.Errorf("Expected error opening non-existent file")
259         }
260         if funcCalled {
261                 t.Errorf("Worker func should not have been called")
262         }
263 }
264
265 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
266         v := NewTestableUnixVolume(t, false, false)
267         defer v.Teardown()
268
269         v.Put(context.Background(), TestHash, TestBlock)
270
271         mtx := NewMockMutex()
272         v.locker = mtx
273
274         funcCalled := make(chan struct{})
275         go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
276                 funcCalled <- struct{}{}
277                 return nil
278         })
279         select {
280         case mtx.AllowLock <- struct{}{}:
281         case <-funcCalled:
282                 t.Fatal("Function was called before mutex was acquired")
283         case <-time.After(5 * time.Second):
284                 t.Fatal("Timed out before mutex was acquired")
285         }
286         select {
287         case <-funcCalled:
288         case mtx.AllowUnlock <- struct{}{}:
289                 t.Fatal("Mutex was released before function was called")
290         case <-time.After(5 * time.Second):
291                 t.Fatal("Timed out waiting for funcCalled")
292         }
293         select {
294         case mtx.AllowUnlock <- struct{}{}:
295         case <-time.After(5 * time.Second):
296                 t.Fatal("Timed out waiting for getFunc() to release mutex")
297         }
298 }
299
300 func TestUnixVolumeCompare(t *testing.T) {
301         v := NewTestableUnixVolume(t, false, false)
302         defer v.Teardown()
303
304         v.Put(context.Background(), TestHash, TestBlock)
305         err := v.Compare(context.Background(), TestHash, TestBlock)
306         if err != nil {
307                 t.Errorf("Got err %q, expected nil", err)
308         }
309
310         err = v.Compare(context.Background(), TestHash, []byte("baddata"))
311         if err != CollisionError {
312                 t.Errorf("Got err %q, expected %q", err, CollisionError)
313         }
314
315         v.Put(context.Background(), TestHash, []byte("baddata"))
316         err = v.Compare(context.Background(), TestHash, TestBlock)
317         if err != DiskHashError {
318                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
319         }
320
321         p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
322         os.Chmod(p, 000)
323         err = v.Compare(context.Background(), TestHash, TestBlock)
324         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
325                 t.Errorf("Got err %q, expected %q", err, "permission denied")
326         }
327 }
328
329 func TestUnixVolumeContextCancelPut(t *testing.T) {
330         v := NewTestableUnixVolume(t, true, false)
331         defer v.Teardown()
332         v.locker.Lock()
333         ctx, cancel := context.WithCancel(context.Background())
334         go func() {
335                 time.Sleep(50 * time.Millisecond)
336                 cancel()
337                 time.Sleep(50 * time.Millisecond)
338                 v.locker.Unlock()
339         }()
340         err := v.Put(ctx, TestHash, TestBlock)
341         if err != context.Canceled {
342                 t.Errorf("Put() returned %s -- expected short read / canceled", err)
343         }
344 }
345
346 func TestUnixVolumeContextCancelGet(t *testing.T) {
347         v := NewTestableUnixVolume(t, false, false)
348         defer v.Teardown()
349         bpath := v.blockPath(TestHash)
350         v.PutRaw(TestHash, TestBlock)
351         os.Remove(bpath)
352         err := syscall.Mkfifo(bpath, 0600)
353         if err != nil {
354                 t.Fatalf("Mkfifo %s: %s", bpath, err)
355         }
356         defer os.Remove(bpath)
357         ctx, cancel := context.WithCancel(context.Background())
358         go func() {
359                 time.Sleep(50 * time.Millisecond)
360                 cancel()
361         }()
362         buf := make([]byte, len(TestBlock))
363         n, err := v.Get(ctx, TestHash, buf)
364         if n == len(TestBlock) || err != context.Canceled {
365                 t.Errorf("Get() returned %d, %s -- expected short read / canceled", n, err)
366         }
367 }
368
369 var _ = check.Suite(&UnixVolumeSuite{})
370
371 type UnixVolumeSuite struct {
372         volume *TestableUnixVolume
373 }
374
375 func (s *UnixVolumeSuite) TearDownTest(c *check.C) {
376         if s.volume != nil {
377                 s.volume.Teardown()
378         }
379 }
380
381 func (s *UnixVolumeSuite) TestStats(c *check.C) {
382         s.volume = NewTestableUnixVolume(c, false, false)
383         stats := func() string {
384                 buf, err := json.Marshal(s.volume.InternalStats())
385                 c.Check(err, check.IsNil)
386                 return string(buf)
387         }
388
389         c.Check(stats(), check.Matches, `.*"StatOps":0,.*`)
390         c.Check(stats(), check.Matches, `.*"Errors":0,.*`)
391
392         loc := "acbd18db4cc2f85cedef654fccc4a4d8"
393         _, err := s.volume.Get(context.Background(), loc, make([]byte, 3))
394         c.Check(err, check.NotNil)
395         c.Check(stats(), check.Matches, `.*"StatOps":[^0],.*`)
396         c.Check(stats(), check.Matches, `.*"Errors":[^0],.*`)
397         c.Check(stats(), check.Matches, `.*"\*os\.PathError":[^0].*`)
398         c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
399         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
400         c.Check(stats(), check.Matches, `.*"CreateOps":0,.*`)
401
402         err = s.volume.Put(context.Background(), loc, []byte("foo"))
403         c.Check(err, check.IsNil)
404         c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
405         c.Check(stats(), check.Matches, `.*"CreateOps":1,.*`)
406         c.Check(stats(), check.Matches, `.*"OpenOps":0,.*`)
407         c.Check(stats(), check.Matches, `.*"UtimesOps":0,.*`)
408
409         err = s.volume.Touch(loc)
410         c.Check(err, check.IsNil)
411         c.Check(stats(), check.Matches, `.*"FlockOps":1,.*`)
412         c.Check(stats(), check.Matches, `.*"OpenOps":1,.*`)
413         c.Check(stats(), check.Matches, `.*"UtimesOps":1,.*`)
414
415         _, err = s.volume.Get(context.Background(), loc, make([]byte, 3))
416         c.Check(err, check.IsNil)
417         err = s.volume.Compare(context.Background(), loc, []byte("foo"))
418         c.Check(err, check.IsNil)
419         c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
420         c.Check(stats(), check.Matches, `.*"OpenOps":3,.*`)
421
422         err = s.volume.Trash(loc)
423         c.Check(err, check.IsNil)
424         c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
425 }