2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / keepstore_test.go
index e1d1dc5cb3cf2eb6ed3bf0b1da0a18b154f03328..3a01476096be7fcd9d3a15758acb3e69d605289b 100644 (file)
-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
 
 import (
        "bytes"
        "context"
+       "crypto/md5"
+       "errors"
        "fmt"
-       "io/ioutil"
+       "io"
+       "net/http"
        "os"
-       "path"
-       "regexp"
        "sort"
        "strings"
+       "sync"
        "testing"
-
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
+       . "gopkg.in/check.v1"
 )
 
-var TestBlock = []byte("The quick brown fox jumps over the lazy dog.")
-var TestHash = "e4d909c290d0fb1ca068ffaddf22cbd0"
-var TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"
-
-var TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
-var TestHash2 = "f15ac516f788aec4f30932ffb6395c39"
-
-var TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
-var TestHash3 = "eed29bbffbc2dbe5e5ee0bb71888e61f"
-
-// BadBlock is used to test collisions and corruption.
-// It must not match any test hashes.
-var BadBlock = []byte("The magic words are squeamish ossifrage.")
-
-// Empty block
-var EmptyHash = "d41d8cd98f00b204e9800998ecf8427e"
-var EmptyBlock = []byte("")
+func TestGocheck(t *testing.T) {
+       TestingT(t)
+}
 
-// TODO(twp): Tests still to be written
-//
-//   * TestPutBlockFull
-//       - test that PutBlock returns 503 Full if the filesystem is full.
-//         (must mock FreeDiskSpace or Statfs? use a tmpfs?)
-//
-//   * TestPutBlockWriteErr
-//       - test the behavior when Write returns an error.
-//           - Possible solutions: use a small tmpfs and a high
-//             MIN_FREE_KILOBYTES to trick PutBlock into attempting
-//             to write a block larger than the amount of space left
-//           - use an interface to mock ioutil.TempFile with a File
-//             object that always returns an error on write
-//
-// ========================================
-// GetBlock tests.
-// ========================================
+const (
+       fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
+       barHash = "37b51d194a7513e45b56f6524f2d51f2"
+)
 
-// TestGetBlock
-//     Test that simple block reads succeed.
-//
-func TestGetBlock(t *testing.T) {
-       defer teardown()
+var testServiceURL = func() arvados.URL {
+       return arvados.URL{Host: "localhost:12345", Scheme: "http"}
+}()
 
-       // Prepare two test Keep volumes. Our block is stored on the second volume.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
+func authContext(token string) context.Context {
+       return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
+}
 
-       vols := KeepVM.AllReadable()
-       if err := vols[1].Put(context.Background(), TestHash, TestBlock); err != nil {
-               t.Error(err)
+func testCluster(t TB) *arvados.Cluster {
+       cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
+       if err != nil {
+               t.Fatal(err)
        }
-
-       // Check that GetBlock returns success.
-       buf := make([]byte, BlockSize)
-       size, err := GetBlock(context.Background(), TestHash, buf, nil)
+       cluster, err := cfg.GetCluster("")
        if err != nil {
-               t.Errorf("GetBlock error: %s", err)
+               t.Fatal(err)
        }
-       if bytes.Compare(buf[:size], TestBlock) != 0 {
-               t.Errorf("got %v, expected %v", buf[:size], TestBlock)
+       cluster.SystemRootToken = arvadostest.SystemRootToken
+       cluster.ManagementToken = arvadostest.ManagementToken
+       return cluster
+}
+
+func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
+       if reg == nil {
+               reg = prometheus.NewRegistry()
        }
+       ctx, cancel := context.WithCancel(context.Background())
+       ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
+       ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
+       if err != nil {
+               t.Fatal(err)
+       }
+       return ks, cancel
 }
 
-// TestGetBlockMissing
-//     GetBlock must return an error when the block is not found.
-//
-func TestGetBlockMissing(t *testing.T) {
-       defer teardown()
+var _ = Suite(&keepstoreSuite{})
 
-       // Create two empty test Keep volumes.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
+type keepstoreSuite struct {
+       cluster *arvados.Cluster
+}
 
-       // Check that GetBlock returns failure.
-       buf := make([]byte, BlockSize)
-       size, err := GetBlock(context.Background(), TestHash, buf, nil)
-       if err != NotFoundError {
-               t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
+func (s *keepstoreSuite) SetUpTest(c *C) {
+       s.cluster = testCluster(c)
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
+               "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
        }
 }
 
-// TestGetBlockCorrupt
-//     GetBlock must return an error when a corrupted block is requested
-//     (the contents of the file do not checksum to its hash).
-//
-func TestGetBlockCorrupt(t *testing.T) {
-       defer teardown()
+func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
+
+       ctx := authContext(arvadostest.ActiveTokenV2)
+
+       fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
+       c.Assert(err, IsNil)
+
+       _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+               Hash: fooHash,
+               Data: []byte("foo"),
+       })
+       c.Check(err, ErrorMatches, "hash collision")
+
+       buf := bytes.NewBuffer(nil)
+       _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
+               Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
+               WriteTo: buf,
+       })
+       c.Check(err, ErrorMatches, "checksum mismatch in stored data")
+       c.Check(buf.String(), Not(Equals), "foo")
+       c.Check(buf.Len() < 3, Equals, true)
+
+       err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
+       c.Assert(err, IsNil)
+
+       buf = bytes.NewBuffer(nil)
+       _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
+               Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
+               WriteTo: buf,
+       })
+       c.Check(err, ErrorMatches, "checksum mismatch in stored data")
+       c.Check(buf.Len() < 3, Equals, true)
+}
 
