Merge branch '5748-max-buffers' refs #5748
[arvados.git] / services / keepstore / keepstore_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "sort"
11         "strings"
12         "testing"
13 )
14
15 var TEST_BLOCK = []byte("The quick brown fox jumps over the lazy dog.")
16 var TEST_HASH = "e4d909c290d0fb1ca068ffaddf22cbd0"
17 var TEST_HASH_PUT_RESPONSE = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
18
19 var TEST_BLOCK_2 = []byte("Pack my box with five dozen liquor jugs.")
20 var TEST_HASH_2 = "f15ac516f788aec4f30932ffb6395c39"
21
22 var TEST_BLOCK_3 = []byte("Now is the time for all good men to come to the aid of their country.")
23 var TEST_HASH_3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
24
25 // BAD_BLOCK is used to test collisions and corruption.
26 // It must not match any test hashes.
27 var BAD_BLOCK = []byte("The magic words are squeamish ossifrage.")
28
29 // TODO(twp): Tests still to be written
30 //
31 //   * TestPutBlockFull
32 //       - test that PutBlock returns 503 Full if the filesystem is full.
33 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
34 //
35 //   * TestPutBlockWriteErr
36 //       - test the behavior when Write returns an error.
37 //           - Possible solutions: use a small tmpfs and a high
38 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
39 //             to write a block larger than the amount of space left
40 //           - use an interface to mock ioutil.TempFile with a File
41 //             object that always returns an error on write
42 //
43 // ========================================
44 // GetBlock tests.
45 // ========================================
46
47 // TestGetBlock
48 //     Test that simple block reads succeed.
49 //
50 func TestGetBlock(t *testing.T) {
51         defer teardown()
52
53         // Prepare two test Keep volumes. Our block is stored on the second volume.
54         KeepVM = MakeTestVolumeManager(2)
55         defer KeepVM.Close()
56
57         vols := KeepVM.AllReadable()
58         if err := vols[1].Put(TEST_HASH, TEST_BLOCK); err != nil {
59                 t.Error(err)
60         }
61
62         // Check that GetBlock returns success.
63         result, err := GetBlock(TEST_HASH, false)
64         if err != nil {
65                 t.Errorf("GetBlock error: %s", err)
66         }
67         if fmt.Sprint(result) != fmt.Sprint(TEST_BLOCK) {
68                 t.Errorf("expected %s, got %s", TEST_BLOCK, result)
69         }
70 }
71
72 // TestGetBlockMissing
73 //     GetBlock must return an error when the block is not found.
74 //
75 func TestGetBlockMissing(t *testing.T) {
76         defer teardown()
77
78         // Create two empty test Keep volumes.
79         KeepVM = MakeTestVolumeManager(2)
80         defer KeepVM.Close()
81
82         // Check that GetBlock returns failure.
83         result, err := GetBlock(TEST_HASH, false)
84         if err != NotFoundError {
85                 t.Errorf("Expected NotFoundError, got %v", result)
86         }
87 }
88
89 // TestGetBlockCorrupt
90 //     GetBlock must return an error when a corrupted block is requested
91 //     (the contents of the file do not checksum to its hash).
92 //
93 func TestGetBlockCorrupt(t *testing.T) {
94         defer teardown()
95
96         // Create two test Keep volumes and store a corrupt block in one.
97         KeepVM = MakeTestVolumeManager(2)
98         defer KeepVM.Close()
99
100         vols := KeepVM.AllReadable()
101         vols[0].Put(TEST_HASH, BAD_BLOCK)
102
103         // Check that GetBlock returns failure.
104         result, err := GetBlock(TEST_HASH, false)
105         if err != DiskHashError {
106                 t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
107         }
108 }
109
110 // ========================================
111 // PutBlock tests
112 // ========================================
113
114 // TestPutBlockOK
115 //     PutBlock can perform a simple block write and returns success.
116 //
117 func TestPutBlockOK(t *testing.T) {
118         defer teardown()
119
120         // Create two test Keep volumes.
121         KeepVM = MakeTestVolumeManager(2)
122         defer KeepVM.Close()
123
124         // Check that PutBlock stores the data as expected.
125         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
126                 t.Fatalf("PutBlock: %v", err)
127         }
128
129         vols := KeepVM.AllReadable()
130         result, err := vols[1].Get(TEST_HASH)
131         if err != nil {
132                 t.Fatalf("Volume #0 Get returned error: %v", err)
133         }
134         if string(result) != string(TEST_BLOCK) {
135                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
136                         string(TEST_BLOCK), string(result))
137         }
138 }
139
140 // TestPutBlockOneVol
141 //     PutBlock still returns success even when only one of the known
142 //     volumes is online.
143 //
144 func TestPutBlockOneVol(t *testing.T) {
145         defer teardown()
146
147         // Create two test Keep volumes, but cripple one of them.
148         KeepVM = MakeTestVolumeManager(2)
149         defer KeepVM.Close()
150
151         vols := KeepVM.AllWritable()
152         vols[0].(*MockVolume).Bad = true
153
154         // Check that PutBlock stores the data as expected.
155         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
156                 t.Fatalf("PutBlock: %v", err)
157         }
158
159         result, err := GetBlock(TEST_HASH, false)
160         if err != nil {
161                 t.Fatalf("GetBlock: %v", err)
162         }
163         if string(result) != string(TEST_BLOCK) {
164                 t.Error("PutBlock/GetBlock mismatch")
165                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
166                         string(TEST_BLOCK), string(result))
167         }
168 }
169
170 // TestPutBlockMD5Fail
171 //     Check that PutBlock returns an error if passed a block and hash that
172 //     do not match.
173 //
174 func TestPutBlockMD5Fail(t *testing.T) {
175         defer teardown()
176
177         // Create two test Keep volumes.
178         KeepVM = MakeTestVolumeManager(2)
179         defer KeepVM.Close()
180
181         // Check that PutBlock returns the expected error when the hash does
182         // not match the block.
183         if err := PutBlock(BAD_BLOCK, TEST_HASH); err != RequestHashError {
184                 t.Error("Expected RequestHashError, got %v", err)
185         }
186
187         // Confirm that GetBlock fails to return anything.
188         if result, err := GetBlock(TEST_HASH, false); err != NotFoundError {
189                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
190                         string(result), err)
191         }
192 }
193
194 // TestPutBlockCorrupt
195 //     PutBlock should overwrite corrupt blocks on disk when given
196 //     a PUT request with a good block.
197 //
198 func TestPutBlockCorrupt(t *testing.T) {
199         defer teardown()
200
201         // Create two test Keep volumes.
202         KeepVM = MakeTestVolumeManager(2)
203         defer KeepVM.Close()
204
205         // Store a corrupted block under TEST_HASH.
206         vols := KeepVM.AllWritable()
207         vols[0].Put(TEST_HASH, BAD_BLOCK)
208         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
209                 t.Errorf("PutBlock: %v", err)
210         }
211
212         // The block on disk should now match TEST_BLOCK.
213         if block, err := GetBlock(TEST_HASH, false); err != nil {
214                 t.Errorf("GetBlock: %v", err)
215         } else if bytes.Compare(block, TEST_BLOCK) != 0 {
216                 t.Errorf("GetBlock returned: '%s'", string(block))
217         }
218 }
219
220 // TestPutBlockCollision
221 //     PutBlock returns a 400 Collision error when attempting to
222 //     store a block that collides with another block on disk.
223 //
224 func TestPutBlockCollision(t *testing.T) {
225         defer teardown()
226
227         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
228         var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
229         var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
230         var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
231
232         // Prepare two test Keep volumes.
233         KeepVM = MakeTestVolumeManager(2)
234         defer KeepVM.Close()
235
236         // Store one block, then attempt to store the other. Confirm that
237         // PutBlock reported a CollisionError.
238         if err := PutBlock(b1, locator); err != nil {
239                 t.Error(err)
240         }
241         if err := PutBlock(b2, locator); err == nil {
242                 t.Error("PutBlock did not report a collision")
243         } else if err != CollisionError {
244                 t.Errorf("PutBlock returned %v", err)
245         }
246 }
247
248 // TestPutBlockTouchFails
249 //     When PutBlock is asked to PUT an existing block, but cannot
250 //     modify the timestamp, it should write a second block.
251 //
252 func TestPutBlockTouchFails(t *testing.T) {
253         defer teardown()
254
255         // Prepare two test Keep volumes.
256         KeepVM = MakeTestVolumeManager(2)
257         defer KeepVM.Close()
258         vols := KeepVM.AllWritable()
259
260         // Store a block and then make the underlying volume bad,
261         // so a subsequent attempt to update the file timestamp
262         // will fail.
263         vols[0].Put(TEST_HASH, BAD_BLOCK)
264         old_mtime, err := vols[0].Mtime(TEST_HASH)
265         if err != nil {
266                 t.Fatalf("vols[0].Mtime(%s): %s\n", TEST_HASH, err)
267         }
268
269         // vols[0].Touch will fail on the next call, so the volume
270         // manager will store a copy on vols[1] instead.
271         vols[0].(*MockVolume).Touchable = false
272         if err := PutBlock(TEST_BLOCK, TEST_HASH); err != nil {
273                 t.Fatalf("PutBlock: %v", err)
274         }
275         vols[0].(*MockVolume).Touchable = true
276
277         // Now the mtime on the block on vols[0] should be unchanged, and
278         // there should be a copy of the block on vols[1].
279         new_mtime, err := vols[0].Mtime(TEST_HASH)
280         if err != nil {
281                 t.Fatalf("vols[0].Mtime(%s): %s\n", TEST_HASH, err)
282         }
283         if !new_mtime.Equal(old_mtime) {
284                 t.Errorf("mtime was changed on vols[0]:\nold_mtime = %v\nnew_mtime = %v\n",
285                         old_mtime, new_mtime)
286         }
287         result, err := vols[1].Get(TEST_HASH)
288         if err != nil {
289                 t.Fatalf("vols[1]: %v", err)
290         }
291         if bytes.Compare(result, TEST_BLOCK) != 0 {
292                 t.Errorf("new block does not match test block\nnew block = %v\n", result)
293         }
294 }
295
296 func TestDiscoverTmpfs(t *testing.T) {
297         var tempVols [4]string
298         var err error
299
300         // Create some directories suitable for using as keep volumes.
301         for i := range tempVols {
302                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
303                         t.Fatal(err)
304                 }
305                 defer os.RemoveAll(tempVols[i])
306                 tempVols[i] = tempVols[i] + "/keep"
307                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
308                         t.Fatal(err)
309                 }
310         }
311
312         // Set up a bogus PROC_MOUNTS file.
313         f, err := ioutil.TempFile("", "keeptest")
314         if err != nil {
315                 t.Fatal(err)
316         }
317         defer os.Remove(f.Name())
318         for i, vol := range tempVols {
319                 // Add readonly mount points at odd indexes.
320                 var opts string
321                 switch i % 2 {
322                 case 0:
323                         opts = "rw,nosuid,nodev,noexec"
324                 case 1:
325                         opts = "nosuid,nodev,noexec,ro"
326                 }
327                 fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
328         }
329         f.Close()
330         PROC_MOUNTS = f.Name()
331
332         var resultVols volumeSet
333         added := resultVols.Discover()
334
335         if added != len(resultVols) {
336                 t.Errorf("Discover returned %d, but added %d volumes",
337                         added, len(resultVols))
338         }
339         if added != len(tempVols) {
340                 t.Errorf("Discover returned %d but we set up %d volumes",
341                         added, len(tempVols))
342         }
343         for i, tmpdir := range tempVols {
344                 if tmpdir != resultVols[i].(*UnixVolume).root {
345                         t.Errorf("Discover returned %s, expected %s\n",
346                                 resultVols[i].(*UnixVolume).root, tmpdir)
347                 }
348                 if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
349                         t.Errorf("Discover added %s with readonly=%v, should be %v",
350                                 tmpdir, !expectReadonly, expectReadonly)
351                 }
352         }
353 }
354
355 func TestDiscoverNone(t *testing.T) {
356         defer teardown()
357
358         // Set up a bogus PROC_MOUNTS file with no Keep vols.
359         f, err := ioutil.TempFile("", "keeptest")
360         if err != nil {
361                 t.Fatal(err)
362         }
363         defer os.Remove(f.Name())
364         fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
365         fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
366         fmt.Fprintln(f, "proc /proc proc opts 0 0")
367         fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
368         fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
369         f.Close()
370         PROC_MOUNTS = f.Name()
371
372         var resultVols volumeSet
373         added := resultVols.Discover()
374         if added != 0 || len(resultVols) != 0 {
375                 t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
376         }
377 }
378
379 // TestIndex
380 //     Test an /index request.
381 func TestIndex(t *testing.T) {
382         defer teardown()
383
384         // Set up Keep volumes and populate them.
385         // Include multiple blocks on different volumes, and
386         // some metadata files.
387         KeepVM = MakeTestVolumeManager(2)
388         defer KeepVM.Close()
389
390         vols := KeepVM.AllReadable()
391         vols[0].Put(TEST_HASH, TEST_BLOCK)
392         vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
393         vols[0].Put(TEST_HASH_3, TEST_BLOCK_3)
394         vols[0].Put(TEST_HASH+".meta", []byte("metadata"))
395         vols[1].Put(TEST_HASH_2+".meta", []byte("metadata"))
396
397         buf := new(bytes.Buffer)
398         vols[0].IndexTo("", buf)
399         vols[1].IndexTo("", buf)
400         index_rows := strings.Split(string(buf.Bytes()), "\n")
401         sort.Strings(index_rows)
402         sorted_index := strings.Join(index_rows, "\n")
403         expected := `^\n` + TEST_HASH + `\+\d+ \d+\n` +
404                 TEST_HASH_3 + `\+\d+ \d+\n` +
405                 TEST_HASH_2 + `\+\d+ \d+$`
406
407         match, err := regexp.MatchString(expected, sorted_index)
408         if err == nil {
409                 if !match {
410                         t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
411                 }
412         } else {
413                 t.Errorf("regexp.MatchString: %s", err)
414         }
415 }
416
417 // TestNodeStatus
418 //     Test that GetNodeStatus returns valid info about available volumes.
419 //
420 //     TODO(twp): set up appropriate interfaces to permit more rigorous
421 //     testing.
422 //
423 func TestNodeStatus(t *testing.T) {
424         defer teardown()
425
426         // Set up test Keep volumes with some blocks.
427         KeepVM = MakeTestVolumeManager(2)
428         defer KeepVM.Close()
429
430         vols := KeepVM.AllReadable()
431         vols[0].Put(TEST_HASH, TEST_BLOCK)
432         vols[1].Put(TEST_HASH_2, TEST_BLOCK_2)
433
434         // Get node status and make a basic sanity check.
435         st := GetNodeStatus()
436         for i := range vols {
437                 volinfo := st.Volumes[i]
438                 mtp := volinfo.MountPoint
439                 if mtp != "/bogo" {
440                         t.Errorf("GetNodeStatus mount_point %s, expected /bogo", mtp)
441                 }
442                 if volinfo.DeviceNum == 0 {
443                         t.Errorf("uninitialized device_num in %v", volinfo)
444                 }
445                 if volinfo.BytesFree == 0 {
446                         t.Errorf("uninitialized bytes_free in %v", volinfo)
447                 }
448                 if volinfo.BytesUsed == 0 {
449                         t.Errorf("uninitialized bytes_used in %v", volinfo)
450                 }
451         }
452 }
453
454 // ========================================
455 // Helper functions for unit tests.
456 // ========================================
457
458 // MakeTestVolumeManager returns a RRVolumeManager with the specified
459 // number of MockVolumes.
460 func MakeTestVolumeManager(num_volumes int) VolumeManager {
461         vols := make([]Volume, num_volumes)
462         for i := range vols {
463                 vols[i] = CreateMockVolume()
464         }
465         return MakeRRVolumeManager(vols)
466 }
467
468 // teardown cleans up after each test.
469 func teardown() {
470         data_manager_token = ""
471         enforce_permissions = false
472         PermissionSecret = nil
473         KeepVM = nil
474 }