2809: Merge branch 'master' into 2809-workbench-rails4 refs #2809
[arvados.git] / services / keep / src / keep / keep_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "testing"
11 )
12
13 var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
14 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
15
16 var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
17 var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
18
19 var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
20 var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
21
22 // BAD_BLOCK is used to test collisions and corruption.
23 // It must not match any test hashes.
24 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
25
26 // TODO(twp): Tests still to be written
27 //
28 //   * TestPutBlockFull
29 //       - test that PutBlock returns 503 Full if the filesystem is full.
30 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
31 //
32 //   * TestPutBlockWriteErr
33 //       - test the behavior when Write returns an error.
34 //           - Possible solutions: use a small tmpfs and a high
35 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
36 //             to write a block larger than the amount of space left
37 //           - use an interface to mock ioutil.TempFile with a File
38 //             object that always returns an error on write
39 //
40 // ========================================
41 // GetBlock tests.
42 // ========================================
43
44 // TestGetBlock
45 //     Test that simple block reads succeed.
46 //
47 func TestGetBlock(t *testing.T) {
48         defer teardown()
49
50         // Prepare two test Keep volumes. Our block is stored on the second volume.
51         KeepVM = MakeTestVolumeManager(2)
52         defer func() { KeepVM.Quit() }()
53
54         vols := KeepVM.Volumes()
55         if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
56                 t.Error(err)
57         }
58
59         // Check that GetBlock returns success.
60         result, err := GetBlock(TEST_HASH)
61         if err != nil {
62                 t.Errorf("GetBlock error: %s", err)
63         }
64         if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
65                 t.Errorf("expected %s, got %s", TEST_BLOCK, result)
66         }
67 }
68
69 // TestGetBlockMissing
70 //     GetBlock must return an error when the block is not found.
71 //
72 func TestGetBlockMissing(t *testing.T) {
73         defer teardown()
74
75         // Create two empty test Keep volumes.
76         KeepVM = MakeTestVolumeManager(2)
77         defer func() { KeepVM.Quit() }()
78
79         // Check that GetBlock returns failure.
80         result, err := GetBlock(TEST_HASH)
81         if err != NotFoundError {
82                 t.Errorf("Expected NotFoundError, got %v", result)
83         }
84 }
85
86 // TestGetBlockCorrupt
87 //     GetBlock must return an error when a corrupted block is requested
88 //     (the contents of the file do not checksum to its hash).
89 //
90 func TestGetBlockCorrupt(t *testing.T) {
91         defer teardown()
92
93         // Create two test Keep volumes and store a corrupt block in one.
94         KeepVM = MakeTestVolumeManager(2)
95         defer func() { KeepVM.Quit() }()
96
97         vols := KeepVM.Volumes()
98         vols[0].Put(TEST_HASH, BAD_BLOCK)
99
100         // Check that GetBlock returns failure.
101         result, err := GetBlock(TEST_HASH)
102         if err != CorruptError {
103                 t.Errorf("Expected CorruptError, got %v (buf: %v)", err, result)
104         }
105 }
106
107 // ========================================
108 // PutBlock tests
109 // ========================================
110
111 // TestPutBlockOK
112 //     PutBlock can perform a simple block write and returns success.
113 //
114 func TestPutBlockOK(t *testing.T) {
115         defer teardown()
116
117         // Create two test Keep volumes.
118         KeepVM = MakeTestVolumeManager(2)
119         defer func() { KeepVM.Quit() }()
120
121         // Check that PutBlock stores the data as expected.
122         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
123                 t.Fatalf("PutBlock: %v", err)
124         }
125
126         vols := KeepVM.Volumes()
127         result, err := vols[0].Get(TEST_HASH)
128         if err != nil {
129                 t.Fatalf("Volume #0 Get returned error: %v", err)
130         }
131         if string(result) != string(TEST_BLOCK) {
132                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
133                         string(TEST_BLOCK), string(result))
134         }
135 }
136
137 // TestPutBlockOneVol
138 //     PutBlock still returns success even when only one of the known
139 //     volumes is online.
140 //
141 func TestPutBlockOneVol(t *testing.T) {
142         defer teardown()
143
144         // Create two test Keep volumes, but cripple one of them.
145         KeepVM = MakeTestVolumeManager(2)
146         defer func() { KeepVM.Quit() }()
147
148         vols := KeepVM.Volumes()
149         vols[0].(*MockVolume).Bad = true
150
151         // Check that PutBlock stores the data as expected.
152         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
153                 t.Fatalf("PutBlock: %v", err)
154         }
155
156         result, err := GetBlock(TEST_HASH)
157         if err != nil {
158                 t.Fatalf("GetBlock: %v", err)
159         }
160         if string(result) != string(TEST_BLOCK) {
161                 t.Error("PutBlock/GetBlock mismatch")
162                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
163                         string(TEST_BLOCK), string(result))
164         }
165 }
166
167 // TestPutBlockMD5Fail
168 //     Check that PutBlock returns an error if passed a block and hash that
169 //     do not match.
170 //
171 func TestPutBlockMD5Fail(t *testing.T) {
172         defer teardown()
173
174         // Create two test Keep volumes.
175         KeepVM = MakeTestVolumeManager(2)
176         defer func() { KeepVM.Quit() }()
177
178         // Check that PutBlock returns the expected error when the hash does
179         // not match the block.
180         if err := PutBlock(BAD_BLOCK, TEST_HASH); err != MD5Error {
181                 t.Error("Expected MD5Error, got %v", err)
182         }
183
184         // Confirm that GetBlock fails to return anything.
185         if result, err := GetBlock(TEST_HASH); err != NotFoundError {
186                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
187                         string(result), err)
188         }
189 }
190
191 // TestPutBlockCorrupt
192 //     PutBlock should overwrite corrupt blocks on disk when given
193 //     a PUT request with a good block.
194 //
195 func TestPutBlockCorrupt(t *testing.T) {
196         defer teardown()
197
198         // Create two test Keep volumes.
199         KeepVM = MakeTestVolumeManager(2)
200         defer func() { KeepVM.Quit() }()
201
202         // Store a corrupted block under TEST_HASH.
203         vols := KeepVM.Volumes()
204         vols[0].Put(TEST_HASH, BAD_BLOCK)
205         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
206                 t.Errorf("PutBlock: %v", err)
207         }
208
209         // The block on disk should now match TEST_BLOCK.
210         if block, err := GetBlock(TEST_HASH); err != nil {
211                 t.Errorf("GetBlock: %v", err)
212         } else if bytes.Compare(block, TEST_BLOCK) != 0 {
213                 t.Errorf("GetBlock returned: '%s'", string(block))
214         }
215 }
216
217 // PutBlockCollision
218 //     PutBlock returns a 400 Collision error when attempting to
219 //     store a block that collides with another block on disk.
220 //
221 func TestPutBlockCollision(t *testing.T) {
222         defer teardown()
223
224         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
225         var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
226         var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
227         var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
228
229         // Prepare two test Keep volumes.
230         KeepVM = MakeTestVolumeManager(2)
231         defer func() { KeepVM.Quit() }()
232
233         // Store one block, then attempt to store the other. Confirm that
234         // PutBlock reported a CollisionError.
235         if err := PutBlock(b1, locator); err != nil {
236                 t.Error(err)
237         }
238         if err := PutBlock(b2, locator); err == nil {
239                 t.Error("PutBlock did not report a collision")
240         } else if err != CollisionError {
241                 t.Errorf("PutBlock returned %v", err)
242         }
243 }
244
245 // ========================================
246 // FindKeepVolumes tests.
247 // ========================================
248
249 // TestFindKeepVolumes
250 //     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
251 //     directories at the top level.
252 //
253 func TestFindKeepVolumes(t *testing.T) {
254         var tempVols [2]string
255         var err error
256
257         defer func() {
258                 for _, path := range tempVols {
259                         os.RemoveAll(path)
260                 }
261         }()
262
263         // Create two directories suitable for using as keep volumes.
264         for i := range tempVols {
265                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
266                         t.Fatal(err)
267                 }
268                 tempVols[i] = tempVols[i] + "/keep"
269                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
270                         t.Fatal(err)
271                 }
272         }
273
274         // Set up a bogus PROC_MOUNTS file.
275         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
276                 for _, vol := range tempVols {
277                         fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
278                 }
279                 f.Close()
280                 PROC_MOUNTS = f.Name()
281
282                 // Check that FindKeepVolumes finds the temp volumes.
283                 resultVols := FindKeepVolumes()
284                 if len(tempVols) != len(resultVols) {
285                         t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
286                                 len(tempVols), len(resultVols))
287                 }
288                 for i := range tempVols {
289                         if tempVols[i] != resultVols[i] {
290                                 t.Errorf("FindKeepVolumes returned %s, expected %s\n",
291                                         resultVols[i], tempVols[i])
292                         }
293                 }
294
295                 os.Remove(f.Name())
296         }
297 }
298
299 // TestFindKeepVolumesFail
300 //     When no Keep volumes are present, FindKeepVolumes returns an empty slice.
301 //
302 func TestFindKeepVolumesFail(t *testing.T) {
303         defer teardown()
304
305         // Set up a bogus PROC_MOUNTS file with no Keep vols.
306         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
307                 fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
308                 fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
309                 fmt.Fprintln(f, "proc /proc proc opts 0 0")
310                 fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
311                 fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
312                 f.Close()
313                 PROC_MOUNTS = f.Name()
314
315                 // Check that FindKeepVolumes returns an empty array.
316                 resultVols := FindKeepVolumes()
317                 if len(resultVols) != 0 {
318                         t.Fatalf("FindKeepVolumes returned %v", resultVols)
319                 }
320
321                 os.Remove(PROC_MOUNTS)
322         }
323 }
324
325 // TestIndex
326 //     Test an /index request.
327 func TestIndex(t *testing.T) {
328         defer teardown()
329
330         // Set up Keep volumes and populate them.
331         // Include multiple blocks on different volumes, and
332         // some metadata files.
333         KeepVM = MakeTestVolumeManager(2)
334         defer func() { KeepVM.Quit() }()
335
336         vols := KeepVM.Volumes()
337         vols[0].Put(TEST_HASH, TEST_BLOCK)
338         vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
339         vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
340         vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
341         vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
342
343         index := vols[0].Index("") + vols[1].Index("")
344         expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
345                 TEST_HASH_3 + `\+\d+ \d+\n` +
346                 TEST_HASH_2 + `\+\d+ \d+\n$`
347
348         match, err := regexp.MatchString(expected, index)
349         if err == nil {
350                 if !match {
351                         t.Errorf("IndexLocators returned:\n%s", index)
352                 }
353         } else {
354                 t.Errorf("regexp.MatchString: %s", err)
355         }
356 }
357
358 // TestNodeStatus
359 //     Test that GetNodeStatus returns valid info about available volumes.
360 //
361 //     TODO(twp): set up appropriate interfaces to permit more rigorous
362 //     testing.
363 //
364 func TestNodeStatus(t *testing.T) {
365         defer teardown()
366
367         // Set up test Keep volumes with some blocks.
368         KeepVM = MakeTestVolumeManager(2)
369         defer func() { KeepVM.Quit() }()
370
371         vols := KeepVM.Volumes()
372         vols[0].Put(TEST_HASH, TEST_BLOCK)
373         vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
374
375         // Get node status and make a basic sanity check.
376         st := GetNodeStatus()
377         for i := range vols {
378                 volinfo := st.Volumes[i]
379                 mtp := volinfo.MountPoint
380                 if mtp != "/bogo" {
381                         t.Errorf("GetNodeStatus mount_point %s, expected /bogo", mtp)
382                 }
383                 if volinfo.DeviceNum == 0 {
384                         t.Errorf("uninitialized device_num in %v", volinfo)
385                 }
386                 if volinfo.BytesFree == 0 {
387                         t.Errorf("uninitialized bytes_free in %v", volinfo)
388                 }
389                 if volinfo.BytesUsed == 0 {
390                         t.Errorf("uninitialized bytes_used in %v", volinfo)
391                 }
392         }
393 }
394
395 // ========================================
396 // Helper functions for unit tests.
397 // ========================================
398
399 // MakeTestVolumeManager
400 //     Creates and returns a RRVolumeManager with the specified number
401 //     of MockVolumes.
402 //
403 func MakeTestVolumeManager(num_volumes int) VolumeManager {
404         vols := make([]Volume, num_volumes)
405         for i := range vols {
406                 vols[i] = CreateMockVolume()
407         }
408         return MakeRRVolumeManager(vols)
409 }
410
411 // teardown
412 //     Cleanup to perform after each test.
413 //
414 func teardown() {
415         data_manager_token = ""
416         enforce_permissions = false
417         PermissionSecret = nil
418         KeepVM = nil
419 }