-       // Create two test Keep volumes and store a corrupt block in one.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
+func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
+       origKey := s.cluster.Collections.BlobSigningKey
+       s.cluster.Collections.BlobSigning = false
+       s.cluster.Collections.BlobSigningKey = ""
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
+
+       resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
+               Hash: fooHash,
+               Data: []byte("foo"),
+       })
+       c.Assert(err, IsNil)
+       c.Check(resp.Locator, Equals, fooHash+"+3")
+       locUnsigned := resp.Locator
+       ttl := time.Hour
+       locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
+       c.Assert(locSigned, Not(Equals), locUnsigned)
+
+       for _, locator := range []string{locUnsigned, locSigned} {
+               for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
+                       c.Logf("=== locator %q token %q", locator, token)
+                       ctx := authContext(token)
+                       buf := bytes.NewBuffer(nil)
+                       _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
+                               Locator: locator,
+                               WriteTo: buf,
+                       })
+                       c.Check(err, IsNil)
+                       c.Check(buf.String(), Equals, "foo")
+               }
+       }
+}
 
-       vols := KeepVM.AllReadable()
-       vols[0].Put(context.Background(), TestHash, BadBlock)
+func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-111111111111111": {
+                       Driver:         "stub",
+                       Replication:    1,
+                       StorageClasses: map[string]bool{"class1": true}},
+               "zzzzz-nyw5e-222222222222222": {
+                       Driver:         "stub",
+                       Replication:    1,
+                       StorageClasses: map[string]bool{"class2": true, "class3": true}},
+       }
+
+       // "foobar" is just some data that happens to result in
+       // rendezvous order {111, 222}
+       data := []byte("foobar")
+       hash := fmt.Sprintf("%x", md5.Sum(data))
+
+       for _, trial := range []struct {
+               priority1 int // priority of class1, thus vol1
+               priority2 int // priority of class2
+               priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
+               expectLog string
+       }{
+               {100, 50, 50, "111 read 385\n"},              // class1 has higher priority => try vol1 first, no need to try vol2
+               {100, 100, 100, "111 read 385\n"},            // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
+               {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
+               {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
+       } {
+               c.Logf("=== %+v", trial)
+
+               s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+                       "class1": {Priority: trial.priority1},
+                       "class2": {Priority: trial.priority2},
+                       "class3": {Priority: trial.priority3},
+               }
+               ks, cancel := testKeepstore(c, s.cluster, nil)
+               defer cancel()
+
+               ctx := authContext(arvadostest.ActiveTokenV2)
+               resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+                       Hash:           hash,
+                       Data:           data,
+                       StorageClasses: []string{"class1"},
+               })
+               c.Assert(err, IsNil)
+
+               // Combine logs into one. (We only want the logs from
+               // the BlockRead below, not from BlockWrite above.)
+               stubLog := &stubLog{}
+               for _, mnt := range ks.mounts {
+                       mnt.volume.(*stubVolume).stubLog = stubLog
+               }
 
-       // Check that GetBlock returns failure.
-       buf := make([]byte, BlockSize)
-       size, err := GetBlock(context.Background(), TestHash, buf, nil)
-       if err != DiskHashError {
-               t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
+               n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
+                       Locator: resp.Locator,
+                       WriteTo: io.Discard,
+               })
+               c.Assert(n, Equals, len(data))
+               c.Assert(err, IsNil)
+               c.Check(stubLog.String(), Equals, trial.expectLog)
        }
 }
 
-// ========================================
-// PutBlock tests
-// ========================================
+func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
+       for uuid, v := range s.cluster.Volumes {
+               v.ReadOnly = true
+               s.cluster.Volumes[uuid] = v
+       }
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
+       for _, mnt := range ks.mounts {
+               mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
+                       c.Error("volume BlockWrite called")
+                       return errors.New("fail")
+               }
+       }
+       ctx := authContext(arvadostest.ActiveTokenV2)
 
-// TestPutBlockOK
-//     PutBlock can perform a simple block write and returns success.
-//
-func TestPutBlockOK(t *testing.T) {
-       defer teardown()
+       _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+               Hash: fooHash,
+               Data: []byte("foo")})
+       c.Check(err, NotNil)
+       c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
+}
 
