11645: Add support for StorageClasses in volume config.
[arvados.git] / services / keepstore / keepstore_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "fmt"
11         "io/ioutil"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strings"
17         "testing"
18
19         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
20 )
21
22 var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
23 var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
24 var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
25
26 var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
27 var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
28
29 var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
30 var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
31
32 // BadBlock is used to test collisions and corruption.
33 // It must not match any test hashes.
34 var BadBlock = []byte("The magic words are squeamish ossifrage.")
35
36 // Empty block
37 var EmptyHash = "d41d8cd98f00b204e9800998ecf8427e"
38 var EmptyBlock = []byte("")
39
40 // TODO(twp): Tests still to be written
41 //
42 //   * TestPutBlockFull
43 //       - test that PutBlock returns 503 Full if the filesystem is full.
44 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
45 //
46 //   * TestPutBlockWriteErr
47 //       - test the behavior when Write returns an error.
48 //           - Possible solutions: use a small tmpfs and a high
49 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
50 //             to write a block larger than the amount of space left
51 //           - use an interface to mock ioutil.TempFile with a File
52 //             object that always returns an error on write
53 //
54 // ========================================
55 // GetBlock tests.
56 // ========================================
57
58 // TestGetBlock
59 //     Test that simple block reads succeed.
60 //
61 func TestGetBlock(t *testing.T) {
62         defer teardown()
63
64         // Prepare two test Keep volumes. Our block is stored on the second volume.
65         KeepVM = MakeTestVolumeManager(2)
66         defer KeepVM.Close()
67
68         vols := KeepVM.AllReadable()
69         if err := vols[1].Put(context.Background(), TestHash, TestBlock); err != nil {
70                 t.Error(err)
71         }
72
73         // Check that GetBlock returns success.
74         buf := make([]byte, BlockSize)
75         size, err := GetBlock(context.Background(), TestHash, buf, nil)
76         if err != nil {
77                 t.Errorf("GetBlock error: %s", err)
78         }
79         if bytes.Compare(buf[:size], TestBlock) != 0 {
80                 t.Errorf("got %v, expected %v", buf[:size], TestBlock)
81         }
82 }
83
84 // TestGetBlockMissing
85 //     GetBlock must return an error when the block is not found.
86 //
87 func TestGetBlockMissing(t *testing.T) {
88         defer teardown()
89
90         // Create two empty test Keep volumes.
91         KeepVM = MakeTestVolumeManager(2)
92         defer KeepVM.Close()
93
94         // Check that GetBlock returns failure.
95         buf := make([]byte, BlockSize)
96         size, err := GetBlock(context.Background(), TestHash, buf, nil)
97         if err != NotFoundError {
98                 t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
99         }
100 }
101
102 // TestGetBlockCorrupt
103 //     GetBlock must return an error when a corrupted block is requested
104 //     (the contents of the file do not checksum to its hash).
105 //
106 func TestGetBlockCorrupt(t *testing.T) {
107         defer teardown()
108
109         // Create two test Keep volumes and store a corrupt block in one.
110         KeepVM = MakeTestVolumeManager(2)
111         defer KeepVM.Close()
112
113         vols := KeepVM.AllReadable()
114         vols[0].Put(context.Background(), TestHash, BadBlock)
115
116         // Check that GetBlock returns failure.
117         buf := make([]byte, BlockSize)
118         size, err := GetBlock(context.Background(), TestHash, buf, nil)
119         if err != DiskHashError {
120                 t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
121         }
122 }
123
124 // ========================================
125 // PutBlock tests
126 // ========================================
127
128 // TestPutBlockOK
129 //     PutBlock can perform a simple block write and returns success.
130 //
131 func TestPutBlockOK(t *testing.T) {
132         defer teardown()
133
134         // Create two test Keep volumes.
135         KeepVM = MakeTestVolumeManager(2)
136         defer KeepVM.Close()
137
138         // Check that PutBlock stores the data as expected.
139         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
140                 t.Fatalf("PutBlock: n %d err %v", n, err)
141         }
142
143         vols := KeepVM.AllReadable()
144         buf := make([]byte, BlockSize)
145         n, err := vols[1].Get(context.Background(), TestHash, buf)
146         if err != nil {
147                 t.Fatalf("Volume #0 Get returned error: %v", err)
148         }
149         if string(buf[:n]) != string(TestBlock) {
150                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
151                         string(TestBlock), string(buf[:n]))
152         }
153 }
154
155 // TestPutBlockOneVol
156 //     PutBlock still returns success even when only one of the known
157 //     volumes is online.
158 //
159 func TestPutBlockOneVol(t *testing.T) {
160         defer teardown()
161
162         // Create two test Keep volumes, but cripple one of them.
163         KeepVM = MakeTestVolumeManager(2)
164         defer KeepVM.Close()
165
166         vols := KeepVM.AllWritable()
167         vols[0].(*MockVolume).Bad = true
168
169         // Check that PutBlock stores the data as expected.
170         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
171                 t.Fatalf("PutBlock: n %d err %v", n, err)
172         }
173
174         buf := make([]byte, BlockSize)
175         size, err := GetBlock(context.Background(), TestHash, buf, nil)
176         if err != nil {
177                 t.Fatalf("GetBlock: %v", err)
178         }
179         if bytes.Compare(buf[:size], TestBlock) != 0 {
180                 t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
181                         TestBlock, buf[:size])
182         }
183 }
184
185 // TestPutBlockMD5Fail
186 //     Check that PutBlock returns an error if passed a block and hash that
187 //     do not match.
188 //
189 func TestPutBlockMD5Fail(t *testing.T) {
190         defer teardown()
191
192         // Create two test Keep volumes.
193         KeepVM = MakeTestVolumeManager(2)
194         defer KeepVM.Close()
195
196         // Check that PutBlock returns the expected error when the hash does
197         // not match the block.
198         if _, err := PutBlock(context.Background(), BadBlock, TestHash); err != RequestHashError {
199                 t.Errorf("Expected RequestHashError, got %v", err)
200         }
201
202         // Confirm that GetBlock fails to return anything.
203         if result, err := GetBlock(context.Background(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
204                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
205                         string(result), err)
206         }
207 }
208
209 // TestPutBlockCorrupt
210 //     PutBlock should overwrite corrupt blocks on disk when given
211 //     a PUT request with a good block.
212 //
213 func TestPutBlockCorrupt(t *testing.T) {
214         defer teardown()
215
216         // Create two test Keep volumes.
217         KeepVM = MakeTestVolumeManager(2)
218         defer KeepVM.Close()
219
220         // Store a corrupted block under TestHash.
221         vols := KeepVM.AllWritable()
222         vols[0].Put(context.Background(), TestHash, BadBlock)
223         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
224                 t.Errorf("PutBlock: n %d err %v", n, err)
225         }
226
227         // The block on disk should now match TestBlock.
228         buf := make([]byte, BlockSize)
229         if size, err := GetBlock(context.Background(), TestHash, buf, nil); err != nil {
230                 t.Errorf("GetBlock: %v", err)
231         } else if bytes.Compare(buf[:size], TestBlock) != 0 {
232                 t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
233         }
234 }
235
236 // TestPutBlockCollision
237 //     PutBlock returns a 400 Collision error when attempting to
238 //     store a block that collides with another block on disk.
239 //
240 func TestPutBlockCollision(t *testing.T) {
241         defer teardown()
242
243         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
244         b1 := arvadostest.MD5CollisionData[0]
245         b2 := arvadostest.MD5CollisionData[1]
246         locator := arvadostest.MD5CollisionMD5
247
248         // Prepare two test Keep volumes.
249         KeepVM = MakeTestVolumeManager(2)
250         defer KeepVM.Close()
251
252         // Store one block, then attempt to store the other. Confirm that
253         // PutBlock reported a CollisionError.
254         if _, err := PutBlock(context.Background(), b1, locator); err != nil {
255                 t.Error(err)
256         }
257         if _, err := PutBlock(context.Background(), b2, locator); err == nil {
258                 t.Error("PutBlock did not report a collision")
259         } else if err != CollisionError {
260                 t.Errorf("PutBlock returned %v", err)
261         }
262 }
263
264 // TestPutBlockTouchFails
265 //     When PutBlock is asked to PUT an existing block, but cannot
266 //     modify the timestamp, it should write a second block.
267 //
268 func TestPutBlockTouchFails(t *testing.T) {
269         defer teardown()
270
271         // Prepare two test Keep volumes.
272         KeepVM = MakeTestVolumeManager(2)
273         defer KeepVM.Close()
274         vols := KeepVM.AllWritable()
275
276         // Store a block and then make the underlying volume bad,
277         // so a subsequent attempt to update the file timestamp
278         // will fail.
279         vols[0].Put(context.Background(), TestHash, BadBlock)
280         oldMtime, err := vols[0].Mtime(TestHash)
281         if err != nil {
282                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
283         }
284
285         // vols[0].Touch will fail on the next call, so the volume
286         // manager will store a copy on vols[1] instead.
287         vols[0].(*MockVolume).Touchable = false
288         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
289                 t.Fatalf("PutBlock: n %d err %v", n, err)
290         }
291         vols[0].(*MockVolume).Touchable = true
292
293         // Now the mtime on the block on vols[0] should be unchanged, and
294         // there should be a copy of the block on vols[1].
295         newMtime, err := vols[0].Mtime(TestHash)
296         if err != nil {
297                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
298         }
299         if !newMtime.Equal(oldMtime) {
300                 t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
301                         oldMtime, newMtime)
302         }
303         buf := make([]byte, BlockSize)
304         n, err := vols[1].Get(context.Background(), TestHash, buf)
305         if err != nil {
306                 t.Fatalf("vols[1]: %v", err)
307         }
308         if bytes.Compare(buf[:n], TestBlock) != 0 {
309                 t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
310         }
311 }
312
313 func TestDiscoverTmpfs(t *testing.T) {
314         var tempVols [4]string
315         var err error
316
317         // Create some directories suitable for using as keep volumes.
318         for i := range tempVols {
319                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
320                         t.Fatal(err)
321                 }
322                 defer os.RemoveAll(tempVols[i])
323                 tempVols[i] = tempVols[i] + "/keep"
324                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
325                         t.Fatal(err)
326                 }
327         }
328
329         // Set up a bogus ProcMounts file.
330         f, err := ioutil.TempFile("", "keeptest")
331         if err != nil {
332                 t.Fatal(err)
333         }
334         defer os.Remove(f.Name())
335         for i, vol := range tempVols {
336                 // Add readonly mount points at odd indexes.
337                 var opts string
338                 switch i % 2 {
339                 case 0:
340                         opts = "rw,nosuid,nodev,noexec"
341                 case 1:
342                         opts = "nosuid,nodev,noexec,ro"
343                 }
344                 fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
345         }
346         f.Close()
347         ProcMounts = f.Name()
348
349         cfg := &Config{}
350         added := (&unixVolumeAdder{cfg}).Discover()
351
352         if added != len(cfg.Volumes) {
353                 t.Errorf("Discover returned %d, but added %d volumes",
354                         added, len(cfg.Volumes))
355         }
356         if added != len(tempVols) {
357                 t.Errorf("Discover returned %d but we set up %d volumes",
358                         added, len(tempVols))
359         }
360         for i, tmpdir := range tempVols {
361                 if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
362                         t.Errorf("Discover returned %s, expected %s\n",
363                                 cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
364                 }
365                 if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
366                         t.Errorf("Discover added %s with readonly=%v, should be %v",
367                                 tmpdir, !expectReadonly, expectReadonly)
368                 }
369         }
370 }
371
372 func TestDiscoverNone(t *testing.T) {
373         defer teardown()
374
375         // Set up a bogus ProcMounts file with no Keep vols.
376         f, err := ioutil.TempFile("", "keeptest")
377         if err != nil {
378                 t.Fatal(err)
379         }
380         defer os.Remove(f.Name())
381         fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
382         fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
383         fmt.Fprintln(f, "proc /proc proc opts 0 0")
384         fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
385         fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
386         f.Close()
387         ProcMounts = f.Name()
388
389         cfg := &Config{}
390         added := (&unixVolumeAdder{cfg}).Discover()
391         if added != 0 || len(cfg.Volumes) != 0 {
392                 t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
393         }
394 }
395
396 // TestIndex
397 //     Test an /index request.
398 func TestIndex(t *testing.T) {
399         defer teardown()
400
401         // Set up Keep volumes and populate them.
402         // Include multiple blocks on different volumes, and
403         // some metadata files.
404         KeepVM = MakeTestVolumeManager(2)
405         defer KeepVM.Close()
406
407         vols := KeepVM.AllReadable()
408         vols[0].Put(context.Background(), TestHash, TestBlock)
409         vols[1].Put(context.Background(), TestHash2, TestBlock2)
410         vols[0].Put(context.Background(), TestHash3, TestBlock3)
411         vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
412         vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
413
414         buf := new(bytes.Buffer)
415         vols[0].IndexTo("", buf)
416         vols[1].IndexTo("", buf)
417         indexRows := strings.Split(string(buf.Bytes()), "\n")
418         sort.Strings(indexRows)
419         sortedIndex := strings.Join(indexRows, "\n")
420         expected := `^\n` + TestHash + `\+\d+ \d+\n` +
421                 TestHash3 + `\+\d+ \d+\n` +
422                 TestHash2 + `\+\d+ \d+$`
423
424         match, err := regexp.MatchString(expected, sortedIndex)
425         if err == nil {
426                 if !match {
427                         t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
428                 }
429         } else {
430                 t.Errorf("regexp.MatchString: %s", err)
431         }
432 }
433
434 // ========================================
435 // Helper functions for unit tests.
436 // ========================================
437
438 // MakeTestVolumeManager returns a RRVolumeManager with the specified
439 // number of MockVolumes.
440 func MakeTestVolumeManager(numVolumes int) VolumeManager {
441         vols := make([]Volume, numVolumes)
442         for i := range vols {
443                 vols[i] = CreateMockVolume()
444         }
445         return MakeRRVolumeManager(vols)
446 }
447
448 // teardown cleans up after each test.
449 func teardown() {
450         theConfig.systemAuthToken = ""
451         theConfig.RequireSignatures = false
452         theConfig.blobSigningKey = nil
453         KeepVM = nil
454 }