10903: support need_transaction for job and pipeline_instance cancel methods.
[arvados.git] / services / keepstore / keepstore_test.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "fmt"
7         "io/ioutil"
8         "os"
9         "path"
10         "regexp"
11         "sort"
12         "strings"
13         "testing"
14
15         "git.curoverse.com/arvados.git/sdk/go/arvadostest"
16 )
17
18 var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
19 var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
20 var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
21
22 var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
23 var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
24
25 var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
26 var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
27
28 // BadBlock is used to test collisions and corruption.
29 // It must not match any test hashes.
30 var BadBlock = []byte("The magic words are squeamish ossifrage.")
31
32 // Empty block
33 var EmptyHash = "d41d8cd98f00b204e9800998ecf8427e"
34 var EmptyBlock = []byte("")
35
36 // TODO(twp): Tests still to be written
37 //
38 //   * TestPutBlockFull
39 //       - test that PutBlock returns 503 Full if the filesystem is full.
40 //         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
41 //
42 //   * TestPutBlockWriteErr
43 //       - test the behavior when Write returns an error.
44 //           - Possible solutions: use a small tmpfs and a high
45 //             MIN_FREE_KILOBYTES to trick PutBlock into attempting
46 //             to write a block larger than the amount of space left
47 //           - use an interface to mock ioutil.TempFile with a File
48 //             object that always returns an error on write
49 //
50 // ========================================
51 // GetBlock tests.
52 // ========================================
53
54 // TestGetBlock
55 //     Test that simple block reads succeed.
56 //
57 func TestGetBlock(t *testing.T) {
58         defer teardown()
59
60         // Prepare two test Keep volumes. Our block is stored on the second volume.
61         KeepVM = MakeTestVolumeManager(2)
62         defer KeepVM.Close()
63
64         vols := KeepVM.AllReadable()
65         if err := vols[1].Put(context.Background(), TestHash, TestBlock); err != nil {
66                 t.Error(err)
67         }
68
69         // Check that GetBlock returns success.
70         buf := make([]byte, BlockSize)
71         size, err := GetBlock(context.Background(), TestHash, buf, nil)
72         if err != nil {
73                 t.Errorf("GetBlock error: %s", err)
74         }
75         if bytes.Compare(buf[:size], TestBlock) != 0 {
76                 t.Errorf("got %v, expected %v", buf[:size], TestBlock)
77         }
78 }
79
80 // TestGetBlockMissing
81 //     GetBlock must return an error when the block is not found.
82 //
83 func TestGetBlockMissing(t *testing.T) {
84         defer teardown()
85
86         // Create two empty test Keep volumes.
87         KeepVM = MakeTestVolumeManager(2)
88         defer KeepVM.Close()
89
90         // Check that GetBlock returns failure.
91         buf := make([]byte, BlockSize)
92         size, err := GetBlock(context.Background(), TestHash, buf, nil)
93         if err != NotFoundError {
94                 t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
95         }
96 }
97
98 // TestGetBlockCorrupt
99 //     GetBlock must return an error when a corrupted block is requested
100 //     (the contents of the file do not checksum to its hash).
101 //
102 func TestGetBlockCorrupt(t *testing.T) {
103         defer teardown()
104
105         // Create two test Keep volumes and store a corrupt block in one.
106         KeepVM = MakeTestVolumeManager(2)
107         defer KeepVM.Close()
108
109         vols := KeepVM.AllReadable()
110         vols[0].Put(context.Background(), TestHash, BadBlock)
111
112         // Check that GetBlock returns failure.
113         buf := make([]byte, BlockSize)
114         size, err := GetBlock(context.Background(), TestHash, buf, nil)
115         if err != DiskHashError {
116                 t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
117         }
118 }
119
120 // ========================================
121 // PutBlock tests
122 // ========================================
123
124 // TestPutBlockOK
125 //     PutBlock can perform a simple block write and returns success.
126 //
127 func TestPutBlockOK(t *testing.T) {
128         defer teardown()
129
130         // Create two test Keep volumes.
131         KeepVM = MakeTestVolumeManager(2)
132         defer KeepVM.Close()
133
134         // Check that PutBlock stores the data as expected.
135         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
136                 t.Fatalf("PutBlock: n %d err %v", n, err)
137         }
138
139         vols := KeepVM.AllReadable()
140         buf := make([]byte, BlockSize)
141         n, err := vols[1].Get(context.Background(), TestHash, buf)
142         if err != nil {
143                 t.Fatalf("Volume #0 Get returned error: %v", err)
144         }
145         if string(buf[:n]) != string(TestBlock) {
146                 t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
147                         string(TestBlock), string(buf[:n]))
148         }
149 }
150
151 // TestPutBlockOneVol
152 //     PutBlock still returns success even when only one of the known
153 //     volumes is online.
154 //
155 func TestPutBlockOneVol(t *testing.T) {
156         defer teardown()
157
158         // Create two test Keep volumes, but cripple one of them.
159         KeepVM = MakeTestVolumeManager(2)
160         defer KeepVM.Close()
161
162         vols := KeepVM.AllWritable()
163         vols[0].(*MockVolume).Bad = true
164
165         // Check that PutBlock stores the data as expected.
166         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
167                 t.Fatalf("PutBlock: n %d err %v", n, err)
168         }
169
170         buf := make([]byte, BlockSize)
171         size, err := GetBlock(context.Background(), TestHash, buf, nil)
172         if err != nil {
173                 t.Fatalf("GetBlock: %v", err)
174         }
175         if bytes.Compare(buf[:size], TestBlock) != 0 {
176                 t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
177                         TestBlock, buf[:size])
178         }
179 }
180
181 // TestPutBlockMD5Fail
182 //     Check that PutBlock returns an error if passed a block and hash that
183 //     do not match.
184 //
185 func TestPutBlockMD5Fail(t *testing.T) {
186         defer teardown()
187
188         // Create two test Keep volumes.
189         KeepVM = MakeTestVolumeManager(2)
190         defer KeepVM.Close()
191
192         // Check that PutBlock returns the expected error when the hash does
193         // not match the block.
194         if _, err := PutBlock(context.Background(), BadBlock, TestHash); err != RequestHashError {
195                 t.Errorf("Expected RequestHashError, got %v", err)
196         }
197
198         // Confirm that GetBlock fails to return anything.
199         if result, err := GetBlock(context.Background(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
200                 t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
201                         string(result), err)
202         }
203 }
204
205 // TestPutBlockCorrupt
206 //     PutBlock should overwrite corrupt blocks on disk when given
207 //     a PUT request with a good block.
208 //
209 func TestPutBlockCorrupt(t *testing.T) {
210         defer teardown()
211
212         // Create two test Keep volumes.
213         KeepVM = MakeTestVolumeManager(2)
214         defer KeepVM.Close()
215
216         // Store a corrupted block under TestHash.
217         vols := KeepVM.AllWritable()
218         vols[0].Put(context.Background(), TestHash, BadBlock)
219         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
220                 t.Errorf("PutBlock: n %d err %v", n, err)
221         }
222
223         // The block on disk should now match TestBlock.
224         buf := make([]byte, BlockSize)
225         if size, err := GetBlock(context.Background(), TestHash, buf, nil); err != nil {
226                 t.Errorf("GetBlock: %v", err)
227         } else if bytes.Compare(buf[:size], TestBlock) != 0 {
228                 t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
229         }
230 }
231
232 // TestPutBlockCollision
233 //     PutBlock returns a 400 Collision error when attempting to
234 //     store a block that collides with another block on disk.
235 //
236 func TestPutBlockCollision(t *testing.T) {
237         defer teardown()
238
239         // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
240         b1 := arvadostest.MD5CollisionData[0]
241         b2 := arvadostest.MD5CollisionData[1]
242         locator := arvadostest.MD5CollisionMD5
243
244         // Prepare two test Keep volumes.
245         KeepVM = MakeTestVolumeManager(2)
246         defer KeepVM.Close()
247
248         // Store one block, then attempt to store the other. Confirm that
249         // PutBlock reported a CollisionError.
250         if _, err := PutBlock(context.Background(), b1, locator); err != nil {
251                 t.Error(err)
252         }
253         if _, err := PutBlock(context.Background(), b2, locator); err == nil {
254                 t.Error("PutBlock did not report a collision")
255         } else if err != CollisionError {
256                 t.Errorf("PutBlock returned %v", err)
257         }
258 }
259
260 // TestPutBlockTouchFails
261 //     When PutBlock is asked to PUT an existing block, but cannot
262 //     modify the timestamp, it should write a second block.
263 //
264 func TestPutBlockTouchFails(t *testing.T) {
265         defer teardown()
266
267         // Prepare two test Keep volumes.
268         KeepVM = MakeTestVolumeManager(2)
269         defer KeepVM.Close()
270         vols := KeepVM.AllWritable()
271
272         // Store a block and then make the underlying volume bad,
273         // so a subsequent attempt to update the file timestamp
274         // will fail.
275         vols[0].Put(context.Background(), TestHash, BadBlock)
276         oldMtime, err := vols[0].Mtime(TestHash)
277         if err != nil {
278                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
279         }
280
281         // vols[0].Touch will fail on the next call, so the volume
282         // manager will store a copy on vols[1] instead.
283         vols[0].(*MockVolume).Touchable = false
284         if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
285                 t.Fatalf("PutBlock: n %d err %v", n, err)
286         }
287         vols[0].(*MockVolume).Touchable = true
288
289         // Now the mtime on the block on vols[0] should be unchanged, and
290         // there should be a copy of the block on vols[1].
291         newMtime, err := vols[0].Mtime(TestHash)
292         if err != nil {
293                 t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
294         }
295         if !newMtime.Equal(oldMtime) {
296                 t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
297                         oldMtime, newMtime)
298         }
299         buf := make([]byte, BlockSize)
300         n, err := vols[1].Get(context.Background(), TestHash, buf)
301         if err != nil {
302                 t.Fatalf("vols[1]: %v", err)
303         }
304         if bytes.Compare(buf[:n], TestBlock) != 0 {
305                 t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
306         }
307 }
308
309 func TestDiscoverTmpfs(t *testing.T) {
310         var tempVols [4]string
311         var err error
312
313         // Create some directories suitable for using as keep volumes.
314         for i := range tempVols {
315                 if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
316                         t.Fatal(err)
317                 }
318                 defer os.RemoveAll(tempVols[i])
319                 tempVols[i] = tempVols[i] + "/keep"
320                 if err = os.Mkdir(tempVols[i], 0755); err != nil {
321                         t.Fatal(err)
322                 }
323         }
324
325         // Set up a bogus ProcMounts file.
326         f, err := ioutil.TempFile("", "keeptest")
327         if err != nil {
328                 t.Fatal(err)
329         }
330         defer os.Remove(f.Name())
331         for i, vol := range tempVols {
332                 // Add readonly mount points at odd indexes.
333                 var opts string
334                 switch i % 2 {
335                 case 0:
336                         opts = "rw,nosuid,nodev,noexec"
337                 case 1:
338                         opts = "nosuid,nodev,noexec,ro"
339                 }
340                 fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
341         }
342         f.Close()
343         ProcMounts = f.Name()
344
345         cfg := &Config{}
346         added := (&unixVolumeAdder{cfg}).Discover()
347
348         if added != len(cfg.Volumes) {
349                 t.Errorf("Discover returned %d, but added %d volumes",
350                         added, len(cfg.Volumes))
351         }
352         if added != len(tempVols) {
353                 t.Errorf("Discover returned %d but we set up %d volumes",
354                         added, len(tempVols))
355         }
356         for i, tmpdir := range tempVols {
357                 if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
358                         t.Errorf("Discover returned %s, expected %s\n",
359                                 cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
360                 }
361                 if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
362                         t.Errorf("Discover added %s with readonly=%v, should be %v",
363                                 tmpdir, !expectReadonly, expectReadonly)
364                 }
365         }
366 }
367
368 func TestDiscoverNone(t *testing.T) {
369         defer teardown()
370
371         // Set up a bogus ProcMounts file with no Keep vols.
372         f, err := ioutil.TempFile("", "keeptest")
373         if err != nil {
374                 t.Fatal(err)
375         }
376         defer os.Remove(f.Name())
377         fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
378         fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
379         fmt.Fprintln(f, "proc /proc proc opts 0 0")
380         fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
381         fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
382         f.Close()
383         ProcMounts = f.Name()
384
385         cfg := &Config{}
386         added := (&unixVolumeAdder{cfg}).Discover()
387         if added != 0 || len(cfg.Volumes) != 0 {
388                 t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
389         }
390 }
391
392 // TestIndex
393 //     Test an /index request.
394 func TestIndex(t *testing.T) {
395         defer teardown()
396
397         // Set up Keep volumes and populate them.
398         // Include multiple blocks on different volumes, and
399         // some metadata files.
400         KeepVM = MakeTestVolumeManager(2)
401         defer KeepVM.Close()
402
403         vols := KeepVM.AllReadable()
404         vols[0].Put(context.Background(), TestHash, TestBlock)
405         vols[1].Put(context.Background(), TestHash2, TestBlock2)
406         vols[0].Put(context.Background(), TestHash3, TestBlock3)
407         vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
408         vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
409
410         buf := new(bytes.Buffer)
411         vols[0].IndexTo("", buf)
412         vols[1].IndexTo("", buf)
413         indexRows := strings.Split(string(buf.Bytes()), "\n")
414         sort.Strings(indexRows)
415         sortedIndex := strings.Join(indexRows, "\n")
416         expected := `^\n` + TestHash + `\+\d+ \d+\n` +
417                 TestHash3 + `\+\d+ \d+\n` +
418                 TestHash2 + `\+\d+ \d+$`
419
420         match, err := regexp.MatchString(expected, sortedIndex)
421         if err == nil {
422                 if !match {
423                         t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
424                 }
425         } else {
426                 t.Errorf("regexp.MatchString: %s", err)
427         }
428 }
429
430 // ========================================
431 // Helper functions for unit tests.
432 // ========================================
433
434 // MakeTestVolumeManager returns a RRVolumeManager with the specified
435 // number of MockVolumes.
436 func MakeTestVolumeManager(numVolumes int) VolumeManager {
437         vols := make([]Volume, numVolumes)
438         for i := range vols {
439                 vols[i] = CreateMockVolume()
440         }
441         return MakeRRVolumeManager(vols)
442 }
443
444 // teardown cleans up after each test.
445 func teardown() {
446         theConfig.systemAuthToken = ""
447         theConfig.RequireSignatures = false
448         theConfig.blobSigningKey = nil
449         KeepVM = nil
450 }