-       // Create two test Keep volumes.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
+func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-111111111111111": {
+                       Driver:         "stub",
+                       Replication:    1,
+                       StorageClasses: map[string]bool{"class1": true}},
+               "zzzzz-nyw5e-121212121212121": {
+                       Driver:         "stub",
+                       Replication:    1,
+                       StorageClasses: map[string]bool{"class1": true, "class2": true}},
+               "zzzzz-nyw5e-222222222222222": {
+                       Driver:         "stub",
+                       Replication:    1,
+                       StorageClasses: map[string]bool{"class2": true}},
+       }
+
+       // testData is a block that happens to have rendezvous order 111, 121, 222
+       testData := []byte("qux")
+       testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
+
+       s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
+               "class1": {},
+               "class2": {},
+               "class3": {},
+       }
+
+       ctx := authContext(arvadostest.ActiveTokenV2)
+       for idx, trial := range []struct {
+               classes   string // desired classes
+               expectLog string
+       }{
+               {"class1", "" +
+                       "111 read d85\n" +
+                       "121 read d85\n" +
+                       "111 write d85\n" +
+                       "111 read d85\n" +
+                       "111 touch d85\n"},
+               {"class2", "" +
+                       "121 read d85\n" + // write#1
+                       "222 read d85\n" +
+                       "121 write d85\n" +
+                       "121 read d85\n" + // write#2
+                       "121 touch d85\n"},
+               {"class1,class2", "" +
+                       "111 read d85\n" + // write#1
+                       "121 read d85\n" +
+                       "222 read d85\n" +
+                       "121 write d85\n" +
+                       "111 write d85\n" +
+                       "111 read d85\n" + // write#2
+                       "111 touch d85\n" +
+                       "121 read d85\n" +
+                       "121 touch d85\n"},
+               {"class1,class2,class404", "" +
+                       "111 read d85\n" + // write#1
+                       "121 read d85\n" +
+                       "222 read d85\n" +
+                       "121 write d85\n" +
+                       "111 write d85\n" +
+                       "111 read d85\n" + // write#2
+                       "111 touch d85\n" +
+                       "121 read d85\n" +
+                       "121 touch d85\n"},
+       } {
+               c.Logf("=== %d: %+v", idx, trial)
+
+               ks, cancel := testKeepstore(c, s.cluster, nil)
+               defer cancel()
+               stubLog := &stubLog{}
+               for _, mnt := range ks.mounts {
+                       mnt.volume.(*stubVolume).stubLog = stubLog
+               }
 
-       // Check that PutBlock stores the data as expected.
-       if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
-               t.Fatalf("PutBlock: n %d err %v", n, err)
+               // Check that we chose the right block data
+               rvz := ks.rendezvous(testHash, ks.mountsW)
+               c.Assert(rvz[0].UUID[24:], Equals, "111")
+               c.Assert(rvz[1].UUID[24:], Equals, "121")
+               c.Assert(rvz[2].UUID[24:], Equals, "222")
+
+               for i := 0; i < 2; i++ {
+                       _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+                               Hash:           testHash,
+                               Data:           testData,
+                               StorageClasses: strings.Split(trial.classes, ","),
+                       })
+                       c.Check(err, IsNil)
+               }
+               c.Check(stubLog.String(), Equals, trial.expectLog)
        }
+}
 
