Merge branch '2449-keep-write-blocks' into 2449-keep-index-status-handlers
[arvados.git] / services / keep / keep_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "testing"
11 )
12
13 var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
14 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
15
16 var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
17 var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
18
19 var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
20 var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
21
22 // BAD_BLOCK is used to test collisions and corruption.
23 // It must not match any test hashes.
24 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
25
26 // TODO(twp): Tests still to be written
27 //
28 //   * TestPutBlockFull
29 //       - test that PutBlock returns 503 Full if the filesystem is full.
30 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
31 //
32 //   * TestPutBlockWriteErr
33 //       - test the behavior when Write returns an error.
34 //           - Possible solutions: use a small tmpfs and a high
35 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
36 //             to write a block larger than the amount of space left
37 //           - use an interface to mock ioutil.TempFile with a File
38 //             object that always returns an error on write
39 //
40 //   * TestNodeStatus
41 //       - test that GetNodeStatus returns a structure with expected
42 //         values: need to mock FreeDiskSpace or Statfs, or use a tmpfs
43 //
44 // ========================================
45 // GetBlock tests.
46 // ========================================
47
48 // TestGetBlock
49 //     Test that simple block reads succeed.
50 //
51 func TestGetBlock(t *testing.T) {
52         defer teardown()
53
54         // Prepare two test Keep volumes. Our block is stored on the second volume.
55         KeepVolumes = setup(t, 2)
56         store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
57
58         // Check that GetBlock returns success.
59         result, err := GetBlock(TEST_HASH)
60         if err != nil {
61                 t.Errorf("GetBlock error: %s", err)
62         }
63         if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
64                 t.Errorf("expected %s, got %s", TEST_BLOCK, result)
65         }
66 }
67
68 // TestGetBlockMissing
69 //     GetBlock must return an error when the block is not found.
70 //
71 func TestGetBlockMissing(t *testing.T) {
72         defer teardown()
73
74         // Create two empty test Keep volumes.
75         KeepVolumes = setup(t, 2)
76
77         // Check that GetBlock returns failure.
78         result, err := GetBlock(TEST_HASH)
79         if err != NotFoundError {
80                 t.Errorf("Expected NotFoundError, got %v", result)
81         }
82 }
83
84 // TestGetBlockCorrupt
85 //     GetBlock must return an error when a corrupted block is requested
86 //     (the contents of the file do not checksum to its hash).
87 //
88 func TestGetBlockCorrupt(t *testing.T) {
89         defer teardown()
90
91         // Create two test Keep volumes and store a block in each of them,
92         // but the hash of the block does not match the filename.
93         KeepVolumes = setup(t, 2)
94         for _, vol := range KeepVolumes {
95                 store(t, vol, TEST_HASH, BAD_BLOCK)
96         }
97
98         // Check that GetBlock returns failure.
99         result, err := GetBlock(TEST_HASH)
100         if err != CorruptError {
101                 t.Errorf("Expected CorruptError, got %v", result)
102         }
103 }
104
105 // ========================================
106 // PutBlock tests
107 // ========================================
108
109 // TestPutBlockOK
110 //     PutBlock can perform a simple block write and returns success.
111 //
112 func TestPutBlockOK(t *testing.T) {
113         defer teardown()
114
115         // Create two test Keep volumes.
116         KeepVolumes = setup(t, 2)
117
118         // Check that PutBlock stores the data as expected.
119         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
120                 t.Fatalf("PutBlock: %v", err)
121         }
122
123         result, err := GetBlock(TEST_HASH)
124         if err != nil {
125                 t.Fatalf("GetBlock returned error: %v", err)
126         }
127         if string(result) != string(TEST_BLOCK) {
128                 t.Error("PutBlock/GetBlock mismatch")
129                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
130                         string(TEST_BLOCK), string(result))
131         }
132 }
133
134 // TestPutBlockOneVol
135 //     PutBlock still returns success even when only one of the known
136 //     volumes is online.
137 //
138 func TestPutBlockOneVol(t *testing.T) {
139         defer teardown()
140
141         // Create two test Keep volumes, but cripple one of them.
142         KeepVolumes = setup(t, 2)
143         os.Chmod(KeepVolumes[0], 000)
144
145         // Check that PutBlock stores the data as expected.
146         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
147                 t.Fatalf("PutBlock: %v", err)
148         }
149
150         result, err := GetBlock(TEST_HASH)
151         if err != nil {
152                 t.Fatalf("GetBlock: %v", err)
153         }
154         if string(result) != string(TEST_BLOCK) {
155                 t.Error("PutBlock/GetBlock mismatch")
156                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
157                         string(TEST_BLOCK), string(result))
158         }
159 }
160
161 // TestPutBlockMD5Fail
162 //     Check that PutBlock returns an error if passed a block and hash that
163 //     do not match.
164 //
165 func TestPutBlockMD5Fail(t *testing.T) {
166         defer teardown()
167
168         // Create two test Keep volumes.
169         KeepVolumes = setup(t, 2)
170
171         // Check that PutBlock returns the expected error when the hash does
172         // not match the block.
173         if err := PutBlock(BAD_BLOCK, TEST_HASH); err != MD5Error {
174                 t.Error("Expected MD5Error, got %v", err)
175         }
176
177         // Confirm that GetBlock fails to return anything.
178         if result, err := GetBlock(TEST_HASH); err != NotFoundError {
179                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
180                         string(result), err)
181         }
182 }
183
184 // TestPutBlockCorrupt
185 //     PutBlock should overwrite corrupt blocks on disk when given
186 //     a PUT request with a good block.
187 //
188 func TestPutBlockCorrupt(t *testing.T) {
189         defer teardown()
190
191         // Create two test Keep volumes.
192         KeepVolumes = setup(t, 2)
193
194         // Store a corrupted block under TEST_HASH.
195         store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
196         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
197                 t.Errorf("PutBlock: %v", err)
198         }
199
200         // The block on disk should now match TEST_BLOCK.
201         if block, err := GetBlock(TEST_HASH); err != nil {
202                 t.Errorf("GetBlock: %v", err)
203         } else if bytes.Compare(block, TEST_BLOCK) != 0 {
204                 t.Errorf("GetBlock returned: '%s'", string(block))
205         }
206 }
207
208 // PutBlockCollision
209 //     PutBlock returns a 400 Collision error when attempting to
210 //     store a block that collides with another block on disk.
211 //
212 func TestPutBlockCollision(t *testing.T) {
213         defer teardown()
214
215         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
216         var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
217         var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
218         var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
219
220         // Prepare two test Keep volumes. Store one block,
221         // then attempt to store the other.
222         KeepVolumes = setup(t, 2)
223         store(t, KeepVolumes[1], locator, b1)
224
225         if err := PutBlock(b2, locator); err == nil {
226                 t.Error("PutBlock did not report a collision")
227         } else if err != CollisionError {
228                 t.Errorf("PutBlock returned %v", err)
229         }
230 }
231
232 // ========================================
233 // FindKeepVolumes tests.
234 // ========================================
235
236 // TestFindKeepVolumes
237 //     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
238 //     directories at the top level.
239 //
240 func TestFindKeepVolumes(t *testing.T) {
241         defer teardown()
242
243         // Initialize two keep volumes.
244         var tempVols []string = setup(t, 2)
245
246         // Set up a bogus PROC_MOUNTS file.
247         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
248                 for _, vol := range tempVols {
249                         fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
250                 }
251                 f.Close()
252                 PROC_MOUNTS = f.Name()
253
254                 // Check that FindKeepVolumes finds the temp volumes.
255                 resultVols := FindKeepVolumes()
256                 if len(tempVols) != len(resultVols) {
257                         t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
258                                 len(tempVols), len(resultVols))
259                 }
260                 for i := range tempVols {
261                         if tempVols[i] != resultVols[i] {
262                                 t.Errorf("FindKeepVolumes returned %s, expected %s\n",
263                                         resultVols[i], tempVols[i])
264                         }
265                 }
266
267                 os.Remove(f.Name())
268         }
269 }
270
271 // TestFindKeepVolumesFail
272 //     When no Keep volumes are present, FindKeepVolumes returns an empty slice.
273 //
274 func TestFindKeepVolumesFail(t *testing.T) {
275         defer teardown()
276
277         // Set up a bogus PROC_MOUNTS file with no Keep vols.
278         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
279                 fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
280                 fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
281                 fmt.Fprintln(f, "proc /proc proc opts 0 0")
282                 fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
283                 fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
284                 f.Close()
285                 PROC_MOUNTS = f.Name()
286
287                 // Check that FindKeepVolumes returns an empty array.
288                 resultVols := FindKeepVolumes()
289                 if len(resultVols) != 0 {
290                         t.Fatalf("FindKeepVolumes returned %v", resultVols)
291                 }
292
293                 os.Remove(PROC_MOUNTS)
294         }
295 }
296
297 // TestIndex
298 //     Test an /index request.
299 func TestIndex(t *testing.T) {
300         defer teardown()
301
302         // Set up Keep volumes and populate them.
303         KeepVolumes = setup(t, 2)
304         store(t, KeepVolumes[0], TEST_HASH, TEST_BLOCK)
305         store(t, KeepVolumes[1], TEST_HASH_2, TEST_BLOCK_2)
306         store(t, KeepVolumes[0], TEST_HASH_3, TEST_BLOCK_3)
307
308         index := IndexLocators("")
309         expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
310                 TEST_HASH_3 + `\+\d+ \d+\n` +
311                 TEST_HASH_2 + `\+\d+ \d+\n$`
312
313         match, err := regexp.MatchString(expected, index)
314         if err == nil {
315                 if !match {
316                         t.Errorf("IndexLocators returned:\n-----\n%s-----\n", index)
317                 }
318         } else {
319                 t.Errorf("regexp.MatchString: %s", err)
320         }
321 }
322
323 // ========================================
324 // Helper functions for unit tests.
325 // ========================================
326
327 // setup
328 //     Create KeepVolumes for testing.
329 //     Returns a slice of pathnames to temporary Keep volumes.
330 //
331 func setup(t *testing.T, num_volumes int) []string {
332         vols := make([]string, num_volumes)
333         for i := range vols {
334                 if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
335                         vols[i] = dir + "/keep"
336                         os.Mkdir(vols[i], 0755)
337                 } else {
338                         t.Fatal(err)
339                 }
340         }
341         return vols
342 }
343
344 // teardown
345 //     Cleanup to perform after each test.
346 //
347 func teardown() {
348         for _, vol := range KeepVolumes {
349                 os.RemoveAll(path.Dir(vol))
350         }
351         KeepVolumes = nil
352 }
353
354 // store
355 //     Low-level code to write Keep blocks directly to disk for testing.
356 //
357 func store(t *testing.T, keepdir string, filename string, block []byte) {
358         blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
359         if err := os.MkdirAll(blockdir, 0755); err != nil {
360                 t.Fatal(err)
361         }
362
363         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
364         if f, err := os.Create(blockpath); err == nil {
365                 f.Write(block)
366                 f.Close()
367         } else {
368                 t.Fatal(err)
369         }
370 }