1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/config"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/arvadostest"
25 "git.arvados.org/arvados.git/sdk/go/auth"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "github.com/prometheus/client_golang/prometheus"
31 func TestGocheck(t *testing.T) {
36 fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
37 barHash = "37b51d194a7513e45b56f6524f2d51f2"
40 var testServiceURL = func() arvados.URL {
41 return arvados.URL{Host: "localhost:12345", Scheme: "http"}
44 func authContext(token string) context.Context {
45 return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
48 func testCluster(t TB) *arvados.Cluster {
49 cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
53 cluster, err := cfg.GetCluster("")
57 cluster.SystemRootToken = arvadostest.SystemRootToken
58 cluster.ManagementToken = arvadostest.ManagementToken
62 func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
64 reg = prometheus.NewRegistry()
66 ctx, cancel := context.WithCancel(context.Background())
67 ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
68 ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
75 var _ = Suite(&keepstoreSuite{})
77 type keepstoreSuite struct {
78 cluster *arvados.Cluster
81 func (s *keepstoreSuite) SetUpTest(c *C) {
82 s.cluster = testCluster(c)
83 s.cluster.Volumes = map[string]arvados.Volume{
84 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
85 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
89 func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
90 ks, cancel := testKeepstore(c, s.cluster, nil)
93 ctx := authContext(arvadostest.ActiveTokenV2)
95 fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
96 err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
99 _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
103 c.Check(err, ErrorMatches, "hash collision")
105 buf := bytes.NewBuffer(nil)
106 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
107 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
110 c.Check(err, ErrorMatches, "checksum mismatch in stored data")
111 c.Check(buf.String(), Not(Equals), "foo")
112 c.Check(buf.Len() < 3, Equals, true)
114 err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
117 buf = bytes.NewBuffer(nil)
118 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
119 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
122 c.Check(err, ErrorMatches, "checksum mismatch in stored data")
123 c.Check(buf.Len() < 3, Equals, true)
126 func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
127 origKey := s.cluster.Collections.BlobSigningKey
128 s.cluster.Collections.BlobSigning = false
129 s.cluster.Collections.BlobSigningKey = ""
130 ks, cancel := testKeepstore(c, s.cluster, nil)
133 resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
138 c.Check(resp.Locator, Equals, fooHash+"+3")
139 locUnsigned := resp.Locator
141 locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
142 c.Assert(locSigned, Not(Equals), locUnsigned)
144 for _, locator := range []string{locUnsigned, locSigned} {
145 for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
146 c.Logf("=== locator %q token %q", locator, token)
147 ctx := authContext(token)
148 buf := bytes.NewBuffer(nil)
149 _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
154 c.Check(buf.String(), Equals, "foo")
159 func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
160 s.cluster.Volumes = map[string]arvados.Volume{
161 "zzzzz-nyw5e-111111111111111": {
164 StorageClasses: map[string]bool{"class1": true}},
165 "zzzzz-nyw5e-222222222222222": {
168 StorageClasses: map[string]bool{"class2": true, "class3": true}},
171 // "foobar" is just some data that happens to result in
172 // rendezvous order {111, 222}
173 data := []byte("foobar")
174 hash := fmt.Sprintf("%x", md5.Sum(data))
176 for _, trial := range []struct {
177 priority1 int // priority of class1, thus vol1
178 priority2 int // priority of class2
179 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
182 {100, 50, 50, "111 read 385\n"}, // class1 has higher priority => try vol1 first, no need to try vol2
183 {100, 100, 100, "111 read 385\n"}, // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
184 {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
185 {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
187 c.Logf("=== %+v", trial)
189 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
190 "class1": {Priority: trial.priority1},
191 "class2": {Priority: trial.priority2},
192 "class3": {Priority: trial.priority3},
194 ks, cancel := testKeepstore(c, s.cluster, nil)
197 ctx := authContext(arvadostest.ActiveTokenV2)
198 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
201 StorageClasses: []string{"class1"},
205 // Combine logs into one. (We only want the logs from
206 // the BlockRead below, not from BlockWrite above.)
207 stubLog := &stubLog{}
208 for _, mnt := range ks.mounts {
209 mnt.volume.(*stubVolume).stubLog = stubLog
212 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
213 Locator: resp.Locator,
216 c.Assert(n, Equals, len(data))
218 c.Check(stubLog.String(), Equals, trial.expectLog)
222 func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
223 for uuid, v := range s.cluster.Volumes {
225 s.cluster.Volumes[uuid] = v
227 ks, cancel := testKeepstore(c, s.cluster, nil)
229 for _, mnt := range ks.mounts {
230 mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
231 c.Error("volume BlockWrite called")
232 return errors.New("fail")
235 ctx := authContext(arvadostest.ActiveTokenV2)
237 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
239 Data: []byte("foo")})
241 c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
244 func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
245 s.cluster.Volumes = map[string]arvados.Volume{
246 "zzzzz-nyw5e-111111111111111": {
249 StorageClasses: map[string]bool{"class1": true}},
250 "zzzzz-nyw5e-121212121212121": {
253 StorageClasses: map[string]bool{"class1": true, "class2": true}},
254 "zzzzz-nyw5e-222222222222222": {
257 StorageClasses: map[string]bool{"class2": true}},
260 // testData is a block that happens to have rendezvous order 111, 121, 222
261 testData := []byte("qux")
262 testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
264 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
270 ctx := authContext(arvadostest.ActiveTokenV2)
271 for idx, trial := range []struct {
272 classes string // desired classes
282 "121 read d85\n" + // write#1
285 "121 read d85\n" + // write#2
287 {"class1,class2", "" +
288 "111 read d85\n" + // write#1
293 "111 read d85\n" + // write#2
297 {"class1,class2,class404", "" +
298 "111 read d85\n" + // write#1
303 "111 read d85\n" + // write#2
308 c.Logf("=== %d: %+v", idx, trial)
310 ks, cancel := testKeepstore(c, s.cluster, nil)
312 stubLog := &stubLog{}
313 for _, mnt := range ks.mounts {
314 mnt.volume.(*stubVolume).stubLog = stubLog
317 // Check that we chose the right block data
318 rvz := ks.rendezvous(testHash, ks.mountsW)
319 c.Assert(rvz[0].UUID[24:], Equals, "111")
320 c.Assert(rvz[1].UUID[24:], Equals, "121")
321 c.Assert(rvz[2].UUID[24:], Equals, "222")
323 for i := 0; i < 2; i++ {
324 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
327 StorageClasses: strings.Split(trial.classes, ","),
331 c.Check(stubLog.String(), Equals, trial.expectLog)
335 func (s *keepstoreSuite) TestBlockTrash(c *C) {
336 s.cluster.Volumes = map[string]arvados.Volume{
337 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
338 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
339 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
340 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
342 ks, cancel := testKeepstore(c, s.cluster, nil)
345 var vol []*stubVolume
346 for _, mount := range ks.mountsR {
347 vol = append(vol, mount.volume.(*stubVolume))
349 sort.Slice(vol, func(i, j int) bool {
350 return vol[i].params.UUID < vol[j].params.UUID
353 ctx := context.Background()
354 loc := fooHash + "+3"
355 tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
358 for _, vol := range vol {
359 err := vol.BlockTrash(fooHash)
360 if !os.IsNotExist(err) {
365 writeit := func(volidx int) {
366 err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
368 err = vol[volidx].blockTouchWithTime(fooHash, tOld)
371 trashit := func() error {
372 return ks.BlockTrash(ctx, loc)
374 checkexists := func(volidx int) bool {
375 err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
376 if !os.IsNotExist(err) {
383 c.Check(trashit(), Equals, os.ErrNotExist)
385 // one old replica => trash it
388 c.Check(trashit(), IsNil)
389 c.Check(checkexists(0), Equals, false)
391 // one old replica + one new replica => keep new, trash old
395 c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
396 c.Check(trashit(), IsNil)
397 c.Check(checkexists(0), Equals, false)
398 c.Check(checkexists(1), Equals, true)
400 // two old replicas => trash both
404 c.Check(trashit(), IsNil)
405 c.Check(checkexists(0), Equals, false)
406 c.Check(checkexists(1), Equals, false)
408 // four old replicas => trash all except readonly volume with
409 // AllowTrashWhenReadOnly==false
415 c.Check(trashit(), IsNil)
416 c.Check(checkexists(0), Equals, false)
417 c.Check(checkexists(1), Equals, false)
418 c.Check(checkexists(2), Equals, true)
419 c.Check(checkexists(3), Equals, false)
421 // two old replicas but one returns an error => return the
422 // only non-404 backend error
424 vol[0].blockTrash = func(hash string) error {
425 return errors.New("fake error")
429 c.Check(trashit(), ErrorMatches, "fake error")
430 c.Check(checkexists(0), Equals, true)
431 c.Check(checkexists(1), Equals, false)
432 c.Check(checkexists(2), Equals, false)
433 c.Check(checkexists(3), Equals, false)
436 func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
437 s.cluster.API.MaxKeepBlobBuffers = 1
438 ks, cancel := testKeepstore(c, s.cluster, nil)
440 ok := make(chan struct{})
443 ctx := authContext(arvadostest.ActiveTokenV2)
444 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
446 Data: []byte("foo")})
451 case <-time.After(time.Second):
452 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
456 func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
457 s.cluster.API.MaxKeepBlobBuffers = 4
458 ks, cancel := testKeepstore(c, s.cluster, nil)
461 ctx := authContext(arvadostest.ActiveTokenV2)
462 var wg sync.WaitGroup
463 for range make([]int, 20) {
467 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
469 Data: []byte("foo")})
471 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
472 Locator: resp.Locator,
473 WriteTo: io.Discard})
477 ok := make(chan struct{})
484 case <-time.After(time.Second):
485 c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
489 func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
490 s.cluster.Volumes = map[string]arvados.Volume{
491 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
492 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
493 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
495 ks, cancel := testKeepstore(c, s.cluster, nil)
497 ctx := authContext(arvadostest.ActiveTokenV2)
499 for _, trial := range []struct {
502 expectClasses map[string]int
506 map[string]int{"default": 1}},
509 map[string]int{"default": 1}},
510 {[]string{"default"},
512 map[string]int{"default": 1}},
513 {[]string{"default", "default"},
515 map[string]int{"default": 1}},
516 {[]string{"special"},
518 map[string]int{"extra": 1, "special": 1}},
519 {[]string{"special", "readonly"},
521 map[string]int{"extra": 1, "special": 1}},
522 {[]string{"special", "nonexistent"},
524 map[string]int{"extra": 1, "special": 1}},
525 {[]string{"extra", "special"},
527 map[string]int{"extra": 1, "special": 1}},
528 {[]string{"default", "special"},
530 map[string]int{"default": 1, "extra": 1, "special": 1}},
532 c.Logf("success case %#v", trial)
533 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
536 StorageClasses: trial.ask,
538 if !c.Check(err, IsNil) {
541 c.Check(resp.Replicas, Equals, trial.expectReplicas)
542 if len(trial.expectClasses) == 0 {
543 // any non-empty value is correct
544 c.Check(resp.StorageClasses, Not(HasLen), 0)
546 c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
550 for _, ask := range [][]string{
552 {"doesnotexist", "readonly"},
555 c.Logf("failure case %s", ask)
556 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
565 func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
566 for uuid, v := range s.cluster.Volumes {
568 s.cluster.Volumes[uuid] = v
570 ks, cancel := testKeepstore(c, s.cluster, nil)
573 for _, mnt := range ks.mounts {
574 err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
576 err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
580 err := ks.BlockUntrash(context.Background(), fooHash)
581 c.Check(os.IsNotExist(err), Equals, true)
583 for _, mnt := range ks.mounts {
584 err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
589 func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
590 s.cluster.Volumes = map[string]arvados.Volume{
591 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
592 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
593 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
595 ks, cancel := testKeepstore(c, s.cluster, nil)
597 ctx := authContext(arvadostest.ActiveTokenV2)
599 for i := range make([]byte, 32) {
600 data := []byte(fmt.Sprintf("block %d", i))
601 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
604 c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
605 c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
606 c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
609 func (s *keepstoreSuite) TestGetLocatorInfo(c *C) {
610 for _, trial := range []struct {
615 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
617 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
618 ok: true, expect: locatorInfo{size: 1234}},
619 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
620 ok: true, expect: locatorInfo{size: 1234, signed: true}},
621 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
622 ok: true, expect: locatorInfo{size: 1234, remote: true}},
623 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
624 ok: true, expect: locatorInfo{size: 12345, remote: true}},
625 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123456+👶🦈+Rzzzzz-abcdef",
626 ok: true, expect: locatorInfo{size: 123456, remote: true}},
627 // invalid: bad hash char
628 {locator: "aaaaaaaaaaaaaazaaaaaaaaaaaaaaaaa+1234",
630 {locator: "aaaaaaaaaaaaaaFaaaaaaaaaaaaaaaaa+1234",
632 {locator: "aaaaaaaaaaaaaa⛵aaaaaaaaaaaaaaaaa+1234",
634 // invalid: hash length != 32
637 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
639 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
641 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
643 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
645 // invalid: first hint is not size
646 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
648 // invalid: leading/trailing/double +
649 {locator: "+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
651 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+",
653 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa++1234",
655 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234++Abcdef@abcdef",
658 c.Logf("=== %s", trial.locator)
659 li, err := getLocatorInfo(trial.locator)
665 c.Check(li.hash, Equals, trial.locator[:32])
666 c.Check(li.size, Equals, trial.expect.size)
667 c.Check(li.signed, Equals, trial.expect.signed)
668 c.Check(li.remote, Equals, trial.expect.remote)
673 driver["stub"] = func(params newVolumeParams) (volume, error) {
676 data: make(map[string]stubData),
683 type stubLog struct {
688 func (sl *stubLog) Printf(format string, args ...interface{}) {
694 fmt.Fprintf(sl, format+"\n", args...)
697 type stubData struct {
703 type stubVolume struct {
704 params newVolumeParams
705 data map[string]stubData
709 // The following funcs enable tests to insert delays and
710 // failures. Each volume operation begins by calling the
711 // corresponding func (if non-nil). If the func returns an
712 // error, that error is returned to caller. Otherwise, the
713 // stub continues normally.
714 blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error
715 blockWrite func(ctx context.Context, hash string, data []byte) error
716 deviceID func() string
717 blockTouch func(hash string) error
718 blockTrash func(hash string) error
719 blockUntrash func(hash string) error
720 index func(ctx context.Context, prefix string, writeTo io.Writer) error
721 mtime func(hash string) (time.Time, error)
725 func (v *stubVolume) log(op, hash string) {
726 // Note this intentionally crashes if UUID or hash is short --
727 // if keepstore ever does that, tests should fail.
728 v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
731 func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
733 if v.blockRead != nil {
734 err := v.blockRead(ctx, hash, writeTo)
740 ent, ok := v.data[hash]
742 if !ok || !ent.trash.IsZero() {
743 return os.ErrNotExist
746 for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
747 data := ent.data[wrote:]
748 if len(data) > writesize {
749 data = data[:writesize]
751 n, err := writeTo.WriteAt(data, int64(wrote))
760 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
762 if v.blockWrite != nil {
763 if err := v.blockWrite(ctx, hash, data); err != nil {
769 v.data[hash] = stubData{
771 data: append([]byte(nil), data...),
776 func (v *stubVolume) DeviceID() string {
777 return fmt.Sprintf("%p", v)
780 func (v *stubVolume) BlockTouch(hash string) error {
782 if v.blockTouch != nil {
783 if err := v.blockTouch(hash); err != nil {
789 ent, ok := v.data[hash]
790 if !ok || !ent.trash.IsZero() {
791 return os.ErrNotExist
793 ent.mtime = time.Now()
798 // Set mtime to the (presumably old) specified time.
799 func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
800 v.log("touchwithtime", hash)
803 ent, ok := v.data[hash]
805 return os.ErrNotExist
812 func (v *stubVolume) BlockTrash(hash string) error {
814 if v.blockTrash != nil {
815 if err := v.blockTrash(hash); err != nil {
821 ent, ok := v.data[hash]
822 if !ok || !ent.trash.IsZero() {
823 return os.ErrNotExist
825 ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
830 func (v *stubVolume) BlockUntrash(hash string) error {
831 v.log("untrash", hash)
832 if v.blockUntrash != nil {
833 if err := v.blockUntrash(hash); err != nil {
839 ent, ok := v.data[hash]
840 if !ok || ent.trash.IsZero() {
841 return os.ErrNotExist
843 ent.trash = time.Time{}
848 func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
849 v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
851 if err := v.index(ctx, prefix, writeTo); err != nil {
855 buf := &bytes.Buffer{}
857 for hash, ent := range v.data {
858 if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
859 fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
863 _, err := io.Copy(writeTo, buf)
867 func (v *stubVolume) Mtime(hash string) (time.Time, error) {
870 if t, err := v.mtime(hash); err != nil {
876 ent, ok := v.data[hash]
877 if !ok || !ent.trash.IsZero() {
878 return time.Time{}, os.ErrNotExist
880 return ent.mtime, nil
883 func (v *stubVolume) EmptyTrash() {
884 v.stubLog.Printf("%s emptytrash", v.params.UUID)
887 for hash, ent := range v.data {
888 if !ent.trash.IsZero() && time.Now().After(ent.trash) {