-       vols := KeepVM.AllReadable()
-       buf := make([]byte, BlockSize)
-       n, err := vols[1].Get(context.Background(), TestHash, buf)
-       if err != nil {
-               t.Fatalf("Volume #0 Get returned error: %v", err)
+func (s *keepstoreSuite) TestBlockTrash(c *C) {
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
+               "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
+               "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
+               "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
+       }
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
+
+       var vol []*stubVolume
+       for _, mount := range ks.mountsR {
+               vol = append(vol, mount.volume.(*stubVolume))
+       }
+       sort.Slice(vol, func(i, j int) bool {
+               return vol[i].params.UUID < vol[j].params.UUID
+       })
+
+       ctx := context.Background()
+       loc := fooHash + "+3"
+       tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
+
+       clear := func() {
+               for _, vol := range vol {
+                       err := vol.BlockTrash(fooHash)
+                       if !os.IsNotExist(err) {
+                               c.Assert(err, IsNil)
+                       }
+               }
        }
-       if string(buf[:n]) != string(TestBlock) {
-               t.Fatalf("PutBlock stored '%s', Get retrieved '%s'",
-                       string(TestBlock), string(buf[:n]))
+       writeit := func(volidx int) {
+               err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
+               c.Assert(err, IsNil)
+               err = vol[volidx].blockTouchWithTime(fooHash, tOld)
+               c.Assert(err, IsNil)
        }
+       trashit := func() error {
+               return ks.BlockTrash(ctx, loc)
+       }
+       checkexists := func(volidx int) bool {
+               _, err := vol[volidx].BlockRead(ctx, fooHash, io.Discard)
+               if !os.IsNotExist(err) {
+                       c.Check(err, IsNil)
+               }
+               return err == nil
+       }
+
+       clear()
+       c.Check(trashit(), Equals, os.ErrNotExist)
+
+       // one old replica => trash it
+       clear()
+       writeit(0)
+       c.Check(trashit(), IsNil)
+       c.Check(checkexists(0), Equals, false)
+
+       // one old replica + one new replica => keep new, trash old
+       clear()
+       writeit(0)
+       writeit(1)
+       c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
+       c.Check(trashit(), IsNil)
+       c.Check(checkexists(0), Equals, false)
+       c.Check(checkexists(1), Equals, true)
+
+       // two old replicas => trash both
+       clear()
+       writeit(0)
+       writeit(1)
+       c.Check(trashit(), IsNil)
+       c.Check(checkexists(0), Equals, false)
+       c.Check(checkexists(1), Equals, false)
+
+       // four old replicas => trash all except readonly volume with
+       // AllowTrashWhenReadOnly==false
+       clear()
+       writeit(0)
+       writeit(1)
+       writeit(2)
+       writeit(3)
+       c.Check(trashit(), IsNil)
+       c.Check(checkexists(0), Equals, false)
+       c.Check(checkexists(1), Equals, false)
+       c.Check(checkexists(2), Equals, true)
+       c.Check(checkexists(3), Equals, false)
+
+       // two old replicas but one returns an error => return the
+       // only non-404 backend error
+       clear()
+       vol[0].blockTrash = func(hash string) error {
+               return errors.New("fake error")
+       }
+       writeit(0)
+       writeit(3)
+       c.Check(trashit(), ErrorMatches, "fake error")
+       c.Check(checkexists(0), Equals, true)
+       c.Check(checkexists(1), Equals, false)
+       c.Check(checkexists(2), Equals, false)
+       c.Check(checkexists(3), Equals, false)
 }
 
-// TestPutBlockOneVol
-//     PutBlock still returns success even when only one of the known
-//     volumes is online.
-//
-func TestPutBlockOneVol(t *testing.T) {
-       defer teardown()
-
-       // Create two test Keep volumes, but cripple one of them.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
-
-       vols := KeepVM.AllWritable()
-       vols[0].(*MockVolume).Bad = true
-
-       // Check that PutBlock stores the data as expected.
-       if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
-               t.Fatalf("PutBlock: n %d err %v", n, err)
+func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
+       s.cluster.API.MaxKeepBlobBuffers = 1
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
+       ok := make(chan struct{})
+       go func() {
+               defer close(ok)
+               ctx := authContext(arvadostest.ActiveTokenV2)
+               _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+                       Hash: fooHash,
+                       Data: []byte("foo")})
+               c.Check(err, IsNil)
+       }()
+       select {
+       case <-ok:
+       case <-time.After(time.Second):
+               c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
        }
+}
 
-       buf := make([]byte, BlockSize)
-       size, err := GetBlock(context.Background(), TestHash, buf, nil)
-       if err != nil {
-               t.Fatalf("GetBlock: %v", err)
-       }
-       if bytes.Compare(buf[:size], TestBlock) != 0 {
-               t.Fatalf("PutBlock stored %+q, GetBlock retrieved %+q",
-                       TestBlock, buf[:size])
+func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
+       s.cluster.API.MaxKeepBlobBuffers = 4
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
+
+       ctx := authContext(arvadostest.ActiveTokenV2)
+       var wg sync.WaitGroup
+       for range make([]int, 20) {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+                               Hash: fooHash,
+                               Data: []byte("foo")})
+                       c.Check(err, IsNil)
+                       _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
+                               Locator: resp.Locator,
+                               WriteTo: io.Discard})
+                       c.Check(err, IsNil)
+               }()
+       }
+       ok := make(chan struct{})
+       go func() {
+               wg.Wait()
+               close(ok)
+       }()
+       select {
+       case <-ok:
+       case <-time.After(time.Second):
+               c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
        }
 }
 
