3a01476096be7fcd9d3a15758acb3e69d605289b
[arvados.git] / services / keepstore / keepstore_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "sort"
17         "strings"
18         "sync"
19         "testing"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/config"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadostest"
25         "git.arvados.org/arvados.git/sdk/go/auth"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "github.com/prometheus/client_golang/prometheus"
28         . "gopkg.in/check.v1"
29 )
30
31 func TestGocheck(t *testing.T) {
32         TestingT(t)
33 }
34
35 const (
36         fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
37         barHash = "37b51d194a7513e45b56f6524f2d51f2"
38 )
39
40 var testServiceURL = func() arvados.URL {
41         return arvados.URL{Host: "localhost:12345", Scheme: "http"}
42 }()
43
44 func authContext(token string) context.Context {
45         return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
46 }
47
48 func testCluster(t TB) *arvados.Cluster {
49         cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
50         if err != nil {
51                 t.Fatal(err)
52         }
53         cluster, err := cfg.GetCluster("")
54         if err != nil {
55                 t.Fatal(err)
56         }
57         cluster.SystemRootToken = arvadostest.SystemRootToken
58         cluster.ManagementToken = arvadostest.ManagementToken
59         return cluster
60 }
61
62 func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
63         if reg == nil {
64                 reg = prometheus.NewRegistry()
65         }
66         ctx, cancel := context.WithCancel(context.Background())
67         ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
68         ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
69         if err != nil {
70                 t.Fatal(err)
71         }
72         return ks, cancel
73 }
74
75 var _ = Suite(&keepstoreSuite{})
76
77 type keepstoreSuite struct {
78         cluster *arvados.Cluster
79 }
80
81 func (s *keepstoreSuite) SetUpTest(c *C) {
82         s.cluster = testCluster(c)
83         s.cluster.Volumes = map[string]arvados.Volume{
84                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
85                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
86         }
87 }
88
89 func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
90         ks, cancel := testKeepstore(c, s.cluster, nil)
91         defer cancel()
92
93         ctx := authContext(arvadostest.ActiveTokenV2)
94
95         fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
96         err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
97         c.Assert(err, IsNil)
98
99         _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
100                 Hash: fooHash,
101                 Data: []byte("foo"),
102         })
103         c.Check(err, ErrorMatches, "hash collision")
104
105         buf := bytes.NewBuffer(nil)
106         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
107                 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
108                 WriteTo: buf,
109         })
110         c.Check(err, ErrorMatches, "checksum mismatch in stored data")
111         c.Check(buf.String(), Not(Equals), "foo")
112         c.Check(buf.Len() < 3, Equals, true)
113
114         err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
115         c.Assert(err, IsNil)
116
117         buf = bytes.NewBuffer(nil)
118         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
119                 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
120                 WriteTo: buf,
121         })
122         c.Check(err, ErrorMatches, "checksum mismatch in stored data")
123         c.Check(buf.Len() < 3, Equals, true)
124 }
125
126 func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
127         origKey := s.cluster.Collections.BlobSigningKey
128         s.cluster.Collections.BlobSigning = false
129         s.cluster.Collections.BlobSigningKey = ""
130         ks, cancel := testKeepstore(c, s.cluster, nil)
131         defer cancel()
132
133         resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
134                 Hash: fooHash,
135                 Data: []byte("foo"),
136         })
137         c.Assert(err, IsNil)
138         c.Check(resp.Locator, Equals, fooHash+"+3")
139         locUnsigned := resp.Locator
140         ttl := time.Hour
141         locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
142         c.Assert(locSigned, Not(Equals), locUnsigned)
143
144         for _, locator := range []string{locUnsigned, locSigned} {
145                 for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
146                         c.Logf("=== locator %q token %q", locator, token)
147                         ctx := authContext(token)
148                         buf := bytes.NewBuffer(nil)
149                         _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
150                                 Locator: locator,
151                                 WriteTo: buf,
152                         })
153                         c.Check(err, IsNil)
154                         c.Check(buf.String(), Equals, "foo")
155                 }
156         }
157 }
158
159 func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
160         s.cluster.Volumes = map[string]arvados.Volume{
161                 "zzzzz-nyw5e-111111111111111": {
162                         Driver:         "stub",
163                         Replication:    1,
164                         StorageClasses: map[string]bool{"class1": true}},
165                 "zzzzz-nyw5e-222222222222222": {
166                         Driver:         "stub",
167                         Replication:    1,
168                         StorageClasses: map[string]bool{"class2": true, "class3": true}},
169         }
170
171         // "foobar" is just some data that happens to result in
172         // rendezvous order {111, 222}
173         data := []byte("foobar")
174         hash := fmt.Sprintf("%x", md5.Sum(data))
175
176         for _, trial := range []struct {
177                 priority1 int // priority of class1, thus vol1
178                 priority2 int // priority of class2
179                 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
180                 expectLog string
181         }{
182                 {100, 50, 50, "111 read 385\n"},              // class1 has higher priority => try vol1 first, no need to try vol2
183                 {100, 100, 100, "111 read 385\n"},            // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
184                 {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
185                 {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
186         } {
187                 c.Logf("=== %+v", trial)
188
189                 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
190                         "class1": {Priority: trial.priority1},
191                         "class2": {Priority: trial.priority2},
192                         "class3": {Priority: trial.priority3},
193                 }
194                 ks, cancel := testKeepstore(c, s.cluster, nil)
195                 defer cancel()
196
197                 ctx := authContext(arvadostest.ActiveTokenV2)
198                 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
199                         Hash:           hash,
200                         Data:           data,
201                         StorageClasses: []string{"class1"},
202                 })
203                 c.Assert(err, IsNil)
204
205                 // Combine logs into one. (We only want the logs from
206                 // the BlockRead below, not from BlockWrite above.)
207                 stubLog := &stubLog{}
208                 for _, mnt := range ks.mounts {
209                         mnt.volume.(*stubVolume).stubLog = stubLog
210                 }
211
212                 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
213                         Locator: resp.Locator,
214                         WriteTo: io.Discard,
215                 })
216                 c.Assert(n, Equals, len(data))
217                 c.Assert(err, IsNil)
218                 c.Check(stubLog.String(), Equals, trial.expectLog)
219         }
220 }
221
222 func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
223         for uuid, v := range s.cluster.Volumes {
224                 v.ReadOnly = true
225                 s.cluster.Volumes[uuid] = v
226         }
227         ks, cancel := testKeepstore(c, s.cluster, nil)
228         defer cancel()
229         for _, mnt := range ks.mounts {
230                 mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
231                         c.Error("volume BlockWrite called")
232                         return errors.New("fail")
233                 }
234         }
235         ctx := authContext(arvadostest.ActiveTokenV2)
236
237         _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
238                 Hash: fooHash,
239                 Data: []byte("foo")})
240         c.Check(err, NotNil)
241         c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
242 }
243
244 func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
245         s.cluster.Volumes = map[string]arvados.Volume{
246                 "zzzzz-nyw5e-111111111111111": {
247                         Driver:         "stub",
248                         Replication:    1,
249                         StorageClasses: map[string]bool{"class1": true}},
250                 "zzzzz-nyw5e-121212121212121": {
251                         Driver:         "stub",
252                         Replication:    1,
253                         StorageClasses: map[string]bool{"class1": true, "class2": true}},
254                 "zzzzz-nyw5e-222222222222222": {
255                         Driver:         "stub",
256                         Replication:    1,
257                         StorageClasses: map[string]bool{"class2": true}},
258         }
259
260         // testData is a block that happens to have rendezvous order 111, 121, 222
261         testData := []byte("qux")
262         testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
263
264         s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
265                 "class1": {},
266                 "class2": {},
267                 "class3": {},
268         }
269
270         ctx := authContext(arvadostest.ActiveTokenV2)
271         for idx, trial := range []struct {
272                 classes   string // desired classes
273                 expectLog string
274         }{
275                 {"class1", "" +
276                         "111 read d85\n" +
277                         "121 read d85\n" +
278                         "111 write d85\n" +
279                         "111 read d85\n" +
280                         "111 touch d85\n"},
281                 {"class2", "" +
282                         "121 read d85\n" + // write#1
283                         "222 read d85\n" +
284                         "121 write d85\n" +
285                         "121 read d85\n" + // write#2
286                         "121 touch d85\n"},
287                 {"class1,class2", "" +
288                         "111 read d85\n" + // write#1
289                         "121 read d85\n" +
290                         "222 read d85\n" +
291                         "121 write d85\n" +
292                         "111 write d85\n" +
293                         "111 read d85\n" + // write#2
294                         "111 touch d85\n" +
295                         "121 read d85\n" +
296                         "121 touch d85\n"},
297                 {"class1,class2,class404", "" +
298                         "111 read d85\n" + // write#1
299                         "121 read d85\n" +
300                         "222 read d85\n" +
301                         "121 write d85\n" +
302                         "111 write d85\n" +
303                         "111 read d85\n" + // write#2
304                         "111 touch d85\n" +
305                         "121 read d85\n" +
306                         "121 touch d85\n"},
307         } {
308                 c.Logf("=== %d: %+v", idx, trial)
309
310                 ks, cancel := testKeepstore(c, s.cluster, nil)
311                 defer cancel()
312                 stubLog := &stubLog{}
313                 for _, mnt := range ks.mounts {
314                         mnt.volume.(*stubVolume).stubLog = stubLog
315                 }
316
317                 // Check that we chose the right block data
318                 rvz := ks.rendezvous(testHash, ks.mountsW)
319                 c.Assert(rvz[0].UUID[24:], Equals, "111")
320                 c.Assert(rvz[1].UUID[24:], Equals, "121")
321                 c.Assert(rvz[2].UUID[24:], Equals, "222")
322
323                 for i := 0; i < 2; i++ {
324                         _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
325                                 Hash:           testHash,
326                                 Data:           testData,
327                                 StorageClasses: strings.Split(trial.classes, ","),
328                         })
329                         c.Check(err, IsNil)
330                 }
331                 c.Check(stubLog.String(), Equals, trial.expectLog)
332         }
333 }
334
335 func (s *keepstoreSuite) TestBlockTrash(c *C) {
336         s.cluster.Volumes = map[string]arvados.Volume{
337                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
338                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
339                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
340                 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
341         }
342         ks, cancel := testKeepstore(c, s.cluster, nil)
343         defer cancel()
344
345         var vol []*stubVolume
346         for _, mount := range ks.mountsR {
347                 vol = append(vol, mount.volume.(*stubVolume))
348         }
349         sort.Slice(vol, func(i, j int) bool {
350                 return vol[i].params.UUID < vol[j].params.UUID
351         })
352
353         ctx := context.Background()
354         loc := fooHash + "+3"
355         tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
356
357         clear := func() {
358                 for _, vol := range vol {
359                         err := vol.BlockTrash(fooHash)
360                         if !os.IsNotExist(err) {
361                                 c.Assert(err, IsNil)
362                         }
363                 }
364         }
365         writeit := func(volidx int) {
366                 err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
367                 c.Assert(err, IsNil)
368                 err = vol[volidx].blockTouchWithTime(fooHash, tOld)
369                 c.Assert(err, IsNil)
370         }
371         trashit := func() error {
372                 return ks.BlockTrash(ctx, loc)
373         }
374         checkexists := func(volidx int) bool {
375                 _, err := vol[volidx].BlockRead(ctx, fooHash, io.Discard)
376                 if !os.IsNotExist(err) {
377                         c.Check(err, IsNil)
378                 }
379                 return err == nil
380         }
381
382         clear()
383         c.Check(trashit(), Equals, os.ErrNotExist)
384
385         // one old replica => trash it
386         clear()
387         writeit(0)
388         c.Check(trashit(), IsNil)
389         c.Check(checkexists(0), Equals, false)
390
391         // one old replica + one new replica => keep new, trash old
392         clear()
393         writeit(0)
394         writeit(1)
395         c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
396         c.Check(trashit(), IsNil)
397         c.Check(checkexists(0), Equals, false)
398         c.Check(checkexists(1), Equals, true)
399
400         // two old replicas => trash both
401         clear()
402         writeit(0)
403         writeit(1)
404         c.Check(trashit(), IsNil)
405         c.Check(checkexists(0), Equals, false)
406         c.Check(checkexists(1), Equals, false)
407
408         // four old replicas => trash all except readonly volume with
409         // AllowTrashWhenReadOnly==false
410         clear()
411         writeit(0)
412         writeit(1)
413         writeit(2)
414         writeit(3)
415         c.Check(trashit(), IsNil)
416         c.Check(checkexists(0), Equals, false)
417         c.Check(checkexists(1), Equals, false)
418         c.Check(checkexists(2), Equals, true)
419         c.Check(checkexists(3), Equals, false)
420
421         // two old replicas but one returns an error => return the
422         // only non-404 backend error
423         clear()
424         vol[0].blockTrash = func(hash string) error {
425                 return errors.New("fake error")
426         }
427         writeit(0)
428         writeit(3)
429         c.Check(trashit(), ErrorMatches, "fake error")
430         c.Check(checkexists(0), Equals, true)
431         c.Check(checkexists(1), Equals, false)
432         c.Check(checkexists(2), Equals, false)
433         c.Check(checkexists(3), Equals, false)
434 }
435
436 func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
437         s.cluster.API.MaxKeepBlobBuffers = 1
438         ks, cancel := testKeepstore(c, s.cluster, nil)
439         defer cancel()
440         ok := make(chan struct{})
441         go func() {
442                 defer close(ok)
443                 ctx := authContext(arvadostest.ActiveTokenV2)
444                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
445                         Hash: fooHash,
446                         Data: []byte("foo")})
447                 c.Check(err, IsNil)
448         }()
449         select {
450         case <-ok:
451         case <-time.After(time.Second):
452                 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
453         }
454 }
455
456 func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
457         s.cluster.API.MaxKeepBlobBuffers = 4
458         ks, cancel := testKeepstore(c, s.cluster, nil)
459         defer cancel()
460
461         ctx := authContext(arvadostest.ActiveTokenV2)
462         var wg sync.WaitGroup
463         for range make([]int, 20) {
464                 wg.Add(1)
465                 go func() {
466                         defer wg.Done()
467                         resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
468                                 Hash: fooHash,
469                                 Data: []byte("foo")})
470                         c.Check(err, IsNil)
471                         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
472                                 Locator: resp.Locator,
473                                 WriteTo: io.Discard})
474                         c.Check(err, IsNil)
475                 }()
476         }
477         ok := make(chan struct{})
478         go func() {
479                 wg.Wait()
480                 close(ok)
481         }()
482         select {
483         case <-ok:
484         case <-time.After(time.Second):
485                 c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
486         }
487 }
488
489 func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
490         s.cluster.Volumes = map[string]arvados.Volume{
491                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
492                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
493                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
494         }
495         ks, cancel := testKeepstore(c, s.cluster, nil)
496         defer cancel()
497         ctx := authContext(arvadostest.ActiveTokenV2)
498
499         for _, trial := range []struct {
500                 ask            []string
501                 expectReplicas int
502                 expectClasses  map[string]int
503         }{
504                 {nil,
505                         1,
506                         map[string]int{"default": 1}},
507                 {[]string{},
508                         1,
509                         map[string]int{"default": 1}},
510                 {[]string{"default"},
511                         1,
512                         map[string]int{"default": 1}},
513                 {[]string{"default", "default"},
514                         1,
515                         map[string]int{"default": 1}},
516                 {[]string{"special"},
517                         1,
518                         map[string]int{"extra": 1, "special": 1}},
519                 {[]string{"special", "readonly"},
520                         1,
521                         map[string]int{"extra": 1, "special": 1}},
522                 {[]string{"special", "nonexistent"},
523                         1,
524                         map[string]int{"extra": 1, "special": 1}},
525                 {[]string{"extra", "special"},
526                         1,
527                         map[string]int{"extra": 1, "special": 1}},
528                 {[]string{"default", "special"},
529                         2,
530                         map[string]int{"default": 1, "extra": 1, "special": 1}},
531         } {
532                 c.Logf("success case %#v", trial)
533                 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
534                         Hash:           fooHash,
535                         Data:           []byte("foo"),
536                         StorageClasses: trial.ask,
537                 })
538                 if !c.Check(err, IsNil) {
539                         continue
540                 }
541                 c.Check(resp.Replicas, Equals, trial.expectReplicas)
542                 if len(trial.expectClasses) == 0 {
543                         // any non-empty value is correct
544                         c.Check(resp.StorageClasses, Not(HasLen), 0)
545                 } else {
546                         c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
547                 }
548         }
549
550         for _, ask := range [][]string{
551                 {"doesnotexist"},
552                 {"doesnotexist", "readonly"},
553                 {"readonly"},
554         } {
555                 c.Logf("failure case %s", ask)
556                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
557                         Hash:           fooHash,
558                         Data:           []byte("foo"),
559                         StorageClasses: ask,
560                 })
561                 c.Check(err, NotNil)
562         }
563 }
564
565 func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
566         for uuid, v := range s.cluster.Volumes {
567                 v.ReadOnly = true
568                 s.cluster.Volumes[uuid] = v
569         }
570         ks, cancel := testKeepstore(c, s.cluster, nil)
571         defer cancel()
572
573         for _, mnt := range ks.mounts {
574                 err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
575                 c.Assert(err, IsNil)
576                 _, err = mnt.BlockRead(context.Background(), fooHash, io.Discard)
577                 c.Assert(err, IsNil)
578         }
579
580         err := ks.BlockUntrash(context.Background(), fooHash)
581         c.Check(os.IsNotExist(err), Equals, true)
582
583         for _, mnt := range ks.mounts {
584                 _, err := mnt.BlockRead(context.Background(), fooHash, io.Discard)
585                 c.Assert(err, IsNil)
586         }
587 }
588
589 func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
590         s.cluster.Volumes = map[string]arvados.Volume{
591                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
592                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
593                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
594         }
595         ks, cancel := testKeepstore(c, s.cluster, nil)
596         defer cancel()
597         ctx := authContext(arvadostest.ActiveTokenV2)
598
599         for i := range make([]byte, 32) {
600                 data := []byte(fmt.Sprintf("block %d", i))
601                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
602                 c.Assert(err, IsNil)
603         }
604         c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
605         c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
606         c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
607 }
608
609 func (s *keepstoreSuite) TestParseLocator(c *C) {
610         for _, trial := range []struct {
611                 locator string
612                 ok      bool
613                 expect  locatorInfo
614         }{
615                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
616                         ok: true},
617                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
618                         ok: true, expect: locatorInfo{size: 1234}},
619                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
620                         ok: true, expect: locatorInfo{size: 1234, signed: true}},
621                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
622                         ok: true, expect: locatorInfo{size: 1234, remote: true}},
623                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
624                         ok: true, expect: locatorInfo{size: 12345, remote: true}},
625                 // invalid: hash length != 32
626                 {locator: "",
627                         ok: false},
628                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
629                         ok: false},
630                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
631                         ok: false},
632                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
633                         ok: false},
634                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
635                         ok: false},
636                 // invalid: first hint is not size
637                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
638                         ok: false},
639         } {
640                 c.Logf("=== %s", trial.locator)
641                 li, err := parseLocator(trial.locator)
642                 if !trial.ok {
643                         c.Check(err, NotNil)
644                         continue
645                 }
646                 c.Check(err, IsNil)
647                 c.Check(li.hash, Equals, trial.locator[:32])
648                 c.Check(li.size, Equals, trial.expect.size)
649                 c.Check(li.signed, Equals, trial.expect.signed)
650                 c.Check(li.remote, Equals, trial.expect.remote)
651         }
652 }
653
654 func init() {
655         driver["stub"] = func(params newVolumeParams) (volume, error) {
656                 v := &stubVolume{
657                         params:  params,
658                         data:    make(map[string]stubData),
659                         stubLog: &stubLog{},
660                 }
661                 return v, nil
662         }
663 }
664
665 type stubLog struct {
666         sync.Mutex
667         bytes.Buffer
668 }
669
670 func (sl *stubLog) Printf(format string, args ...interface{}) {
671         if sl == nil {
672                 return
673         }
674         sl.Lock()
675         defer sl.Unlock()
676         fmt.Fprintf(sl, format+"\n", args...)
677 }
678
679 type stubData struct {
680         mtime time.Time
681         data  []byte
682         trash time.Time
683 }
684
685 type stubVolume struct {
686         params  newVolumeParams
687         data    map[string]stubData
688         stubLog *stubLog
689         mtx     sync.Mutex
690
691         // The following funcs enable tests to insert delays and
692         // failures. Each volume operation begins by calling the
693         // corresponding func (if non-nil). If the func returns an
694         // error, that error is returned to caller. Otherwise, the
695         // stub continues normally.
696         blockRead    func(ctx context.Context, hash string, writeTo io.Writer) (int, error)
697         blockWrite   func(ctx context.Context, hash string, data []byte) error
698         deviceID     func() string
699         blockTouch   func(hash string) error
700         blockTrash   func(hash string) error
701         blockUntrash func(hash string) error
702         index        func(ctx context.Context, prefix string, writeTo io.Writer) error
703         mtime        func(hash string) (time.Time, error)
704         emptyTrash   func()
705 }
706
707 func (v *stubVolume) log(op, hash string) {
708         // Note this intentionally crashes if UUID or hash is short --
709         // if keepstore ever does that, tests should fail.
710         v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
711 }
712
713 func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
714         v.log("read", hash)
715         if v.blockRead != nil {
716                 n, err := v.blockRead(ctx, hash, writeTo)
717                 if err != nil {
718                         return n, err
719                 }
720         }
721         v.mtx.Lock()
722         ent, ok := v.data[hash]
723         v.mtx.Unlock()
724         if !ok || !ent.trash.IsZero() {
725                 return 0, os.ErrNotExist
726         }
727         wrote := 0
728         for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
729                 data := ent.data[wrote:]
730                 if len(data) > writesize {
731                         data = data[:writesize]
732                 }
733                 n, err := writeTo.Write(data)
734                 wrote += n
735                 if err != nil {
736                         return wrote, err
737                 }
738         }
739         return wrote, nil
740 }
741
742 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
743         v.log("write", hash)
744         if v.blockWrite != nil {
745                 if err := v.blockWrite(ctx, hash, data); err != nil {
746                         return err
747                 }
748         }
749         v.mtx.Lock()
750         defer v.mtx.Unlock()
751         v.data[hash] = stubData{
752                 mtime: time.Now(),
753                 data:  append([]byte(nil), data...),
754         }
755         return nil
756 }
757
758 func (v *stubVolume) DeviceID() string {
759         return fmt.Sprintf("%p", v)
760 }
761
762 func (v *stubVolume) BlockTouch(hash string) error {
763         v.log("touch", hash)
764         if v.blockTouch != nil {
765                 if err := v.blockTouch(hash); err != nil {
766                         return err
767                 }
768         }
769         v.mtx.Lock()
770         defer v.mtx.Unlock()
771         ent, ok := v.data[hash]
772         if !ok || !ent.trash.IsZero() {
773                 return os.ErrNotExist
774         }
775         ent.mtime = time.Now()
776         v.data[hash] = ent
777         return nil
778 }
779
780 // Set mtime to the (presumably old) specified time.
781 func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
782         v.log("touchwithtime", hash)
783         v.mtx.Lock()
784         defer v.mtx.Unlock()
785         ent, ok := v.data[hash]
786         if !ok {
787                 return os.ErrNotExist
788         }
789         ent.mtime = t
790         v.data[hash] = ent
791         return nil
792 }
793
794 func (v *stubVolume) BlockTrash(hash string) error {
795         v.log("trash", hash)
796         if v.blockTrash != nil {
797                 if err := v.blockTrash(hash); err != nil {
798                         return err
799                 }
800         }
801         v.mtx.Lock()
802         defer v.mtx.Unlock()
803         ent, ok := v.data[hash]
804         if !ok || !ent.trash.IsZero() {
805                 return os.ErrNotExist
806         }
807         ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
808         v.data[hash] = ent
809         return nil
810 }
811
812 func (v *stubVolume) BlockUntrash(hash string) error {
813         v.log("untrash", hash)
814         if v.blockUntrash != nil {
815                 if err := v.blockUntrash(hash); err != nil {
816                         return err
817                 }
818         }
819         v.mtx.Lock()
820         defer v.mtx.Unlock()
821         ent, ok := v.data[hash]
822         if !ok || ent.trash.IsZero() {
823                 return os.ErrNotExist
824         }
825         ent.trash = time.Time{}
826         v.data[hash] = ent
827         return nil
828 }
829
830 func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
831         v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
832         if v.index != nil {
833                 if err := v.index(ctx, prefix, writeTo); err != nil {
834                         return err
835                 }
836         }
837         buf := &bytes.Buffer{}
838         v.mtx.Lock()
839         for hash, ent := range v.data {
840                 if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
841                         fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
842                 }
843         }
844         v.mtx.Unlock()
845         _, err := io.Copy(writeTo, buf)
846         return err
847 }
848
849 func (v *stubVolume) Mtime(hash string) (time.Time, error) {
850         v.log("mtime", hash)
851         if v.mtime != nil {
852                 if t, err := v.mtime(hash); err != nil {
853                         return t, err
854                 }
855         }
856         v.mtx.Lock()
857         defer v.mtx.Unlock()
858         ent, ok := v.data[hash]
859         if !ok || !ent.trash.IsZero() {
860                 return time.Time{}, os.ErrNotExist
861         }
862         return ent.mtime, nil
863 }
864
865 func (v *stubVolume) EmptyTrash() {
866         v.stubLog.Printf("%s emptytrash", v.params.UUID)
867         v.mtx.Lock()
868         defer v.mtx.Unlock()
869         for hash, ent := range v.data {
870                 if !ent.trash.IsZero() && time.Now().After(ent.trash) {
871                         delete(v.data, hash)
872                 }
873         }
874 }