2a1c3d243ab922855b2bf6344f69631a78272662
[arvados.git] / services / keepstore / keepstore_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "sort"
11         "strings"
12         "testing"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
15 )
16
17 var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
18 var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
19 var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
20
21 var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
22 var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
23
24 var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
25 var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
26
27 // BadBlock is used to test collisions and corruption.
28 // It must not match any test hashes.
29 var BadBlock = []byte("The magic words are squeamish ossifrage.")
30
31 // Empty block
32 var EmptyHash = "d41d8cd98f00b204e9800998ecf8427e"
33 var EmptyBlock = []byte("")
34
35 // TODO(twp): Tests still to be written
36 //
37 //   * TestPutBlockFull
38 //       - test that PutBlock returns 503 Full if the filesystem is full.
39 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
40 //
41 //   * TestPutBlockWriteErr
42 //       - test the behavior when Write returns an error.
43 //           - Possible solutions: use a small tmpfs and a high
44 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
45 //             to write a block larger than the amount of space left
46 //           - use an interface to mock ioutil.TempFile with a File
47 //             object that always returns an error on write
48 //
49 // ========================================
50 // GetBlock tests.
51 // ========================================
52
53 // TestGetBlock
54 //     Test that simple block reads succeed.
55 //
56 func TestGetBlock(t *testing.T) {
57         defer teardown()
58
59         // Prepare two test Keep volumes. Our block is stored on the second volume.
60         KeepVM = MakeTestVolumeManager(2)
61         defer KeepVM.Close()
62
63         vols := KeepVM.AllReadable()
64         if err := vols[1].Put(TestHash, TestBlock); err != nil {
65                 t.Error(err)
66         }
67
68         // Check that GetBlock returns success.
69         result, err := GetBlock(TestHash)
70         if err != nil {
71                 t.Errorf("GetBlock error: %s", err)
72         }
73         if fmt.Sprint(result) != fmt.Sprint(TestBlock) {
74                 t.Errorf("expected %s, got %s", TestBlock, result)
75         }
76 }
77
78 // TestGetBlockMissing
79 //     GetBlock must return an error when the block is not found.
80 //
81 func TestGetBlockMissing(t *testing.T) {
82         defer teardown()
83
84         // Create two empty test Keep volumes.
85         KeepVM = MakeTestVolumeManager(2)
86         defer KeepVM.Close()
87
88         // Check that GetBlock returns failure.
89         result, err := GetBlock(TestHash)
90         if err != NotFoundError {
91                 t.Errorf("Expected NotFoundError, got %v", result)
92         }
93 }
94
95 // TestGetBlockCorrupt
96 //     GetBlock must return an error when a corrupted block is requested
97 //     (the contents of the file do not checksum to its hash).
98 //
99 func TestGetBlockCorrupt(t *testing.T) {
100         defer teardown()
101
102         // Create two test Keep volumes and store a corrupt block in one.
103         KeepVM = MakeTestVolumeManager(2)
104         defer KeepVM.Close()
105
106         vols := KeepVM.AllReadable()
107         vols[0].Put(TestHash, BadBlock)
108
109         // Check that GetBlock returns failure.
110         result, err := GetBlock(TestHash)
111         if err != DiskHashError {
112                 t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, result)
113         }
114 }
115
116 // ========================================
117 // PutBlock tests
118 // ========================================
119
120 // TestPutBlockOK
121 //     PutBlock can perform a simple block write and returns success.
122 //
123 func TestPutBlockOK(t *testing.T) {
124         defer teardown()
125
126         // Create two test Keep volumes.
127         KeepVM = MakeTestVolumeManager(2)
128         defer KeepVM.Close()
129
130         // Check that PutBlock stores the data as expected.
131         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
132                 t.Fatalf("PutBlock: n %d err %v", n, err)
133         }
134
135         vols := KeepVM.AllReadable()
136         result, err := vols[1].Get(TestHash)
137         if err != nil {
138                 t.Fatalf("Volume #0 Get returned error: %v", err)
139         }
140         if string(result) != string(TestBlock) {
141                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
142                         string(TestBlock), string(result))
143         }
144 }
145
146 // TestPutBlockOneVol
147 //     PutBlock still returns success even when only one of the known
148 //     volumes is online.
149 //
150 func TestPutBlockOneVol(t *testing.T) {
151         defer teardown()
152
153         // Create two test Keep volumes, but cripple one of them.
154         KeepVM = MakeTestVolumeManager(2)
155         defer KeepVM.Close()
156
157         vols := KeepVM.AllWritable()
158         vols[0].(*MockVolume).Bad = true
159
160         // Check that PutBlock stores the data as expected.
161         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
162                 t.Fatalf("PutBlock: n %d err %v", n, err)
163         }
164
165         result, err := GetBlock(TestHash)
166         if err != nil {
167                 t.Fatalf("GetBlock: %v", err)
168         }
169         if string(result) != string(TestBlock) {
170                 t.Error("PutBlock/GetBlock mismatch")
171                 t.Fatalf("PutBlock stored '%s', GetBlock retrieved '%s'",
172                         string(TestBlock), string(result))
173         }
174 }
175
176 // TestPutBlockMD5Fail
177 //     Check that PutBlock returns an error if passed a block and hash that
178 //     do not match.
179 //
180 func TestPutBlockMD5Fail(t *testing.T) {
181         defer teardown()
182
183         // Create two test Keep volumes.
184         KeepVM = MakeTestVolumeManager(2)
185         defer KeepVM.Close()
186
187         // Check that PutBlock returns the expected error when the hash does
188         // not match the block.
189         if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
190                 t.Errorf("Expected RequestHashError, got %v", err)
191         }
192
193         // Confirm that GetBlock fails to return anything.
194         if result, err := GetBlock(TestHash); err != NotFoundError {
195                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
196                         string(result), err)
197         }
198 }
199
200 // TestPutBlockCorrupt
201 //     PutBlock should overwrite corrupt blocks on disk when given
202 //     a PUT request with a good block.
203 //
204 func TestPutBlockCorrupt(t *testing.T) {
205         defer teardown()
206
207         // Create two test Keep volumes.
208         KeepVM = MakeTestVolumeManager(2)
209         defer KeepVM.Close()
210
211         // Store a corrupted block under TestHash.
212         vols := KeepVM.AllWritable()
213         vols[0].Put(TestHash, BadBlock)
214         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
215                 t.Errorf("PutBlock: n %d err %v", n, err)
216         }
217
218         // The block on disk should now match TestBlock.
219         if block, err := GetBlock(TestHash); err != nil {
220                 t.Errorf("GetBlock: %v", err)
221         } else if bytes.Compare(block, TestBlock) != 0 {
222                 t.Errorf("GetBlock returned: '%s'", string(block))
223         }
224 }
225
226 // TestPutBlockCollision
227 //     PutBlock returns a 400 Collision error when attempting to
228 //     store a block that collides with another block on disk.
229 //
230 func TestPutBlockCollision(t *testing.T) {
231         defer teardown()
232
233         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
234         b1 := arvadostest.MD5CollisionData[0]
235         b2 := arvadostest.MD5CollisionData[1]
236         locator := arvadostest.MD5CollisionMD5
237
238         // Prepare two test Keep volumes.
239         KeepVM = MakeTestVolumeManager(2)
240         defer KeepVM.Close()
241
242         // Store one block, then attempt to store the other. Confirm that
243         // PutBlock reported a CollisionError.
244         if _, err := PutBlock(b1, locator); err != nil {
245                 t.Error(err)
246         }
247         if _, err := PutBlock(b2, locator); err == nil {
248                 t.Error("PutBlock did not report a collision")
249         } else if err != CollisionError {
250                 t.Errorf("PutBlock returned %v", err)
251         }
252 }
253
254 // TestPutBlockTouchFails
255 //     When PutBlock is asked to PUT an existing block, but cannot
256 //     modify the timestamp, it should write a second block.
257 //
258 func TestPutBlockTouchFails(t *testing.T) {
259         defer teardown()
260
261         // Prepare two test Keep volumes.
262         KeepVM = MakeTestVolumeManager(2)
263         defer KeepVM.Close()
264         vols := KeepVM.AllWritable()
265
266         // Store a block and then make the underlying volume bad,
267         // so a subsequent attempt to update the file timestamp
268         // will fail.
269         vols[0].Put(TestHash, BadBlock)
270         oldMtime, err := vols[0].Mtime(TestHash)
271         if err != nil {
272                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
273         }
274
275         // vols[0].Touch will fail on the next call, so the volume
276         // manager will store a copy on vols[1] instead.
277         vols[0].(*MockVolume).Touchable = false
278         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
279                 t.Fatalf("PutBlock: n %d err %v", n, err)
280         }
281         vols[0].(*MockVolume).Touchable = true
282
283         // Now the mtime on the block on vols[0] should be unchanged, and
284         // there should be a copy of the block on vols[1].
285         newMtime, err := vols[0].Mtime(TestHash)
286         if err != nil {
287                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
288         }
289         if !newMtime.Equal(oldMtime) {
290                 t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
291                         oldMtime, newMtime)
292         }
293         result, err := vols[1].Get(TestHash)
294         if err != nil {
295                 t.Fatalf("vols[1]: %v", err)
296         }
297         if bytes.Compare(result, TestBlock) != 0 {
298                 t.Errorf("new block does not match test block\nnew block = %v\n", result)
299         }
300 }
301
302 func TestDiscoverTmpfs(t *testing.T) {
303         var tempVols [4]string
304         var err error
305
306         // Create some directories suitable for using as keep volumes.
307         for i := range tempVols {
308                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
309                         t.Fatal(err)
310                 }
311                 defer os.RemoveAll(tempVols[i])
312                 tempVols[i] = tempVols[i] + "/keep"
313                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
314                         t.Fatal(err)
315                 }
316         }
317
318         // Set up a bogus ProcMounts file.
319         f, err := ioutil.TempFile("", "keeptest")
320         if err != nil {
321                 t.Fatal(err)
322         }
323         defer os.Remove(f.Name())
324         for i, vol := range tempVols {
325                 // Add readonly mount points at odd indexes.
326                 var opts string
327                 switch i % 2 {
328                 case 0:
329                         opts = "rw,nosuid,nodev,noexec"
330                 case 1:
331                         opts = "nosuid,nodev,noexec,ro"
332                 }
333                 fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
334         }
335         f.Close()
336         ProcMounts = f.Name()
337
338         resultVols := volumeSet{}
339         added := (&unixVolumeAdder{&resultVols}).Discover()
340
341         if added != len(resultVols) {
342                 t.Errorf("Discover returned %d, but added %d volumes",
343                         added, len(resultVols))
344         }
345         if added != len(tempVols) {
346                 t.Errorf("Discover returned %d but we set up %d volumes",
347                         added, len(tempVols))
348         }
349         for i, tmpdir := range tempVols {
350                 if tmpdir != resultVols[i].(*UnixVolume).root {
351                         t.Errorf("Discover returned %s, expected %s\n",
352                                 resultVols[i].(*UnixVolume).root, tmpdir)
353                 }
354                 if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
355                         t.Errorf("Discover added %s with readonly=%v, should be %v",
356                                 tmpdir, !expectReadonly, expectReadonly)
357                 }
358         }
359 }
360
361 func TestDiscoverNone(t *testing.T) {
362         defer teardown()
363
364         // Set up a bogus ProcMounts file with no Keep vols.
365         f, err := ioutil.TempFile("", "keeptest")
366         if err != nil {
367                 t.Fatal(err)
368         }
369         defer os.Remove(f.Name())
370         fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
371         fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
372         fmt.Fprintln(f, "proc /proc proc opts 0 0")
373         fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
374         fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
375         f.Close()
376         ProcMounts = f.Name()
377
378         resultVols := volumeSet{}
379         added := (&unixVolumeAdder{&resultVols}).Discover()
380         if added != 0 || len(resultVols) != 0 {
381                 t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
382         }
383 }
384
385 // TestIndex
386 //     Test an /index request.
387 func TestIndex(t *testing.T) {
388         defer teardown()
389
390         // Set up Keep volumes and populate them.
391         // Include multiple blocks on different volumes, and
392         // some metadata files.
393         KeepVM = MakeTestVolumeManager(2)
394         defer KeepVM.Close()
395
396         vols := KeepVM.AllReadable()
397         vols[0].Put(TestHash, TestBlock)
398         vols[1].Put(TestHash2, TestBlock2)
399         vols[0].Put(TestHash3, TestBlock3)
400         vols[0].Put(TestHash+".meta", []byte("metadata"))
401         vols[1].Put(TestHash2+".meta", []byte("metadata"))
402
403         buf := new(bytes.Buffer)
404         vols[0].IndexTo("", buf)
405         vols[1].IndexTo("", buf)
406         indexRows := strings.Split(string(buf.Bytes()), "\n")
407         sort.Strings(indexRows)
408         sortedIndex := strings.Join(indexRows, "\n")
409         expected := `^\n` + TestHash + `\+\d+ \d+\n` +
410                 TestHash3 + `\+\d+ \d+\n` +
411                 TestHash2 + `\+\d+ \d+$`
412
413         match, err := regexp.MatchString(expected, sortedIndex)
414         if err == nil {
415                 if !match {
416                         t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
417                 }
418         } else {
419                 t.Errorf("regexp.MatchString: %s", err)
420         }
421 }
422
423 // ========================================
424 // Helper functions for unit tests.
425 // ========================================
426
427 // MakeTestVolumeManager returns a RRVolumeManager with the specified
428 // number of MockVolumes.
429 func MakeTestVolumeManager(numVolumes int) VolumeManager {
430         vols := make([]Volume, numVolumes)
431         for i := range vols {
432                 vols[i] = CreateMockVolume()
433         }
434         return MakeRRVolumeManager(vols)
435 }
436
437 // teardown cleans up after each test.
438 func teardown() {
439         dataManagerToken = ""
440         enforcePermissions = false
441         PermissionSecret = nil
442         KeepVM = nil
443 }