-// TestPutBlockMD5Fail
-//     Check that PutBlock returns an error if passed a block and hash that
-//     do not match.
-//
-func TestPutBlockMD5Fail(t *testing.T) {
-       defer teardown()
+func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
+               "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
+               "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
+       }
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
+       ctx := authContext(arvadostest.ActiveTokenV2)
+
+       for _, trial := range []struct {
+               ask            []string
+               expectReplicas int
+               expectClasses  map[string]int
+       }{
+               {nil,
+                       1,
+                       map[string]int{"default": 1}},
+               {[]string{},
+                       1,
+                       map[string]int{"default": 1}},
+               {[]string{"default"},
+                       1,
+                       map[string]int{"default": 1}},
+               {[]string{"default", "default"},
+                       1,
+                       map[string]int{"default": 1}},
+               {[]string{"special"},
+                       1,
+                       map[string]int{"extra": 1, "special": 1}},
+               {[]string{"special", "readonly"},
+                       1,
+                       map[string]int{"extra": 1, "special": 1}},
+               {[]string{"special", "nonexistent"},
+                       1,
+                       map[string]int{"extra": 1, "special": 1}},
+               {[]string{"extra", "special"},
+                       1,
+                       map[string]int{"extra": 1, "special": 1}},
+               {[]string{"default", "special"},
+                       2,
+                       map[string]int{"default": 1, "extra": 1, "special": 1}},
+       } {
+               c.Logf("success case %#v", trial)
+               resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+                       Hash:           fooHash,
+                       Data:           []byte("foo"),
+                       StorageClasses: trial.ask,
+               })
+               if !c.Check(err, IsNil) {
+                       continue
+               }
+               c.Check(resp.Replicas, Equals, trial.expectReplicas)
+               if len(trial.expectClasses) == 0 {
+                       // any non-empty value is correct
+                       c.Check(resp.StorageClasses, Not(HasLen), 0)
+               } else {
+                       c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
+               }
+       }
 
-       // Create two test Keep volumes.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
+       for _, ask := range [][]string{
+               {"doesnotexist"},
+               {"doesnotexist", "readonly"},
+               {"readonly"},
+       } {
+               c.Logf("failure case %s", ask)
+               _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
+                       Hash:           fooHash,
+                       Data:           []byte("foo"),
+                       StorageClasses: ask,
+               })
+               c.Check(err, NotNil)
+       }
+}
 
-       // Check that PutBlock returns the expected error when the hash does
-       // not match the block.
-       if _, err := PutBlock(context.Background(), BadBlock, TestHash); err != RequestHashError {
-               t.Errorf("Expected RequestHashError, got %v", err)
+func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
+       for uuid, v := range s.cluster.Volumes {
+               v.ReadOnly = true
+               s.cluster.Volumes[uuid] = v
        }
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
 
-       // Confirm that GetBlock fails to return anything.
-       if result, err := GetBlock(context.Background(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
-               t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
-                       string(result), err)
+       for _, mnt := range ks.mounts {
+               err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
+               c.Assert(err, IsNil)
+               _, err = mnt.BlockRead(context.Background(), fooHash, io.Discard)
+               c.Assert(err, IsNil)
        }
-}
 
-// TestPutBlockCorrupt
-//     PutBlock should overwrite corrupt blocks on disk when given
-//     a PUT request with a good block.
-//
-func TestPutBlockCorrupt(t *testing.T) {
-       defer teardown()
+       err := ks.BlockUntrash(context.Background(), fooHash)
+       c.Check(os.IsNotExist(err), Equals, true)
 
-       // Create two test Keep volumes.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
+       for _, mnt := range ks.mounts {
+               _, err := mnt.BlockRead(context.Background(), fooHash, io.Discard)
+               c.Assert(err, IsNil)
+       }
+}
 
-       // Store a corrupted block under TestHash.
-       vols := KeepVM.AllWritable()
-       vols[0].Put(context.Background(), TestHash, BadBlock)
-       if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
-               t.Errorf("PutBlock: n %d err %v", n, err)
+func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
+               "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
+               "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
        }
+       ks, cancel := testKeepstore(c, s.cluster, nil)
+       defer cancel()
+       ctx := authContext(arvadostest.ActiveTokenV2)
 
-       // The block on disk should now match TestBlock.
-       buf := make([]byte, BlockSize)
-       if size, err := GetBlock(context.Background(), TestHash, buf, nil); err != nil {
-               t.Errorf("GetBlock: %v", err)
-       } else if bytes.Compare(buf[:size], TestBlock) != 0 {
-               t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
+       for i := range make([]byte, 32) {
+               data := []byte(fmt.Sprintf("block %d", i))
+               _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
+               c.Assert(err, IsNil)
        }
+       c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
+       c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
+       c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
 }
 
-// TestPutBlockCollision
-//     PutBlock returns a 400 Collision error when attempting to
-//     store a block that collides with another block on disk.
-//
-func TestPutBlockCollision(t *testing.T) {
-       defer teardown()
+func (s *keepstoreSuite) TestParseLocator(c *C) {
+       for _, trial := range []struct {
+               locator string
+               ok      bool
+               expect  locatorInfo
+       }{
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+                       ok: true},
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
+                       ok: true, expect: locatorInfo{size: 1234}},
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
+                       ok: true, expect: locatorInfo{size: 1234, signed: true}},
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
+                       ok: true, expect: locatorInfo{size: 1234, remote: true}},
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
+                       ok: true, expect: locatorInfo{size: 12345, remote: true}},
+               // invalid: hash length != 32
+               {locator: "",
+                       ok: false},
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
+                       ok: false},
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
+                       ok: false},
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
+                       ok: false},
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
+                       ok: false},
+               // invalid: first hint is not size
+               {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
+                       ok: false},
+       } {
+               c.Logf("=== %s", trial.locator)
+               li, err := parseLocator(trial.locator)
+               if !trial.ok {
+                       c.Check(err, NotNil)
+                       continue
+               }
+               c.Check(err, IsNil)
+               c.Check(li.hash, Equals, trial.locator[:32])
+               c.Check(li.size, Equals, trial.expect.size)
+               c.Check(li.signed, Equals, trial.expect.signed)
+               c.Check(li.remote, Equals, trial.expect.remote)
+       }
+}
 
