7310: adds git v2 style regex to match http 500 error
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "os"
10         "regexp"
11         "sort"
12         "strings"
13         "sync"
14         "syscall"
15         "testing"
16         "time"
17 )
18
19 type TestableUnixVolume struct {
20         UnixVolume
21         t *testing.T
22 }
23
24 func NewTestableUnixVolume(t *testing.T, serialize bool, readonly bool) *TestableUnixVolume {
25         d, err := ioutil.TempDir("", "volume_test")
26         if err != nil {
27                 t.Fatal(err)
28         }
29         var locker sync.Locker
30         if serialize {
31                 locker = &sync.Mutex{}
32         }
33         return &TestableUnixVolume{
34                 UnixVolume: UnixVolume{
35                         root:     d,
36                         locker:   locker,
37                         readonly: readonly,
38                 },
39                 t: t,
40         }
41 }
42
43 // PutRaw writes a Keep block directly into a UnixVolume, even if
44 // the volume is readonly.
45 func (v *TestableUnixVolume) PutRaw(locator string, data []byte) {
46         defer func(orig bool) {
47                 v.readonly = orig
48         }(v.readonly)
49         v.readonly = false
50         err := v.Put(locator, data)
51         if err != nil {
52                 v.t.Fatal(err)
53         }
54 }
55
56 func (v *TestableUnixVolume) TouchWithDate(locator string, lastPut time.Time) {
57         err := syscall.Utime(v.blockPath(locator), &syscall.Utimbuf{lastPut.Unix(), lastPut.Unix()})
58         if err != nil {
59                 v.t.Fatal(err)
60         }
61 }
62
63 func (v *TestableUnixVolume) Teardown() {
64         if err := os.RemoveAll(v.root); err != nil {
65                 v.t.Fatal(err)
66         }
67 }
68
69 func TestUnixVolumeWithGenericTests(t *testing.T) {
70         DoGenericVolumeTests(t, func(t *testing.T) TestableVolume {
71                 return NewTestableUnixVolume(t, false, false)
72         })
73 }
74
75 func TestGet(t *testing.T) {
76         v := NewTestableUnixVolume(t, false, false)
77         defer v.Teardown()
78         v.Put(TEST_HASH, TEST_BLOCK)
79
80         buf, err := v.Get(TEST_HASH)
81         if err != nil {
82                 t.Error(err)
83         }
84         if bytes.Compare(buf, TEST_BLOCK) != 0 {
85                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
86         }
87 }
88
89 func TestGetNotFound(t *testing.T) {
90         v := NewTestableUnixVolume(t, false, false)
91         defer v.Teardown()
92         v.Put(TEST_HASH, TEST_BLOCK)
93
94         buf, err := v.Get(TEST_HASH_2)
95         switch {
96         case os.IsNotExist(err):
97                 break
98         case err == nil:
99                 t.Errorf("Read should have failed, returned %s", string(buf))
100         default:
101                 t.Errorf("Read expected ErrNotExist, got: %s", err)
102         }
103 }
104
105 func TestIndexTo(t *testing.T) {
106         v := NewTestableUnixVolume(t, false, false)
107         defer v.Teardown()
108
109         v.Put(TEST_HASH, TEST_BLOCK)
110         v.Put(TEST_HASH_2, TEST_BLOCK_2)
111         v.Put(TEST_HASH_3, TEST_BLOCK_3)
112
113         buf := new(bytes.Buffer)
114         v.IndexTo("", buf)
115         index_rows := strings.Split(string(buf.Bytes()), "\n")
116         sort.Strings(index_rows)
117         sorted_index := strings.Join(index_rows, "\n")
118         m, err := regexp.MatchString(
119                 `^\n`+TEST_HASH+`\+\d+ \d+\n`+
120                         TEST_HASH_3+`\+\d+ \d+\n`+
121                         TEST_HASH_2+`\+\d+ \d+$`,
122                 sorted_index)
123         if err != nil {
124                 t.Error(err)
125         } else if !m {
126                 t.Errorf("Got index %q for empty prefix", sorted_index)
127         }
128
129         for _, prefix := range []string{"f", "f15", "f15ac"} {
130                 buf = new(bytes.Buffer)
131                 v.IndexTo(prefix, buf)
132                 m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
133                 if err != nil {
134                         t.Error(err)
135                 } else if !m {
136                         t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
137                 }
138         }
139 }
140
141 func TestPut(t *testing.T) {
142         v := NewTestableUnixVolume(t, false, false)
143         defer v.Teardown()
144
145         err := v.Put(TEST_HASH, TEST_BLOCK)
146         if err != nil {
147                 t.Error(err)
148         }
149         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
150         if buf, err := ioutil.ReadFile(p); err != nil {
151                 t.Error(err)
152         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
153                 t.Errorf("Write should have stored %s, did store %s",
154                         string(TEST_BLOCK), string(buf))
155         }
156 }
157
158 func TestPutBadVolume(t *testing.T) {
159         v := NewTestableUnixVolume(t, false, false)
160         defer v.Teardown()
161
162         os.Chmod(v.root, 000)
163         err := v.Put(TEST_HASH, TEST_BLOCK)
164         if err == nil {
165                 t.Error("Write should have failed")
166         }
167 }
168
169 func TestUnixVolumeReadonly(t *testing.T) {
170         v := NewTestableUnixVolume(t, false, true)
171         defer v.Teardown()
172
173         v.PutRaw(TEST_HASH, TEST_BLOCK)
174
175         _, err := v.Get(TEST_HASH)
176         if err != nil {
177                 t.Errorf("got err %v, expected nil", err)
178         }
179
180         err = v.Put(TEST_HASH, TEST_BLOCK)
181         if err != MethodDisabledError {
182                 t.Errorf("got err %v, expected MethodDisabledError", err)
183         }
184
185         err = v.Touch(TEST_HASH)
186         if err != MethodDisabledError {
187                 t.Errorf("got err %v, expected MethodDisabledError", err)
188         }
189
190         err = v.Delete(TEST_HASH)
191         if err != MethodDisabledError {
192                 t.Errorf("got err %v, expected MethodDisabledError", err)
193         }
194 }
195
196 // TestPutTouch
197 //     Test that when applying PUT to a block that already exists,
198 //     the block's modification time is updated.
199 func TestPutTouch(t *testing.T) {
200         v := NewTestableUnixVolume(t, false, false)
201         defer v.Teardown()
202
203         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
204                 t.Error(err)
205         }
206
207         // We'll verify { t0 < threshold < t1 }, where t0 is the
208         // existing block's timestamp on disk before Put() and t1 is
209         // its timestamp after Put().
210         threshold := time.Now().Add(-time.Second)
211
212         // Set the stored block's mtime far enough in the past that we
213         // can see the difference between "timestamp didn't change"
214         // and "timestamp granularity is too low".
215         v.TouchWithDate(TEST_HASH, time.Now().Add(-20*time.Second))
216
217         // Make sure v.Mtime() agrees the above Utime really worked.
218         if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
219                 t.Errorf("Setting mtime failed: %v, %v", t0, err)
220         }
221
222         // Write the same block again.
223         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
224                 t.Error(err)
225         }
226
227         // Verify threshold < t1
228         if t1, err := v.Mtime(TEST_HASH); err != nil {
229                 t.Error(err)
230         } else if t1.Before(threshold) {
231                 t.Errorf("t1 %v should be >= threshold %v after v.Put ", t1, threshold)
232         }
233 }
234
235 // Serialization tests: launch a bunch of concurrent
236 //
237 // TODO(twp): show that the underlying Read/Write operations executed
238 // serially and not concurrently. The easiest way to do this is
239 // probably to activate verbose or debug logging, capture log output
240 // and examine it to confirm that Reads and Writes did not overlap.
241 //
242 // TODO(twp): a proper test of I/O serialization requires that a
243 // second request start while the first one is still underway.
244 // Guaranteeing that the test behaves this way requires some tricky
245 // synchronization and mocking.  For now we'll just launch a bunch of
246 // requests simultaenously in goroutines and demonstrate that they
247 // return accurate results.
248 //
249 func TestGetSerialized(t *testing.T) {
250         // Create a volume with I/O serialization enabled.
251         v := NewTestableUnixVolume(t, true, false)
252         defer v.Teardown()
253
254         v.Put(TEST_HASH, TEST_BLOCK)
255         v.Put(TEST_HASH_2, TEST_BLOCK_2)
256         v.Put(TEST_HASH_3, TEST_BLOCK_3)
257
258         sem := make(chan int)
259         go func(sem chan int) {
260                 buf, err := v.Get(TEST_HASH)
261                 if err != nil {
262                         t.Errorf("err1: %v", err)
263                 }
264                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
265                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
266                 }
267                 sem <- 1
268         }(sem)
269
270         go func(sem chan int) {
271                 buf, err := v.Get(TEST_HASH_2)
272                 if err != nil {
273                         t.Errorf("err2: %v", err)
274                 }
275                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
276                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
277                 }
278                 sem <- 1
279         }(sem)
280
281         go func(sem chan int) {
282                 buf, err := v.Get(TEST_HASH_3)
283                 if err != nil {
284                         t.Errorf("err3: %v", err)
285                 }
286                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
287                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
288                 }
289                 sem <- 1
290         }(sem)
291
292         // Wait for all goroutines to finish
293         for done := 0; done < 3; {
294                 done += <-sem
295         }
296 }
297
298 func TestPutSerialized(t *testing.T) {
299         // Create a volume with I/O serialization enabled.
300         v := NewTestableUnixVolume(t, true, false)
301         defer v.Teardown()
302
303         sem := make(chan int)
304         go func(sem chan int) {
305                 err := v.Put(TEST_HASH, TEST_BLOCK)
306                 if err != nil {
307                         t.Errorf("err1: %v", err)
308                 }
309                 sem <- 1
310         }(sem)
311
312         go func(sem chan int) {
313                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
314                 if err != nil {
315                         t.Errorf("err2: %v", err)
316                 }
317                 sem <- 1
318         }(sem)
319
320         go func(sem chan int) {
321                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
322                 if err != nil {
323                         t.Errorf("err3: %v", err)
324                 }
325                 sem <- 1
326         }(sem)
327
328         // Wait for all goroutines to finish
329         for done := 0; done < 3; {
330                 done += <-sem
331         }
332
333         // Double check that we actually wrote the blocks we expected to write.
334         buf, err := v.Get(TEST_HASH)
335         if err != nil {
336                 t.Errorf("Get #1: %v", err)
337         }
338         if bytes.Compare(buf, TEST_BLOCK) != 0 {
339                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
340         }
341
342         buf, err = v.Get(TEST_HASH_2)
343         if err != nil {
344                 t.Errorf("Get #2: %v", err)
345         }
346         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
347                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
348         }
349
350         buf, err = v.Get(TEST_HASH_3)
351         if err != nil {
352                 t.Errorf("Get #3: %v", err)
353         }
354         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
355                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
356         }
357 }
358
359 func TestIsFull(t *testing.T) {
360         v := NewTestableUnixVolume(t, false, false)
361         defer v.Teardown()
362
363         full_path := v.root + "/full"
364         now := fmt.Sprintf("%d", time.Now().Unix())
365         os.Symlink(now, full_path)
366         if !v.IsFull() {
367                 t.Errorf("%s: claims not to be full", v)
368         }
369         os.Remove(full_path)
370
371         // Test with an expired /full link.
372         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
373         os.Symlink(expired, full_path)
374         if v.IsFull() {
375                 t.Errorf("%s: should no longer be full", v)
376         }
377 }
378
379 func TestNodeStatus(t *testing.T) {
380         v := NewTestableUnixVolume(t, false, false)
381         defer v.Teardown()
382
383         // Get node status and make a basic sanity check.
384         volinfo := v.Status()
385         if volinfo.MountPoint != v.root {
386                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
387         }
388         if volinfo.DeviceNum == 0 {
389                 t.Errorf("uninitialized device_num in %v", volinfo)
390         }
391         if volinfo.BytesFree == 0 {
392                 t.Errorf("uninitialized bytes_free in %v", volinfo)
393         }
394         if volinfo.BytesUsed == 0 {
395                 t.Errorf("uninitialized bytes_used in %v", volinfo)
396         }
397 }
398
399 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
400         v := NewTestableUnixVolume(t, false, false)
401         defer v.Teardown()
402
403         v.Put(TEST_HASH, TEST_BLOCK)
404         mockErr := errors.New("Mock error")
405         err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
406                 return mockErr
407         })
408         if err != mockErr {
409                 t.Errorf("Got %v, expected %v", err, mockErr)
410         }
411 }
412
413 func TestUnixVolumeGetFuncFileError(t *testing.T) {
414         v := NewTestableUnixVolume(t, false, false)
415         defer v.Teardown()
416
417         funcCalled := false
418         err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
419                 funcCalled = true
420                 return nil
421         })
422         if err == nil {
423                 t.Errorf("Expected error opening non-existent file")
424         }
425         if funcCalled {
426                 t.Errorf("Worker func should not have been called")
427         }
428 }
429
430 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
431         v := NewTestableUnixVolume(t, false, false)
432         defer v.Teardown()
433
434         v.Put(TEST_HASH, TEST_BLOCK)
435
436         mtx := NewMockMutex()
437         v.locker = mtx
438
439         funcCalled := make(chan struct{})
440         go v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
441                 funcCalled <- struct{}{}
442                 return nil
443         })
444         select {
445         case mtx.AllowLock <- struct{}{}:
446         case <-funcCalled:
447                 t.Fatal("Function was called before mutex was acquired")
448         case <-time.After(5 * time.Second):
449                 t.Fatal("Timed out before mutex was acquired")
450         }
451         select {
452         case <-funcCalled:
453         case mtx.AllowUnlock <- struct{}{}:
454                 t.Fatal("Mutex was released before function was called")
455         case <-time.After(5 * time.Second):
456                 t.Fatal("Timed out waiting for funcCalled")
457         }
458         select {
459         case mtx.AllowUnlock <- struct{}{}:
460         case <-time.After(5 * time.Second):
461                 t.Fatal("Timed out waiting for getFunc() to release mutex")
462         }
463 }
464
465 func TestUnixVolumeCompare(t *testing.T) {
466         v := NewTestableUnixVolume(t, false, false)
467         defer v.Teardown()
468
469         v.Put(TEST_HASH, TEST_BLOCK)
470         err := v.Compare(TEST_HASH, TEST_BLOCK)
471         if err != nil {
472                 t.Errorf("Got err %q, expected nil", err)
473         }
474
475         err = v.Compare(TEST_HASH, []byte("baddata"))
476         if err != CollisionError {
477                 t.Errorf("Got err %q, expected %q", err, CollisionError)
478         }
479
480         v.Put(TEST_HASH, []byte("baddata"))
481         err = v.Compare(TEST_HASH, TEST_BLOCK)
482         if err != DiskHashError {
483                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
484         }
485
486         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
487         os.Chmod(p, 000)
488         err = v.Compare(TEST_HASH, TEST_BLOCK)
489         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
490                 t.Errorf("Got err %q, expected %q", err, "permission denied")
491         }
492 }