7121: Replace Get(loc,true) with CompareAndTouch(). Add Compare method to Volume...
[arvados.git] / services / keepstore / volume_unix_test.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "os"
10         "regexp"
11         "sort"
12         "strings"
13         "syscall"
14         "testing"
15         "time"
16 )
17
18 func TempUnixVolume(t *testing.T, serialize bool, readonly bool) *UnixVolume {
19         d, err := ioutil.TempDir("", "volume_test")
20         if err != nil {
21                 t.Fatal(err)
22         }
23         return &UnixVolume{
24                 root:      d,
25                 serialize: serialize,
26                 readonly:  readonly,
27         }
28 }
29
30 func _teardown(v *UnixVolume) {
31         os.RemoveAll(v.root)
32 }
33
34 // _store writes a Keep block directly into a UnixVolume, bypassing
35 // the overhead and safeguards of Put(). Useful for storing bogus data
36 // and isolating unit tests from Put() behavior.
37 func _store(t *testing.T, vol *UnixVolume, filename string, block []byte) {
38         blockdir := fmt.Sprintf("%s/%s", vol.root, filename[:3])
39         if err := os.MkdirAll(blockdir, 0755); err != nil {
40                 t.Fatal(err)
41         }
42
43         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
44         if f, err := os.Create(blockpath); err == nil {
45                 f.Write(block)
46                 f.Close()
47         } else {
48                 t.Fatal(err)
49         }
50 }
51
52 func TestGet(t *testing.T) {
53         v := TempUnixVolume(t, false, false)
54         defer _teardown(v)
55         _store(t, v, TEST_HASH, TEST_BLOCK)
56
57         buf, err := v.Get(TEST_HASH)
58         if err != nil {
59                 t.Error(err)
60         }
61         if bytes.Compare(buf, TEST_BLOCK) != 0 {
62                 t.Errorf("expected %s, got %s", string(TEST_BLOCK), string(buf))
63         }
64 }
65
66 func TestGetNotFound(t *testing.T) {
67         v := TempUnixVolume(t, false, false)
68         defer _teardown(v)
69         _store(t, v, TEST_HASH, TEST_BLOCK)
70
71         buf, err := v.Get(TEST_HASH_2)
72         switch {
73         case os.IsNotExist(err):
74                 break
75         case err == nil:
76                 t.Errorf("Read should have failed, returned %s", string(buf))
77         default:
78                 t.Errorf("Read expected ErrNotExist, got: %s", err)
79         }
80 }
81
82 func TestIndexTo(t *testing.T) {
83         v := TempUnixVolume(t, false, false)
84         defer _teardown(v)
85
86         _store(t, v, TEST_HASH, TEST_BLOCK)
87         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
88         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
89
90         buf := new(bytes.Buffer)
91         v.IndexTo("", buf)
92         index_rows := strings.Split(string(buf.Bytes()), "\n")
93         sort.Strings(index_rows)
94         sorted_index := strings.Join(index_rows, "\n")
95         m, err := regexp.MatchString(
96                 `^\n`+TEST_HASH+`\+\d+ \d+\n`+
97                         TEST_HASH_3+`\+\d+ \d+\n`+
98                         TEST_HASH_2+`\+\d+ \d+$`,
99                 sorted_index)
100         if err != nil {
101                 t.Error(err)
102         } else if !m {
103                 t.Errorf("Got index %q for empty prefix", sorted_index)
104         }
105
106         for _, prefix := range []string{"f", "f15", "f15ac"} {
107                 buf = new(bytes.Buffer)
108                 v.IndexTo(prefix, buf)
109                 m, err := regexp.MatchString(`^`+TEST_HASH_2+`\+\d+ \d+\n$`, string(buf.Bytes()))
110                 if err != nil {
111                         t.Error(err)
112                 } else if !m {
113                         t.Errorf("Got index %q for prefix %q", string(buf.Bytes()), prefix)
114                 }
115         }
116 }
117
118 func TestPut(t *testing.T) {
119         v := TempUnixVolume(t, false, false)
120         defer _teardown(v)
121
122         err := v.Put(TEST_HASH, TEST_BLOCK)
123         if err != nil {
124                 t.Error(err)
125         }
126         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
127         if buf, err := ioutil.ReadFile(p); err != nil {
128                 t.Error(err)
129         } else if bytes.Compare(buf, TEST_BLOCK) != 0 {
130                 t.Errorf("Write should have stored %s, did store %s",
131                         string(TEST_BLOCK), string(buf))
132         }
133 }
134
135 func TestPutBadVolume(t *testing.T) {
136         v := TempUnixVolume(t, false, false)
137         defer _teardown(v)
138
139         os.Chmod(v.root, 000)
140         err := v.Put(TEST_HASH, TEST_BLOCK)
141         if err == nil {
142                 t.Error("Write should have failed")
143         }
144 }
145
146 func TestUnixVolumeReadonly(t *testing.T) {
147         v := TempUnixVolume(t, false, false)
148         defer _teardown(v)
149
150         // First write something before marking readonly
151         err := v.Put(TEST_HASH, TEST_BLOCK)
152         if err != nil {
153                 t.Error("got err %v, expected nil", err)
154         }
155
156         v.readonly = true
157
158         _, err = v.Get(TEST_HASH)
159         if err != nil {
160                 t.Error("got err %v, expected nil", err)
161         }
162
163         err = v.Put(TEST_HASH, TEST_BLOCK)
164         if err != MethodDisabledError {
165                 t.Error("got err %v, expected MethodDisabledError", err)
166         }
167
168         err = v.Touch(TEST_HASH)
169         if err != MethodDisabledError {
170                 t.Error("got err %v, expected MethodDisabledError", err)
171         }
172
173         err = v.Delete(TEST_HASH)
174         if err != MethodDisabledError {
175                 t.Error("got err %v, expected MethodDisabledError", err)
176         }
177 }
178
179 // TestPutTouch
180 //     Test that when applying PUT to a block that already exists,
181 //     the block's modification time is updated.
182 func TestPutTouch(t *testing.T) {
183         v := TempUnixVolume(t, false, false)
184         defer _teardown(v)
185
186         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
187                 t.Error(err)
188         }
189
190         // We'll verify { t0 < threshold < t1 }, where t0 is the
191         // existing block's timestamp on disk before Put() and t1 is
192         // its timestamp after Put().
193         threshold := time.Now().Add(-time.Second)
194
195         // Set the stored block's mtime far enough in the past that we
196         // can see the difference between "timestamp didn't change"
197         // and "timestamp granularity is too low".
198         {
199                 oldtime := time.Now().Add(-20 * time.Second).Unix()
200                 if err := syscall.Utime(v.blockPath(TEST_HASH),
201                         &syscall.Utimbuf{oldtime, oldtime}); err != nil {
202                         t.Error(err)
203                 }
204
205                 // Make sure v.Mtime() agrees the above Utime really worked.
206                 if t0, err := v.Mtime(TEST_HASH); err != nil || t0.IsZero() || !t0.Before(threshold) {
207                         t.Errorf("Setting mtime failed: %v, %v", t0, err)
208                 }
209         }
210
211         // Write the same block again.
212         if err := v.Put(TEST_HASH, TEST_BLOCK); err != nil {
213                 t.Error(err)
214         }
215
216         // Verify threshold < t1
217         t1, err := v.Mtime(TEST_HASH)
218         if err != nil {
219                 t.Error(err)
220         }
221         if t1.Before(threshold) {
222                 t.Errorf("t1 %v must be >= threshold %v after v.Put ",
223                         t1, threshold)
224         }
225 }
226
227 // Serialization tests: launch a bunch of concurrent
228 //
229 // TODO(twp): show that the underlying Read/Write operations executed
230 // serially and not concurrently. The easiest way to do this is
231 // probably to activate verbose or debug logging, capture log output
232 // and examine it to confirm that Reads and Writes did not overlap.
233 //
234 // TODO(twp): a proper test of I/O serialization requires that a
235 // second request start while the first one is still underway.
236 // Guaranteeing that the test behaves this way requires some tricky
237 // synchronization and mocking.  For now we'll just launch a bunch of
238 // requests simultaenously in goroutines and demonstrate that they
239 // return accurate results.
240 //
241 func TestGetSerialized(t *testing.T) {
242         // Create a volume with I/O serialization enabled.
243         v := TempUnixVolume(t, true, false)
244         defer _teardown(v)
245
246         _store(t, v, TEST_HASH, TEST_BLOCK)
247         _store(t, v, TEST_HASH_2, TEST_BLOCK_2)
248         _store(t, v, TEST_HASH_3, TEST_BLOCK_3)
249
250         sem := make(chan int)
251         go func(sem chan int) {
252                 buf, err := v.Get(TEST_HASH)
253                 if err != nil {
254                         t.Errorf("err1: %v", err)
255                 }
256                 if bytes.Compare(buf, TEST_BLOCK) != 0 {
257                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK), string(buf))
258                 }
259                 sem <- 1
260         }(sem)
261
262         go func(sem chan int) {
263                 buf, err := v.Get(TEST_HASH_2)
264                 if err != nil {
265                         t.Errorf("err2: %v", err)
266                 }
267                 if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
268                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_2), string(buf))
269                 }
270                 sem <- 1
271         }(sem)
272
273         go func(sem chan int) {
274                 buf, err := v.Get(TEST_HASH_3)
275                 if err != nil {
276                         t.Errorf("err3: %v", err)
277                 }
278                 if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
279                         t.Errorf("buf should be %s, is %s", string(TEST_BLOCK_3), string(buf))
280                 }
281                 sem <- 1
282         }(sem)
283
284         // Wait for all goroutines to finish
285         for done := 0; done < 3; {
286                 done += <-sem
287         }
288 }
289
290 func TestPutSerialized(t *testing.T) {
291         // Create a volume with I/O serialization enabled.
292         v := TempUnixVolume(t, true, false)
293         defer _teardown(v)
294
295         sem := make(chan int)
296         go func(sem chan int) {
297                 err := v.Put(TEST_HASH, TEST_BLOCK)
298                 if err != nil {
299                         t.Errorf("err1: %v", err)
300                 }
301                 sem <- 1
302         }(sem)
303
304         go func(sem chan int) {
305                 err := v.Put(TEST_HASH_2, TEST_BLOCK_2)
306                 if err != nil {
307                         t.Errorf("err2: %v", err)
308                 }
309                 sem <- 1
310         }(sem)
311
312         go func(sem chan int) {
313                 err := v.Put(TEST_HASH_3, TEST_BLOCK_3)
314                 if err != nil {
315                         t.Errorf("err3: %v", err)
316                 }
317                 sem <- 1
318         }(sem)
319
320         // Wait for all goroutines to finish
321         for done := 0; done < 3; {
322                 done += <-sem
323         }
324
325         // Double check that we actually wrote the blocks we expected to write.
326         buf, err := v.Get(TEST_HASH)
327         if err != nil {
328                 t.Errorf("Get #1: %v", err)
329         }
330         if bytes.Compare(buf, TEST_BLOCK) != 0 {
331                 t.Errorf("Get #1: expected %s, got %s", string(TEST_BLOCK), string(buf))
332         }
333
334         buf, err = v.Get(TEST_HASH_2)
335         if err != nil {
336                 t.Errorf("Get #2: %v", err)
337         }
338         if bytes.Compare(buf, TEST_BLOCK_2) != 0 {
339                 t.Errorf("Get #2: expected %s, got %s", string(TEST_BLOCK_2), string(buf))
340         }
341
342         buf, err = v.Get(TEST_HASH_3)
343         if err != nil {
344                 t.Errorf("Get #3: %v", err)
345         }
346         if bytes.Compare(buf, TEST_BLOCK_3) != 0 {
347                 t.Errorf("Get #3: expected %s, got %s", string(TEST_BLOCK_3), string(buf))
348         }
349 }
350
351 func TestIsFull(t *testing.T) {
352         v := TempUnixVolume(t, false, false)
353         defer _teardown(v)
354
355         full_path := v.root + "/full"
356         now := fmt.Sprintf("%d", time.Now().Unix())
357         os.Symlink(now, full_path)
358         if !v.IsFull() {
359                 t.Errorf("%s: claims not to be full", v)
360         }
361         os.Remove(full_path)
362
363         // Test with an expired /full link.
364         expired := fmt.Sprintf("%d", time.Now().Unix()-3605)
365         os.Symlink(expired, full_path)
366         if v.IsFull() {
367                 t.Errorf("%s: should no longer be full", v)
368         }
369 }
370
371 func TestNodeStatus(t *testing.T) {
372         v := TempUnixVolume(t, false, false)
373         defer _teardown(v)
374
375         // Get node status and make a basic sanity check.
376         volinfo := v.Status()
377         if volinfo.MountPoint != v.root {
378                 t.Errorf("GetNodeStatus mount_point %s, expected %s", volinfo.MountPoint, v.root)
379         }
380         if volinfo.DeviceNum == 0 {
381                 t.Errorf("uninitialized device_num in %v", volinfo)
382         }
383         if volinfo.BytesFree == 0 {
384                 t.Errorf("uninitialized bytes_free in %v", volinfo)
385         }
386         if volinfo.BytesUsed == 0 {
387                 t.Errorf("uninitialized bytes_used in %v", volinfo)
388         }
389 }
390
391 func TestUnixVolumeGetFuncWorkerError(t *testing.T) {
392         v := TempUnixVolume(t, false, false)
393         defer _teardown(v)
394
395         v.Put(TEST_HASH, TEST_BLOCK)
396         mockErr := errors.New("Mock error")
397         err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
398                 return mockErr
399         })
400         if err != mockErr {
401                 t.Errorf("Got %v, expected %v", err, mockErr)
402         }
403 }
404
405 func TestUnixVolumeGetFuncFileError(t *testing.T) {
406         v := TempUnixVolume(t, false, false)
407         defer _teardown(v)
408
409         funcCalled := false
410         err := v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
411                 funcCalled = true
412                 return nil
413         })
414         if err == nil {
415                 t.Errorf("Expected error opening non-existent file")
416         }
417         if funcCalled {
418                 t.Errorf("Worker func should not have been called")
419         }
420 }
421
422 func TestUnixVolumeGetFuncWorkerWaitsOnMutex(t *testing.T) {
423         v := TempUnixVolume(t, true, false)
424         defer _teardown(v)
425
426         v.mutex.Lock()
427         locked := true
428         go func() {
429                 // TODO(TC): Don't rely on Sleep. Mock the mutex instead?
430                 time.Sleep(10 * time.Millisecond)
431                 locked = false
432                 v.mutex.Unlock()
433         }()
434         v.getFunc(v.blockPath(TEST_HASH), func(rdr io.Reader) error {
435                 if locked {
436                         t.Errorf("Worker func called before serialize lock was obtained")
437                 }
438                 return nil
439         })
440 }
441
442 func TestUnixVolumeCompare(t *testing.T) {
443         v := TempUnixVolume(t, false, false)
444         defer _teardown(v)
445
446         v.Put(TEST_HASH, TEST_BLOCK)
447         err := v.Compare(TEST_HASH, TEST_BLOCK)
448         if err != nil {
449                 t.Errorf("Got err %q, expected nil", err)
450         }
451
452         err = v.Compare(TEST_HASH, []byte("baddata"))
453         if err != CollisionError {
454                 t.Errorf("Got err %q, expected %q", err, CollisionError)
455         }
456
457         _store(t, v, TEST_HASH, []byte("baddata"))
458         err = v.Compare(TEST_HASH, TEST_BLOCK)
459         if err != DiskHashError {
460                 t.Errorf("Got err %q, expected %q", err, DiskHashError)
461         }
462
463         p := fmt.Sprintf("%s/%s/%s", v.root, TEST_HASH[:3], TEST_HASH)
464         os.Chmod(p, 000)
465         err = v.Compare(TEST_HASH, TEST_BLOCK)
466         if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
467                 t.Errorf("Got err %q, expected %q", err, "permission denied")
468         }
469 }