Fix repositories.get_all_permissions, add tests. closes #3546
[arvados.git] / services / keep / src / keep / keep_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "testing"
11 )
12
13 var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
14 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
15 var TEST_HASH_PUT_RESPONSE = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
16
17 var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
18 var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
19
20 var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
21 var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
22
23 // BAD_BLOCK is used to test collisions and corruption.
24 // It must not match any test hashes.
25 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
26
27 // TODO(twp): Tests still to be written
28 //
29 //   * TestPutBlockFull
30 //       - test that PutBlock returns 503 Full if the filesystem is full.
31 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
32 //
33 //   * TestPutBlockWriteErr
34 //       - test the behavior when Write returns an error.
35 //           - Possible solutions: use a small tmpfs and a high
36 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
37 //             to write a block larger than the amount of space left
38 //           - use an interface to mock ioutil.TempFile with a File
39 //             object that always returns an error on write
40 //
41 // ========================================
42 // GetBlock tests.
43 // ========================================
44
45 // TestGetBlock
46 //     Test that simple block reads succeed.
47 //
48 func TestGetBlock(t *testing.T) {
49         defer teardown()
50
51         // Prepare two test Keep volumes. Our block is stored on the second volume.
52         KeepVM = MakeTestVolumeManager(2)
53         defer func() { KeepVM.Quit() }()
54
55         vols := KeepVM.Volumes()
56         if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
57                 t.Error(err)
58         }
59
60         // Check that GetBlock returns success.
61         result, err := GetBlock(TEST_HASH)
62         if err != nil {
63                 t.Errorf("GetBlock error: %s", err)
64         }
65         if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
66                 t.Errorf("expected %s, got %s", TEST_BLOCK, result)
67         }
68 }
69
70 // TestGetBlockMissing
71 //     GetBlock must return an error when the block is not found.
72 //
73 func TestGetBlockMissing(t *testing.T) {
74         defer teardown()
75
76         // Create two empty test Keep volumes.
77         KeepVM = MakeTestVolumeManager(2)
78         defer func() { KeepVM.Quit() }()
79
80         // Check that GetBlock returns failure.
81         result, err := GetBlock(TEST_HASH)
82         if err != NotFoundError {
83                 t.Errorf("Expected NotFoundError, got %v", result)
84         }
85 }
86
87 // TestGetBlockCorrupt
88 //     GetBlock must return an error when a corrupted block is requested
89 //     (the contents of the file do not checksum to its hash).
90 //
91 func TestGetBlockCorrupt(t *testing.T) {
92         defer teardown()
93
94         // Create two test Keep volumes and store a corrupt block in one.
95         KeepVM = MakeTestVolumeManager(2)
96         defer func() { KeepVM.Quit() }()
97
98         vols := KeepVM.Volumes()
99         vols[0].Put(TEST_HASH, BAD_BLOCK)
100
101         // Check that GetBlock returns failure.
102         result, err := GetBlock(TEST_HASH)
103         if err != DiskHashError {
104                 t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
105         }
106 }
107
108 // ========================================
109 // PutBlock tests
110 // ========================================
111
112 // TestPutBlockOK
113 //     PutBlock can perform a simple block write and returns success.
114 //
115 func TestPutBlockOK(t *testing.T) {
116         defer teardown()
117
118         // Create two test Keep volumes.
119         KeepVM = MakeTestVolumeManager(2)
120         defer func() { KeepVM.Quit() }()
121
122         // Check that PutBlock stores the data as expected.
123         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
124                 t.Fatalf("PutBlock: %v", err)
125         }
126
127         vols := KeepVM.Volumes()
128         result, err := vols[0].Get(TEST_HASH)
129         if err != nil {
130                 t.Fatalf("Volume #0 Get returned error: %v", err)
131         }
132         if string(result) != string(TEST_BLOCK) {
133                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
134                         string(TEST_BLOCK), string(result))
135         }
136 }
137
138 // TestPutBlockOneVol
139 //     PutBlock still returns success even when only one of the known
140 //     volumes is online.
141 //
142 func TestPutBlockOneVol(t *testing.T) {
143         defer teardown()
144
145         // Create two test Keep volumes, but cripple one of them.
146         KeepVM = MakeTestVolumeManager(2)
147         defer func() { KeepVM.Quit() }()
148
149         vols := KeepVM.Volumes()
150         vols[0].(*MockVolume).Bad = true
151
152         // Check that PutBlock stores the data as expected.
153         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
154                 t.Fatalf("PutBlock: %v", err)
155         }
156
157         result, err := GetBlock(TEST_HASH)
158         if err != nil {
159                 t.Fatalf("GetBlock: %v", err)
160         }
161         if string(result) != string(TEST_BLOCK) {
162                 t.Error("PutBlock/GetBlock mismatch")
163                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
164                         string(TEST_BLOCK), string(result))
165         }
166 }
167
168 // TestPutBlockMD5Fail
169 //     Check that PutBlock returns an error if passed a block and hash that
170 //     do not match.
171 //
172 func TestPutBlockMD5Fail(t *testing.T) {
173         defer teardown()
174
175         // Create two test Keep volumes.
176         KeepVM = MakeTestVolumeManager(2)
177         defer func() { KeepVM.Quit() }()
178
179         // Check that PutBlock returns the expected error when the hash does
180         // not match the block.
181         if err := PutBlock(BAD_BLOCK, TEST_HASH); err != RequestHashError {
182                 t.Error("Expected RequestHashError, got %v", err)
183         }
184
185         // Confirm that GetBlock fails to return anything.
186         if result, err := GetBlock(TEST_HASH); err != NotFoundError {
187                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
188                         string(result), err)
189         }
190 }
191
192 // TestPutBlockCorrupt
193 //     PutBlock should overwrite corrupt blocks on disk when given
194 //     a PUT request with a good block.
195 //
196 func TestPutBlockCorrupt(t *testing.T) {
197         defer teardown()
198
199         // Create two test Keep volumes.
200         KeepVM = MakeTestVolumeManager(2)
201         defer func() { KeepVM.Quit() }()
202
203         // Store a corrupted block under TEST_HASH.
204         vols := KeepVM.Volumes()
205         vols[0].Put(TEST_HASH, BAD_BLOCK)
206         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
207                 t.Errorf("PutBlock: %v", err)
208         }
209
210         // The block on disk should now match TEST_BLOCK.
211         if block, err := GetBlock(TEST_HASH); err != nil {
212                 t.Errorf("GetBlock: %v", err)
213         } else if bytes.Compare(block, TEST_BLOCK) != 0 {
214                 t.Errorf("GetBlock returned: '%s'", string(block))
215         }
216 }
217
218 // PutBlockCollision
219 //     PutBlock returns a 400 Collision error when attempting to
220 //     store a block that collides with another block on disk.
221 //
222 func TestPutBlockCollision(t *testing.T) {
223         defer teardown()
224
225         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
226         var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
227         var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
228         var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
229
230         // Prepare two test Keep volumes.
231         KeepVM = MakeTestVolumeManager(2)
232         defer func() { KeepVM.Quit() }()
233
234         // Store one block, then attempt to store the other. Confirm that
235         // PutBlock reported a CollisionError.
236         if err := PutBlock(b1, locator); err != nil {
237                 t.Error(err)
238         }
239         if err := PutBlock(b2, locator); err == nil {
240                 t.Error("PutBlock did not report a collision")
241         } else if err != CollisionError {
242                 t.Errorf("PutBlock returned %v", err)
243         }
244 }
245
246 // ========================================
247 // FindKeepVolumes tests.
248 // ========================================
249
250 // TestFindKeepVolumes
251 //     Confirms that FindKeepVolumes finds tmpfs volumes with "/keep"
252 //     directories at the top level.
253 //
254 func TestFindKeepVolumes(t *testing.T) {
255         var tempVols [2]string
256         var err error
257
258         defer func() {
259                 for _, path := range tempVols {
260                         os.RemoveAll(path)
261                 }
262         }()
263
264         // Create two directories suitable for using as keep volumes.
265         for i := range tempVols {
266                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
267                         t.Fatal(err)
268                 }
269                 tempVols[i] = tempVols[i] + "/keep"
270                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
271                         t.Fatal(err)
272                 }
273         }
274
275         // Set up a bogus PROC_MOUNTS file.
276         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
277                 for _, vol := range tempVols {
278                         fmt.Fprintf(f, "tmpfs %s tmpfs opts\n", path.Dir(vol))
279                 }
280                 f.Close()
281                 PROC_MOUNTS = f.Name()
282
283                 // Check that FindKeepVolumes finds the temp volumes.
284                 resultVols := FindKeepVolumes()
285                 if len(tempVols) != len(resultVols) {
286                         t.Fatalf("set up %d volumes, FindKeepVolumes found %d\n",
287                                 len(tempVols), len(resultVols))
288                 }
289                 for i := range tempVols {
290                         if tempVols[i] != resultVols[i] {
291                                 t.Errorf("FindKeepVolumes returned %s, expected %s\n",
292                                         resultVols[i], tempVols[i])
293                         }
294                 }
295
296                 os.Remove(f.Name())
297         }
298 }
299
300 // TestFindKeepVolumesFail
301 //     When no Keep volumes are present, FindKeepVolumes returns an empty slice.
302 //
303 func TestFindKeepVolumesFail(t *testing.T) {
304         defer teardown()
305
306         // Set up a bogus PROC_MOUNTS file with no Keep vols.
307         if f, err := ioutil.TempFile("", "keeptest"); err == nil {
308                 fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
309                 fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
310                 fmt.Fprintln(f, "proc /proc proc opts 0 0")
311                 fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
312                 fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
313                 f.Close()
314                 PROC_MOUNTS = f.Name()
315
316                 // Check that FindKeepVolumes returns an empty array.
317                 resultVols := FindKeepVolumes()
318                 if len(resultVols) != 0 {
319                         t.Fatalf("FindKeepVolumes returned %v", resultVols)
320                 }
321
322                 os.Remove(PROC_MOUNTS)
323         }
324 }
325
326 // TestIndex
327 //     Test an /index request.
328 func TestIndex(t *testing.T) {
329         defer teardown()
330
331         // Set up Keep volumes and populate them.
332         // Include multiple blocks on different volumes, and
333         // some metadata files.
334         KeepVM = MakeTestVolumeManager(2)
335         defer func() { KeepVM.Quit() }()
336
337         vols := KeepVM.Volumes()
338         vols[0].Put(TEST_HASH, TEST_BLOCK)
339         vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
340         vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
341         vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
342         vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
343
344         index := vols[0].Index("") + vols[1].Index("")
345         expected := `^` + TEST_HASH + `\+\d+ \d+\n` +
346                 TEST_HASH_3 + `\+\d+ \d+\n` +
347                 TEST_HASH_2 + `\+\d+ \d+\n$`
348
349         match, err := regexp.MatchString(expected, index)
350         if err == nil {
351                 if !match {
352                         t.Errorf("IndexLocators returned:\n%s", index)
353                 }
354         } else {
355                 t.Errorf("regexp.MatchString: %s", err)
356         }
357 }
358
359 // TestNodeStatus
360 //     Test that GetNodeStatus returns valid info about available volumes.
361 //
362 //     TODO(twp): set up appropriate interfaces to permit more rigorous
363 //     testing.
364 //
365 func TestNodeStatus(t *testing.T) {
366         defer teardown()
367
368         // Set up test Keep volumes with some blocks.
369         KeepVM = MakeTestVolumeManager(2)
370         defer func() { KeepVM.Quit() }()
371
372         vols := KeepVM.Volumes()
373         vols[0].Put(TEST_HASH, TEST_BLOCK)
374         vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
375
376         // Get node status and make a basic sanity check.
377         st := GetNodeStatus()
378         for i := range vols {
379                 volinfo := st.Volumes[i]
380                 mtp := volinfo.MountPoint
381                 if mtp != "/bogo" {
382                         t.Errorf("GetNodeStatus mount_point %s, expected /bogo", mtp)
383                 }
384                 if volinfo.DeviceNum == 0 {
385                         t.Errorf("uninitialized device_num in %v", volinfo)
386                 }
387                 if volinfo.BytesFree == 0 {
388                         t.Errorf("uninitialized bytes_free in %v", volinfo)
389                 }
390                 if volinfo.BytesUsed == 0 {
391                         t.Errorf("uninitialized bytes_used in %v", volinfo)
392                 }
393         }
394 }
395
396 // ========================================
397 // Helper functions for unit tests.
398 // ========================================
399
400 // MakeTestVolumeManager
401 //     Creates and returns a RRVolumeManager with the specified number
402 //     of MockVolumes.
403 //
404 func MakeTestVolumeManager(num_volumes int) VolumeManager {
405         vols := make([]Volume, num_volumes)
406         for i := range vols {
407                 vols[i] = CreateMockVolume()
408         }
409         return MakeRRVolumeManager(vols)
410 }
411
412 // teardown
413 //     Cleanup to perform after each test.
414 //
415 func teardown() {
416         data_manager_token = ""
417         enforce_permissions = false
418         PermissionSecret = nil
419         KeepVM = nil
420 }