1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
22 "git.arvados.org/arvados.git/lib/config"
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/arvadostest"
25 "git.arvados.org/arvados.git/sdk/go/auth"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "github.com/prometheus/client_golang/prometheus"
31 func TestGocheck(t *testing.T) {
36 fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
37 barHash = "37b51d194a7513e45b56f6524f2d51f2"
40 var testServiceURL = func() arvados.URL {
41 return arvados.URL{Host: "localhost:12345", Scheme: "http"}
44 func authContext(token string) context.Context {
45 return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
48 func testCluster(t TB) *arvados.Cluster {
49 cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
53 cluster, err := cfg.GetCluster("")
57 cluster.SystemRootToken = arvadostest.SystemRootToken
58 cluster.ManagementToken = arvadostest.ManagementToken
62 func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
64 reg = prometheus.NewRegistry()
66 ctx, cancel := context.WithCancel(context.Background())
67 ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
68 ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
75 var _ = Suite(&keepstoreSuite{})
77 type keepstoreSuite struct {
78 cluster *arvados.Cluster
81 func (s *keepstoreSuite) SetUpTest(c *C) {
82 s.cluster = testCluster(c)
83 s.cluster.Volumes = map[string]arvados.Volume{
84 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
85 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
89 func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
90 ks, cancel := testKeepstore(c, s.cluster, nil)
93 ctx := authContext(arvadostest.ActiveTokenV2)
95 fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
96 err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
99 _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
103 c.Check(err, ErrorMatches, "hash collision")
105 buf := bytes.NewBuffer(nil)
106 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
107 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
110 c.Check(err, ErrorMatches, "checksum mismatch in stored data")
111 c.Check(buf.String(), Not(Equals), "foo")
112 c.Check(buf.Len() < 3, Equals, true)
114 err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
117 buf = bytes.NewBuffer(nil)
118 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
119 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
122 c.Check(err, ErrorMatches, "checksum mismatch in stored data")
123 c.Check(buf.Len() < 3, Equals, true)
126 func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
127 origKey := s.cluster.Collections.BlobSigningKey
128 s.cluster.Collections.BlobSigning = false
129 s.cluster.Collections.BlobSigningKey = ""
130 ks, cancel := testKeepstore(c, s.cluster, nil)
133 resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
138 c.Check(resp.Locator, Equals, fooHash+"+3")
139 locUnsigned := resp.Locator
141 locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
142 c.Assert(locSigned, Not(Equals), locUnsigned)
144 for _, locator := range []string{locUnsigned, locSigned} {
145 for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
146 c.Logf("=== locator %q token %q", locator, token)
147 ctx := authContext(token)
148 buf := bytes.NewBuffer(nil)
149 _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
154 c.Check(buf.String(), Equals, "foo")
159 func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
160 s.cluster.Volumes = map[string]arvados.Volume{
161 "zzzzz-nyw5e-111111111111111": {
164 StorageClasses: map[string]bool{"class1": true}},
165 "zzzzz-nyw5e-222222222222222": {
168 StorageClasses: map[string]bool{"class2": true, "class3": true}},
171 // "foobar" is just some data that happens to result in
172 // rendezvous order {111, 222}
173 data := []byte("foobar")
174 hash := fmt.Sprintf("%x", md5.Sum(data))
176 for _, trial := range []struct {
177 priority1 int // priority of class1, thus vol1
178 priority2 int // priority of class2
179 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
182 {100, 50, 50, "111 read 385\n"}, // class1 has higher priority => try vol1 first, no need to try vol2
183 {100, 100, 100, "111 read 385\n"}, // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
184 {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
185 {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
187 c.Logf("=== %+v", trial)
189 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
190 "class1": {Priority: trial.priority1},
191 "class2": {Priority: trial.priority2},
192 "class3": {Priority: trial.priority3},
194 ks, cancel := testKeepstore(c, s.cluster, nil)
197 ctx := authContext(arvadostest.ActiveTokenV2)
198 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
201 StorageClasses: []string{"class1"},
205 // Combine logs into one. (We only want the logs from
206 // the BlockRead below, not from BlockWrite above.)
207 stubLog := &stubLog{}
208 for _, mnt := range ks.mounts {
209 mnt.volume.(*stubVolume).stubLog = stubLog
212 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
213 Locator: resp.Locator,
216 c.Assert(n, Equals, len(data))
218 c.Check(stubLog.String(), Equals, trial.expectLog)
222 func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
223 for uuid, v := range s.cluster.Volumes {
225 s.cluster.Volumes[uuid] = v
227 ks, cancel := testKeepstore(c, s.cluster, nil)
229 for _, mnt := range ks.mounts {
230 mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
231 c.Error("volume BlockWrite called")
232 return errors.New("fail")
235 ctx := authContext(arvadostest.ActiveTokenV2)
237 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
239 Data: []byte("foo")})
241 c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
244 func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
245 s.cluster.Volumes = map[string]arvados.Volume{
246 "zzzzz-nyw5e-111111111111111": {
249 StorageClasses: map[string]bool{"class1": true}},
250 "zzzzz-nyw5e-121212121212121": {
253 StorageClasses: map[string]bool{"class1": true, "class2": true}},
254 "zzzzz-nyw5e-222222222222222": {
257 StorageClasses: map[string]bool{"class2": true}},
260 // testData is a block that happens to have rendezvous order 111, 121, 222
261 testData := []byte("qux")
262 testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
264 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
270 ctx := authContext(arvadostest.ActiveTokenV2)
271 for idx, trial := range []struct {
272 classes string // desired classes
282 "121 read d85\n" + // write#1
285 "121 read d85\n" + // write#2
287 {"class1,class2", "" +
288 "111 read d85\n" + // write#1
293 "111 read d85\n" + // write#2
297 {"class1,class2,class404", "" +
298 "111 read d85\n" + // write#1
303 "111 read d85\n" + // write#2
308 c.Logf("=== %d: %+v", idx, trial)
310 ks, cancel := testKeepstore(c, s.cluster, nil)
312 stubLog := &stubLog{}
313 for _, mnt := range ks.mounts {
314 mnt.volume.(*stubVolume).stubLog = stubLog
317 // Check that we chose the right block data
318 rvz := ks.rendezvous(testHash, ks.mountsW)
319 c.Assert(rvz[0].UUID[24:], Equals, "111")
320 c.Assert(rvz[1].UUID[24:], Equals, "121")
321 c.Assert(rvz[2].UUID[24:], Equals, "222")
323 for i := 0; i < 2; i++ {
324 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
327 StorageClasses: strings.Split(trial.classes, ","),
331 c.Check(stubLog.String(), Equals, trial.expectLog)
335 func (s *keepstoreSuite) TestBlockTrash(c *C) {
336 s.cluster.Volumes = map[string]arvados.Volume{
337 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
338 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
339 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
340 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
342 ks, cancel := testKeepstore(c, s.cluster, nil)
345 var vol []*stubVolume
346 for _, mount := range ks.mountsR {
347 vol = append(vol, mount.volume.(*stubVolume))
349 sort.Slice(vol, func(i, j int) bool {
350 return vol[i].params.UUID < vol[j].params.UUID
353 ctx := context.Background()
354 loc := fooHash + "+3"
355 tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
358 for _, vol := range vol {
359 err := vol.BlockTrash(fooHash)
360 if !os.IsNotExist(err) {
365 writeit := func(volidx int) {
366 err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
368 err = vol[volidx].blockTouchWithTime(fooHash, tOld)
371 trashit := func() error {
372 return ks.BlockTrash(ctx, loc)
374 checkexists := func(volidx int) bool {
375 err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
376 if !os.IsNotExist(err) {
383 c.Check(trashit(), Equals, os.ErrNotExist)
385 // one old replica => trash it
388 c.Check(trashit(), IsNil)
389 c.Check(checkexists(0), Equals, false)
391 // one old replica + one new replica => keep new, trash old
395 c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
396 c.Check(trashit(), IsNil)
397 c.Check(checkexists(0), Equals, false)
398 c.Check(checkexists(1), Equals, true)
400 // two old replicas => trash both
404 c.Check(trashit(), IsNil)
405 c.Check(checkexists(0), Equals, false)
406 c.Check(checkexists(1), Equals, false)
408 // four old replicas => trash all except readonly volume with
409 // AllowTrashWhenReadOnly==false
415 c.Check(trashit(), IsNil)
416 c.Check(checkexists(0), Equals, false)
417 c.Check(checkexists(1), Equals, false)
418 c.Check(checkexists(2), Equals, true)
419 c.Check(checkexists(3), Equals, false)
421 // two old replicas but one returns an error => return the
422 // only non-404 backend error
424 vol[0].blockTrash = func(hash string) error {
425 return errors.New("fake error")
429 c.Check(trashit(), ErrorMatches, "fake error")
430 c.Check(checkexists(0), Equals, true)
431 c.Check(checkexists(1), Equals, false)
432 c.Check(checkexists(2), Equals, false)
433 c.Check(checkexists(3), Equals, false)
436 func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
437 s.cluster.API.MaxKeepBlobBuffers = 1
438 ks, cancel := testKeepstore(c, s.cluster, nil)
440 ok := make(chan struct{})
443 ctx := authContext(arvadostest.ActiveTokenV2)
444 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
446 Data: []byte("foo")})
451 case <-time.After(time.Second):
452 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
456 func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
457 s.cluster.API.MaxKeepBlobBuffers = 4
458 ks, cancel := testKeepstore(c, s.cluster, nil)
461 ctx := authContext(arvadostest.ActiveTokenV2)
462 var wg sync.WaitGroup
463 for range make([]int, 20) {
467 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
469 Data: []byte("foo")})
471 _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
472 Locator: resp.Locator,
473 WriteTo: io.Discard})
477 ok := make(chan struct{})
484 case <-time.After(time.Second):
485 c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
489 func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
490 s.cluster.Volumes = map[string]arvados.Volume{
491 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
492 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
493 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
495 ks, cancel := testKeepstore(c, s.cluster, nil)
497 ctx := authContext(arvadostest.ActiveTokenV2)
499 for _, trial := range []struct {
502 expectClasses map[string]int
506 map[string]int{"default": 1}},
509 map[string]int{"default": 1}},
510 {[]string{"default"},
512 map[string]int{"default": 1}},
513 {[]string{"default", "default"},
515 map[string]int{"default": 1}},
516 {[]string{"special"},
518 map[string]int{"extra": 1, "special": 1}},
519 {[]string{"special", "readonly"},
521 map[string]int{"extra": 1, "special": 1}},
522 {[]string{"special", "nonexistent"},
524 map[string]int{"extra": 1, "special": 1}},
525 {[]string{"extra", "special"},
527 map[string]int{"extra": 1, "special": 1}},
528 {[]string{"default", "special"},
530 map[string]int{"default": 1, "extra": 1, "special": 1}},
532 c.Logf("success case %#v", trial)
533 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
536 StorageClasses: trial.ask,
538 if !c.Check(err, IsNil) {
541 c.Check(resp.Replicas, Equals, trial.expectReplicas)
542 if len(trial.expectClasses) == 0 {
543 // any non-empty value is correct
544 c.Check(resp.StorageClasses, Not(HasLen), 0)
546 c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
550 for _, ask := range [][]string{
552 {"doesnotexist", "readonly"},
555 c.Logf("failure case %s", ask)
556 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
565 func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
566 for uuid, v := range s.cluster.Volumes {
568 s.cluster.Volumes[uuid] = v
570 ks, cancel := testKeepstore(c, s.cluster, nil)
573 for _, mnt := range ks.mounts {
574 err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
576 err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
580 err := ks.BlockUntrash(context.Background(), fooHash)
581 c.Check(os.IsNotExist(err), Equals, true)
583 for _, mnt := range ks.mounts {
584 err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
589 func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
590 s.cluster.Volumes = map[string]arvados.Volume{
591 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
592 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
593 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
595 ks, cancel := testKeepstore(c, s.cluster, nil)
597 ctx := authContext(arvadostest.ActiveTokenV2)
599 for i := range make([]byte, 32) {
600 data := []byte(fmt.Sprintf("block %d", i))
601 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
604 c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
605 c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
606 c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
609 func (s *keepstoreSuite) TestParseLocator(c *C) {
610 for _, trial := range []struct {
615 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
617 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
618 ok: true, expect: locatorInfo{size: 1234}},
619 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
620 ok: true, expect: locatorInfo{size: 1234, signed: true}},
621 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
622 ok: true, expect: locatorInfo{size: 1234, remote: true}},
623 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
624 ok: true, expect: locatorInfo{size: 12345, remote: true}},
625 // invalid: hash length != 32
628 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
630 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
632 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
634 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
636 // invalid: first hint is not size
637 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
640 c.Logf("=== %s", trial.locator)
641 li, err := getLocatorInfo(trial.locator)
647 c.Check(li.hash, Equals, trial.locator[:32])
648 c.Check(li.size, Equals, trial.expect.size)
649 c.Check(li.signed, Equals, trial.expect.signed)
650 c.Check(li.remote, Equals, trial.expect.remote)
655 driver["stub"] = func(params newVolumeParams) (volume, error) {
658 data: make(map[string]stubData),
665 type stubLog struct {
670 func (sl *stubLog) Printf(format string, args ...interface{}) {
676 fmt.Fprintf(sl, format+"\n", args...)
679 type stubData struct {
685 type stubVolume struct {
686 params newVolumeParams
687 data map[string]stubData
691 // The following funcs enable tests to insert delays and
692 // failures. Each volume operation begins by calling the
693 // corresponding func (if non-nil). If the func returns an
694 // error, that error is returned to caller. Otherwise, the
695 // stub continues normally.
696 blockRead func(ctx context.Context, hash string, writeTo io.WriterAt) error
697 blockWrite func(ctx context.Context, hash string, data []byte) error
698 deviceID func() string
699 blockTouch func(hash string) error
700 blockTrash func(hash string) error
701 blockUntrash func(hash string) error
702 index func(ctx context.Context, prefix string, writeTo io.Writer) error
703 mtime func(hash string) (time.Time, error)
707 func (v *stubVolume) log(op, hash string) {
708 // Note this intentionally crashes if UUID or hash is short --
709 // if keepstore ever does that, tests should fail.
710 v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
713 func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
715 if v.blockRead != nil {
716 err := v.blockRead(ctx, hash, writeTo)
722 ent, ok := v.data[hash]
724 if !ok || !ent.trash.IsZero() {
725 return os.ErrNotExist
728 for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
729 data := ent.data[wrote:]
730 if len(data) > writesize {
731 data = data[:writesize]
733 n, err := writeTo.WriteAt(data, int64(wrote))
742 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
744 if v.blockWrite != nil {
745 if err := v.blockWrite(ctx, hash, data); err != nil {
751 v.data[hash] = stubData{
753 data: append([]byte(nil), data...),
758 func (v *stubVolume) DeviceID() string {
759 return fmt.Sprintf("%p", v)
762 func (v *stubVolume) BlockTouch(hash string) error {
764 if v.blockTouch != nil {
765 if err := v.blockTouch(hash); err != nil {
771 ent, ok := v.data[hash]
772 if !ok || !ent.trash.IsZero() {
773 return os.ErrNotExist
775 ent.mtime = time.Now()
780 // Set mtime to the (presumably old) specified time.
781 func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
782 v.log("touchwithtime", hash)
785 ent, ok := v.data[hash]
787 return os.ErrNotExist
794 func (v *stubVolume) BlockTrash(hash string) error {
796 if v.blockTrash != nil {
797 if err := v.blockTrash(hash); err != nil {
803 ent, ok := v.data[hash]
804 if !ok || !ent.trash.IsZero() {
805 return os.ErrNotExist
807 ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
812 func (v *stubVolume) BlockUntrash(hash string) error {
813 v.log("untrash", hash)
814 if v.blockUntrash != nil {
815 if err := v.blockUntrash(hash); err != nil {
821 ent, ok := v.data[hash]
822 if !ok || ent.trash.IsZero() {
823 return os.ErrNotExist
825 ent.trash = time.Time{}
830 func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
831 v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
833 if err := v.index(ctx, prefix, writeTo); err != nil {
837 buf := &bytes.Buffer{}
839 for hash, ent := range v.data {
840 if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
841 fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
845 _, err := io.Copy(writeTo, buf)
849 func (v *stubVolume) Mtime(hash string) (time.Time, error) {
852 if t, err := v.mtime(hash); err != nil {
858 ent, ok := v.data[hash]
859 if !ok || !ent.trash.IsZero() {
860 return time.Time{}, os.ErrNotExist
862 return ent.mtime, nil
865 func (v *stubVolume) EmptyTrash() {
866 v.stubLog.Printf("%s emptytrash", v.params.UUID)
869 for hash, ent := range v.data {
870 if !ent.trash.IsZero() && time.Now().After(ent.trash) {