]> git.arvados.org - arvados.git/blob - services/keepstore/keepstore_test.go
Restart Docker after updating daemon.json
[arvados.git] / services / keepstore / keepstore_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "sort"
17         "strings"
18         "sync"
19         "testing"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/boot"
23         "git.arvados.org/arvados.git/lib/config"
24         "git.arvados.org/arvados.git/lib/service"
25         "git.arvados.org/arvados.git/sdk/go/arvados"
26         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
27         "git.arvados.org/arvados.git/sdk/go/arvadostest"
28         "git.arvados.org/arvados.git/sdk/go/auth"
29         "git.arvados.org/arvados.git/sdk/go/ctxlog"
30         "git.arvados.org/arvados.git/sdk/go/keepclient"
31         "github.com/prometheus/client_golang/prometheus"
32         . "gopkg.in/check.v1"
33 )
34
35 func TestGocheck(t *testing.T) {
36         TestingT(t)
37 }
38
39 const (
40         fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
41         barHash = "37b51d194a7513e45b56f6524f2d51f2"
42 )
43
44 var testServiceURL = func() arvados.URL {
45         return arvados.URL{Host: "localhost:12345", Scheme: "http"}
46 }()
47
48 func authContext(token string) context.Context {
49         return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
50 }
51
52 func testCluster(t TB) *arvados.Cluster {
53         cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
54         if err != nil {
55                 t.Fatal(err)
56         }
57         cluster, err := cfg.GetCluster("")
58         if err != nil {
59                 t.Fatal(err)
60         }
61         cluster.SystemRootToken = arvadostest.SystemRootToken
62         cluster.ManagementToken = arvadostest.ManagementToken
63         return cluster
64 }
65
66 func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
67         if reg == nil {
68                 reg = prometheus.NewRegistry()
69         }
70         ctx, cancel := context.WithCancel(context.Background())
71         ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
72         ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
73         if err != nil {
74                 t.Fatal(err)
75         }
76         return ks, cancel
77 }
78
79 var _ = Suite(&keepstoreSuite{})
80
81 type keepstoreSuite struct {
82         cluster *arvados.Cluster
83 }
84
85 func (s *keepstoreSuite) SetUpTest(c *C) {
86         s.cluster = testCluster(c)
87         s.cluster.Volumes = map[string]arvados.Volume{
88                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
89                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
90         }
91 }
92
93 func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
94         ks, cancel := testKeepstore(c, s.cluster, nil)
95         defer cancel()
96
97         ctx := authContext(arvadostest.ActiveTokenV2)
98
99         fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
100         err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
101         c.Assert(err, IsNil)
102
103         _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
104                 Hash: fooHash,
105                 Data: []byte("foo"),
106         })
107         c.Check(err, ErrorMatches, "hash collision")
108
109         buf := bytes.NewBuffer(nil)
110         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
111                 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
112                 WriteTo: buf,
113         })
114         c.Check(err, ErrorMatches, "checksum mismatch in stored data")
115         c.Check(buf.String(), Not(Equals), "foo")
116         c.Check(buf.Len() < 3, Equals, true)
117
118         err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
119         c.Assert(err, IsNil)
120
121         buf = bytes.NewBuffer(nil)
122         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
123                 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
124                 WriteTo: buf,
125         })
126         c.Check(err, ErrorMatches, "checksum mismatch in stored data")
127         c.Check(buf.Len() < 3, Equals, true)
128 }
129
130 func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
131         origKey := s.cluster.Collections.BlobSigningKey
132         s.cluster.Collections.BlobSigning = false
133         s.cluster.Collections.BlobSigningKey = ""
134         ks, cancel := testKeepstore(c, s.cluster, nil)
135         defer cancel()
136
137         resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
138                 Hash: fooHash,
139                 Data: []byte("foo"),
140         })
141         c.Assert(err, IsNil)
142         c.Check(resp.Locator, Equals, fooHash+"+3")
143         locUnsigned := resp.Locator
144         ttl := time.Hour
145         locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
146         c.Assert(locSigned, Not(Equals), locUnsigned)
147
148         for _, locator := range []string{locUnsigned, locSigned} {
149                 for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
150                         c.Logf("=== locator %q token %q", locator, token)
151                         ctx := authContext(token)
152                         buf := bytes.NewBuffer(nil)
153                         _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
154                                 Locator: locator,
155                                 WriteTo: buf,
156                         })
157                         c.Check(err, IsNil)
158                         c.Check(buf.String(), Equals, "foo")
159                 }
160         }
161 }
162
163 func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
164         s.cluster.Volumes = map[string]arvados.Volume{
165                 "zzzzz-nyw5e-111111111111111": {
166                         Driver:         "stub",
167                         Replication:    1,
168                         StorageClasses: map[string]bool{"class1": true}},
169                 "zzzzz-nyw5e-222222222222222": {
170                         Driver:         "stub",
171                         Replication:    1,
172                         StorageClasses: map[string]bool{"class2": true, "class3": true}},
173         }
174
175         // "foobar" is just some data that happens to result in
176         // rendezvous order {111, 222}
177         data := []byte("foobar")
178         hash := fmt.Sprintf("%x", md5.Sum(data))
179
180         for _, trial := range []struct {
181                 priority1 int // priority of class1, thus vol1
182                 priority2 int // priority of class2
183                 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
184                 expectLog string
185         }{
186                 {100, 50, 50, "111 read 385\n"},              // class1 has higher priority => try vol1 first, no need to try vol2
187                 {100, 100, 100, "111 read 385\n"},            // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
188                 {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
189                 {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
190         } {
191                 c.Logf("=== %+v", trial)
192
193                 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
194                         "class1": {Priority: trial.priority1},
195                         "class2": {Priority: trial.priority2},
196                         "class3": {Priority: trial.priority3},
197                 }
198                 ks, cancel := testKeepstore(c, s.cluster, nil)
199                 defer cancel()
200
201                 ctx := authContext(arvadostest.ActiveTokenV2)
202                 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
203                         Hash:           hash,
204                         Data:           data,
205                         StorageClasses: []string{"class1"},
206                 })
207                 c.Assert(err, IsNil)
208
209                 // Combine logs into one. (We only want the logs from
210                 // the BlockRead below, not from BlockWrite above.)
211                 stubLog := &stubLog{}
212                 for _, mnt := range ks.mounts {
213                         mnt.volume.(*stubVolume).stubLog = stubLog
214                 }
215
216                 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
217                         Locator: resp.Locator,
218                         WriteTo: io.Discard,
219                 })
220                 c.Assert(n, Equals, len(data))
221                 c.Assert(err, IsNil)
222                 c.Check(stubLog.String(), Equals, trial.expectLog)
223         }
224 }
225
226 // Ensure BlockRead(..., {CheckCacheOnly: true}) always returns
227 // ErrNotCached.
228 //
229 // There is currently (Arvados 3.1 / March 2025) no way for an
230 // incoming http request to set that field anyway, because nothing
231 // accesses a cache via http.  But if/when it does, keepstore's
232 // BlockRead is expected to behave correctly.
233 func (s *keepstoreSuite) TestBlockRead_CheckCacheOnly(c *C) {
234         ks, cancel := testKeepstore(c, s.cluster, nil)
235         defer cancel()
236
237         ctx := authContext(arvadostest.ActiveTokenV2)
238
239         data := []byte("foo")
240         hash := fmt.Sprintf("%x", md5.Sum(data))
241         resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
242                 Hash: hash,
243                 Data: data,
244         })
245         c.Assert(err, IsNil)
246
247         n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
248                 Locator: resp.Locator,
249                 WriteTo: io.Discard,
250         })
251         c.Assert(n, Equals, 3)
252         c.Assert(err, IsNil)
253
254         // Block exists -> ErrNotCached
255         n, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
256                 Locator:        resp.Locator,
257                 WriteTo:        io.Discard,
258                 CheckCacheOnly: true,
259         })
260         c.Check(n, Equals, 0)
261         c.Check(err, Equals, arvados.ErrNotCached)
262
263         // Block does not exist -> ErrNotCached
264         n, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
265                 Locator:        ks.signLocator(arvadostest.ActiveTokenV2, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+3"),
266                 WriteTo:        io.Discard,
267                 CheckCacheOnly: true,
268         })
269         c.Check(n, Equals, 0)
270         c.Check(err, Equals, arvados.ErrNotCached)
271 }
272
273 func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
274         for uuid, v := range s.cluster.Volumes {
275                 v.ReadOnly = true
276                 s.cluster.Volumes[uuid] = v
277         }
278         ks, cancel := testKeepstore(c, s.cluster, nil)
279         defer cancel()
280         for _, mnt := range ks.mounts {
281                 mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
282                         c.Error("volume BlockWrite called")
283                         return errors.New("fail")
284                 }
285         }
286         ctx := authContext(arvadostest.ActiveTokenV2)
287
288         _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
289                 Hash: fooHash,
290                 Data: []byte("foo")})
291         c.Check(err, NotNil)
292         c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
293 }
294
295 func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
296         s.cluster.Volumes = map[string]arvados.Volume{
297                 "zzzzz-nyw5e-111111111111111": {
298                         Driver:         "stub",
299                         Replication:    1,
300                         StorageClasses: map[string]bool{"class1": true}},
301                 "zzzzz-nyw5e-121212121212121": {
302                         Driver:         "stub",
303                         Replication:    1,
304                         StorageClasses: map[string]bool{"class1": true, "class2": true}},
305                 "zzzzz-nyw5e-222222222222222": {
306                         Driver:         "stub",
307                         Replication:    1,
308                         StorageClasses: map[string]bool{"class2": true}},
309         }
310
311         // testData is a block that happens to have rendezvous order 111, 121, 222
312         testData := []byte("qux")
313         testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
314
315         s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
316                 "class1": {},
317                 "class2": {},
318                 "class3": {},
319         }
320
321         ctx := authContext(arvadostest.ActiveTokenV2)
322         for idx, trial := range []struct {
323                 classes   string // desired classes
324                 expectLog string
325         }{
326                 {"class1", "" +
327                         "111 read d85\n" +
328                         "121 read d85\n" +
329                         "111 write d85\n" +
330                         "111 read d85\n" +
331                         "111 touch d85\n"},
332                 {"class2", "" +
333                         "121 read d85\n" + // write#1
334                         "222 read d85\n" +
335                         "121 write d85\n" +
336                         "121 read d85\n" + // write#2
337                         "121 touch d85\n"},
338                 {"class1,class2", "" +
339                         "111 read d85\n" + // write#1
340                         "121 read d85\n" +
341                         "222 read d85\n" +
342                         "121 write d85\n" +
343                         "111 write d85\n" +
344                         "111 read d85\n" + // write#2
345                         "111 touch d85\n" +
346                         "121 read d85\n" +
347                         "121 touch d85\n"},
348                 {"class1,class2,class404", "" +
349                         "111 read d85\n" + // write#1
350                         "121 read d85\n" +
351                         "222 read d85\n" +
352                         "121 write d85\n" +
353                         "111 write d85\n" +
354                         "111 read d85\n" + // write#2
355                         "111 touch d85\n" +
356                         "121 read d85\n" +
357                         "121 touch d85\n"},
358         } {
359                 c.Logf("=== %d: %+v", idx, trial)
360
361                 ks, cancel := testKeepstore(c, s.cluster, nil)
362                 defer cancel()
363                 stubLog := &stubLog{}
364                 for _, mnt := range ks.mounts {
365                         mnt.volume.(*stubVolume).stubLog = stubLog
366                 }
367
368                 // Check that we chose the right block data
369                 rvz := ks.rendezvous(testHash, ks.mountsW)
370                 c.Assert(rvz[0].UUID[24:], Equals, "111")
371                 c.Assert(rvz[1].UUID[24:], Equals, "121")
372                 c.Assert(rvz[2].UUID[24:], Equals, "222")
373
374                 for i := 0; i < 2; i++ {
375                         _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
376                                 Hash:           testHash,
377                                 Data:           testData,
378                                 StorageClasses: strings.Split(trial.classes, ","),
379                         })
380                         c.Check(err, IsNil)
381                 }
382                 // The "nextmnt" loop in BlockWrite first starts the
383                 // goroutine that writes to mount 121, then the
384                 // goroutine that writes to mount 111.  Most of the
385                 // time, mount 121 will log first, but occasionally
386                 // mount 111 will log first.  In that case we swap the
387                 // log entries. (The order of the rest of the log
388                 // entries is meaningful -- just not these two.)
389                 gotLog := strings.Replace(stubLog.String(),
390                         "111 write d85\n121 write d85\n",
391                         "121 write d85\n111 write d85\n", 1)
392                 c.Check(gotLog, Equals, trial.expectLog)
393         }
394 }
395
396 func (s *keepstoreSuite) TestBlockTrash(c *C) {
397         s.cluster.Volumes = map[string]arvados.Volume{
398                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
399                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
400                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
401                 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
402         }
403         ks, cancel := testKeepstore(c, s.cluster, nil)
404         defer cancel()
405
406         var vol []*stubVolume
407         for _, mount := range ks.mountsR {
408                 vol = append(vol, mount.volume.(*stubVolume))
409         }
410         sort.Slice(vol, func(i, j int) bool {
411                 return vol[i].params.UUID < vol[j].params.UUID
412         })
413
414         ctx := context.Background()
415         loc := fooHash + "+3"
416         tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
417
418         clear := func() {
419                 for _, vol := range vol {
420                         err := vol.BlockTrash(fooHash)
421                         if !os.IsNotExist(err) {
422                                 c.Assert(err, IsNil)
423                         }
424                 }
425         }
426         writeit := func(volidx int) {
427                 err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
428                 c.Assert(err, IsNil)
429                 err = vol[volidx].blockTouchWithTime(fooHash, tOld)
430                 c.Assert(err, IsNil)
431         }
432         trashit := func() error {
433                 return ks.BlockTrash(ctx, loc)
434         }
435         checkexists := func(volidx int) bool {
436                 err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
437                 if !os.IsNotExist(err) {
438                         c.Check(err, IsNil)
439                 }
440                 return err == nil
441         }
442
443         clear()
444         c.Check(trashit(), Equals, os.ErrNotExist)
445
446         // one old replica => trash it
447         clear()
448         writeit(0)
449         c.Check(trashit(), IsNil)
450         c.Check(checkexists(0), Equals, false)
451
452         // one old replica + one new replica => keep new, trash old
453         clear()
454         writeit(0)
455         writeit(1)
456         c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
457         c.Check(trashit(), IsNil)
458         c.Check(checkexists(0), Equals, false)
459         c.Check(checkexists(1), Equals, true)
460
461         // two old replicas => trash both
462         clear()
463         writeit(0)
464         writeit(1)
465         c.Check(trashit(), IsNil)
466         c.Check(checkexists(0), Equals, false)
467         c.Check(checkexists(1), Equals, false)
468
469         // four old replicas => trash all except readonly volume with
470         // AllowTrashWhenReadOnly==false
471         clear()
472         writeit(0)
473         writeit(1)
474         writeit(2)
475         writeit(3)
476         c.Check(trashit(), IsNil)
477         c.Check(checkexists(0), Equals, false)
478         c.Check(checkexists(1), Equals, false)
479         c.Check(checkexists(2), Equals, true)
480         c.Check(checkexists(3), Equals, false)
481
482         // two old replicas but one returns an error => return the
483         // only non-404 backend error
484         clear()
485         vol[0].blockTrash = func(hash string) error {
486                 return errors.New("fake error")
487         }
488         writeit(0)
489         writeit(3)
490         c.Check(trashit(), ErrorMatches, "fake error")
491         c.Check(checkexists(0), Equals, true)
492         c.Check(checkexists(1), Equals, false)
493         c.Check(checkexists(2), Equals, false)
494         c.Check(checkexists(3), Equals, false)
495 }
496
497 func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
498         s.cluster.API.MaxKeepBlobBuffers = 1
499         ks, cancel := testKeepstore(c, s.cluster, nil)
500         defer cancel()
501         ok := make(chan struct{})
502         go func() {
503                 defer close(ok)
504                 ctx := authContext(arvadostest.ActiveTokenV2)
505                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
506                         Hash: fooHash,
507                         Data: []byte("foo")})
508                 c.Check(err, IsNil)
509         }()
510         select {
511         case <-ok:
512         case <-time.After(time.Second):
513                 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
514         }
515 }
516
517 func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
518         s.cluster.API.MaxKeepBlobBuffers = 4
519         ks, cancel := testKeepstore(c, s.cluster, nil)
520         defer cancel()
521
522         ctx := authContext(arvadostest.ActiveTokenV2)
523         var wg sync.WaitGroup
524         for range make([]int, 20) {
525                 wg.Add(1)
526                 go func() {
527                         defer wg.Done()
528                         resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
529                                 Hash: fooHash,
530                                 Data: []byte("foo")})
531                         c.Check(err, IsNil)
532                         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
533                                 Locator: resp.Locator,
534                                 WriteTo: io.Discard})
535                         c.Check(err, IsNil)
536                 }()
537         }
538         ok := make(chan struct{})
539         go func() {
540                 wg.Wait()
541                 close(ok)
542         }()
543         select {
544         case <-ok:
545         case <-time.After(time.Second):
546                 c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
547         }
548 }
549
550 func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
551         s.cluster.Volumes = map[string]arvados.Volume{
552                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
553                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
554                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
555         }
556         ks, cancel := testKeepstore(c, s.cluster, nil)
557         defer cancel()
558         ctx := authContext(arvadostest.ActiveTokenV2)
559
560         for _, trial := range []struct {
561                 ask            []string
562                 expectReplicas int
563                 expectClasses  map[string]int
564         }{
565                 {nil,
566                         1,
567                         map[string]int{"default": 1}},
568                 {[]string{},
569                         1,
570                         map[string]int{"default": 1}},
571                 {[]string{"default"},
572                         1,
573                         map[string]int{"default": 1}},
574                 {[]string{"default", "default"},
575                         1,
576                         map[string]int{"default": 1}},
577                 {[]string{"special"},
578                         1,
579                         map[string]int{"extra": 1, "special": 1}},
580                 {[]string{"special", "readonly"},
581                         1,
582                         map[string]int{"extra": 1, "special": 1}},
583                 {[]string{"special", "nonexistent"},
584                         1,
585                         map[string]int{"extra": 1, "special": 1}},
586                 {[]string{"extra", "special"},
587                         1,
588                         map[string]int{"extra": 1, "special": 1}},
589                 {[]string{"default", "special"},
590                         2,
591                         map[string]int{"default": 1, "extra": 1, "special": 1}},
592         } {
593                 c.Logf("success case %#v", trial)
594                 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
595                         Hash:           fooHash,
596                         Data:           []byte("foo"),
597                         StorageClasses: trial.ask,
598                 })
599                 if !c.Check(err, IsNil) {
600                         continue
601                 }
602                 c.Check(resp.Replicas, Equals, trial.expectReplicas)
603                 if len(trial.expectClasses) == 0 {
604                         // any non-empty value is correct
605                         c.Check(resp.StorageClasses, Not(HasLen), 0)
606                 } else {
607                         c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
608                 }
609         }
610
611         for _, ask := range [][]string{
612                 {"doesnotexist"},
613                 {"doesnotexist", "readonly"},
614                 {"readonly"},
615         } {
616                 c.Logf("failure case %s", ask)
617                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
618                         Hash:           fooHash,
619                         Data:           []byte("foo"),
620                         StorageClasses: ask,
621                 })
622                 c.Check(err, NotNil)
623         }
624 }
625
626 func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
627         for uuid, v := range s.cluster.Volumes {
628                 v.ReadOnly = true
629                 s.cluster.Volumes[uuid] = v
630         }
631         ks, cancel := testKeepstore(c, s.cluster, nil)
632         defer cancel()
633
634         for _, mnt := range ks.mounts {
635                 err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
636                 c.Assert(err, IsNil)
637                 err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
638                 c.Assert(err, IsNil)
639         }
640
641         err := ks.BlockUntrash(context.Background(), fooHash)
642         c.Check(os.IsNotExist(err), Equals, true)
643
644         for _, mnt := range ks.mounts {
645                 err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
646                 c.Assert(err, IsNil)
647         }
648 }
649
650 func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
651         s.cluster.Volumes = map[string]arvados.Volume{
652                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
653                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
654                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
655         }
656         ks, cancel := testKeepstore(c, s.cluster, nil)
657         defer cancel()
658         ctx := authContext(arvadostest.ActiveTokenV2)
659
660         for i := range make([]byte, 32) {
661                 data := []byte(fmt.Sprintf("block %d", i))
662                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
663                 c.Assert(err, IsNil)
664         }
665         c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
666         c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
667         c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
668 }
669
670 func (s *keepstoreSuite) TestGetLocatorInfo(c *C) {
671         for _, trial := range []struct {
672                 locator string
673                 ok      bool
674                 expect  locatorInfo
675         }{
676                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
677                         ok: true},
678                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
679                         ok: true, expect: locatorInfo{size: 1234}},
680                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
681                         ok: true, expect: locatorInfo{size: 1234, signed: true}},
682                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
683                         ok: true, expect: locatorInfo{size: 1234, remote: true}},
684                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
685                         ok: true, expect: locatorInfo{size: 12345, remote: true}},
686                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123456+👶🦈+Rzzzzz-abcdef",
687                         ok: true, expect: locatorInfo{size: 123456, remote: true}},
688                 // invalid: bad hash char
689                 {locator: "aaaaaaaaaaaaaazaaaaaaaaaaaaaaaaa+1234",
690                         ok: false},
691                 {locator: "aaaaaaaaaaaaaaFaaaaaaaaaaaaaaaaa+1234",
692                         ok: false},
693                 {locator: "aaaaaaaaaaaaaa⛵aaaaaaaaaaaaaaaaa+1234",
694                         ok: false},
695                 // invalid: hash length != 32
696                 {locator: "",
697                         ok: false},
698                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
699                         ok: false},
700                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
701                         ok: false},
702                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
703                         ok: false},
704                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
705                         ok: false},
706                 // invalid: first hint is not size
707                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
708                         ok: false},
709                 // invalid: leading/trailing/double +
710                 {locator: "+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
711                         ok: false},
712                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+",
713                         ok: false},
714                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa++1234",
715                         ok: false},
716                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234++Abcdef@abcdef",
717                         ok: false},
718         } {
719                 c.Logf("=== %s", trial.locator)
720                 li, err := getLocatorInfo(trial.locator)
721                 if !trial.ok {
722                         c.Check(err, NotNil)
723                         continue
724                 }
725                 c.Check(err, IsNil)
726                 c.Check(li.hash, Equals, trial.locator[:32])
727                 c.Check(li.size, Equals, trial.expect.size)
728                 c.Check(li.signed, Equals, trial.expect.signed)
729                 c.Check(li.remote, Equals, trial.expect.remote)
730         }
731 }
732
733 func (s *keepstoreSuite) TestTimeout(c *C) {
734         var rtr *router
735         port, err := boot.AvailablePort("")
736         c.Assert(err, IsNil)
737         go service.Command(arvados.ServiceNameKeepstore, func(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
738                 rtr = newHandlerOrErrorHandler(ctx, cluster, token, reg).(*router)
739                 return rtr
740         }).RunCommand("keepstore", []string{"-config", "-"}, strings.NewReader(`
741 Clusters:
742  zzzzz:
743   SystemRootToken: abcdefg
744   Services:
745    Keepstore:
746     InternalURLs:
747      "http://127.0.1.123:`+port+`": {}
748   API:
749    RequestTimeout: 500ms
750    KeepServiceRequestTimeout: 15s
751   Collections:
752    BlobSigningKey: abcdefg
753   Volumes:
754    zzzzz-nyw5e-000000000000000:
755     Replication: 1
756     Driver: stub
757 `), ctxlog.LogWriter(c.Log), ctxlog.LogWriter(c.Log))
758         for deadline := time.Now().Add(5 * time.Second); rtr == nil; {
759                 if time.Now().After(deadline) {
760                         c.Error("timed out waiting for service.Command to call newHandler func")
761                         c.FailNow()
762                 }
763         }
764         stubvol := rtr.keepstore.mountsW[0].volume.(*stubVolume)
765         err = stubvol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
766         c.Assert(err, IsNil)
767         ac, err := arvados.NewClientFromConfig(rtr.keepstore.cluster)
768         ac.AuthToken = "abcdefg"
769         c.Assert(err, IsNil)
770         ac.KeepServiceURIs = []string{"http://127.0.1.123:" + port}
771         arv, err := arvadosclient.New(ac)
772         c.Assert(err, IsNil)
773         kc := keepclient.New(arv)
774         kc.Want_replicas = 1
775
776         // Wait for service to come up
777         for deadline := time.Now().Add(time.Second); ; time.Sleep(time.Second / 10) {
778                 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
779                         Data: []byte("foo"),
780                 })
781                 if err == nil {
782                         break
783                 }
784                 if time.Now().After(deadline) {
785                         c.Errorf("timed out waiting for BlockWrite(`foo`) to succeed: %s", err)
786                         c.FailNow()
787                 }
788         }
789
790         // Induce a write delay longer than RequestTimeout, and check
791         // that the request fails.
792         t0 := time.Now()
793         stubvol.blockWrite = func(context.Context, string, []byte) error { time.Sleep(time.Second); return nil }
794         _, err = kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
795                 Data: []byte("bar"),
796         })
797         c.Check(err, ErrorMatches, `Could not write .* 499 .*`)
798         duration := time.Since(t0)
799         c.Logf("write request duration %s", duration)
800         c.Check(duration > 500*time.Millisecond, Equals, true)
801
802         // Induce an index delay, and check that the request does
803         // *not* fail (because index is exempt from RequestTimeout).
804         t0 = time.Now()
805         stubvol.index = func(context.Context, string, io.Writer) error { time.Sleep(time.Second); return nil }
806         var uuid string
807         for uuid = range kc.LocalRoots() {
808         }
809         rdr, err := kc.GetIndex(uuid, "")
810         c.Assert(err, IsNil)
811         _, err = io.ReadAll(rdr)
812         c.Check(err, IsNil)
813         duration = time.Since(t0)
814         c.Logf("index request duration %s", duration)
815         c.Check(duration > time.Second, Equals, true)
816 }
817
818 func init() {
819         driver["stub"] = func(params newVolumeParams) (volume, error) {
820                 v := &stubVolume{
821                         params:  params,
822                         data:    make(map[string]stubData),
823                         stubLog: &stubLog{},
824                 }
825                 return v, nil
826         }
827 }
828
829 type stubLog struct {
830         sync.Mutex
831         bytes.Buffer
832 }
833
834 func (sl *stubLog) Printf(format string, args ...interface{}) {
835         if sl == nil {
836                 return
837         }
838         sl.Lock()
839         defer sl.Unlock()
840         fmt.Fprintf(sl, format+"\n", args...)
841 }
842
843 type stubData struct {
844         mtime time.Time
845         data  []byte
846         trash time.Time
847 }
848
849 type stubVolume struct {
850         params  newVolumeParams
851         data    map[string]stubData
852         stubLog *stubLog
853         mtx     sync.Mutex
854
855         // The following funcs enable tests to insert delays and
856         // failures. Each volume operation begins by calling the
857         // corresponding func (if non-nil). If the func returns an
858         // error, that error is returned to caller. Otherwise, the
859         // stub continues normally.
860         blockRead    func(ctx context.Context, hash string, writeTo io.WriterAt) error
861         blockWrite   func(ctx context.Context, hash string, data []byte) error
862         deviceID     func() string
863         blockTouch   func(hash string) error
864         blockTrash   func(hash string) error
865         blockUntrash func(hash string) error
866         index        func(ctx context.Context, prefix string, writeTo io.Writer) error
867         mtime        func(hash string) (time.Time, error)
868         emptyTrash   func()
869 }
870
871 func (v *stubVolume) log(op, hash string) {
872         // Note this intentionally crashes if UUID or hash is short --
873         // if keepstore ever does that, tests should fail.
874         v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
875 }
876
877 func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
878         v.log("read", hash)
879         if v.blockRead != nil {
880                 err := v.blockRead(ctx, hash, writeTo)
881                 if err != nil {
882                         return err
883                 }
884         }
885         v.mtx.Lock()
886         ent, ok := v.data[hash]
887         v.mtx.Unlock()
888         if !ok || !ent.trash.IsZero() {
889                 return os.ErrNotExist
890         }
891         wrote := 0
892         for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
893                 data := ent.data[wrote:]
894                 if len(data) > writesize {
895                         data = data[:writesize]
896                 }
897                 n, err := writeTo.WriteAt(data, int64(wrote))
898                 wrote += n
899                 if err != nil {
900                         return err
901                 }
902         }
903         return nil
904 }
905
906 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
907         v.log("write", hash)
908         if v.blockWrite != nil {
909                 if err := v.blockWrite(ctx, hash, data); err != nil {
910                         return err
911                 }
912         }
913         v.mtx.Lock()
914         defer v.mtx.Unlock()
915         v.data[hash] = stubData{
916                 mtime: time.Now(),
917                 data:  append([]byte(nil), data...),
918         }
919         return nil
920 }
921
922 func (v *stubVolume) DeviceID() string {
923         return fmt.Sprintf("%p", v)
924 }
925
926 func (v *stubVolume) BlockTouch(hash string) error {
927         v.log("touch", hash)
928         if v.blockTouch != nil {
929                 if err := v.blockTouch(hash); err != nil {
930                         return err
931                 }
932         }
933         v.mtx.Lock()
934         defer v.mtx.Unlock()
935         ent, ok := v.data[hash]
936         if !ok || !ent.trash.IsZero() {
937                 return os.ErrNotExist
938         }
939         ent.mtime = time.Now()
940         v.data[hash] = ent
941         return nil
942 }
943
944 // Set mtime to the (presumably old) specified time.
945 func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
946         v.log("touchwithtime", hash)
947         v.mtx.Lock()
948         defer v.mtx.Unlock()
949         ent, ok := v.data[hash]
950         if !ok {
951                 return os.ErrNotExist
952         }
953         ent.mtime = t
954         v.data[hash] = ent
955         return nil
956 }
957
958 func (v *stubVolume) BlockTrash(hash string) error {
959         v.log("trash", hash)
960         if v.blockTrash != nil {
961                 if err := v.blockTrash(hash); err != nil {
962                         return err
963                 }
964         }
965         v.mtx.Lock()
966         defer v.mtx.Unlock()
967         ent, ok := v.data[hash]
968         if !ok || !ent.trash.IsZero() {
969                 return os.ErrNotExist
970         }
971         ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
972         v.data[hash] = ent
973         return nil
974 }
975
976 func (v *stubVolume) BlockUntrash(hash string) error {
977         v.log("untrash", hash)
978         if v.blockUntrash != nil {
979                 if err := v.blockUntrash(hash); err != nil {
980                         return err
981                 }
982         }
983         v.mtx.Lock()
984         defer v.mtx.Unlock()
985         ent, ok := v.data[hash]
986         if !ok || ent.trash.IsZero() {
987                 return os.ErrNotExist
988         }
989         ent.trash = time.Time{}
990         v.data[hash] = ent
991         return nil
992 }
993
994 func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
995         v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
996         if v.index != nil {
997                 if err := v.index(ctx, prefix, writeTo); err != nil {
998                         return err
999                 }
1000         }
1001         buf := &bytes.Buffer{}
1002         v.mtx.Lock()
1003         for hash, ent := range v.data {
1004                 if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
1005                         fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
1006                 }
1007         }
1008         v.mtx.Unlock()
1009         if err := ctx.Err(); err != nil {
1010                 return err
1011         }
1012         _, err := io.Copy(writeTo, buf)
1013         return err
1014 }
1015
1016 func (v *stubVolume) Mtime(hash string) (time.Time, error) {
1017         v.log("mtime", hash)
1018         if v.mtime != nil {
1019                 if t, err := v.mtime(hash); err != nil {
1020                         return t, err
1021                 }
1022         }
1023         v.mtx.Lock()
1024         defer v.mtx.Unlock()
1025         ent, ok := v.data[hash]
1026         if !ok || !ent.trash.IsZero() {
1027                 return time.Time{}, os.ErrNotExist
1028         }
1029         return ent.mtime, nil
1030 }
1031
1032 func (v *stubVolume) EmptyTrash() {
1033         v.stubLog.Printf("%s emptytrash", v.params.UUID)
1034         v.mtx.Lock()
1035         defer v.mtx.Unlock()
1036         for hash, ent := range v.data {
1037                 if !ent.trash.IsZero() && time.Now().After(ent.trash) {
1038                         delete(v.data, hash)
1039                 }
1040         }
1041 }