-       // These blocks both hash to the MD5 digest cee9a457e790cf20d4bdaa6d69f01e41.
-       b1 := arvadostest.MD5CollisionData[0]
-       b2 := arvadostest.MD5CollisionData[1]
-       locator := arvadostest.MD5CollisionMD5
+func init() {
+       driver["stub"] = func(params newVolumeParams) (volume, error) {
+               v := &stubVolume{
+                       params:  params,
+                       data:    make(map[string]stubData),
+                       stubLog: &stubLog{},
+               }
+               return v, nil
+       }
+}
 
-       // Prepare two test Keep volumes.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
+type stubLog struct {
+       sync.Mutex
+       bytes.Buffer
+}
 
-       // Store one block, then attempt to store the other. Confirm that
-       // PutBlock reported a CollisionError.
-       if _, err := PutBlock(context.Background(), b1, locator); err != nil {
-               t.Error(err)
-       }
-       if _, err := PutBlock(context.Background(), b2, locator); err == nil {
-               t.Error("PutBlock did not report a collision")
-       } else if err != CollisionError {
-               t.Errorf("PutBlock returned %v", err)
+func (sl *stubLog) Printf(format string, args ...interface{}) {
+       if sl == nil {
+               return
        }
+       sl.Lock()
+       defer sl.Unlock()
+       fmt.Fprintf(sl, format+"\n", args...)
 }
 
-// TestPutBlockTouchFails
-//     When PutBlock is asked to PUT an existing block, but cannot
-//     modify the timestamp, it should write a second block.
-//
-func TestPutBlockTouchFails(t *testing.T) {
-       defer teardown()
-
-       // Prepare two test Keep volumes.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
-       vols := KeepVM.AllWritable()
-
-       // Store a block and then make the underlying volume bad,
-       // so a subsequent attempt to update the file timestamp
-       // will fail.
-       vols[0].Put(context.Background(), TestHash, BadBlock)
-       oldMtime, err := vols[0].Mtime(TestHash)
-       if err != nil {
-               t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
-       }
+type stubData struct {
+       mtime time.Time
+       data  []byte
+       trash time.Time
+}
 
-       // vols[0].Touch will fail on the next call, so the volume
-       // manager will store a copy on vols[1] instead.
-       vols[0].(*MockVolume).Touchable = false
-       if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
-               t.Fatalf("PutBlock: n %d err %v", n, err)
-       }
-       vols[0].(*MockVolume).Touchable = true
+type stubVolume struct {
+       params  newVolumeParams
+       data    map[string]stubData
+       stubLog *stubLog
+       mtx     sync.Mutex
+
+       // The following funcs enable tests to insert delays and
+       // failures. Each volume operation begins by calling the
+       // corresponding func (if non-nil). If the func returns an
+       // error, that error is returned to caller. Otherwise, the
+       // stub continues normally.
+       blockRead    func(ctx context.Context, hash string, writeTo io.Writer) (int, error)
+       blockWrite   func(ctx context.Context, hash string, data []byte) error
+       deviceID     func() string
+       blockTouch   func(hash string) error
+       blockTrash   func(hash string) error
+       blockUntrash func(hash string) error
+       index        func(ctx context.Context, prefix string, writeTo io.Writer) error
+       mtime        func(hash string) (time.Time, error)
+       emptyTrash   func()
+}
 
-       // Now the mtime on the block on vols[0] should be unchanged, and
-       // there should be a copy of the block on vols[1].
-       newMtime, err := vols[0].Mtime(TestHash)
-       if err != nil {
-               t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
+func (v *stubVolume) log(op, hash string) {
+       // Note this intentionally crashes if UUID or hash is short --
+       // if keepstore ever does that, tests should fail.
+       v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
+}
+
+func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
+       v.log("read", hash)
+       if v.blockRead != nil {
+               n, err := v.blockRead(ctx, hash, writeTo)
+               if err != nil {
+                       return n, err
+               }
        }
-       if !newMtime.Equal(oldMtime) {
-               t.Errorf("mtime was changed on vols[0]:\noldMtime = %v\nnewMtime = %v\n",
-                       oldMtime, newMtime)
+       v.mtx.Lock()
+       ent, ok := v.data[hash]
+       v.mtx.Unlock()
+       if !ok || !ent.trash.IsZero() {
+               return 0, os.ErrNotExist
+       }
+       wrote := 0
+       for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
+               data := ent.data[wrote:]
+               if len(data) > writesize {
+                       data = data[:writesize]
+               }
+               n, err := writeTo.Write(data)
+               wrote += n
+               if err != nil {
+                       return wrote, err
+               }
        }
-       buf := make([]byte, BlockSize)
-       n, err := vols[1].Get(context.Background(), TestHash, buf)
-       if err != nil {
-               t.Fatalf("vols[1]: %v", err)
+       return wrote, nil
+}
+
+func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+       v.log("write", hash)
+       if v.blockWrite != nil {
+               if err := v.blockWrite(ctx, hash, data); err != nil {
+                       return err
+               }
        }
-       if bytes.Compare(buf[:n], TestBlock) != 0 {
-               t.Errorf("new block does not match test block\nnew block = %v\n", buf[:n])
+       v.mtx.Lock()
+       defer v.mtx.Unlock()
+       v.data[hash] = stubData{
+               mtime: time.Now(),
+               data:  append([]byte(nil), data...),
        }
+       return nil
 }
 
-func TestDiscoverTmpfs(t *testing.T) {
-       var tempVols [4]string
-       var err error
+func (v *stubVolume) DeviceID() string {
+       return fmt.Sprintf("%p", v)
+}
 
-       // Create some directories suitable for using as keep volumes.
-       for i := range tempVols {
-               if tempVols[i], err = ioutil.TempDir("", "findvol"); err != nil {
-                       t.Fatal(err)
-               }
-               defer os.RemoveAll(tempVols[i])
-               tempVols[i] = tempVols[i] + "/keep"
-               if err = os.Mkdir(tempVols[i], 0755); err != nil {
-                       t.Fatal(err)
+func (v *stubVolume) BlockTouch(hash string) error {
+       v.log("touch", hash)
+       if v.blockTouch != nil {
+               if err := v.blockTouch(hash); err != nil {
+                       return err
                }
        }
-
-       // Set up a bogus ProcMounts file.
-       f, err := ioutil.TempFile("", "keeptest")
-       if err != nil {
-               t.Fatal(err)
-       }
-       defer os.Remove(f.Name())
-       for i, vol := range tempVols {
-               // Add readonly mount points at odd indexes.
-               var opts string
-               switch i % 2 {
-               case 0:
-                       opts = "rw,nosuid,nodev,noexec"
-               case 1:
-                       opts = "nosuid,nodev,noexec,ro"
-               }
-               fmt.Fprintf(f, "tmpfs %s tmpfs %s 0 0\n", path.Dir(vol), opts)
+       v.mtx.Lock()
+       defer v.mtx.Unlock()
+       ent, ok := v.data[hash]
+       if !ok || !ent.trash.IsZero() {
+               return os.ErrNotExist
        }
-       f.Close()
-       ProcMounts = f.Name()
+       ent.mtime = time.Now()
+       v.data[hash] = ent
+       return nil
+}
 
-       cfg := &Config{}
-       added := (&unixVolumeAdder{cfg}).Discover()
+// Set mtime to the (presumably old) specified time.
+func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
+       v.log("touchwithtime", hash)
+       v.mtx.Lock()
+       defer v.mtx.Unlock()
+       ent, ok := v.data[hash]
+       if !ok {
+               return os.ErrNotExist
+       }
+       ent.mtime = t
+       v.data[hash] = ent
+       return nil
+}
 
-       if added != len(cfg.Volumes) {
-               t.Errorf("Discover returned %d, but added %d volumes",
-                       added, len(cfg.Volumes))
+func (v *stubVolume) BlockTrash(hash string) error {
+       v.log("trash", hash)
+       if v.blockTrash != nil {
+               if err := v.blockTrash(hash); err != nil {
+                       return err
+               }
        }
-       if added != len(tempVols) {
-               t.Errorf("Discover returned %d but we set up %d volumes",
-                       added, len(tempVols))
+       v.mtx.Lock()
+       defer v.mtx.Unlock()
+       ent, ok := v.data[hash]
+       if !ok || !ent.trash.IsZero() {
+               return os.ErrNotExist
        }
-       for i, tmpdir := range tempVols {
-               if tmpdir != cfg.Volumes[i].(*UnixVolume).Root {
-                       t.Errorf("Discover returned %s, expected %s\n",
-                               cfg.Volumes[i].(*UnixVolume).Root, tmpdir)
-               }
-               if expectReadonly := i%2 == 1; expectReadonly != cfg.Volumes[i].(*UnixVolume).ReadOnly {
-                       t.Errorf("Discover added %s with readonly=%v, should be %v",
-                               tmpdir, !expectReadonly, expectReadonly)
+       ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
+       v.data[hash] = ent
+       return nil
+}
+
+func (v *stubVolume) BlockUntrash(hash string) error {
+       v.log("untrash", hash)
+       if v.blockUntrash != nil {
+               if err := v.blockUntrash(hash); err != nil {
+                       return err
                }
        }
+       v.mtx.Lock()
+       defer v.mtx.Unlock()
+       ent, ok := v.data[hash]
+       if !ok || ent.trash.IsZero() {
+               return os.ErrNotExist
+       }
+       ent.trash = time.Time{}
+       v.data[hash] = ent
+       return nil
 }
 
-func TestDiscoverNone(t *testing.T) {
-       defer teardown()
-
-       // Set up a bogus ProcMounts file with no Keep vols.
-       f, err := ioutil.TempFile("", "keeptest")
-       if err != nil {
-               t.Fatal(err)
+func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
+       v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
+       if v.index != nil {
+               if err := v.index(ctx, prefix, writeTo); err != nil {
+                       return err
+               }
        }
-       defer os.Remove(f.Name())
-       fmt.Fprintln(f, "rootfs / rootfs opts 0 0")
-       fmt.Fprintln(f, "sysfs /sys sysfs opts 0 0")
-       fmt.Fprintln(f, "proc /proc proc opts 0 0")
-       fmt.Fprintln(f, "udev /dev devtmpfs opts 0 0")
-       fmt.Fprintln(f, "devpts /dev/pts devpts opts 0 0")
-       f.Close()
-       ProcMounts = f.Name()
-
-       cfg := &Config{}
-       added := (&unixVolumeAdder{cfg}).Discover()
-       if added != 0 || len(cfg.Volumes) != 0 {
-               t.Fatalf("got %d, %v; expected 0, []", added, cfg.Volumes)
-       }
-}
-
-// TestIndex
-//     Test an /index request.
-func TestIndex(t *testing.T) {
-       defer teardown()
-
-       // Set up Keep volumes and populate them.
-       // Include multiple blocks on different volumes, and
-       // some metadata files.
-       KeepVM = MakeTestVolumeManager(2)
-       defer KeepVM.Close()
-
-       vols := KeepVM.AllReadable()
-       vols[0].Put(context.Background(), TestHash, TestBlock)
-       vols[1].Put(context.Background(), TestHash2, TestBlock2)
-       vols[0].Put(context.Background(), TestHash3, TestBlock3)
-       vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
-       vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
-
-       buf := new(bytes.Buffer)
-       vols[0].IndexTo("", buf)
-       vols[1].IndexTo("", buf)
-       indexRows := strings.Split(string(buf.Bytes()), "\n")
-       sort.Strings(indexRows)
-       sortedIndex := strings.Join(indexRows, "\n")
-       expected := `^\n` + TestHash + `\+\d+ \d+\n` +
-               TestHash3 + `\+\d+ \d+\n` +
-               TestHash2 + `\+\d+ \d+$`
-
-       match, err := regexp.MatchString(expected, sortedIndex)
-       if err == nil {
-               if !match {
-                       t.Errorf("IndexLocators returned:\n%s", string(buf.Bytes()))
+       buf := &bytes.Buffer{}
+       v.mtx.Lock()
+       for hash, ent := range v.data {
+               if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
+                       fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
                }
-       } else {
-               t.Errorf("regexp.MatchString: %s", err)
        }
+       v.mtx.Unlock()
+       _, err := io.Copy(writeTo, buf)
+       return err
 }
 
-// ========================================
-// Helper functions for unit tests.
-// ========================================
-
-// MakeTestVolumeManager returns a RRVolumeManager with the specified
-// number of MockVolumes.
-func MakeTestVolumeManager(numVolumes int) VolumeManager {
-       vols := make([]Volume, numVolumes)
-       for i := range vols {
-               vols[i] = CreateMockVolume()
+func (v *stubVolume) Mtime(hash string) (time.Time, error) {
+       v.log("mtime", hash)
+       if v.mtime != nil {
+               if t, err := v.mtime(hash); err != nil {
+                       return t, err
+               }
        }
-       return MakeRRVolumeManager(vols)
+       v.mtx.Lock()
+       defer v.mtx.Unlock()
+       ent, ok := v.data[hash]
+       if !ok || !ent.trash.IsZero() {
+               return time.Time{}, os.ErrNotExist
+       }
+       return ent.mtime, nil
 }
 
-// teardown cleans up after each test.
-func teardown() {
-       theConfig.systemAuthToken = ""
-       theConfig.RequireSignatures = false
-       theConfig.blobSigningKey = nil
-       KeepVM = nil
+func (v *stubVolume) EmptyTrash() {
+       v.stubLog.Printf("%s emptytrash", v.params.UUID)
+       v.mtx.Lock()
+       defer v.mtx.Unlock()
+       for hash, ent := range v.data {
+               if !ent.trash.IsZero() && time.Now().After(ent.trash) {
+                       delete(v.data, hash)
+               }
+       }
 }