Merge branch '2449-keep-write-blocks' into 2449-keep-index-status-handlers
[arvados.git] / services / keep / keep_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "testing"
11 )
12
13 var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
14 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
15
16 var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
17 var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
18
19 var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
20 var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
21
22 // BAD_BLOCK is used to test collisions and corruption.
23 // It must not match any test hashes.
24 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
25
26 // TODO(twp): Tests still to be written
27 //
28 //   * PutBlockCollision
29 //       - test that PutBlock(BLOCK, HASH) reports a collision. HASH must
30 //         be present in Keep and identify a block which sums to HASH but
31 //         which does not match BLOCK. (Requires an interface to mock MD5.)
32 //
33 //   * PutBlockFull
34 //       - test that PutBlock returns 503 Full if the filesystem is full.
35 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
36
37 // ========================================
38 // GetBlock tests.
39 // ========================================
40
41 // TestGetBlock
42 //     Test that simple block reads succeed.
43 //
44 func TestGetBlock(t *testing.T) {
45         defer teardown()
46
47         // Prepare two test Keep volumes. Our block is stored on the second volume.
48         KeepVolumes = setup(t, 2)
49         store(t, KeepVolumes[1], TEST_HASH, TEST_BLOCK)
50
51         // Check that GetBlock returns success.
52         result, err := GetBlock(TEST_HASH)
53         if err != nil {
54                 t.Errorf("GetBlock error: %s", err)
55         }
56         if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
57                 t.Errorf("expected %s, got %s", TEST_BLOCK, result)
58         }
59 }
60
61 // TestGetBlockMissing
62 //     GetBlock must return an error when the block is not found.
63 //
64 func TestGetBlockMissing(t *testing.T) {
65         defer teardown()
66
67         // Create two empty test Keep volumes.
68         KeepVolumes = setup(t, 2)
69
70         // Check that GetBlock returns failure.
71         result, err := GetBlock(TEST_HASH)
72         if err == nil {
73                 t.Errorf("GetBlock incorrectly returned success: ", result)
74         } else {
75                 ke := err.(*KeepError)
76                 if ke.HTTPCode != ErrNotFound {
77                         t.Errorf("GetBlock: %v", ke)
78                 }
79         }
80 }
81
82 // TestGetBlockCorrupt
83 //     GetBlock must return an error when a corrupted block is requested
84 //     (the contents of the file do not checksum to its hash).
85 //
86 func TestGetBlockCorrupt(t *testing.T) {
87         defer teardown()
88
89         // Create two test Keep volumes and store a block in each of them,
90         // but the hash of the block does not match the filename.
91         KeepVolumes = setup(t, 2)
92         for _, vol := range KeepVolumes {
93                 store(t, vol, TEST_HASH, BAD_BLOCK)
94         }
95
96         // Check that GetBlock returns failure.
97         result, err := GetBlock(TEST_HASH)
98         if err == nil {
99                 t.Errorf("GetBlock incorrectly returned success: %s", result)
100         }
101 }
102
103 // ========================================
104 // PutBlock tests
105 // ========================================
106
107 // TestPutBlockOK
108 //     PutBlock can perform a simple block write and returns success.
109 //
110 func TestPutBlockOK(t *testing.T) {
111         defer teardown()
112
113         // Create two test Keep volumes.
114         KeepVolumes = setup(t, 2)
115
116         // Check that PutBlock stores the data as expected.
117         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
118                 t.Fatalf("PutBlock: %v", err)
119         }
120
121         result, err := GetBlock(TEST_HASH)
122         if err != nil {
123                 t.Fatalf("GetBlock: %s", err.Error())
124         }
125         if string(result) != string(TEST_BLOCK) {
126                 t.Error("PutBlock/GetBlock mismatch")
127                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
128                         string(TEST_BLOCK), string(result))
129         }
130 }
131
132 // TestPutBlockOneVol
133 //     PutBlock still returns success even when only one of the known
134 //     volumes is online.
135 //
136 func TestPutBlockOneVol(t *testing.T) {
137         defer teardown()
138
139         // Create two test Keep volumes, but cripple one of them.
140         KeepVolumes = setup(t, 2)
141         os.Chmod(KeepVolumes[0], 000)
142
143         // Check that PutBlock stores the data as expected.
144         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
145                 t.Fatalf("PutBlock: %v", err)
146         }
147
148         result, err := GetBlock(TEST_HASH)
149         if err != nil {
150                 t.Fatalf("GetBlock: %s", err.Error())
151         }
152         if string(result) != string(TEST_BLOCK) {
153                 t.Error("PutBlock/GetBlock mismatch")
154                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
155                         string(TEST_BLOCK), string(result))
156         }
157 }
158
159 // TestPutBlockMD5Fail
160 //     Check that PutBlock returns an error if passed a block and hash that
161 //     do not match.
162 //
163 func TestPutBlockMD5Fail(t *testing.T) {
164         defer teardown()
165
166         // Create two test Keep volumes.
167         KeepVolumes = setup(t, 2)
168
169         // Check that PutBlock returns the expected error when the hash does
170         // not match the block.
171         if err := PutBlock(BAD_BLOCK, TEST_HASH); err == nil {
172                 t.Error("PutBlock succeeded despite a block mismatch")
173         } else {
174                 ke := err.(*KeepError)
175                 if ke.HTTPCode != ErrMD5Fail {
176                         t.Errorf("PutBlock returned the wrong error (%v)", ke)
177                 }
178         }
179
180         // Confirm that GetBlock fails to return anything.
181         if result, err := GetBlock(TEST_HASH); err == nil {
182                 t.Errorf("GetBlock succeded after a corrupt block store, returned '%s'",
183                         string(result))
184         }
185 }
186
187 // TestPutBlockCorrupt
188 //     PutBlock should overwrite corrupt blocks on disk when given
189 //     a PUT request with a good block.
190 //
191 func TestPutBlockCorrupt(t *testing.T) {
192         defer teardown()
193
194         // Create two test Keep volumes.
195         KeepVolumes = setup(t, 2)
196
197         // Store a corrupted block under TEST_HASH.
198         store(t, KeepVolumes[0], TEST_HASH, BAD_BLOCK)
199         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
200                 t.Errorf("PutBlock: %v", err)
201         }
202
203         // The block on disk should now match TEST_BLOCK.
204         if block, err := GetBlock(TEST_HASH); err != nil {
205                 t.Errorf("GetBlock: %v", err)
206         } else if bytes.Compare(block, TEST_BLOCK) != 0 {
207                 t.Errorf("GetBlock returned: '%s'", string(block))
208         }
209 }
210
211 // ========================================
212 // FindKeepVolumes tests.
213 // ========================================
214
215 // TestFindKeepVolumes
216 //     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
217 //     directories at the top level.
218 //
219 func TestFindKeepVolumes(t *testing.T) {
220         defer teardown()
221
222         // Initialize two keep volumes.
223         var tempVols []string = setup(t, 2)
224
225         // Set up a bogus PROC_MOUNTS file.
226         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
227                 for _, vol := range tempVols {
228                         fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
229                 }
230                 f.Close()
231                 PROC_MOUNTS = f.Name()
232
233                 // Check that FindKeepVolumes finds the temp volumes.
234                 resultVols := FindKeepVolumes()
235                 if len(tempVols) != len(resultVols) {
236                         t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
237                                 len(tempVols), len(resultVols))
238                 }
239                 for i := range tempVols {
240                         if tempVols[i] != resultVols[i] {
241                                 t.Errorf("FindKeepVolumes returned %s, expected %s\n",
242                                         resultVols[i], tempVols[i])
243                         }
244                 }
245
246                 os.Remove(f.Name())
247         }
248 }
249
250 // TestFindKeepVolumesFail
251 //     When no Keep volumes are present, FindKeepVolumes returns an empty slice.
252 //
253 func TestFindKeepVolumesFail(t *testing.T) {
254         defer teardown()
255
256         // Set up a bogus PROC_MOUNTS file with no Keep vols.
257         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
258                 fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
259                 fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
260                 fmt.Fprintln(f, "proc /proc proc opts 0 0")
261                 fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
262                 fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
263                 f.Close()
264                 PROC_MOUNTS = f.Name()
265
266                 // Check that FindKeepVolumes returns an empty array.
267                 resultVols := FindKeepVolumes()
268                 if len(resultVols) != 0 {
269                         t.Fatalf("FindKeepVolumes returned %v", resultVols)
270                 }
271
272                 os.Remove(PROC_MOUNTS)
273         }
274 }
275
276 // TestIndex
277 //     Test an /index request.
278 func TestIndex(t *testing.T) {
279         defer teardown()
280
281         // Set up Keep volumes and populate them.
282         KeepVolumes = setup(t, 2)
283         store(t, KeepVolumes[0], TEST_HASH, TEST_BLOCK)
284         store(t, KeepVolumes[1], TEST_HASH_2, TEST_BLOCK_2)
285         store(t, KeepVolumes[0], TEST_HASH_3, TEST_BLOCK_3)
286
287         index := IndexLocators("")
288         expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
289                 TEST_HASH_3 + `\+\d+ \d+\n` +
290                 TEST_HASH_2 + `\+\d+ \d+\n$`
291
292         match, err := regexp.MatchString(expected, index)
293         if err == nil {
294                 if !match {
295                         t.Errorf("IndexLocators returned:\n-----\n%s-----\n", index)
296                 }
297         } else {
298                 t.Errorf("regexp.MatchString: %s", err)
299         }
300 }
301
302 // ========================================
303 // Helper functions for unit tests.
304 // ========================================
305
306 // setup
307 //     Create KeepVolumes for testing.
308 //     Returns a slice of pathnames to temporary Keep volumes.
309 //
310 func setup(t *testing.T, num_volumes int) []string {
311         vols := make([]string, num_volumes)
312         for i := range vols {
313                 if dir, err := ioutil.TempDir(os.TempDir(), "keeptest"); err == nil {
314                         vols[i] = dir + "/keep"
315                         os.Mkdir(vols[i], 0755)
316                 } else {
317                         t.Fatal(err)
318                 }
319         }
320         return vols
321 }
322
323 // teardown
324 //     Cleanup to perform after each test.
325 //
326 func teardown() {
327         for _, vol := range KeepVolumes {
328                 os.RemoveAll(path.Dir(vol))
329         }
330         KeepVolumes = nil
331 }
332
333 // store
334 //     Low-level code to write Keep blocks directly to disk for testing.
335 //
336 func store(t *testing.T, keepdir string, filename string, block []byte) {
337         blockdir := fmt.Sprintf("%s/%s", keepdir, filename[:3])
338         if err := os.MkdirAll(blockdir, 0755); err != nil {
339                 t.Fatal(err)
340         }
341
342         blockpath := fmt.Sprintf("%s/%s", blockdir, filename)
343         if f, err := os.Create(blockpath); err == nil {
344                 f.Write(block)
345                 f.Close()
346         } else {
347                 t.Fatal(err)
348         }
349 }