1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/config"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/arvadostest"
25 "git.arvados.org/arvados.git/sdk/go/auth"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "github.com/prometheus/client_golang/prometheus"
31 func TestGocheck(t *testing.T) {
36 fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
37 barHash = "37b51d194a7513e45b56f6524f2d51f2"
40 var testServiceURL = func() arvados.URL {
41 return arvados.URL{Host: "localhost:12345", Scheme: "http"}
44 func authContext(token string) context.Context {
45 return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
48 func testCluster(t TB) *arvados.Cluster {
49 cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
53 cluster, err := cfg.GetCluster("")
57 cluster.SystemRootToken = arvadostest.SystemRootToken
58 cluster.ManagementToken = arvadostest.ManagementToken
62 func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
64 reg = prometheus.NewRegistry()
66 ctx, cancel := context.WithCancel(context.Background())
67 ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
68 ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
75 var _ = Suite(&keepstoreSuite{})
77 type keepstoreSuite struct {
78 cluster *arvados.Cluster
81 func (s *keepstoreSuite) SetUpTest(c *C) {
82 s.cluster = testCluster(c)
83 s.cluster.Volumes = map[string]arvados.Volume{
84 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
85 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
89 func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
90 ks, cancel := testKeepstore(c, s.cluster, nil)
93 ctx := authContext(arvadostest.ActiveTokenV2)
95 fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
96 err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
99 _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
103 c.Check(err, ErrorMatches, "hash collision")
105 buf := bytes.NewBuffer(nil)
106 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
107 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
110 c.Check(err, ErrorMatches, "checksum mismatch in stored data")
111 c.Check(buf.String(), Not(Equals), "foo")
112 c.Check(buf.Len() < 3, Equals, true)
114 err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
117 buf = bytes.NewBuffer(nil)
118 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
119 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
122 c.Check(err, ErrorMatches, "checksum mismatch in stored data")
123 c.Check(buf.Len() < 3, Equals, true)
126 func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
127 origKey := s.cluster.Collections.BlobSigningKey
128 s.cluster.Collections.BlobSigning = false
129 s.cluster.Collections.BlobSigningKey = ""
130 ks, cancel := testKeepstore(c, s.cluster, nil)
133 resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
138 c.Check(resp.Locator, Equals, fooHash+"+3")
139 locUnsigned := resp.Locator
141 locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
142 c.Assert(locSigned, Not(Equals), locUnsigned)
144 for _, locator := range []string{locUnsigned, locSigned} {
145 for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
146 c.Logf("=== locator %q token %q", locator, token)
147 ctx := authContext(token)
148 buf := bytes.NewBuffer(nil)
149 _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
154 c.Check(buf.String(), Equals, "foo")
159 func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
160 s.cluster.Volumes = map[string]arvados.Volume{
161 "zzzzz-nyw5e-111111111111111": {
164 StorageClasses: map[string]bool{"class1": true}},
165 "zzzzz-nyw5e-222222222222222": {
168 StorageClasses: map[string]bool{"class2": true, "class3": true}},
171 // "foobar" is just some data that happens to result in
172 // rendezvous order {111, 222}
173 data := []byte("foobar")
174 hash := fmt.Sprintf("%x", md5.Sum(data))
176 for _, trial := range []struct {
177 priority1 int // priority of class1, thus vol1
178 priority2 int // priority of class2
179 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
182 {100, 50, 50, "111 read 385\n"}, // class1 has higher priority => try vol1 first, no need to try vol2
183 {100, 100, 100, "111 read 385\n"}, // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
184 {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
185 {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
187 c.Logf("=== %+v", trial)
189 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
190 "class1": {Priority: trial.priority1},
191 "class2": {Priority: trial.priority2},
192 "class3": {Priority: trial.priority3},
194 ks, cancel := testKeepstore(c, s.cluster, nil)
197 ctx := authContext(arvadostest.ActiveTokenV2)
198 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
201 StorageClasses: []string{"class1"},
205 // Combine logs into one. (We only want the logs from
206 // the BlockRead below, not from BlockWrite above.)
207 stubLog := &stubLog{}
208 for _, mnt := range ks.mounts {
209 mnt.volume.(*stubVolume).stubLog = stubLog
212 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
213 Locator: resp.Locator,
216 c.Assert(n, Equals, len(data))
218 c.Check(stubLog.String(), Equals, trial.expectLog)
222 func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
223 for uuid, v := range s.cluster.Volumes {
225 s.cluster.Volumes[uuid] = v
227 ks, cancel := testKeepstore(c, s.cluster, nil)
229 for _, mnt := range ks.mounts {
230 mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
231 c.Error("volume BlockWrite called")
232 return errors.New("fail")
235 ctx := authContext(arvadostest.ActiveTokenV2)
237 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
239 Data: []byte("foo")})
241 c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
244 func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
245 s.cluster.Volumes = map[string]arvados.Volume{
246 "zzzzz-nyw5e-111111111111111": {
249 StorageClasses: map[string]bool{"class1": true}},
250 "zzzzz-nyw5e-121212121212121": {
253 StorageClasses: map[string]bool{"class1": true, "class2": true}},
254 "zzzzz-nyw5e-222222222222222": {
257 StorageClasses: map[string]bool{"class2": true}},
260 // testData is a block that happens to have rendezvous order 111, 121, 222
261 testData := []byte("qux")
262 testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
264 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
270 ctx := authContext(arvadostest.ActiveTokenV2)
271 for idx, trial := range []struct {
272 classes string // desired classes
282 "121 read d85\n" + // write#1
285 "121 read d85\n" + // write#2
287 {"class1,class2", "" +
288 "111 read d85\n" + // write#1
293 "111 read d85\n" + // write#2
297 {"class1,class2,class404", "" +
298 "111 read d85\n" + // write#1
303 "111 read d85\n" + // write#2
308 c.Logf("=== %d: %+v", idx, trial)
310 ks, cancel := testKeepstore(c, s.cluster, nil)
312 stubLog := &stubLog{}
313 for _, mnt := range ks.mounts {
314 mnt.volume.(*stubVolume).stubLog = stubLog
317 // Check that we chose the right block data
318 rvz := ks.rendezvous(testHash, ks.mountsW)
319 c.Assert(rvz[0].UUID[24:], Equals, "111")
320 c.Assert(rvz[1].UUID[24:], Equals, "121")
321 c.Assert(rvz[2].UUID[24:], Equals, "222")
323 for i := 0; i < 2; i++ {
324 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
327 StorageClasses: strings.Split(trial.classes, ","),
331 // The "nextmnt" loop in BlockWrite first starts the
332 // goroutine that writes to mount 121, then the
333 // goroutine that writes to mount 111. Most of the
334 // time, mount 121 will log first, but occasionally
335 // mount 111 will log first. In that case we swap the
336 // log entries. (The order of the rest of the log
337 // entries is meaningful -- just not these two.)
338 gotLog := strings.Replace(stubLog.String(),
339 "111 write d85\n121 write d85\n",
340 "121 write d85\n111 write d85\n", 1)
341 c.Check(gotLog, Equals, trial.expectLog)
345 func (s *keepstoreSuite) TestBlockTrash(c *C) {
346 s.cluster.Volumes = map[string]arvados.Volume{
347 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
348 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
349 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
350 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
352 ks, cancel := testKeepstore(c, s.cluster, nil)
355 var vol []*stubVolume
356 for _, mount := range ks.mountsR {
357 vol = append(vol, mount.volume.(*stubVolume))
359 sort.Slice(vol, func(i, j int) bool {
360 return vol[i].params.UUID < vol[j].params.UUID
363 ctx := context.Background()
364 loc := fooHash + "+3"
365 tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
368 for _, vol := range vol {
369 err := vol.BlockTrash(fooHash)
370 if !os.IsNotExist(err) {
375 writeit := func(volidx int) {
376 err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
378 err = vol[volidx].blockTouchWithTime(fooHash, tOld)
381 trashit := func() error {
382 return ks.BlockTrash(ctx, loc)
384 checkexists := func(volidx int) bool {
385 err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
386 if !os.IsNotExist(err) {
393 c.Check(trashit(), Equals, os.ErrNotExist)
395 // one old replica => trash it
398 c.Check(trashit(), IsNil)
399 c.Check(checkexists(0), Equals, false)
401 // one old replica + one new replica => keep new, trash old
405 c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
406 c.Check(trashit(), IsNil)
407 c.Check(checkexists(0), Equals, false)
408 c.Check(checkexists(1), Equals, true)
410 // two old replicas => trash both
414 c.Check(trashit(), IsNil)
415 c.Check(checkexists(0), Equals, false)
416 c.Check(checkexists(1), Equals, false)
418 // four old replicas => trash all except readonly volume with
419 // AllowTrashWhenReadOnly==false
425 c.Check(trashit(), IsNil)
426 c.Check(checkexists(0), Equals, false)
427 c.Check(checkexists(1), Equals, false)
428 c.Check(checkexists(2), Equals, true)
429 c.Check(checkexists(3), Equals, false)
431 // two old replicas but one returns an error => return the
432 // only non-404 backend error
434 vol[0].blockTrash = func(hash string) error {
435 return errors.New("fake error")
439 c.Check(trashit(), ErrorMatches, "fake error")
440 c.Check(checkexists(0), Equals, true)
441 c.Check(checkexists(1), Equals, false)
442 c.Check(checkexists(2), Equals, false)
443 c.Check(checkexists(3), Equals, false)
446 func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
447 s.cluster.API.MaxKeepBlobBuffers = 1
448 ks, cancel := testKeepstore(c, s.cluster, nil)
450 ok := make(chan struct{})
453 ctx := authContext(arvadostest.ActiveTokenV2)
454 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
456 Data: []byte("foo")})
461 case <-time.After(time.Second):
462 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
466 func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
467 s.cluster.API.MaxKeepBlobBuffers = 4
468 ks, cancel := testKeepstore(c, s.cluster, nil)
471 ctx := authContext(arvadostest.ActiveTokenV2)
472 var wg sync.WaitGroup
473 for range make([]int, 20) {
477 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
479 Data: []byte("foo")})
481 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
482 Locator: resp.Locator,
483 WriteTo: io.Discard})
487 ok := make(chan struct{})
494 case <-time.After(time.Second):
495 c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
499 func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
500 s.cluster.Volumes = map[string]arvados.Volume{
501 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
502 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
503 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
505 ks, cancel := testKeepstore(c, s.cluster, nil)
507 ctx := authContext(arvadostest.ActiveTokenV2)
509 for _, trial := range []struct {
512 expectClasses map[string]int
516 map[string]int{"default": 1}},
519 map[string]int{"default": 1}},
520 {[]string{"default"},
522 map[string]int{"default": 1}},
523 {[]string{"default", "default"},
525 map[string]int{"default": 1}},
526 {[]string{"special"},
528 map[string]int{"extra": 1, "special": 1}},
529 {[]string{"special", "readonly"},
531 map[string]int{"extra": 1, "special": 1}},
532 {[]string{"special", "nonexistent"},
534 map[string]int{"extra": 1, "special": 1}},
535 {[]string{"extra", "special"},
537 map[string]int{"extra": 1, "special": 1}},
538 {[]string{"default", "special"},
540 map[string]int{"default": 1, "extra": 1, "special": 1}},
542 c.Logf("success case %#v", trial)
543 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
546 StorageClasses: trial.ask,
548 if !c.Check(err, IsNil) {
551 c.Check(resp.Replicas, Equals, trial.expectReplicas)
552 if len(trial.expectClasses) == 0 {
553 // any non-empty value is correct
554 c.Check(resp.StorageClasses, Not(HasLen), 0)
556 c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
560 for _, ask := range [][]string{
562 {"doesnotexist", "readonly"},
565 c.Logf("failure case %s", ask)
566 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
575 func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
576 for uuid, v := range s.cluster.Volumes {
578 s.cluster.Volumes[uuid] = v
580 ks, cancel := testKeepstore(c, s.cluster, nil)
583 for _, mnt := range ks.mounts {
584 err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
586 err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
590 err := ks.BlockUntrash(context.Background(), fooHash)
591 c.Check(os.IsNotExist(err), Equals, true)
593 for _, mnt := range ks.mounts {
594 err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
599 func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
600 s.cluster.Volumes = map[string]arvados.Volume{
601 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
602 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
603 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
605 ks, cancel := testKeepstore(c, s.cluster, nil)
607 ctx := authContext(arvadostest.ActiveTokenV2)
609 for i := range make([]byte, 32) {
610 data := []byte(fmt.Sprintf("block %d", i))
611 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
614 c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
615 c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
616 c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
619 func (s *keepstoreSuite) TestGetLocatorInfo(c *C) {
620 for _, trial := range []struct {
625 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
627 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
628 ok: true, expect: locatorInfo{size: 1234}},
629 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
630 ok: true, expect: locatorInfo{size: 1234, signed: true}},
631 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
632 ok: true, expect: locatorInfo{size: 1234, remote: true}},
633 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
634 ok: true, expect: locatorInfo{size: 12345, remote: true}},
635 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123456+👶🦈+Rzzzzz-abcdef",
636 ok: true, expect: locatorInfo{size: 123456, remote: true}},
637 // invalid: bad hash char
638 {locator: "aaaaaaaaaaaaaazaaaaaaaaaaaaaaaaa+1234",
640 {locator: "aaaaaaaaaaaaaaFaaaaaaaaaaaaaaaaa+1234",
642 {locator: "aaaaaaaaaaaaaa⛵aaaaaaaaaaaaaaaaa+1234",
644 // invalid: hash length != 32
647 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
649 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
651 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
653 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
655 // invalid: first hint is not size
656 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
658 // invalid: leading/trailing/double +
659 {locator: "+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
661 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+",
663 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa++1234",
665 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234++Abcdef@abcdef",
668 c.Logf("=== %s", trial.locator)
669 li, err := getLocatorInfo(trial.locator)
675 c.Check(li.hash, Equals, trial.locator[:32])
676 c.Check(li.size, Equals, trial.expect.size)
677 c.Check(li.signed, Equals, trial.expect.signed)
678 c.Check(li.remote, Equals, trial.expect.remote)
683 driver["stub"] = func(params newVolumeParams) (volume, error) {
686 data: make(map[string]stubData),
693 type stubLog struct {
698 func (sl *stubLog) Printf(format string, args ...interface{}) {
704 fmt.Fprintf(sl, format+"\n", args...)
707 type stubData struct {
713 type stubVolume struct {
714 params newVolumeParams
715 data map[string]stubData
719 // The following funcs enable tests to insert delays and
720 // failures. Each volume operation begins by calling the
721 // corresponding func (if non-nil). If the func returns an
722 // error, that error is returned to caller. Otherwise, the
723 // stub continues normally.
724 blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error
725 blockWrite func(ctx context.Context, hash string, data []byte) error
726 deviceID func() string
727 blockTouch func(hash string) error
728 blockTrash func(hash string) error
729 blockUntrash func(hash string) error
730 index func(ctx context.Context, prefix string, writeTo io.Writer) error
731 mtime func(hash string) (time.Time, error)
735 func (v *stubVolume) log(op, hash string) {
736 // Note this intentionally crashes if UUID or hash is short --
737 // if keepstore ever does that, tests should fail.
738 v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
741 func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
743 if v.blockRead != nil {
744 err := v.blockRead(ctx, hash, writeTo)
750 ent, ok := v.data[hash]
752 if !ok || !ent.trash.IsZero() {
753 return os.ErrNotExist
756 for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
757 data := ent.data[wrote:]
758 if len(data) > writesize {
759 data = data[:writesize]
761 n, err := writeTo.WriteAt(data, int64(wrote))
770 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
772 if v.blockWrite != nil {
773 if err := v.blockWrite(ctx, hash, data); err != nil {
779 v.data[hash] = stubData{
781 data: append([]byte(nil), data...),
786 func (v *stubVolume) DeviceID() string {
787 return fmt.Sprintf("%p", v)
790 func (v *stubVolume) BlockTouch(hash string) error {
792 if v.blockTouch != nil {
793 if err := v.blockTouch(hash); err != nil {
799 ent, ok := v.data[hash]
800 if !ok || !ent.trash.IsZero() {
801 return os.ErrNotExist
803 ent.mtime = time.Now()
808 // Set mtime to the (presumably old) specified time.
809 func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
810 v.log("touchwithtime", hash)
813 ent, ok := v.data[hash]
815 return os.ErrNotExist
822 func (v *stubVolume) BlockTrash(hash string) error {
824 if v.blockTrash != nil {
825 if err := v.blockTrash(hash); err != nil {
831 ent, ok := v.data[hash]
832 if !ok || !ent.trash.IsZero() {
833 return os.ErrNotExist
835 ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
840 func (v *stubVolume) BlockUntrash(hash string) error {
841 v.log("untrash", hash)
842 if v.blockUntrash != nil {
843 if err := v.blockUntrash(hash); err != nil {
849 ent, ok := v.data[hash]
850 if !ok || ent.trash.IsZero() {
851 return os.ErrNotExist
853 ent.trash = time.Time{}
858 func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
859 v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
861 if err := v.index(ctx, prefix, writeTo); err != nil {
865 buf := &bytes.Buffer{}
867 for hash, ent := range v.data {
868 if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
869 fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
873 _, err := io.Copy(writeTo, buf)
877 func (v *stubVolume) Mtime(hash string) (time.Time, error) {
880 if t, err := v.mtime(hash); err != nil {
886 ent, ok := v.data[hash]
887 if !ok || !ent.trash.IsZero() {
888 return time.Time{}, os.ErrNotExist
890 return ent.mtime, nil
893 func (v *stubVolume) EmptyTrash() {
894 v.stubLog.Printf("%s emptytrash", v.params.UUID)
897 for hash, ent := range v.data {
898 if !ent.trash.IsZero() && time.Now().After(ent.trash) {