9068: Move buffer allocation from volumes to GetBlockHandler.
[arvados.git] / services / keepstore / keepstore_test.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io/ioutil"
7         "os"
8         "path"
9         "regexp"
10         "sort"
11         "strings"
12         "testing"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
15 )
16
17 var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
18 var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
19 var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
20
21 var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
22 var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
23
24 var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
25 var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
26
27 // BadBlock is used to test collisions and corruption.
28 // It must not match any test hashes.
29 var BadBlock = []byte("The magic words are squeamish ossifrage.")
30
31 // Empty block
32 var EmptyHash = "d41d8cd98f00b204e9800998ecf8427e"
33 var EmptyBlock = []byte("")
34
35 // TODO(twp): Tests still to be written
36 //
37 //   * TestPutBlockFull
38 //       - test that PutBlock returns 503 Full if the filesystem is full.
39 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
40 //
41 //   * TestPutBlockWriteErr
42 //       - test the behavior when Write returns an error.
43 //           - Possible solutions: use a small tmpfs and a high
44 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
45 //             to write a block larger than the amount of space left
46 //           - use an interface to mock ioutil.TempFile with a File
47 //             object that always returns an error on write
48 //
49 // ========================================
50 // GetBlock tests.
51 // ========================================
52
53 // TestGetBlock
54 //     Test that simple block reads succeed.
55 //
56 func TestGetBlock(t *testing.T) {
57         defer teardown()
58
59         // Prepare two test Keep volumes. Our block is stored on the second volume.
60         KeepVM = MakeTestVolumeManager(2)
61         defer KeepVM.Close()
62
63         vols := KeepVM.AllReadable()
64         if err := vols[1].Put(TestHash, TestBlock); err != nil {
65                 t.Error(err)
66         }
67
68         // Check that GetBlock returns success.
69         buf := make([]byte, BlockSize)
70         size, err := GetBlock(TestHash, buf, nil)
71         if err != nil {
72                 t.Errorf("GetBlock error: %s", err)
73         }
74         if bytes.Compare(buf[:size], TestBlock) != 0 {
75                 t.Errorf("got %v, expected %v", buf[:size], TestBlock)
76         }
77 }
78
79 // TestGetBlockMissing
80 //     GetBlock must return an error when the block is not found.
81 //
82 func TestGetBlockMissing(t *testing.T) {
83         defer teardown()
84
85         // Create two empty test Keep volumes.
86         KeepVM = MakeTestVolumeManager(2)
87         defer KeepVM.Close()
88
89         // Check that GetBlock returns failure.
90         buf := make([]byte, BlockSize)
91         size, err := GetBlock(TestHash, buf, nil)
92         if err != NotFoundError {
93                 t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
94         }
95 }
96
97 // TestGetBlockCorrupt
98 //     GetBlock must return an error when a corrupted block is requested
99 //     (the contents of the file do not checksum to its hash).
100 //
101 func TestGetBlockCorrupt(t *testing.T) {
102         defer teardown()
103
104         // Create two test Keep volumes and store a corrupt block in one.
105         KeepVM = MakeTestVolumeManager(2)
106         defer KeepVM.Close()
107
108         vols := KeepVM.AllReadable()
109         vols[0].Put(TestHash, BadBlock)
110
111         // Check that GetBlock returns failure.
112         buf := make([]byte, BlockSize)
113         size, err := GetBlock(TestHash, buf, nil)
114         if err != DiskHashError {
115                 t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
116         }
117 }
118
119 // ========================================
120 // PutBlock tests
121 // ========================================
122
123 // TestPutBlockOK
124 //     PutBlock can perform a simple block write and returns success.
125 //
126 func TestPutBlockOK(t *testing.T) {
127         defer teardown()
128
129         // Create two test Keep volumes.
130         KeepVM = MakeTestVolumeManager(2)
131         defer KeepVM.Close()
132
133         // Check that PutBlock stores the data as expected.
134         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
135                 t.Fatalf("PutBlock: n %d err %v", n, err)
136         }
137
138         vols := KeepVM.AllReadable()
139         buf := make([]byte, BlockSize)
140         n, err := vols[1].Get(TestHash, buf)
141         if err != nil {
142                 t.Fatalf("Volume #0 Get returned error: %v", err)
143         }
144         if string(buf[:n]) != string(TestBlock) {
145                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
146                         string(TestBlock), string(buf[:n]))
147         }
148 }
149
150 // TestPutBlockOneVol
151 //     PutBlock still returns success even when only one of the known
152 //     volumes is online.
153 //
154 func TestPutBlockOneVol(t *testing.T) {
155         defer teardown()
156
157         // Create two test Keep volumes, but cripple one of them.
158         KeepVM = MakeTestVolumeManager(2)
159         defer KeepVM.Close()
160
161         vols := KeepVM.AllWritable()
162         vols[0].(*MockVolume).Bad = true
163
164         // Check that PutBlock stores the data as expected.
165         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
166                 t.Fatalf("PutBlock: n %d err %v", n, err)
167         }
168
169         buf := make([]byte, BlockSize)
170         size, err := GetBlock(TestHash, buf, nil)
171         if err != nil {
172                 t.Fatalf("GetBlock: %v", err)
173         }
174         if bytes.Compare(buf[:size], TestBlock) != 0 {
175                 t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
176                         TestBlock, buf[:size])
177         }
178 }
179
180 // TestPutBlockMD5Fail
181 //     Check that PutBlock returns an error if passed a block and hash that
182 //     do not match.
183 //
184 func TestPutBlockMD5Fail(t *testing.T) {
185         defer teardown()
186
187         // Create two test Keep volumes.
188         KeepVM = MakeTestVolumeManager(2)
189         defer KeepVM.Close()
190
191         // Check that PutBlock returns the expected error when the hash does
192         // not match the block.
193         if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
194                 t.Errorf("Expected RequestHashError, got %v", err)
195         }
196
197         // Confirm that GetBlock fails to return anything.
198         if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
199                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
200                         string(result), err)
201         }
202 }
203
204 // TestPutBlockCorrupt
205 //     PutBlock should overwrite corrupt blocks on disk when given
206 //     a PUT request with a good block.
207 //
208 func TestPutBlockCorrupt(t *testing.T) {
209         defer teardown()
210
211         // Create two test Keep volumes.
212         KeepVM = MakeTestVolumeManager(2)
213         defer KeepVM.Close()
214
215         // Store a corrupted block under TestHash.
216         vols := KeepVM.AllWritable()
217         vols[0].Put(TestHash, BadBlock)
218         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
219                 t.Errorf("PutBlock: n %d err %v", n, err)
220         }
221
222         // The block on disk should now match TestBlock.
223         buf := make([]byte, BlockSize)
224         if size, err := GetBlock(TestHash, buf, nil); err != nil {
225                 t.Errorf("GetBlock: %v", err)
226         } else if bytes.Compare(buf[:size], TestBlock) != 0 {
227                 t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
228         }
229 }
230
231 // TestPutBlockCollision
232 //     PutBlock returns a 400 Collision error when attempting to
233 //     store a block that collides with another block on disk.
234 //
235 func TestPutBlockCollision(t *testing.T) {
236         defer teardown()
237
238         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
239         b1 := arvadostest.MD5CollisionData[0]
240         b2 := arvadostest.MD5CollisionData[1]
241         locator := arvadostest.MD5CollisionMD5
242
243         // Prepare two test Keep volumes.
244         KeepVM = MakeTestVolumeManager(2)
245         defer KeepVM.Close()
246
247         // Store one block, then attempt to store the other. Confirm that
248         // PutBlock reported a CollisionError.
249         if _, err := PutBlock(b1, locator); err != nil {
250                 t.Error(err)
251         }
252         if _, err := PutBlock(b2, locator); err == nil {
253                 t.Error("PutBlock did not report a collision")
254         } else if err != CollisionError {
255                 t.Errorf("PutBlock returned %v", err)
256         }
257 }
258
259 // TestPutBlockTouchFails
260 //     When PutBlock is asked to PUT an existing block, but cannot
261 //     modify the timestamp, it should write a second block.
262 //
263 func TestPutBlockTouchFails(t *testing.T) {
264         defer teardown()
265
266         // Prepare two test Keep volumes.
267         KeepVM = MakeTestVolumeManager(2)
268         defer KeepVM.Close()
269         vols := KeepVM.AllWritable()
270
271         // Store a block and then make the underlying volume bad,
272         // so a subsequent attempt to update the file timestamp
273         // will fail.
274         vols[0].Put(TestHash, BadBlock)
275         oldMtime, err := vols[0].Mtime(TestHash)
276         if err != nil {
277                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
278         }
279
280         // vols[0].Touch will fail on the next call, so the volume
281         // manager will store a copy on vols[1] instead.
282         vols[0].(*MockVolume).Touchable = false
283         if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
284                 t.Fatalf("PutBlock: n %d err %v", n, err)
285         }
286         vols[0].(*MockVolume).Touchable = true
287
288         // Now the mtime on the block on vols[0] should be unchanged, and
289         // there should be a copy of the block on vols[1].
290         newMtime, err := vols[0].Mtime(TestHash)
291         if err != nil {
292                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
293         }
294         if !newMtime.Equal(oldMtime) {
295                 t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
296                         oldMtime, newMtime)
297         }
298         buf := make([]byte, BlockSize)
299         n, err := vols[1].Get(TestHash, buf)
300         if err != nil {
301                 t.Fatalf("vols[1]: %v", err)
302         }
303         if bytes.Compare(buf[:n], TestBlock) != 0 {
304                 t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
305         }
306 }
307
308 func TestDiscoverTmpfs(t *testing.T) {
309         var tempVols [4]string
310         var err error
311
312         // Create some directories suitable for using as keep volumes.
313         for i := range tempVols {
314                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
315                         t.Fatal(err)
316                 }
317                 defer os.RemoveAll(tempVols[i])
318                 tempVols[i] = tempVols[i] + "/keep"
319                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
320                         t.Fatal(err)
321                 }
322         }
323
324         // Set up a bogus ProcMounts file.
325         f, err := ioutil.TempFile("", "keeptest")
326         if err != nil {
327                 t.Fatal(err)
328         }
329         defer os.Remove(f.Name())
330         for i, vol := range tempVols {
331                 // Add readonly mount points at odd indexes.
332                 var opts string
333                 switch i % 2 {
334                 case 0:
335                         opts = "rw,nosuid,nodev,noexec"
336                 case 1:
337                         opts = "nosuid,nodev,noexec,ro"
338                 }
339                 fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
340         }
341         f.Close()
342         ProcMounts = f.Name()
343
344         resultVols := volumeSet{}
345         added := (&unixVolumeAdder{&resultVols}).Discover()
346
347         if added != len(resultVols) {
348                 t.Errorf("Discover returned %d, but added %d volumes",
349                         added, len(resultVols))
350         }
351         if added != len(tempVols) {
352                 t.Errorf("Discover returned %d but we set up %d volumes",
353                         added, len(tempVols))
354         }
355         for i, tmpdir := range tempVols {
356                 if tmpdir != resultVols[i].(*UnixVolume).root {
357                         t.Errorf("Discover returned %s, expected %s\n",
358                                 resultVols[i].(*UnixVolume).root, tmpdir)
359                 }
360                 if expectReadonly := i%2 == 1; expectReadonly != resultVols[i].(*UnixVolume).readonly {
361                         t.Errorf("Discover added %s with readonly=%v, should be %v",
362                                 tmpdir, !expectReadonly, expectReadonly)
363                 }
364         }
365 }
366
367 func TestDiscoverNone(t *testing.T) {
368         defer teardown()
369
370         // Set up a bogus ProcMounts file with no Keep vols.
371         f, err := ioutil.TempFile("", "keeptest")
372         if err != nil {
373                 t.Fatal(err)
374         }
375         defer os.Remove(f.Name())
376         fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
377         fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
378         fmt.Fprintln(f, "proc /proc proc opts 0 0")
379         fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
380         fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
381         f.Close()
382         ProcMounts = f.Name()
383
384         resultVols := volumeSet{}
385         added := (&unixVolumeAdder{&resultVols}).Discover()
386         if added != 0 || len(resultVols) != 0 {
387                 t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
388         }
389 }
390
391 // TestIndex
392 //     Test an /index request.
393 func TestIndex(t *testing.T) {
394         defer teardown()
395
396         // Set up Keep volumes and populate them.
397         // Include multiple blocks on different volumes, and
398         // some metadata files.
399         KeepVM = MakeTestVolumeManager(2)
400         defer KeepVM.Close()
401
402         vols := KeepVM.AllReadable()
403         vols[0].Put(TestHash, TestBlock)
404         vols[1].Put(TestHash2, TestBlock2)
405         vols[0].Put(TestHash3, TestBlock3)
406         vols[0].Put(TestHash+".meta", []byte("metadata"))
407         vols[1].Put(TestHash2+".meta", []byte("metadata"))
408
409         buf := new(bytes.Buffer)
410         vols[0].IndexTo("", buf)
411         vols[1].IndexTo("", buf)
412         indexRows := strings.Split(string(buf.Bytes()), "\n")
413         sort.Strings(indexRows)
414         sortedIndex := strings.Join(indexRows, "\n")
415         expected := `^\n` + TestHash + `\+\d+ \d+\n` +
416                 TestHash3 + `\+\d+ \d+\n` +
417                 TestHash2 + `\+\d+ \d+$`
418
419         match, err := regexp.MatchString(expected, sortedIndex)
420         if err == nil {
421                 if !match {
422                         t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
423                 }
424         } else {
425                 t.Errorf("regexp.MatchString: %s", err)
426         }
427 }
428
429 // ========================================
430 // Helper functions for unit tests.
431 // ========================================
432
433 // MakeTestVolumeManager returns a RRVolumeManager with the specified
434 // number of MockVolumes.
435 func MakeTestVolumeManager(numVolumes int) VolumeManager {
436         vols := make([]Volume, numVolumes)
437         for i := range vols {
438                 vols[i] = CreateMockVolume()
439         }
440         return MakeRRVolumeManager(vols)
441 }
442
443 // teardown cleans up after each test.
444 func teardown() {
445         dataManagerToken = ""
446         enforcePermissions = false
447         PermissionSecret = nil
448         KeepVM = nil
449 }