15112: Stop processing collections on first error.
[arvados.git] / services / keepstore / keepstore_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "errors"
11         "fmt"
12         "io/ioutil"
13         "os"
14         "path"
15         "regexp"
16         "sort"
17         "strings"
18         "testing"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
21 )
22
23 var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
24 var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
25 var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
26
27 var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
28 var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
29
30 var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
31 var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
32
33 // BadBlock is used to test collisions and corruption.
34 // It must not match any test hashes.
35 var BadBlock = []byte("The magic words are squeamish ossifrage.")
36
37 // Empty block
38 var EmptyHash = "d41d8cd98f00b204e9800998ecf8427e"
39 var EmptyBlock = []byte("")
40
41 // TODO(twp): Tests still to be written
42 //
43 //   * TestPutBlockFull
44 //       - test that PutBlock returns 503 Full if the filesystem is full.
45 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
46 //
47 //   * TestPutBlockWriteErr
48 //       - test the behavior when Write returns an error.
49 //           - Possible solutions: use a small tmpfs and a high
50 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
51 //             to write a block larger than the amount of space left
52 //           - use an interface to mock ioutil.TempFile with a File
53 //             object that always returns an error on write
54 //
55 // ========================================
56 // GetBlock tests.
57 // ========================================
58
59 // TestGetBlock
60 //     Test that simple block reads succeed.
61 //
62 func TestGetBlock(t *testing.T) {
63         defer teardown()
64
65         // Prepare two test Keep volumes. Our block is stored on the second volume.
66         KeepVM = MakeTestVolumeManager(2)
67         defer KeepVM.Close()
68
69         vols := KeepVM.AllReadable()
70         if err := vols[1].Put(context.Background(), TestHash, TestBlock); err != nil {
71                 t.Error(err)
72         }
73
74         // Check that GetBlock returns success.
75         buf := make([]byte, BlockSize)
76         size, err := GetBlock(context.Background(), TestHash, buf, nil)
77         if err != nil {
78                 t.Errorf("GetBlock error: %s", err)
79         }
80         if bytes.Compare(buf[:size], TestBlock) != 0 {
81                 t.Errorf("got %v, expected %v", buf[:size], TestBlock)
82         }
83 }
84
85 // TestGetBlockMissing
86 //     GetBlock must return an error when the block is not found.
87 //
88 func TestGetBlockMissing(t *testing.T) {
89         defer teardown()
90
91         // Create two empty test Keep volumes.
92         KeepVM = MakeTestVolumeManager(2)
93         defer KeepVM.Close()
94
95         // Check that GetBlock returns failure.
96         buf := make([]byte, BlockSize)
97         size, err := GetBlock(context.Background(), TestHash, buf, nil)
98         if err != NotFoundError {
99                 t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
100         }
101 }
102
103 // TestGetBlockCorrupt
104 //     GetBlock must return an error when a corrupted block is requested
105 //     (the contents of the file do not checksum to its hash).
106 //
107 func TestGetBlockCorrupt(t *testing.T) {
108         defer teardown()
109
110         // Create two test Keep volumes and store a corrupt block in one.
111         KeepVM = MakeTestVolumeManager(2)
112         defer KeepVM.Close()
113
114         vols := KeepVM.AllReadable()
115         vols[0].Put(context.Background(), TestHash, BadBlock)
116
117         // Check that GetBlock returns failure.
118         buf := make([]byte, BlockSize)
119         size, err := GetBlock(context.Background(), TestHash, buf, nil)
120         if err != DiskHashError {
121                 t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
122         }
123 }
124
125 // ========================================
126 // PutBlock tests
127 // ========================================
128
129 // TestPutBlockOK
130 //     PutBlock can perform a simple block write and returns success.
131 //
132 func TestPutBlockOK(t *testing.T) {
133         defer teardown()
134
135         // Create two test Keep volumes.
136         KeepVM = MakeTestVolumeManager(2)
137         defer KeepVM.Close()
138
139         // Check that PutBlock stores the data as expected.
140         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
141                 t.Fatalf("PutBlock: n %d err %v", n, err)
142         }
143
144         vols := KeepVM.AllReadable()
145         buf := make([]byte, BlockSize)
146         n, err := vols[1].Get(context.Background(), TestHash, buf)
147         if err != nil {
148                 t.Fatalf("Volume #0 Get returned error: %v", err)
149         }
150         if string(buf[:n]) != string(TestBlock) {
151                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
152                         string(TestBlock), string(buf[:n]))
153         }
154 }
155
156 // TestPutBlockOneVol
157 //     PutBlock still returns success even when only one of the known
158 //     volumes is online.
159 //
160 func TestPutBlockOneVol(t *testing.T) {
161         defer teardown()
162
163         // Create two test Keep volumes, but cripple one of them.
164         KeepVM = MakeTestVolumeManager(2)
165         defer KeepVM.Close()
166
167         vols := KeepVM.AllWritable()
168         vols[0].(*MockVolume).Bad = true
169         vols[0].(*MockVolume).BadVolumeError = errors.New("Bad volume")
170
171         // Check that PutBlock stores the data as expected.
172         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
173                 t.Fatalf("PutBlock: n %d err %v", n, err)
174         }
175
176         buf := make([]byte, BlockSize)
177         size, err := GetBlock(context.Background(), TestHash, buf, nil)
178         if err != nil {
179                 t.Fatalf("GetBlock: %v", err)
180         }
181         if bytes.Compare(buf[:size], TestBlock) != 0 {
182                 t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
183                         TestBlock, buf[:size])
184         }
185 }
186
187 // TestPutBlockMD5Fail
188 //     Check that PutBlock returns an error if passed a block and hash that
189 //     do not match.
190 //
191 func TestPutBlockMD5Fail(t *testing.T) {
192         defer teardown()
193
194         // Create two test Keep volumes.
195         KeepVM = MakeTestVolumeManager(2)
196         defer KeepVM.Close()
197
198         // Check that PutBlock returns the expected error when the hash does
199         // not match the block.
200         if _, err := PutBlock(context.Background(), BadBlock, TestHash); err != RequestHashError {
201                 t.Errorf("Expected RequestHashError, got %v", err)
202         }
203
204         // Confirm that GetBlock fails to return anything.
205         if result, err := GetBlock(context.Background(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
206                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
207                         string(result), err)
208         }
209 }
210
211 // TestPutBlockCorrupt
212 //     PutBlock should overwrite corrupt blocks on disk when given
213 //     a PUT request with a good block.
214 //
215 func TestPutBlockCorrupt(t *testing.T) {
216         defer teardown()
217
218         // Create two test Keep volumes.
219         KeepVM = MakeTestVolumeManager(2)
220         defer KeepVM.Close()
221
222         // Store a corrupted block under TestHash.
223         vols := KeepVM.AllWritable()
224         vols[0].Put(context.Background(), TestHash, BadBlock)
225         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
226                 t.Errorf("PutBlock: n %d err %v", n, err)
227         }
228
229         // The block on disk should now match TestBlock.
230         buf := make([]byte, BlockSize)
231         if size, err := GetBlock(context.Background(), TestHash, buf, nil); err != nil {
232                 t.Errorf("GetBlock: %v", err)
233         } else if bytes.Compare(buf[:size], TestBlock) != 0 {
234                 t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
235         }
236 }
237
238 // TestPutBlockCollision
239 //     PutBlock returns a 400 Collision error when attempting to
240 //     store a block that collides with another block on disk.
241 //
242 func TestPutBlockCollision(t *testing.T) {
243         defer teardown()
244
245         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
246         b1 := arvadostest.MD5CollisionData[0]
247         b2 := arvadostest.MD5CollisionData[1]
248         locator := arvadostest.MD5CollisionMD5
249
250         // Prepare two test Keep volumes.
251         KeepVM = MakeTestVolumeManager(2)
252         defer KeepVM.Close()
253
254         // Store one block, then attempt to store the other. Confirm that
255         // PutBlock reported a CollisionError.
256         if _, err := PutBlock(context.Background(), b1, locator); err != nil {
257                 t.Error(err)
258         }
259         if _, err := PutBlock(context.Background(), b2, locator); err == nil {
260                 t.Error("PutBlock did not report a collision")
261         } else if err != CollisionError {
262                 t.Errorf("PutBlock returned %v", err)
263         }
264 }
265
266 // TestPutBlockTouchFails
267 //     When PutBlock is asked to PUT an existing block, but cannot
268 //     modify the timestamp, it should write a second block.
269 //
270 func TestPutBlockTouchFails(t *testing.T) {
271         defer teardown()
272
273         // Prepare two test Keep volumes.
274         KeepVM = MakeTestVolumeManager(2)
275         defer KeepVM.Close()
276         vols := KeepVM.AllWritable()
277
278         // Store a block and then make the underlying volume bad,
279         // so a subsequent attempt to update the file timestamp
280         // will fail.
281         vols[0].Put(context.Background(), TestHash, BadBlock)
282         oldMtime, err := vols[0].Mtime(TestHash)
283         if err != nil {
284                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
285         }
286
287         // vols[0].Touch will fail on the next call, so the volume
288         // manager will store a copy on vols[1] instead.
289         vols[0].(*MockVolume).Touchable = false
290         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
291                 t.Fatalf("PutBlock: n %d err %v", n, err)
292         }
293         vols[0].(*MockVolume).Touchable = true
294
295         // Now the mtime on the block on vols[0] should be unchanged, and
296         // there should be a copy of the block on vols[1].
297         newMtime, err := vols[0].Mtime(TestHash)
298         if err != nil {
299                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
300         }
301         if !newMtime.Equal(oldMtime) {
302                 t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
303                         oldMtime, newMtime)
304         }
305         buf := make([]byte, BlockSize)
306         n, err := vols[1].Get(context.Background(), TestHash, buf)
307         if err != nil {
308                 t.Fatalf("vols[1]: %v", err)
309         }
310         if bytes.Compare(buf[:n], TestBlock) != 0 {
311                 t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
312         }
313 }
314
315 func TestDiscoverTmpfs(t *testing.T) {
316         var tempVols [4]string
317         var err error
318
319         // Create some directories suitable for using as keep volumes.
320         for i := range tempVols {
321                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
322                         t.Fatal(err)
323                 }
324                 defer os.RemoveAll(tempVols[i])
325                 tempVols[i] = tempVols[i] + "/keep"
326                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
327                         t.Fatal(err)
328                 }
329         }
330
331         // Set up a bogus ProcMounts file.
332         f, err := ioutil.TempFile("", "keeptest")
333         if err != nil {
334                 t.Fatal(err)
335         }
336         defer os.Remove(f.Name())
337         for i, vol := range tempVols {
338                 // Add readonly mount points at odd indexes.
339                 var opts string
340                 switch i % 2 {
341                 case 0:
342                         opts = "rw,nosuid,nodev,noexec"
343                 case 1:
344                         opts = "nosuid,nodev,noexec,ro"
345                 }
346                 fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
347         }
348         f.Close()
349         ProcMounts = f.Name()
350
351         cfg := &Config{}
352         added := (&unixVolumeAdder{cfg}).Discover()
353
354         if added != len(cfg.Volumes) {
355                 t.Errorf("Discover returned %d, but added %d volumes",
356                         added, len(cfg.Volumes))
357         }
358         if added != len(tempVols) {
359                 t.Errorf("Discover returned %d but we set up %d volumes",
360                         added, len(tempVols))
361         }
362         for i, tmpdir := range tempVols {
363                 if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
364                         t.Errorf("Discover returned %s, expected %s\n",
365                                 cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
366                 }
367                 if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
368                         t.Errorf("Discover added %s with readonly=%v, should be %v",
369                                 tmpdir, !expectReadonly, expectReadonly)
370                 }
371         }
372 }
373
374 func TestDiscoverNone(t *testing.T) {
375         defer teardown()
376
377         // Set up a bogus ProcMounts file with no Keep vols.
378         f, err := ioutil.TempFile("", "keeptest")
379         if err != nil {
380                 t.Fatal(err)
381         }
382         defer os.Remove(f.Name())
383         fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
384         fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
385         fmt.Fprintln(f, "proc /proc proc opts 0 0")
386         fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
387         fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
388         f.Close()
389         ProcMounts = f.Name()
390
391         cfg := &Config{}
392         added := (&unixVolumeAdder{cfg}).Discover()
393         if added != 0 || len(cfg.Volumes) != 0 {
394                 t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
395         }
396 }
397
398 // TestIndex
399 //     Test an /index request.
400 func TestIndex(t *testing.T) {
401         defer teardown()
402
403         // Set up Keep volumes and populate them.
404         // Include multiple blocks on different volumes, and
405         // some metadata files.
406         KeepVM = MakeTestVolumeManager(2)
407         defer KeepVM.Close()
408
409         vols := KeepVM.AllReadable()
410         vols[0].Put(context.Background(), TestHash, TestBlock)
411         vols[1].Put(context.Background(), TestHash2, TestBlock2)
412         vols[0].Put(context.Background(), TestHash3, TestBlock3)
413         vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
414         vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
415
416         buf := new(bytes.Buffer)
417         vols[0].IndexTo("", buf)
418         vols[1].IndexTo("", buf)
419         indexRows := strings.Split(string(buf.Bytes()), "\n")
420         sort.Strings(indexRows)
421         sortedIndex := strings.Join(indexRows, "\n")
422         expected := `^\n` + TestHash + `\+\d+ \d+\n` +
423                 TestHash3 + `\+\d+ \d+\n` +
424                 TestHash2 + `\+\d+ \d+$`
425
426         match, err := regexp.MatchString(expected, sortedIndex)
427         if err == nil {
428                 if !match {
429                         t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
430                 }
431         } else {
432                 t.Errorf("regexp.MatchString: %s", err)
433         }
434 }
435
436 // ========================================
437 // Helper functions for unit tests.
438 // ========================================
439
440 // MakeTestVolumeManager returns a RRVolumeManager with the specified
441 // number of MockVolumes.
442 func MakeTestVolumeManager(numVolumes int) VolumeManager {
443         vols := make([]Volume, numVolumes)
444         for i := range vols {
445                 vols[i] = CreateMockVolume()
446         }
447         return MakeRRVolumeManager(vols)
448 }
449
450 // teardown cleans up after each test.
451 func teardown() {
452         theConfig.systemAuthToken = ""
453         theConfig.RequireSignatures = false
454         theConfig.blobSigningKey = nil
455         KeepVM = nil
456 }