Merge branch '2449-keep-write-blocks' into 2449-keep-index-status-handlers
[arvados.git] / services / keep / keep_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "testing"
11 )
12
13 var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
14 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
15
16 var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
17 var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
18
19 var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
20 var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
21
22 // BAD_BLOCK is used to test collisions and corruption.
23 // It must not match any test hashes.
24 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
25
26 // TODO(twp): Tests still to be written
27 //
28 //   * PutBlockFull
29 //       - test that PutBlock returns 503 Full if the filesystem is full.
30 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
31 //
32 //   * PutBlockWriteErr
33 //       - test the behavior when Write returns an error.
34 //           - Possible solutions: use a small tmpfs and a high
35 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
36 //             to write a block larger than the amount of space left
37 //           - use an interface to mock ioutil.TempFile with a File
38 //             object that always returns an error on write
39 //
40 // ========================================
41 // GetBlock tests.
42 // ========================================
43
44 // TestGetBlock
45 //     Test that simple block reads succeed.
46 //
47 func TestGetBlock(t *testing.T) {
48         defer teardown()
49
50         // Prepare two test Keep volumes. Our block is stored on the second volume.
51         KeepVolumes = setup(t, 2)
52         store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
53
54         // Check that GetBlock returns success.
55         result, err := GetBlock(TEST_HASH)
56         if err != nil {
57                 t.Errorf("GetBlock error: %s", err)
58         }
59         if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
60                 t.Errorf("expected %s, got %s", TEST_BLOCK, result)
61         }
62 }
63
64 // TestGetBlockMissing
65 //     GetBlock must return an error when the block is not found.
66 //
67 func TestGetBlockMissing(t *testing.T) {
68         defer teardown()
69
70         // Create two empty test Keep volumes.
71         KeepVolumes = setup(t, 2)
72
73         // Check that GetBlock returns failure.
74         result, err := GetBlock(TEST_HASH)
75         if err == nil {
76                 t.Errorf("GetBlock incorrectly returned success: ", result)
77         } else {
78                 ke := err.(*KeepError)
79                 if ke.HTTPCode != ErrNotFound {
80                         t.Errorf("GetBlock: %v", ke)
81                 }
82         }
83 }
84
85 // TestGetBlockCorrupt
86 //     GetBlock must return an error when a corrupted block is requested
87 //     (the contents of the file do not checksum to its hash).
88 //
89 func TestGetBlockCorrupt(t *testing.T) {
90         defer teardown()
91
92         // Create two test Keep volumes and store a block in each of them,
93         // but the hash of the block does not match the filename.
94         KeepVolumes = setup(t, 2)
95         for _, vol := range KeepVolumes {
96                 store(t, vol, TEST_HASH, BAD_BLOCK)
97         }
98
99         // Check that GetBlock returns failure.
100         result, err := GetBlock(TEST_HASH)
101         if err == nil {
102                 t.Errorf("GetBlock incorrectly returned success: %s", result)
103         }
104 }
105
106 // ========================================
107 // PutBlock tests
108 // ========================================
109
110 // TestPutBlockOK
111 //     PutBlock can perform a simple block write and returns success.
112 //
113 func TestPutBlockOK(t *testing.T) {
114         defer teardown()
115
116         // Create two test Keep volumes.
117         KeepVolumes = setup(t, 2)
118
119         // Check that PutBlock stores the data as expected.
120         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
121                 t.Fatalf("PutBlock: %v", err)
122         }
123
124         result, err := GetBlock(TEST_HASH)
125         if err != nil {
126                 t.Fatalf("GetBlock: %s", err.Error())
127         }
128         if string(result) != string(TEST_BLOCK) {
129                 t.Error("PutBlock/GetBlock mismatch")
130                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
131                         string(TEST_BLOCK), string(result))
132         }
133 }
134
135 // TestPutBlockOneVol
136 //     PutBlock still returns success even when only one of the known
137 //     volumes is online.
138 //
139 func TestPutBlockOneVol(t *testing.T) {
140         defer teardown()
141
142         // Create two test Keep volumes, but cripple one of them.
143         KeepVolumes = setup(t, 2)
144         os.Chmod(KeepVolumes[0], 000)
145
146         // Check that PutBlock stores the data as expected.
147         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
148                 t.Fatalf("PutBlock: %v", err)
149         }
150
151         result, err := GetBlock(TEST_HASH)
152         if err != nil {
153                 t.Fatalf("GetBlock: %s", err.Error())
154         }
155         if string(result) != string(TEST_BLOCK) {
156                 t.Error("PutBlock/GetBlock mismatch")
157                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
158                         string(TEST_BLOCK), string(result))
159         }
160 }
161
162 // TestPutBlockMD5Fail
163 //     Check that PutBlock returns an error if passed a block and hash that
164 //     do not match.
165 //
166 func TestPutBlockMD5Fail(t *testing.T) {
167         defer teardown()
168
169         // Create two test Keep volumes.
170         KeepVolumes = setup(t, 2)
171
172         // Check that PutBlock returns the expected error when the hash does
173         // not match the block.
174         if err := PutBlock(BAD_BLOCK, TEST_HASH); err == nil {
175                 t.Error("PutBlock succeeded despite a block mismatch")
176         } else {
177                 ke := err.(*KeepError)
178                 if ke.HTTPCode != ErrMD5Fail {
179                         t.Errorf("PutBlock returned the wrong error (%v)", ke)
180                 }
181         }
182
183         // Confirm that GetBlock fails to return anything.
184         if result, err := GetBlock(TEST_HASH); err == nil {
185                 t.Errorf("GetBlock succeded after a corrupt block store, returned '%s'",
186                         string(result))
187         }
188 }
189
190 // TestPutBlockCorrupt
191 //     PutBlock should overwrite corrupt blocks on disk when given
192 //     a PUT request with a good block.
193 //
194 func TestPutBlockCorrupt(t *testing.T) {
195         defer teardown()
196
197         // Create two test Keep volumes.
198         KeepVolumes = setup(t, 2)
199
200         // Store a corrupted block under TEST_HASH.
201         store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
202         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
203                 t.Errorf("PutBlock: %v", err)
204         }
205
206         // The block on disk should now match TEST_BLOCK.
207         if block, err := GetBlock(TEST_HASH); err != nil {
208                 t.Errorf("GetBlock: %v", err)
209         } else if bytes.Compare(block, TEST_BLOCK) != 0 {
210                 t.Errorf("GetBlock returned: '%s'", string(block))
211         }
212 }
213
214 // PutBlockCollision
215 //     PutBlock returns a 400 Collision error when attempting to
216 //     store a block that collides with another block on disk.
217 //
218 func TestPutBlockCollision(t *testing.T) {
219         defer teardown()
220
221         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
222         var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
223         var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
224         var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
225
226         // Prepare two test Keep volumes. Store one block,
227         // then attempt to store the other.
228         KeepVolumes = setup(t, 2)
229         store(t, KeepVolumes[1], locator, b1)
230
231         if err := PutBlock(b2, locator); err == nil {
232                 t.Error("PutBlock did not report a collision")
233         } else if err.(*KeepError).HTTPCode != ErrCollision {
234                 t.Errorf("PutBlock returned %v", err)
235         }
236 }
237
238 // ========================================
239 // FindKeepVolumes tests.
240 // ========================================
241
242 // TestFindKeepVolumes
243 //     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
244 //     directories at the top level.
245 //
246 func TestFindKeepVolumes(t *testing.T) {
247         defer teardown()
248
249         // Initialize two keep volumes.
250         var tempVols []string = setup(t, 2)
251
252         // Set up a bogus PROC_MOUNTS file.
253         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
254                 for _, vol := range tempVols {
255                         fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
256                 }
257                 f.Close()
258                 PROC_MOUNTS = f.Name()
259
260                 // Check that FindKeepVolumes finds the temp volumes.
261                 resultVols := FindKeepVolumes()
262                 if len(tempVols) != len(resultVols) {
263                         t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
264                                 len(tempVols), len(resultVols))
265                 }
266                 for i := range tempVols {
267                         if tempVols[i] != resultVols[i] {
268                                 t.Errorf("FindKeepVolumes returned %s, expected %s\n",
269                                         resultVols[i], tempVols[i])
270                         }
271                 }
272
273                 os.Remove(f.Name())
274         }
275 }
276
277 // TestFindKeepVolumesFail
278 //     When no Keep volumes are present, FindKeepVolumes returns an empty slice.
279 //
280 func TestFindKeepVolumesFail(t *testing.T) {
281         defer teardown()
282
283         // Set up a bogus PROC_MOUNTS file with no Keep vols.
284         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
285                 fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
286                 fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
287                 fmt.Fprintln(f, "proc /proc proc opts 0 0")
288                 fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
289                 fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
290                 f.Close()
291                 PROC_MOUNTS = f.Name()
292
293                 // Check that FindKeepVolumes returns an empty array.
294                 resultVols := FindKeepVolumes()
295                 if len(resultVols) != 0 {
296                         t.Fatalf("FindKeepVolumes returned %v", resultVols)
297                 }
298
299                 os.Remove(PROC_MOUNTS)
300         }
301 }
302
303 // TestIndex
304 //     Test an /index request.
305 func TestIndex(t *testing.T) {
306         defer teardown()
307
308         // Set up Keep volumes and populate them.
309         KeepVolumes = setup(t, 2)
310         store(t, KeepVolumes[0], TEST_HASH, TEST_BLOCK)
311         store(t, KeepVolumes[1], TEST_HASH_2, TEST_BLOCK_2)
312         store(t, KeepVolumes[0], TEST_HASH_3, TEST_BLOCK_3)
313
314         index := IndexLocators("")
315         expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
316                 TEST_HASH_3 + `\+\d+ \d+\n` +
317                 TEST_HASH_2 + `\+\d+ \d+\n$`
318
319         match, err := regexp.MatchString(expected, index)
320         if err == nil {
321                 if !match {
322                         t.Errorf("IndexLocators returned:\n-----\n%s-----\n", index)
323                 }
324         } else {
325                 t.Errorf("regexp.MatchString: %s", err)
326         }
327 }
328
329 // ========================================
330 // Helper functions for unit tests.
331 // ========================================
332
333 // setup
334 //     Create KeepVolumes for testing.
335 //     Returns a slice of pathnames to temporary Keep volumes.
336 //
337 func setup(t *testing.T, num_volumes int) []string {
338         vols := make([]string, num_volumes)
339         for i := range vols {
340                 if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
341                         vols[i] = dir + "/keep"
342                         os.Mkdir(vols[i], 0755)
343                 } else {
344                         t.Fatal(err)
345                 }
346         }
347         return vols
348 }
349
350 // teardown
351 //     Cleanup to perform after each test.
352 //
353 func teardown() {
354         for _, vol := range KeepVolumes {
355                 os.RemoveAll(path.Dir(vol))
356         }
357         KeepVolumes = nil
358 }
359
360 // store
361 //     Low-level code to write Keep blocks directly to disk for testing.
362 //
363 func store(t *testing.T, keepdir string, filename string, block []byte) {
364         blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
365         if err := os.MkdirAll(blockdir, 0755); err != nil {
366                 t.Fatal(err)
367         }
368
369         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
370         if f, err := os.Create(blockpath); err == nil {
371                 f.Write(block)
372                 f.Close()
373         } else {
374                 t.Fatal(err)
375         }
376 }