7121: Add collisionOrCorrupt->DiskHashError test cases where the mismatched data...
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "os"
10         "regexp"
11         "sort"
12         "strings"
13         "sync"
14         "syscall"
15         "testing"
16         "time"
17 )
18
19 func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
20         d, err := ioutil.TempDir("", "volume_test")
21         if err != nil {
22                 t.Fatal(err)
23         }
24         var locker sync.Locker
25         if serialize {
26                 locker = &sync.Mutex{}
27         }
28         return &UnixVolume{
29                 root:     d,
30                 locker:   locker,
31                 readonly: readonly,
32         }
33 }
34
35 func _teardown(v *UnixVolume) {
36         os.RemoveAll(v.root)
37 }
38
39 // _store writes a Keep block directly into a UnixVolume, bypassing
40 // the overhead and safeguards of Put(). Useful for storing bogus data
41 // and isolating unit tests from Put() behavior.
42 func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
43         blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
44         if err := os.MkdirAll(blockdir, 0755); err != nil {
45                 t.Fatal(err)
46         }
47
48         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
49         if f, err := os.Create(blockpath); err == nil {
50                 f.Write(block)
51                 f.Close()
52         } else {
53                 t.Fatal(err)
54         }
55 }
56
57 func TestGet(t *testing.T) {
58         v := TempUnixVolume(t, false, false)
59         defer _teardown(v)
60         _store(t, v, TEST_HASH, TEST_BLOCK)
61
62         buf, err := v.Get(TEST_HASH)
63         if err != nil {
64                 t.Error(err)
65         }
66         if bytes.Compare(buf, TEST_BLOCK) != 0 {
67                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
68         }
69 }
70
71 func TestGetNotFound(t *testing.T) {
72         v := TempUnixVolume(t, false, false)
73         defer _teardown(v)
74         _store(t, v, TEST_HASH, TEST_BLOCK)
75
76         buf, err := v.Get(TEST_HASH_2)
77         switch {
78         case os.IsNotExist(err):
79                 break
80         case err == nil:
81                 t.Errorf("Read should have failed, returned %s", string(buf))
82         default:
83                 t.Errorf("Read expected ErrNotExist, got: %s", err)
84         }
85 }
86
87 func TestIndexTo(t *testing.T) {
88         v := TempUnixVolume(t, false, false)
89         defer _teardown(v)
90
91         _store(t, v, TEST_HASH, TEST_BLOCK)
92         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
93         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
94
95         buf := new(bytes.Buffer)
96         v.IndexTo("", buf)
97         index_rows := strings.Split(string(buf.Bytes()), "\n")
98         sort.Strings(index_rows)
99         sorted_index := strings.Join(index_rows, "\n")
100         m, err := regexp.MatchString(
101                 `^\n`+TEST_HASH+`\+\d+ \d+\n`+
102                         TEST_HASH_3+`\+\d+ \d+\n`+
103                         TEST_HASH_2+`\+\d+ \d+$`,
104                 sorted_index)
105         if err != nil {
106                 t.Error(err)
107         } else if !m {
108                 t.Errorf("Got index %q for empty prefix", sorted_index)
109         }
110
111         for _, prefix := range []string{"f", "f15", "f15ac"} {
112                 buf = new(bytes.Buffer)
113                 v.IndexTo(prefix, buf)
114                 m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
115                 if err != nil {
116                         t.Error(err)
117                 } else if !m {
118                         t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
119                 }
120         }
121 }
122
123 func TestPut(t *testing.T) {
124         v := TempUnixVolume(t, false, false)
125         defer _teardown(v)
126
127         err := v.Put(TEST_HASH, TEST_BLOCK)
128         if err != nil {
129                 t.Error(err)
130         }
131         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
132         if buf, err := ioutil.ReadFile(p); err != nil {
133                 t.Error(err)
134         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
135                 t.Errorf("Write should have stored %s, did store %s",
136                         string(TEST_BLOCK), string(buf))
137         }
138 }
139
140 func TestPutBadVolume(t *testing.T) {
141         v := TempUnixVolume(t, false, false)
142         defer _teardown(v)
143
144         os.Chmod(v.root, 000)
145         err := v.Put(TEST_HASH, TEST_BLOCK)
146         if err == nil {
147                 t.Error("Write should have failed")
148         }
149 }
150
151 func TestUnixVolumeReadonly(t *testing.T) {
152         v := TempUnixVolume(t, false, false)
153         defer _teardown(v)
154
155         // First write something before marking readonly
156         err := v.Put(TEST_HASH, TEST_BLOCK)
157         if err != nil {
158                 t.Error("got err %v, expected nil", err)
159         }
160
161         v.readonly = true
162
163         _, err = v.Get(TEST_HASH)
164         if err != nil {
165                 t.Error("got err %v, expected nil", err)
166         }
167
168         err = v.Put(TEST_HASH, TEST_BLOCK)
169         if err != MethodDisabledError {
170                 t.Error("got err %v, expected MethodDisabledError", err)
171         }
172
173         err = v.Touch(TEST_HASH)
174         if err != MethodDisabledError {
175                 t.Error("got err %v, expected MethodDisabledError", err)
176         }
177
178         err = v.Delete(TEST_HASH)
179         if err != MethodDisabledError {
180                 t.Error("got err %v, expected MethodDisabledError", err)
181         }
182 }
183
184 // TestPutTouch
185 //     Test that when applying PUT to a block that already exists,
186 //     the block's modification time is updated.
187 func TestPutTouch(t *testing.T) {
188         v := TempUnixVolume(t, false, false)
189         defer _teardown(v)
190
191         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
192                 t.Error(err)
193         }
194
195         // We'll verify { t0 < threshold < t1 }, where t0 is the
196         // existing block's timestamp on disk before Put() and t1 is
197         // its timestamp after Put().
198         threshold := time.Now().Add(-time.Second)
199
200         // Set the stored block's mtime far enough in the past that we
201         // can see the difference between "timestamp didn't change"
202         // and "timestamp granularity is too low".
203         {
204                 oldtime := time.Now().Add(-20 * time.Second).Unix()
205                 if err := syscall.Utime(v.blockPath(TEST_HASH),
206                         &syscall.Utimbuf{oldtime, oldtime}); err != nil {
207                         t.Error(err)
208                 }
209
210                 // Make sure v.Mtime() agrees the above Utime really worked.
211                 if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
212                         t.Errorf("Setting mtime failed: %v, %v", t0, err)
213                 }
214         }
215
216         // Write the same block again.
217         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
218                 t.Error(err)
219         }
220
221         // Verify threshold < t1
222         t1, err := v.Mtime(TEST_HASH)
223         if err != nil {
224                 t.Error(err)
225         }
226         if t1.Before(threshold) {
227                 t.Errorf("t1 %v must be >= threshold %v after v.Put ",
228                         t1, threshold)
229         }
230 }
231
232 // Serialization tests: launch a bunch of concurrent
233 //
234 // TODO(twp): show that the underlying Read/Write operations executed
235 // serially and not concurrently. The easiest way to do this is
236 // probably to activate verbose or debug logging, capture log output
237 // and examine it to confirm that Reads and Writes did not overlap.
238 //
239 // TODO(twp): a proper test of I/O serialization requires that a
240 // second request start while the first one is still underway.
241 // Guaranteeing that the test behaves this way requires some tricky
242 // synchronization and mocking.  For now we'll just launch a bunch of
243 // requests simultaenously in goroutines and demonstrate that they
244 // return accurate results.
245 //
246 func TestGetSerialized(t *testing.T) {
247         // Create a volume with I/O serialization enabled.
248         v := TempUnixVolume(t, true, false)
249         defer _teardown(v)
250
251         _store(t, v, TEST_HASH, TEST_BLOCK)
252         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
253         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
254
255         sem := make(chan int)
256         go func(sem chan int) {
257                 buf, err := v.Get(TEST_HASH)
258                 if err != nil {
259                         t.Errorf("err1: %v", err)
260                 }
261                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
262                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
263                 }
264                 sem <- 1
265         }(sem)
266
267         go func(sem chan int) {
268                 buf, err := v.Get(TEST_HASH_2)
269                 if err != nil {
270                         t.Errorf("err2: %v", err)
271                 }
272                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
273                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
274                 }
275                 sem <- 1
276         }(sem)
277
278         go func(sem chan int) {
279                 buf, err := v.Get(TEST_HASH_3)
280                 if err != nil {
281                         t.Errorf("err3: %v", err)
282                 }
283                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
284                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
285                 }
286                 sem <- 1
287         }(sem)
288
289         // Wait for all goroutines to finish
290         for done := 0; done < 3; {
291                 done += <-sem
292         }
293 }
294
295 func TestPutSerialized(t *testing.T) {
296         // Create a volume with I/O serialization enabled.
297         v := TempUnixVolume(t, true, false)
298         defer _teardown(v)
299
300         sem := make(chan int)
301         go func(sem chan int) {
302                 err := v.Put(TEST_HASH, TEST_BLOCK)
303                 if err != nil {
304                         t.Errorf("err1: %v", err)
305                 }
306                 sem <- 1
307         }(sem)
308
309         go func(sem chan int) {
310                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
311                 if err != nil {
312                         t.Errorf("err2: %v", err)
313                 }
314                 sem <- 1
315         }(sem)
316
317         go func(sem chan int) {
318                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
319                 if err != nil {
320                         t.Errorf("err3: %v", err)
321                 }
322                 sem <- 1
323         }(sem)
324
325         // Wait for all goroutines to finish
326         for done := 0; done < 3; {
327                 done += <-sem
328         }
329
330         // Double check that we actually wrote the blocks we expected to write.
331         buf, err := v.Get(TEST_HASH)
332         if err != nil {
333                 t.Errorf("Get #1: %v", err)
334         }
335         if bytes.Compare(buf, TEST_BLOCK) != 0 {
336                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
337         }
338
339         buf, err = v.Get(TEST_HASH_2)
340         if err != nil {
341                 t.Errorf("Get #2: %v", err)
342         }
343         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
344                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
345         }
346
347         buf, err = v.Get(TEST_HASH_3)
348         if err != nil {
349                 t.Errorf("Get #3: %v", err)
350         }
351         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
352                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
353         }
354 }
355
356 func TestIsFull(t *testing.T) {
357         v := TempUnixVolume(t, false, false)
358         defer _teardown(v)
359
360         full_path := v.root + "/full"
361         now := fmt.Sprintf("%d", time.Now().Unix())
362         os.Symlink(now, full_path)
363         if !v.IsFull() {
364                 t.Errorf("%s: claims not to be full", v)
365         }
366         os.Remove(full_path)
367
368         // Test with an expired /full link.
369         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
370         os.Symlink(expired, full_path)
371         if v.IsFull() {
372                 t.Errorf("%s: should no longer be full", v)
373         }
374 }
375
376 func TestNodeStatus(t *testing.T) {
377         v := TempUnixVolume(t, false, false)
378         defer _teardown(v)
379
380         // Get node status and make a basic sanity check.
381         volinfo := v.Status()
382         if volinfo.MountPoint != v.root {
383                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
384         }
385         if volinfo.DeviceNum == 0 {
386                 t.Errorf("uninitialized device_num in %v", volinfo)
387         }
388         if volinfo.BytesFree == 0 {
389                 t.Errorf("uninitialized bytes_free in %v", volinfo)
390         }
391         if volinfo.BytesUsed == 0 {
392                 t.Errorf("uninitialized bytes_used in %v", volinfo)
393         }
394 }
395
396 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
397         v := TempUnixVolume(t, false, false)
398         defer _teardown(v)
399
400         v.Put(TEST_HASH, TEST_BLOCK)
401         mockErr := errors.New("Mock error")
402         err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
403                 return mockErr
404         })
405         if err != mockErr {
406                 t.Errorf("Got %v, expected %v", err, mockErr)
407         }
408 }
409
410 func TestUnixVolumeGetFuncFileError(t *testing.T) {
411         v := TempUnixVolume(t, false, false)
412         defer _teardown(v)
413
414         funcCalled := false
415         err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
416                 funcCalled = true
417                 return nil
418         })
419         if err == nil {
420                 t.Errorf("Expected error opening non-existent file")
421         }
422         if funcCalled {
423                 t.Errorf("Worker func should not have been called")
424         }
425 }
426
427 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
428         v := TempUnixVolume(t, false, false)
429         defer _teardown(v)
430
431         v.Put(TEST_HASH, TEST_BLOCK)
432
433         mtx := NewMockMutex()
434         v.locker = mtx
435
436         funcCalled := make(chan struct{})
437         go v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
438                 funcCalled <- struct{}{}
439                 return nil
440         })
441         select {
442         case mtx.AllowLock <- struct{}{}:
443         case <-funcCalled:
444                 t.Fatal("Function was called before mutex was acquired")
445         case <-time.After(5 * time.Second):
446                 t.Fatal("Timed out before mutex was acquired")
447         }
448         select {
449         case <-funcCalled:
450         case mtx.AllowUnlock <- struct{}{}:
451                 t.Fatal("Mutex was released before function was called")
452         case <-time.After(5 * time.Second):
453                 t.Fatal("Timed out waiting for funcCalled")
454         }
455         select {
456         case mtx.AllowUnlock <- struct{}{}:
457         case <-time.After(5 * time.Second):
458                 t.Fatal("Timed out waiting for getFunc() to release mutex")
459         }
460 }
461
462 func TestUnixVolumeCompare(t *testing.T) {
463         v := TempUnixVolume(t, false, false)
464         defer _teardown(v)
465
466         v.Put(TEST_HASH, TEST_BLOCK)
467         err := v.Compare(TEST_HASH, TEST_BLOCK)
468         if err != nil {
469                 t.Errorf("Got err %q, expected nil", err)
470         }
471
472         err = v.Compare(TEST_HASH, []byte("baddata"))
473         if err != CollisionError {
474                 t.Errorf("Got err %q, expected %q", err, CollisionError)
475         }
476
477         _store(t, v, TEST_HASH, []byte("baddata"))
478         err = v.Compare(TEST_HASH, TEST_BLOCK)
479         if err != DiskHashError {
480                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
481         }
482
483         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
484         os.Chmod(p, 000)
485         err = v.Compare(TEST_HASH, TEST_BLOCK)
486         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
487                 t.Errorf("Got err %q, expected %q", err, "permission denied")
488         }
489 }