1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/boot"
23 "git.arvados.org/arvados.git/lib/config"
24 "git.arvados.org/arvados.git/lib/service"
25 "git.arvados.org/arvados.git/sdk/go/arvados"
26 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
27 "git.arvados.org/arvados.git/sdk/go/arvadostest"
28 "git.arvados.org/arvados.git/sdk/go/auth"
29 "git.arvados.org/arvados.git/sdk/go/ctxlog"
30 "git.arvados.org/arvados.git/sdk/go/keepclient"
31 "github.com/prometheus/client_golang/prometheus"
35 func TestGocheck(t *testing.T) {
40 fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
41 barHash = "37b51d194a7513e45b56f6524f2d51f2"
44 var testServiceURL = func() arvados.URL {
45 return arvados.URL{Host: "localhost:12345", Scheme: "http"}
48 func authContext(token string) context.Context {
49 return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
52 func testCluster(t TB) *arvados.Cluster {
53 cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
57 cluster, err := cfg.GetCluster("")
61 cluster.SystemRootToken = arvadostest.SystemRootToken
62 cluster.ManagementToken = arvadostest.ManagementToken
66 func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
68 reg = prometheus.NewRegistry()
70 ctx, cancel := context.WithCancel(context.Background())
71 ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
72 ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
79 var _ = Suite(&keepstoreSuite{})
81 type keepstoreSuite struct {
82 cluster *arvados.Cluster
85 func (s *keepstoreSuite) SetUpTest(c *C) {
86 s.cluster = testCluster(c)
87 s.cluster.Volumes = map[string]arvados.Volume{
88 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
89 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
93 func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
94 ks, cancel := testKeepstore(c, s.cluster, nil)
97 ctx := authContext(arvadostest.ActiveTokenV2)
99 fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
100 err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
103 _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
107 c.Check(err, ErrorMatches, "hash collision")
109 buf := bytes.NewBuffer(nil)
110 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
111 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
114 c.Check(err, ErrorMatches, "checksum mismatch in stored data")
115 c.Check(buf.String(), Not(Equals), "foo")
116 c.Check(buf.Len() < 3, Equals, true)
118 err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
121 buf = bytes.NewBuffer(nil)
122 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
123 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
126 c.Check(err, ErrorMatches, "checksum mismatch in stored data")
127 c.Check(buf.Len() < 3, Equals, true)
130 func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
131 origKey := s.cluster.Collections.BlobSigningKey
132 s.cluster.Collections.BlobSigning = false
133 s.cluster.Collections.BlobSigningKey = ""
134 ks, cancel := testKeepstore(c, s.cluster, nil)
137 resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
142 c.Check(resp.Locator, Equals, fooHash+"+3")
143 locUnsigned := resp.Locator
145 locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
146 c.Assert(locSigned, Not(Equals), locUnsigned)
148 for _, locator := range []string{locUnsigned, locSigned} {
149 for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
150 c.Logf("=== locator %q token %q", locator, token)
151 ctx := authContext(token)
152 buf := bytes.NewBuffer(nil)
153 _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
158 c.Check(buf.String(), Equals, "foo")
163 func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
164 s.cluster.Volumes = map[string]arvados.Volume{
165 "zzzzz-nyw5e-111111111111111": {
168 StorageClasses: map[string]bool{"class1": true}},
169 "zzzzz-nyw5e-222222222222222": {
172 StorageClasses: map[string]bool{"class2": true, "class3": true}},
175 // "foobar" is just some data that happens to result in
176 // rendezvous order {111, 222}
177 data := []byte("foobar")
178 hash := fmt.Sprintf("%x", md5.Sum(data))
180 for _, trial := range []struct {
181 priority1 int // priority of class1, thus vol1
182 priority2 int // priority of class2
183 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
186 {100, 50, 50, "111 read 385\n"}, // class1 has higher priority => try vol1 first, no need to try vol2
187 {100, 100, 100, "111 read 385\n"}, // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
188 {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
189 {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
191 c.Logf("=== %+v", trial)
193 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
194 "class1": {Priority: trial.priority1},
195 "class2": {Priority: trial.priority2},
196 "class3": {Priority: trial.priority3},
198 ks, cancel := testKeepstore(c, s.cluster, nil)
201 ctx := authContext(arvadostest.ActiveTokenV2)
202 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
205 StorageClasses: []string{"class1"},
209 // Combine logs into one. (We only want the logs from
210 // the BlockRead below, not from BlockWrite above.)
211 stubLog := &stubLog{}
212 for _, mnt := range ks.mounts {
213 mnt.volume.(*stubVolume).stubLog = stubLog
216 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
217 Locator: resp.Locator,
220 c.Assert(n, Equals, len(data))
222 c.Check(stubLog.String(), Equals, trial.expectLog)
226 // Ensure BlockRead(..., {CheckCacheOnly: true}) always returns
229 // There is currently (Arvados 3.1 / March 2025) no way for an
230 // incoming http request to set that field anyway, because nothing
231 // accesses a cache via http. But if/when it does, keepstore's
232 // BlockRead is expected to behave correctly.
233 func (s *keepstoreSuite) TestBlockRead_CheckCacheOnly(c *C) {
234 ks, cancel := testKeepstore(c, s.cluster, nil)
237 ctx := authContext(arvadostest.ActiveTokenV2)
239 data := []byte("foo")
240 hash := fmt.Sprintf("%x", md5.Sum(data))
241 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
247 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
248 Locator: resp.Locator,
251 c.Assert(n, Equals, 3)
254 // Block exists -> ErrNotCached
255 n, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
256 Locator: resp.Locator,
258 CheckCacheOnly: true,
260 c.Check(n, Equals, 0)
261 c.Check(err, Equals, arvados.ErrNotCached)
263 // Block does not exist -> ErrNotCached
264 n, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
265 Locator: ks.signLocator(arvadostest.ActiveTokenV2, "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+3"),
267 CheckCacheOnly: true,
269 c.Check(n, Equals, 0)
270 c.Check(err, Equals, arvados.ErrNotCached)
273 func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
274 for uuid, v := range s.cluster.Volumes {
276 s.cluster.Volumes[uuid] = v
278 ks, cancel := testKeepstore(c, s.cluster, nil)
280 for _, mnt := range ks.mounts {
281 mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
282 c.Error("volume BlockWrite called")
283 return errors.New("fail")
286 ctx := authContext(arvadostest.ActiveTokenV2)
288 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
290 Data: []byte("foo")})
292 c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
295 func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
296 s.cluster.Volumes = map[string]arvados.Volume{
297 "zzzzz-nyw5e-111111111111111": {
300 StorageClasses: map[string]bool{"class1": true}},
301 "zzzzz-nyw5e-121212121212121": {
304 StorageClasses: map[string]bool{"class1": true, "class2": true}},
305 "zzzzz-nyw5e-222222222222222": {
308 StorageClasses: map[string]bool{"class2": true}},
311 // testData is a block that happens to have rendezvous order 111, 121, 222
312 testData := []byte("qux")
313 testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
315 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
321 ctx := authContext(arvadostest.ActiveTokenV2)
322 for idx, trial := range []struct {
323 classes string // desired classes
333 "121 read d85\n" + // write#1
336 "121 read d85\n" + // write#2
338 {"class1,class2", "" +
339 "111 read d85\n" + // write#1
344 "111 read d85\n" + // write#2
348 {"class1,class2,class404", "" +
349 "111 read d85\n" + // write#1
354 "111 read d85\n" + // write#2
359 c.Logf("=== %d: %+v", idx, trial)
361 ks, cancel := testKeepstore(c, s.cluster, nil)
363 stubLog := &stubLog{}
364 for _, mnt := range ks.mounts {
365 mnt.volume.(*stubVolume).stubLog = stubLog
368 // Check that we chose the right block data
369 rvz := ks.rendezvous(testHash, ks.mountsW)
370 c.Assert(rvz[0].UUID[24:], Equals, "111")
371 c.Assert(rvz[1].UUID[24:], Equals, "121")
372 c.Assert(rvz[2].UUID[24:], Equals, "222")
374 for i := 0; i < 2; i++ {
375 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
378 StorageClasses: strings.Split(trial.classes, ","),
382 // The "nextmnt" loop in BlockWrite first starts the
383 // goroutine that writes to mount 121, then the
384 // goroutine that writes to mount 111. Most of the
385 // time, mount 121 will log first, but occasionally
386 // mount 111 will log first. In that case we swap the
387 // log entries. (The order of the rest of the log
388 // entries is meaningful -- just not these two.)
389 gotLog := strings.Replace(stubLog.String(),
390 "111 write d85\n121 write d85\n",
391 "121 write d85\n111 write d85\n", 1)
392 c.Check(gotLog, Equals, trial.expectLog)
396 func (s *keepstoreSuite) TestBlockTrash(c *C) {
397 s.cluster.Volumes = map[string]arvados.Volume{
398 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
399 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
400 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
401 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
403 ks, cancel := testKeepstore(c, s.cluster, nil)
406 var vol []*stubVolume
407 for _, mount := range ks.mountsR {
408 vol = append(vol, mount.volume.(*stubVolume))
410 sort.Slice(vol, func(i, j int) bool {
411 return vol[i].params.UUID < vol[j].params.UUID
414 ctx := context.Background()
415 loc := fooHash + "+3"
416 tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
419 for _, vol := range vol {
420 err := vol.BlockTrash(fooHash)
421 if !os.IsNotExist(err) {
426 writeit := func(volidx int) {
427 err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
429 err = vol[volidx].blockTouchWithTime(fooHash, tOld)
432 trashit := func() error {
433 return ks.BlockTrash(ctx, loc)
435 checkexists := func(volidx int) bool {
436 err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
437 if !os.IsNotExist(err) {
444 c.Check(trashit(), Equals, os.ErrNotExist)
446 // one old replica => trash it
449 c.Check(trashit(), IsNil)
450 c.Check(checkexists(0), Equals, false)
452 // one old replica + one new replica => keep new, trash old
456 c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
457 c.Check(trashit(), IsNil)
458 c.Check(checkexists(0), Equals, false)
459 c.Check(checkexists(1), Equals, true)
461 // two old replicas => trash both
465 c.Check(trashit(), IsNil)
466 c.Check(checkexists(0), Equals, false)
467 c.Check(checkexists(1), Equals, false)
469 // four old replicas => trash all except readonly volume with
470 // AllowTrashWhenReadOnly==false
476 c.Check(trashit(), IsNil)
477 c.Check(checkexists(0), Equals, false)
478 c.Check(checkexists(1), Equals, false)
479 c.Check(checkexists(2), Equals, true)
480 c.Check(checkexists(3), Equals, false)
482 // two old replicas but one returns an error => return the
483 // only non-404 backend error
485 vol[0].blockTrash = func(hash string) error {
486 return errors.New("fake error")
490 c.Check(trashit(), ErrorMatches, "fake error")
491 c.Check(checkexists(0), Equals, true)
492 c.Check(checkexists(1), Equals, false)
493 c.Check(checkexists(2), Equals, false)
494 c.Check(checkexists(3), Equals, false)
497 func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
498 s.cluster.API.MaxKeepBlobBuffers = 1
499 ks, cancel := testKeepstore(c, s.cluster, nil)
501 ok := make(chan struct{})
504 ctx := authContext(arvadostest.ActiveTokenV2)
505 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
507 Data: []byte("foo")})
512 case <-time.After(time.Second):
513 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
517 func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
518 s.cluster.API.MaxKeepBlobBuffers = 4
519 ks, cancel := testKeepstore(c, s.cluster, nil)
522 ctx := authContext(arvadostest.ActiveTokenV2)
523 var wg sync.WaitGroup
524 for range make([]int, 20) {
528 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
530 Data: []byte("foo")})
532 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
533 Locator: resp.Locator,
534 WriteTo: io.Discard})
538 ok := make(chan struct{})
545 case <-time.After(time.Second):
546 c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
550 func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
551 s.cluster.Volumes = map[string]arvados.Volume{
552 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
553 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
554 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
556 ks, cancel := testKeepstore(c, s.cluster, nil)
558 ctx := authContext(arvadostest.ActiveTokenV2)
560 for _, trial := range []struct {
563 expectClasses map[string]int
567 map[string]int{"default": 1}},
570 map[string]int{"default": 1}},
571 {[]string{"default"},
573 map[string]int{"default": 1}},
574 {[]string{"default", "default"},
576 map[string]int{"default": 1}},
577 {[]string{"special"},
579 map[string]int{"extra": 1, "special": 1}},
580 {[]string{"special", "readonly"},
582 map[string]int{"extra": 1, "special": 1}},
583 {[]string{"special", "nonexistent"},
585 map[string]int{"extra": 1, "special": 1}},
586 {[]string{"extra", "special"},
588 map[string]int{"extra": 1, "special": 1}},
589 {[]string{"default", "special"},
591 map[string]int{"default": 1, "extra": 1, "special": 1}},
593 c.Logf("success case %#v", trial)
594 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
597 StorageClasses: trial.ask,
599 if !c.Check(err, IsNil) {
602 c.Check(resp.Replicas, Equals, trial.expectReplicas)
603 if len(trial.expectClasses) == 0 {
604 // any non-empty value is correct
605 c.Check(resp.StorageClasses, Not(HasLen), 0)
607 c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
611 for _, ask := range [][]string{
613 {"doesnotexist", "readonly"},
616 c.Logf("failure case %s", ask)
617 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
626 func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
627 for uuid, v := range s.cluster.Volumes {
629 s.cluster.Volumes[uuid] = v
631 ks, cancel := testKeepstore(c, s.cluster, nil)
634 for _, mnt := range ks.mounts {
635 err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
637 err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
641 err := ks.BlockUntrash(context.Background(), fooHash)
642 c.Check(os.IsNotExist(err), Equals, true)
644 for _, mnt := range ks.mounts {
645 err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
650 func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
651 s.cluster.Volumes = map[string]arvados.Volume{
652 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
653 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
654 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
656 ks, cancel := testKeepstore(c, s.cluster, nil)
658 ctx := authContext(arvadostest.ActiveTokenV2)
660 for i := range make([]byte, 32) {
661 data := []byte(fmt.Sprintf("block %d", i))
662 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
665 c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
666 c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
667 c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
670 func (s *keepstoreSuite) TestGetLocatorInfo(c *C) {
671 for _, trial := range []struct {
676 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
678 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
679 ok: true, expect: locatorInfo{size: 1234}},
680 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
681 ok: true, expect: locatorInfo{size: 1234, signed: true}},
682 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
683 ok: true, expect: locatorInfo{size: 1234, remote: true}},
684 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
685 ok: true, expect: locatorInfo{size: 12345, remote: true}},
686 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123456+👶🦈+Rzzzzz-abcdef",
687 ok: true, expect: locatorInfo{size: 123456, remote: true}},
688 // invalid: bad hash char
689 {locator: "aaaaaaaaaaaaaazaaaaaaaaaaaaaaaaa+1234",
691 {locator: "aaaaaaaaaaaaaaFaaaaaaaaaaaaaaaaa+1234",
693 {locator: "aaaaaaaaaaaaaa⛵aaaaaaaaaaaaaaaaa+1234",
695 // invalid: hash length != 32
698 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
700 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
702 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
704 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
706 // invalid: first hint is not size
707 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
709 // invalid: leading/trailing/double +
710 {locator: "+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
712 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+",
714 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa++1234",
716 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234++Abcdef@abcdef",
719 c.Logf("=== %s", trial.locator)
720 li, err := getLocatorInfo(trial.locator)
726 c.Check(li.hash, Equals, trial.locator[:32])
727 c.Check(li.size, Equals, trial.expect.size)
728 c.Check(li.signed, Equals, trial.expect.signed)
729 c.Check(li.remote, Equals, trial.expect.remote)
733 func (s *keepstoreSuite) TestTimeout(c *C) {
735 port, err := boot.AvailablePort("")
737 go service.Command(arvados.ServiceNameKeepstore, func(ctx context.Context, cluster *arvados.Cluster, token string, reg *prometheus.Registry) service.Handler {
738 rtr = newHandlerOrErrorHandler(ctx, cluster, token, reg).(*router)
740 }).RunCommand("keepstore", []string{"-config", "-"}, strings.NewReader(`
743 SystemRootToken: abcdefg
747 "http://127.0.1.123:`+port+`": {}
749 RequestTimeout: 500ms
750 KeepServiceRequestTimeout: 15s
752 BlobSigningKey: abcdefg
754 zzzzz-nyw5e-000000000000000:
757 `), ctxlog.LogWriter(c.Log), ctxlog.LogWriter(c.Log))
758 for deadline := time.Now().Add(5 * time.Second); rtr == nil; {
759 if time.Now().After(deadline) {
760 c.Error("timed out waiting for service.Command to call newHandler func")
764 stubvol := rtr.keepstore.mountsW[0].volume.(*stubVolume)
765 err = stubvol.BlockWrite(context.Background(), "acbd18db4cc2f85cedef654fccc4a4d8", []byte("foo"))
767 ac, err := arvados.NewClientFromConfig(rtr.keepstore.cluster)
768 ac.AuthToken = "abcdefg"
770 ac.KeepServiceURIs = []string{"http://127.0.1.123:" + port}
771 arv, err := arvadosclient.New(ac)
773 kc := keepclient.New(arv)
776 // Wait for service to come up
777 for deadline := time.Now().Add(time.Second); ; time.Sleep(time.Second / 10) {
778 _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
784 if time.Now().After(deadline) {
785 c.Errorf("timed out waiting for BlockWrite(`foo`) to succeed: %s", err)
790 // Induce a write delay longer than RequestTimeout, and check
791 // that the request fails.
793 stubvol.blockWrite = func(context.Context, string, []byte) error { time.Sleep(time.Second); return nil }
794 _, err = kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
797 c.Check(err, ErrorMatches, `Could not write .* 499 .*`)
798 duration := time.Since(t0)
799 c.Logf("write request duration %s", duration)
800 c.Check(duration > 500*time.Millisecond, Equals, true)
802 // Induce an index delay, and check that the request does
803 // *not* fail (because index is exempt from RequestTimeout).
805 stubvol.index = func(context.Context, string, io.Writer) error { time.Sleep(time.Second); return nil }
807 for uuid = range kc.LocalRoots() {
809 rdr, err := kc.GetIndex(uuid, "")
811 _, err = io.ReadAll(rdr)
813 duration = time.Since(t0)
814 c.Logf("index request duration %s", duration)
815 c.Check(duration > time.Second, Equals, true)
819 driver["stub"] = func(params newVolumeParams) (volume, error) {
822 data: make(map[string]stubData),
829 type stubLog struct {
834 func (sl *stubLog) Printf(format string, args ...interface{}) {
840 fmt.Fprintf(sl, format+"\n", args...)
843 type stubData struct {
849 type stubVolume struct {
850 params newVolumeParams
851 data map[string]stubData
855 // The following funcs enable tests to insert delays and
856 // failures. Each volume operation begins by calling the
857 // corresponding func (if non-nil). If the func returns an
858 // error, that error is returned to caller. Otherwise, the
859 // stub continues normally.
860 blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error
861 blockWrite func(ctx context.Context, hash string, data []byte) error
862 deviceID func() string
863 blockTouch func(hash string) error
864 blockTrash func(hash string) error
865 blockUntrash func(hash string) error
866 index func(ctx context.Context, prefix string, writeTo io.Writer) error
867 mtime func(hash string) (time.Time, error)
871 func (v *stubVolume) log(op, hash string) {
872 // Note this intentionally crashes if UUID or hash is short --
873 // if keepstore ever does that, tests should fail.
874 v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
877 func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
879 if v.blockRead != nil {
880 err := v.blockRead(ctx, hash, writeTo)
886 ent, ok := v.data[hash]
888 if !ok || !ent.trash.IsZero() {
889 return os.ErrNotExist
892 for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
893 data := ent.data[wrote:]
894 if len(data) > writesize {
895 data = data[:writesize]
897 n, err := writeTo.WriteAt(data, int64(wrote))
906 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
908 if v.blockWrite != nil {
909 if err := v.blockWrite(ctx, hash, data); err != nil {
915 v.data[hash] = stubData{
917 data: append([]byte(nil), data...),
922 func (v *stubVolume) DeviceID() string {
923 return fmt.Sprintf("%p", v)
926 func (v *stubVolume) BlockTouch(hash string) error {
928 if v.blockTouch != nil {
929 if err := v.blockTouch(hash); err != nil {
935 ent, ok := v.data[hash]
936 if !ok || !ent.trash.IsZero() {
937 return os.ErrNotExist
939 ent.mtime = time.Now()
944 // Set mtime to the (presumably old) specified time.
945 func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
946 v.log("touchwithtime", hash)
949 ent, ok := v.data[hash]
951 return os.ErrNotExist
958 func (v *stubVolume) BlockTrash(hash string) error {
960 if v.blockTrash != nil {
961 if err := v.blockTrash(hash); err != nil {
967 ent, ok := v.data[hash]
968 if !ok || !ent.trash.IsZero() {
969 return os.ErrNotExist
971 ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
976 func (v *stubVolume) BlockUntrash(hash string) error {
977 v.log("untrash", hash)
978 if v.blockUntrash != nil {
979 if err := v.blockUntrash(hash); err != nil {
985 ent, ok := v.data[hash]
986 if !ok || ent.trash.IsZero() {
987 return os.ErrNotExist
989 ent.trash = time.Time{}
994 func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
995 v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
997 if err := v.index(ctx, prefix, writeTo); err != nil {
1001 buf := &bytes.Buffer{}
1003 for hash, ent := range v.data {
1004 if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
1005 fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
1009 if err := ctx.Err(); err != nil {
1012 _, err := io.Copy(writeTo, buf)
1016 func (v *stubVolume) Mtime(hash string) (time.Time, error) {
1017 v.log("mtime", hash)
1019 if t, err := v.mtime(hash); err != nil {
1024 defer v.mtx.Unlock()
1025 ent, ok := v.data[hash]
1026 if !ok || !ent.trash.IsZero() {
1027 return time.Time{}, os.ErrNotExist
1029 return ent.mtime, nil
1032 func (v *stubVolume) EmptyTrash() {
1033 v.stubLog.Printf("%s emptytrash", v.params.UUID)
1035 defer v.mtx.Unlock()
1036 for hash, ent := range v.data {
1037 if !ent.trash.IsZero() && time.Now().After(ent.trash) {
1038 delete(v.data, hash)