Merge branch 'pr/25'
[arvados.git] / services / keepstore / keepstore_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "sort"
11         "strings"
12         "testing"
13 )
14
15 var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
16 var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
17 var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
18
19 var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
20 var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
21
22 var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
23 var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
24
25 // BadBlock is used to test collisions and corruption.
26 // It must not match any test hashes.
27 var BadBlock = []byte("The magic words are squeamish ossifrage.")
28
29 // Empty block
30 var EmptyHash = "d41d8cd98f00b204e9800998ecf8427e"
31 var EmptyBlock = []byte("")
32
33 // TODO(twp): Tests still to be written
34 //
35 //   * TestPutBlockFull
36 //       - test that PutBlock returns 503 Full if the filesystem is full.
37 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
38 //
39 //   * TestPutBlockWriteErr
40 //       - test the behavior when Write returns an error.
41 //           - Possible solutions: use a small tmpfs and a high
42 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
43 //             to write a block larger than the amount of space left
44 //           - use an interface to mock ioutil.TempFile with a File
45 //             object that always returns an error on write
46 //
47 // ========================================
48 // GetBlock tests.
49 // ========================================
50
51 // TestGetBlock
52 //     Test that simple block reads succeed.
53 //
54 func TestGetBlock(t *testing.T) {
55         defer teardown()
56
57         // Prepare two test Keep volumes. Our block is stored on the second volume.
58         KeepVM = MakeTestVolumeManager(2)
59         defer KeepVM.Close()
60
61         vols := KeepVM.AllReadable()
62         if err := vols[1].Put(TestHash, TestBlock); err != nil {
63                 t.Error(err)
64         }
65
66         // Check that GetBlock returns success.
67         result, err := GetBlock(TestHash)
68         if err != nil {
69                 t.Errorf("GetBlock error: %s", err)
70         }
71         if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
72                 t.Errorf("expected %s, got %s", TestBlock, result)
73         }
74 }
75
76 // TestGetBlockMissing
77 //     GetBlock must return an error when the block is not found.
78 //
79 func TestGetBlockMissing(t *testing.T) {
80         defer teardown()
81
82         // Create two empty test Keep volumes.
83         KeepVM = MakeTestVolumeManager(2)
84         defer KeepVM.Close()
85
86         // Check that GetBlock returns failure.
87         result, err := GetBlock(TestHash)
88         if err != NotFoundError {
89                 t.Errorf("Expected NotFoundError, got %v", result)
90         }
91 }
92
93 // TestGetBlockCorrupt
94 //     GetBlock must return an error when a corrupted block is requested
95 //     (the contents of the file do not checksum to its hash).
96 //
97 func TestGetBlockCorrupt(t *testing.T) {
98         defer teardown()
99
100         // Create two test Keep volumes and store a corrupt block in one.
101         KeepVM = MakeTestVolumeManager(2)
102         defer KeepVM.Close()
103
104         vols := KeepVM.AllReadable()
105         vols[0].Put(TestHash, BadBlock)
106
107         // Check that GetBlock returns failure.
108         result, err := GetBlock(TestHash)
109         if err != DiskHashError {
110                 t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
111         }
112 }
113
114 // ========================================
115 // PutBlock tests
116 // ========================================
117
118 // TestPutBlockOK
119 //     PutBlock can perform a simple block write and returns success.
120 //
121 func TestPutBlockOK(t *testing.T) {
122         defer teardown()
123
124         // Create two test Keep volumes.
125         KeepVM = MakeTestVolumeManager(2)
126         defer KeepVM.Close()
127
128         // Check that PutBlock stores the data as expected.
129         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
130                 t.Fatalf("PutBlock: n %d err %v", n, err)
131         }
132
133         vols := KeepVM.AllReadable()
134         result, err := vols[1].Get(TestHash)
135         if err != nil {
136                 t.Fatalf("Volume #0 Get returned error: %v", err)
137         }
138         if string(result) != string(TestBlock) {
139                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
140                         string(TestBlock), string(result))
141         }
142 }
143
144 // TestPutBlockOneVol
145 //     PutBlock still returns success even when only one of the known
146 //     volumes is online.
147 //
148 func TestPutBlockOneVol(t *testing.T) {
149         defer teardown()
150
151         // Create two test Keep volumes, but cripple one of them.
152         KeepVM = MakeTestVolumeManager(2)
153         defer KeepVM.Close()
154
155         vols := KeepVM.AllWritable()
156         vols[0].(*MockVolume).Bad = true
157
158         // Check that PutBlock stores the data as expected.
159         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
160                 t.Fatalf("PutBlock: n %d err %v", n, err)
161         }
162
163         result, err := GetBlock(TestHash)
164         if err != nil {
165                 t.Fatalf("GetBlock: %v", err)
166         }
167         if string(result) != string(TestBlock) {
168                 t.Error("PutBlock/GetBlock mismatch")
169                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
170                         string(TestBlock), string(result))
171         }
172 }
173
174 // TestPutBlockMD5Fail
175 //     Check that PutBlock returns an error if passed a block and hash that
176 //     do not match.
177 //
178 func TestPutBlockMD5Fail(t *testing.T) {
179         defer teardown()
180
181         // Create two test Keep volumes.
182         KeepVM = MakeTestVolumeManager(2)
183         defer KeepVM.Close()
184
185         // Check that PutBlock returns the expected error when the hash does
186         // not match the block.
187         if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
188                 t.Errorf("Expected RequestHashError, got %v", err)
189         }
190
191         // Confirm that GetBlock fails to return anything.
192         if result, err := GetBlock(TestHash); err != NotFoundError {
193                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
194                         string(result), err)
195         }
196 }
197
198 // TestPutBlockCorrupt
199 //     PutBlock should overwrite corrupt blocks on disk when given
200 //     a PUT request with a good block.
201 //
202 func TestPutBlockCorrupt(t *testing.T) {
203         defer teardown()
204
205         // Create two test Keep volumes.
206         KeepVM = MakeTestVolumeManager(2)
207         defer KeepVM.Close()
208
209         // Store a corrupted block under TestHash.
210         vols := KeepVM.AllWritable()
211         vols[0].Put(TestHash, BadBlock)
212         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
213                 t.Errorf("PutBlock: n %d err %v", n, err)
214         }
215
216         // The block on disk should now match TestBlock.
217         if block, err := GetBlock(TestHash); err != nil {
218                 t.Errorf("GetBlock: %v", err)
219         } else if bytes.Compare(block, TestBlock) != 0 {
220                 t.Errorf("GetBlock returned: '%s'", string(block))
221         }
222 }
223
224 // TestPutBlockCollision
225 //     PutBlock returns a 400 Collision error when attempting to
226 //     store a block that collides with another block on disk.
227 //
228 func TestPutBlockCollision(t *testing.T) {
229         defer teardown()
230
231         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
232         var b1 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9epO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\\\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
233         var b2 = []byte("\x0e0eaU\x9a\xa7\x87\xd0\x0b\xc6\xf7\x0b\xbd\xfe4\x04\xcf\x03e\x9etO\x854\xc0\x0f\xfbe\x9cL\x87@\xcc\x94/\xeb-\xa1\x15\xa3\xf4\x15\xdc\xbb\x86\x07Is\x86em}\x1f4\xa4 Y\xd7\x8fZ\x8d\xd1\xef")
234         var locator = "cee9a457e790cf20d4bdaa6d69f01e41"
235
236         // Prepare two test Keep volumes.
237         KeepVM = MakeTestVolumeManager(2)
238         defer KeepVM.Close()
239
240         // Store one block, then attempt to store the other. Confirm that
241         // PutBlock reported a CollisionError.
242         if _, err := PutBlock(b1, locator); err != nil {
243                 t.Error(err)
244         }
245         if _, err := PutBlock(b2, locator); err == nil {
246                 t.Error("PutBlock did not report a collision")
247         } else if err != CollisionError {
248                 t.Errorf("PutBlock returned %v", err)
249         }
250 }
251
252 // TestPutBlockTouchFails
253 //     When PutBlock is asked to PUT an existing block, but cannot
254 //     modify the timestamp, it should write a second block.
255 //
256 func TestPutBlockTouchFails(t *testing.T) {
257         defer teardown()
258
259         // Prepare two test Keep volumes.
260         KeepVM = MakeTestVolumeManager(2)
261         defer KeepVM.Close()
262         vols := KeepVM.AllWritable()
263
264         // Store a block and then make the underlying volume bad,
265         // so a subsequent attempt to update the file timestamp
266         // will fail.
267         vols[0].Put(TestHash, BadBlock)
268         oldMtime, err := vols[0].Mtime(TestHash)
269         if err != nil {
270                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
271         }
272
273         // vols[0].Touch will fail on the next call, so the volume
274         // manager will store a copy on vols[1] instead.
275         vols[0].(*MockVolume).Touchable = false
276         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
277                 t.Fatalf("PutBlock: n %d err %v", n, err)
278         }
279         vols[0].(*MockVolume).Touchable = true
280
281         // Now the mtime on the block on vols[0] should be unchanged, and
282         // there should be a copy of the block on vols[1].
283         newMtime, err := vols[0].Mtime(TestHash)
284         if err != nil {
285                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
286         }
287         if !newMtime.Equal(oldMtime) {
288                 t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
289                         oldMtime, newMtime)
290         }
291         result, err := vols[1].Get(TestHash)
292         if err != nil {
293                 t.Fatalf("vols[1]: %v", err)
294         }
295         if bytes.Compare(result, TestBlock) != 0 {
296                 t.Errorf("new block does not match test block\nnew block = %v\n", result)
297         }
298 }
299
300 func TestDiscoverTmpfs(t *testing.T) {
301         var tempVols [4]string
302         var err error
303
304         // Create some directories suitable for using as keep volumes.
305         for i := range tempVols {
306                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
307                         t.Fatal(err)
308                 }
309                 defer os.RemoveAll(tempVols[i])
310                 tempVols[i] = tempVols[i] + "/keep"
311                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
312                         t.Fatal(err)
313                 }
314         }
315
316         // Set up a bogus ProcMounts file.
317         f, err := ioutil.TempFile("", "keeptest")
318         if err != nil {
319                 t.Fatal(err)
320         }
321         defer os.Remove(f.Name())
322         for i, vol := range tempVols {
323                 // Add readonly mount points at odd indexes.
324                 var opts string
325                 switch i % 2 {
326                 case 0:
327                         opts = "rw,nosuid,nodev,noexec"
328                 case 1:
329                         opts = "nosuid,nodev,noexec,ro"
330                 }
331                 fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
332         }
333         f.Close()
334         ProcMounts = f.Name()
335
336         resultVols := volumeSet{}
337         added := (&unixVolumeAdder{&resultVols}).Discover()
338
339         if added != len(resultVols) {
340                 t.Errorf("Discover returned %d, but added %d volumes",
341                         added, len(resultVols))
342         }
343         if added != len(tempVols) {
344                 t.Errorf("Discover returned %d but we set up %d volumes",
345                         added, len(tempVols))
346         }
347         for i, tmpdir := range tempVols {
348                 if tmpdir != resultVols[i].(*UnixVolume).root {
349                         t.Errorf("Discover returned %s, expected %s\n",
350                                 resultVols[i].(*UnixVolume).root, tmpdir)
351                 }
352                 if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
353                         t.Errorf("Discover added %s with readonly=%v, should be %v",
354                                 tmpdir, !expectReadonly, expectReadonly)
355                 }
356         }
357 }
358
359 func TestDiscoverNone(t *testing.T) {
360         defer teardown()
361
362         // Set up a bogus ProcMounts file with no Keep vols.
363         f, err := ioutil.TempFile("", "keeptest")
364         if err != nil {
365                 t.Fatal(err)
366         }
367         defer os.Remove(f.Name())
368         fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
369         fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
370         fmt.Fprintln(f, "proc /proc proc opts 0 0")
371         fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
372         fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
373         f.Close()
374         ProcMounts = f.Name()
375
376         resultVols := volumeSet{}
377         added := (&unixVolumeAdder{&resultVols}).Discover()
378         if added != 0 || len(resultVols) != 0 {
379                 t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
380         }
381 }
382
383 // TestIndex
384 //     Test an /index request.
385 func TestIndex(t *testing.T) {
386         defer teardown()
387
388         // Set up Keep volumes and populate them.
389         // Include multiple blocks on different volumes, and
390         // some metadata files.
391         KeepVM = MakeTestVolumeManager(2)
392         defer KeepVM.Close()
393
394         vols := KeepVM.AllReadable()
395         vols[0].Put(TestHash, TestBlock)
396         vols[1].Put(TestHash2, TestBlock2)
397         vols[0].Put(TestHash3, TestBlock3)
398         vols[0].Put(TestHash+".meta", []byte("metadata"))
399         vols[1].Put(TestHash2+".meta", []byte("metadata"))
400
401         buf := new(bytes.Buffer)
402         vols[0].IndexTo("", buf)
403         vols[1].IndexTo("", buf)
404         indexRows := strings.Split(string(buf.Bytes()), "\n")
405         sort.Strings(indexRows)
406         sortedIndex := strings.Join(indexRows, "\n")
407         expected := `^\n` + TestHash + `\+\d+ \d+\n` +
408                 TestHash3 + `\+\d+ \d+\n` +
409                 TestHash2 + `\+\d+ \d+$`
410
411         match, err := regexp.MatchString(expected, sortedIndex)
412         if err == nil {
413                 if !match {
414                         t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
415                 }
416         } else {
417                 t.Errorf("regexp.MatchString: %s", err)
418         }
419 }
420
421 // ========================================
422 // Helper functions for unit tests.
423 // ========================================
424
425 // MakeTestVolumeManager returns a RRVolumeManager with the specified
426 // number of MockVolumes.
427 func MakeTestVolumeManager(numVolumes int) VolumeManager {
428         vols := make([]Volume, numVolumes)
429         for i := range vols {
430                 vols[i] = CreateMockVolume()
431         }
432         return MakeRRVolumeManager(vols)
433 }
434
435 // teardown cleans up after each test.
436 func teardown() {
437         dataManagerToken = ""
438         enforcePermissions = false
439         PermissionSecret = nil
440         KeepVM = nil
441 }