Added -serialize flag.
[arvados.git] / services / keep / src / keep / keep_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "testing"
11 )
12
13 var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
14 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
15
16 var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
17 var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
18
19 var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
20 var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
21
22 // BAD_BLOCK is used to test collisions and corruption.
23 // It must not match any test hashes.
24 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
25
26 // TODO(twp): Tests still to be written
27 //
28 //   * TestPutBlockFull
29 //       - test that PutBlock returns 503 Full if the filesystem is full.
30 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
31 //
32 //   * TestPutBlockWriteErr
33 //       - test the behavior when Write returns an error.
34 //           - Possible solutions: use a small tmpfs and a high
35 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
36 //             to write a block larger than the amount of space left
37 //           - use an interface to mock ioutil.TempFile with a File
38 //             object that always returns an error on write
39 //
40 // TODO(twp): Make these tests less dependent on being able to access
41 // the UnixVolume root field.
42 //
43 // ========================================
44 // GetBlock tests.
45 // ========================================
46
47 // TestGetBlock
48 //     Test that simple block reads succeed.
49 //
50 func TestGetBlock(t *testing.T) {
51         defer teardown()
52
53         // Prepare two test Keep volumes. Our block is stored on the second volume.
54         KeepVolumes = setup(t, 2)
55         store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
56
57         // Check that GetBlock returns success.
58         result, err := GetBlock(TEST_HASH)
59         if err != nil {
60                 t.Errorf("GetBlock error: %s", err)
61         }
62         if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
63                 t.Errorf("expected %s, got %s", TEST_BLOCK, result)
64         }
65 }
66
67 // TestGetBlockMissing
68 //     GetBlock must return an error when the block is not found.
69 //
70 func TestGetBlockMissing(t *testing.T) {
71         defer teardown()
72
73         // Create two empty test Keep volumes.
74         KeepVolumes = setup(t, 2)
75
76         // Check that GetBlock returns failure.
77         result, err := GetBlock(TEST_HASH)
78         if err != NotFoundError {
79                 t.Errorf("Expected NotFoundError, got %v", result)
80         }
81 }
82
83 // TestGetBlockCorrupt
84 //     GetBlock must return an error when a corrupted block is requested
85 //     (the contents of the file do not checksum to its hash).
86 //
87 func TestGetBlockCorrupt(t *testing.T) {
88         defer teardown()
89
90         // Create two test Keep volumes and store a block in each of them,
91         // but the hash of the block does not match the filename.
92         KeepVolumes = setup(t, 2)
93         for _, vol := range KeepVolumes {
94                 store(t, vol, TEST_HASH, BAD_BLOCK)
95         }
96
97         // Check that GetBlock returns failure.
98         result, err := GetBlock(TEST_HASH)
99         if err != CorruptError {
100                 t.Errorf("Expected CorruptError, got %v (buf: %v)", err, result)
101         }
102 }
103
104 // ========================================
105 // PutBlock tests
106 // ========================================
107
108 // TestPutBlockOK
109 //     PutBlock can perform a simple block write and returns success.
110 //
111 func TestPutBlockOK(t *testing.T) {
112         defer teardown()
113
114         // Create two test Keep volumes.
115         KeepVolumes = setup(t, 2)
116
117         // Check that PutBlock stores the data as expected.
118         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
119                 t.Fatalf("PutBlock: %v", err)
120         }
121
122         result, err := GetBlock(TEST_HASH)
123         if err != nil {
124                 t.Fatalf("GetBlock returned error: %v", err)
125         }
126         if string(result) != string(TEST_BLOCK) {
127                 t.Error("PutBlock/GetBlock mismatch")
128                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
129                         string(TEST_BLOCK), string(result))
130         }
131 }
132
133 // TestPutBlockOneVol
134 //     PutBlock still returns success even when only one of the known
135 //     volumes is online.
136 //
137 func TestPutBlockOneVol(t *testing.T) {
138         defer teardown()
139
140         // Create two test Keep volumes, but cripple one of them.
141         KeepVolumes = setup(t, 2)
142         os.Chmod(KeepVolumes[0].(*UnixVolume).root, 000)
143
144         // Check that PutBlock stores the data as expected.
145         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
146                 t.Fatalf("PutBlock: %v", err)
147         }
148
149         result, err := GetBlock(TEST_HASH)
150         if err != nil {
151                 t.Fatalf("GetBlock: %v", err)
152         }
153         if string(result) != string(TEST_BLOCK) {
154                 t.Error("PutBlock/GetBlock mismatch")
155                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
156                         string(TEST_BLOCK), string(result))
157         }
158 }
159
160 // TestPutBlockMD5Fail
161 //     Check that PutBlock returns an error if passed a block and hash that
162 //     do not match.
163 //
164 func TestPutBlockMD5Fail(t *testing.T) {
165         defer teardown()
166
167         // Create two test Keep volumes.
168         KeepVolumes = setup(t, 2)
169
170         // Check that PutBlock returns the expected error when the hash does
171         // not match the block.
172         if err := PutBlock(BAD_BLOCK, TEST_HASH); err != MD5Error {
173                 t.Error("Expected MD5Error, got %v", err)
174         }
175
176         // Confirm that GetBlock fails to return anything.
177         if result, err := GetBlock(TEST_HASH); err != NotFoundError {
178                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
179                         string(result), err)
180         }
181 }
182
183 // TestPutBlockCorrupt
184 //     PutBlock should overwrite corrupt blocks on disk when given
185 //     a PUT request with a good block.
186 //
187 func TestPutBlockCorrupt(t *testing.T) {
188         defer teardown()
189
190         // Create two test Keep volumes.
191         KeepVolumes = setup(t, 2)
192
193         // Store a corrupted block under TEST_HASH.
194         store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
195         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
196                 t.Errorf("PutBlock: %v", err)
197         }
198
199         // The block on disk should now match TEST_BLOCK.
200         if block, err := GetBlock(TEST_HASH); err != nil {
201                 t.Errorf("GetBlock: %v", err)
202         } else if bytes.Compare(block, TEST_BLOCK) != 0 {
203                 t.Errorf("GetBlock returned: '%s'", string(block))
204         }
205 }
206
207 // PutBlockCollision
208 //     PutBlock returns a 400 Collision error when attempting to
209 //     store a block that collides with another block on disk.
210 //
211 func TestPutBlockCollision(t *testing.T) {
212         defer teardown()
213
214         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
215         var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
216         var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
217         var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
218
219         // Prepare two test Keep volumes. Store one block,
220         // then attempt to store the other.
221         KeepVolumes = setup(t, 2)
222         store(t, KeepVolumes[1], locator, b1)
223
224         if err := PutBlock(b2, locator); err == nil {
225                 t.Error("PutBlock did not report a collision")
226         } else if err != CollisionError {
227                 t.Errorf("PutBlock returned %v", err)
228         }
229 }
230
231 // ========================================
232 // FindKeepVolumes tests.
233 // ========================================
234
235 // TestFindKeepVolumes
236 //     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
237 //     directories at the top level.
238 //
239 func TestFindKeepVolumes(t *testing.T) {
240         defer teardown()
241
242         // Initialize two keep volumes.
243         var tempVols []Volume = setup(t, 2)
244
245         // Set up a bogus PROC_MOUNTS file.
246         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
247                 for _, vol := range tempVols {
248                         fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol.(*UnixVolume).root))
249                 }
250                 f.Close()
251                 PROC_MOUNTS = f.Name()
252
253                 // Check that FindKeepVolumes finds the temp volumes.
254                 resultVols := FindKeepVolumes()
255                 if len(tempVols) != len(resultVols) {
256                         t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
257                                 len(tempVols), len(resultVols))
258                 }
259                 for i := range tempVols {
260                         tempVolRoot := tempVols[i].(*UnixVolume).root
261                         if tempVolRoot != resultVols[i] {
262                                 t.Errorf("FindKeepVolumes returned %s, expected %s\n",
263                                         tempVolRoot, tempVols[i])
264                         }
265                 }
266
267                 os.Remove(f.Name())
268         }
269 }
270
271 // TestFindKeepVolumesFail
272 //     When no Keep volumes are present, FindKeepVolumes returns an empty slice.
273 //
274 func TestFindKeepVolumesFail(t *testing.T) {
275         defer teardown()
276
277         // Set up a bogus PROC_MOUNTS file with no Keep vols.
278         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
279                 fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
280                 fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
281                 fmt.Fprintln(f, "proc /proc proc opts 0 0")
282                 fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
283                 fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
284                 f.Close()
285                 PROC_MOUNTS = f.Name()
286
287                 // Check that FindKeepVolumes returns an empty array.
288                 resultVols := FindKeepVolumes()
289                 if len(resultVols) != 0 {
290                         t.Fatalf("FindKeepVolumes returned %v", resultVols)
291                 }
292
293                 os.Remove(PROC_MOUNTS)
294         }
295 }
296
297 // TestIndex
298 //     Test an /index request.
299 func TestIndex(t *testing.T) {
300         defer teardown()
301
302         // Set up Keep volumes and populate them.
303         // Include multiple blocks on different volumes, and
304         // some metadata files.
305         KeepVolumes = setup(t, 2)
306         store(t, KeepVolumes[0], TEST_HASH, TEST_BLOCK)
307         store(t, KeepVolumes[1], TEST_HASH_2, TEST_BLOCK_2)
308         store(t, KeepVolumes[0], TEST_HASH_3, TEST_BLOCK_3)
309         store(t, KeepVolumes[0], TEST_HASH+".meta", []byte("metadata"))
310         store(t, KeepVolumes[1], TEST_HASH_2+".meta", []byte("metadata"))
311
312         index := KeepVolumes[0].Index("") + KeepVolumes[1].Index("")
313         expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
314                 TEST_HASH_3 + `\+\d+ \d+\n` +
315                 TEST_HASH_2 + `\+\d+ \d+\n$`
316
317         match, err := regexp.MatchString(expected, index)
318         if err == nil {
319                 if !match {
320                         t.Errorf("IndexLocators returned:\n-----\n%s-----\n", index)
321                 }
322         } else {
323                 t.Errorf("regexp.MatchString: %s", err)
324         }
325 }
326
327 // TestNodeStatus
328 //     Test that GetNodeStatus returns valid info about available volumes.
329 //
330 //     TODO(twp): set up appropriate interfaces to permit more rigorous
331 //     testing.
332 //
333 func TestNodeStatus(t *testing.T) {
334         defer teardown()
335
336         // Set up test Keep volumes.
337         KeepVolumes = setup(t, 2)
338
339         // Get node status and make a basic sanity check.
340         st := GetNodeStatus()
341         for i, vol := range KeepVolumes {
342                 volinfo := st.Volumes[i]
343                 mtp := volinfo.MountPoint
344                 if mtp != vol.(*UnixVolume).root {
345                         t.Errorf("GetNodeStatus mount_point %s != KeepVolume %s", mtp, vol)
346                 }
347                 if volinfo.DeviceNum == 0 {
348                         t.Errorf("uninitialized device_num in %v", volinfo)
349                 }
350                 if volinfo.BytesFree == 0 {
351                         t.Errorf("uninitialized bytes_free in %v", volinfo)
352                 }
353                 if volinfo.BytesUsed == 0 {
354                         t.Errorf("uninitialized bytes_used in %v", volinfo)
355                 }
356         }
357 }
358
359 // ========================================
360 // Helper functions for unit tests.
361 // ========================================
362
363 // setup
364 //     Create KeepVolumes for testing.
365 //     Returns a slice of pathnames to temporary Keep volumes.
366 //
367 func setup(t *testing.T, num_volumes int) []Volume {
368         vols := make([]Volume, num_volumes)
369         for i := range vols {
370                 if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
371                         root := dir + "/keep"
372                         vols[i] = &UnixVolume{root, nil}
373                         os.Mkdir(root, 0755)
374                 } else {
375                         t.Fatal(err)
376                 }
377         }
378         return vols
379 }
380
381 // teardown
382 //     Cleanup to perform after each test.
383 //
384 func teardown() {
385         for _, vol := range KeepVolumes {
386                 os.RemoveAll(path.Dir(vol.(*UnixVolume).root))
387         }
388         KeepVolumes = nil
389 }
390
391 // store
392 //     Low-level code to write Keep blocks directly to disk for testing.
393 //     Note: works only on UnixVolumes.
394 //
395 func store(t *testing.T, vol Volume, filename string, block []byte) {
396         blockdir := fmt.Sprintf("%s/%s", vol.(*UnixVolume).root, filename[:3])
397         if err := os.MkdirAll(blockdir, 0755); err != nil {
398                 t.Fatal(err)
399         }
400
401         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
402         if f, err := os.Create(blockpath); err == nil {
403                 f.Write(block)
404                 f.Close()
405         } else {
406                 t.Fatal(err)
407         }
408 }