21842: added keep typing
[arvados.git] / services / keepstore / keepstore_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "sort"
17         "strings"
18         "sync"
19         "testing"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/config"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadostest"
25         "git.arvados.org/arvados.git/sdk/go/auth"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "github.com/prometheus/client_golang/prometheus"
28         . "gopkg.in/check.v1"
29 )
30
31 func TestGocheck(t *testing.T) {
32         TestingT(t)
33 }
34
35 const (
36         fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
37         barHash = "37b51d194a7513e45b56f6524f2d51f2"
38 )
39
40 var testServiceURL = func() arvados.URL {
41         return arvados.URL{Host: "localhost:12345", Scheme: "http"}
42 }()
43
44 func authContext(token string) context.Context {
45         return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
46 }
47
48 func testCluster(t TB) *arvados.Cluster {
49         cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
50         if err != nil {
51                 t.Fatal(err)
52         }
53         cluster, err := cfg.GetCluster("")
54         if err != nil {
55                 t.Fatal(err)
56         }
57         cluster.SystemRootToken = arvadostest.SystemRootToken
58         cluster.ManagementToken = arvadostest.ManagementToken
59         return cluster
60 }
61
62 func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
63         if reg == nil {
64                 reg = prometheus.NewRegistry()
65         }
66         ctx, cancel := context.WithCancel(context.Background())
67         ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
68         ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
69         if err != nil {
70                 t.Fatal(err)
71         }
72         return ks, cancel
73 }
74
75 var _ = Suite(&keepstoreSuite{})
76
77 type keepstoreSuite struct {
78         cluster *arvados.Cluster
79 }
80
81 func (s *keepstoreSuite) SetUpTest(c *C) {
82         s.cluster = testCluster(c)
83         s.cluster.Volumes = map[string]arvados.Volume{
84                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
85                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
86         }
87 }
88
89 func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
90         ks, cancel := testKeepstore(c, s.cluster, nil)
91         defer cancel()
92
93         ctx := authContext(arvadostest.ActiveTokenV2)
94
95         fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
96         err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
97         c.Assert(err, IsNil)
98
99         _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
100                 Hash: fooHash,
101                 Data: []byte("foo"),
102         })
103         c.Check(err, ErrorMatches, "hash collision")
104
105         buf := bytes.NewBuffer(nil)
106         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
107                 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
108                 WriteTo: buf,
109         })
110         c.Check(err, ErrorMatches, "checksum mismatch in stored data")
111         c.Check(buf.String(), Not(Equals), "foo")
112         c.Check(buf.Len() < 3, Equals, true)
113
114         err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
115         c.Assert(err, IsNil)
116
117         buf = bytes.NewBuffer(nil)
118         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
119                 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
120                 WriteTo: buf,
121         })
122         c.Check(err, ErrorMatches, "checksum mismatch in stored data")
123         c.Check(buf.Len() < 3, Equals, true)
124 }
125
126 func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
127         origKey := s.cluster.Collections.BlobSigningKey
128         s.cluster.Collections.BlobSigning = false
129         s.cluster.Collections.BlobSigningKey = ""
130         ks, cancel := testKeepstore(c, s.cluster, nil)
131         defer cancel()
132
133         resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
134                 Hash: fooHash,
135                 Data: []byte("foo"),
136         })
137         c.Assert(err, IsNil)
138         c.Check(resp.Locator, Equals, fooHash+"+3")
139         locUnsigned := resp.Locator
140         ttl := time.Hour
141         locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
142         c.Assert(locSigned, Not(Equals), locUnsigned)
143
144         for _, locator := range []string{locUnsigned, locSigned} {
145                 for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
146                         c.Logf("=== locator %q token %q", locator, token)
147                         ctx := authContext(token)
148                         buf := bytes.NewBuffer(nil)
149                         _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
150                                 Locator: locator,
151                                 WriteTo: buf,
152                         })
153                         c.Check(err, IsNil)
154                         c.Check(buf.String(), Equals, "foo")
155                 }
156         }
157 }
158
159 func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
160         s.cluster.Volumes = map[string]arvados.Volume{
161                 "zzzzz-nyw5e-111111111111111": {
162                         Driver:         "stub",
163                         Replication:    1,
164                         StorageClasses: map[string]bool{"class1": true}},
165                 "zzzzz-nyw5e-222222222222222": {
166                         Driver:         "stub",
167                         Replication:    1,
168                         StorageClasses: map[string]bool{"class2": true, "class3": true}},
169         }
170
171         // "foobar" is just some data that happens to result in
172         // rendezvous order {111, 222}
173         data := []byte("foobar")
174         hash := fmt.Sprintf("%x", md5.Sum(data))
175
176         for _, trial := range []struct {
177                 priority1 int // priority of class1, thus vol1
178                 priority2 int // priority of class2
179                 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
180                 expectLog string
181         }{
182                 {100, 50, 50, "111 read 385\n"},              // class1 has higher priority => try vol1 first, no need to try vol2
183                 {100, 100, 100, "111 read 385\n"},            // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
184                 {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
185                 {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
186         } {
187                 c.Logf("=== %+v", trial)
188
189                 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
190                         "class1": {Priority: trial.priority1},
191                         "class2": {Priority: trial.priority2},
192                         "class3": {Priority: trial.priority3},
193                 }
194                 ks, cancel := testKeepstore(c, s.cluster, nil)
195                 defer cancel()
196
197                 ctx := authContext(arvadostest.ActiveTokenV2)
198                 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
199                         Hash:           hash,
200                         Data:           data,
201                         StorageClasses: []string{"class1"},
202                 })
203                 c.Assert(err, IsNil)
204
205                 // Combine logs into one. (We only want the logs from
206                 // the BlockRead below, not from BlockWrite above.)
207                 stubLog := &stubLog{}
208                 for _, mnt := range ks.mounts {
209                         mnt.volume.(*stubVolume).stubLog = stubLog
210                 }
211
212                 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
213                         Locator: resp.Locator,
214                         WriteTo: io.Discard,
215                 })
216                 c.Assert(n, Equals, len(data))
217                 c.Assert(err, IsNil)
218                 c.Check(stubLog.String(), Equals, trial.expectLog)
219         }
220 }
221
222 func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
223         for uuid, v := range s.cluster.Volumes {
224                 v.ReadOnly = true
225                 s.cluster.Volumes[uuid] = v
226         }
227         ks, cancel := testKeepstore(c, s.cluster, nil)
228         defer cancel()
229         for _, mnt := range ks.mounts {
230                 mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
231                         c.Error("volume BlockWrite called")
232                         return errors.New("fail")
233                 }
234         }
235         ctx := authContext(arvadostest.ActiveTokenV2)
236
237         _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
238                 Hash: fooHash,
239                 Data: []byte("foo")})
240         c.Check(err, NotNil)
241         c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
242 }
243
244 func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
245         s.cluster.Volumes = map[string]arvados.Volume{
246                 "zzzzz-nyw5e-111111111111111": {
247                         Driver:         "stub",
248                         Replication:    1,
249                         StorageClasses: map[string]bool{"class1": true}},
250                 "zzzzz-nyw5e-121212121212121": {
251                         Driver:         "stub",
252                         Replication:    1,
253                         StorageClasses: map[string]bool{"class1": true, "class2": true}},
254                 "zzzzz-nyw5e-222222222222222": {
255                         Driver:         "stub",
256                         Replication:    1,
257                         StorageClasses: map[string]bool{"class2": true}},
258         }
259
260         // testData is a block that happens to have rendezvous order 111, 121, 222
261         testData := []byte("qux")
262         testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
263
264         s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
265                 "class1": {},
266                 "class2": {},
267                 "class3": {},
268         }
269
270         ctx := authContext(arvadostest.ActiveTokenV2)
271         for idx, trial := range []struct {
272                 classes   string // desired classes
273                 expectLog string
274         }{
275                 {"class1", "" +
276                         "111 read d85\n" +
277                         "121 read d85\n" +
278                         "111 write d85\n" +
279                         "111 read d85\n" +
280                         "111 touch d85\n"},
281                 {"class2", "" +
282                         "121 read d85\n" + // write#1
283                         "222 read d85\n" +
284                         "121 write d85\n" +
285                         "121 read d85\n" + // write#2
286                         "121 touch d85\n"},
287                 {"class1,class2", "" +
288                         "111 read d85\n" + // write#1
289                         "121 read d85\n" +
290                         "222 read d85\n" +
291                         "121 write d85\n" +
292                         "111 write d85\n" +
293                         "111 read d85\n" + // write#2
294                         "111 touch d85\n" +
295                         "121 read d85\n" +
296                         "121 touch d85\n"},
297                 {"class1,class2,class404", "" +
298                         "111 read d85\n" + // write#1
299                         "121 read d85\n" +
300                         "222 read d85\n" +
301                         "121 write d85\n" +
302                         "111 write d85\n" +
303                         "111 read d85\n" + // write#2
304                         "111 touch d85\n" +
305                         "121 read d85\n" +
306                         "121 touch d85\n"},
307         } {
308                 c.Logf("=== %d: %+v", idx, trial)
309
310                 ks, cancel := testKeepstore(c, s.cluster, nil)
311                 defer cancel()
312                 stubLog := &stubLog{}
313                 for _, mnt := range ks.mounts {
314                         mnt.volume.(*stubVolume).stubLog = stubLog
315                 }
316
317                 // Check that we chose the right block data
318                 rvz := ks.rendezvous(testHash, ks.mountsW)
319                 c.Assert(rvz[0].UUID[24:], Equals, "111")
320                 c.Assert(rvz[1].UUID[24:], Equals, "121")
321                 c.Assert(rvz[2].UUID[24:], Equals, "222")
322
323                 for i := 0; i < 2; i++ {
324                         _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
325                                 Hash:           testHash,
326                                 Data:           testData,
327                                 StorageClasses: strings.Split(trial.classes, ","),
328                         })
329                         c.Check(err, IsNil)
330                 }
331                 c.Check(stubLog.String(), Equals, trial.expectLog)
332         }
333 }
334
335 func (s *keepstoreSuite) TestBlockTrash(c *C) {
336         s.cluster.Volumes = map[string]arvados.Volume{
337                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
338                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
339                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
340                 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
341         }
342         ks, cancel := testKeepstore(c, s.cluster, nil)
343         defer cancel()
344
345         var vol []*stubVolume
346         for _, mount := range ks.mountsR {
347                 vol = append(vol, mount.volume.(*stubVolume))
348         }
349         sort.Slice(vol, func(i, j int) bool {
350                 return vol[i].params.UUID < vol[j].params.UUID
351         })
352
353         ctx := context.Background()
354         loc := fooHash + "+3"
355         tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
356
357         clear := func() {
358                 for _, vol := range vol {
359                         err := vol.BlockTrash(fooHash)
360                         if !os.IsNotExist(err) {
361                                 c.Assert(err, IsNil)
362                         }
363                 }
364         }
365         writeit := func(volidx int) {
366                 err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
367                 c.Assert(err, IsNil)
368                 err = vol[volidx].blockTouchWithTime(fooHash, tOld)
369                 c.Assert(err, IsNil)
370         }
371         trashit := func() error {
372                 return ks.BlockTrash(ctx, loc)
373         }
374         checkexists := func(volidx int) bool {
375                 err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
376                 if !os.IsNotExist(err) {
377                         c.Check(err, IsNil)
378                 }
379                 return err == nil
380         }
381
382         clear()
383         c.Check(trashit(), Equals, os.ErrNotExist)
384
385         // one old replica => trash it
386         clear()
387         writeit(0)
388         c.Check(trashit(), IsNil)
389         c.Check(checkexists(0), Equals, false)
390
391         // one old replica + one new replica => keep new, trash old
392         clear()
393         writeit(0)
394         writeit(1)
395         c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
396         c.Check(trashit(), IsNil)
397         c.Check(checkexists(0), Equals, false)
398         c.Check(checkexists(1), Equals, true)
399
400         // two old replicas => trash both
401         clear()
402         writeit(0)
403         writeit(1)
404         c.Check(trashit(), IsNil)
405         c.Check(checkexists(0), Equals, false)
406         c.Check(checkexists(1), Equals, false)
407
408         // four old replicas => trash all except readonly volume with
409         // AllowTrashWhenReadOnly==false
410         clear()
411         writeit(0)
412         writeit(1)
413         writeit(2)
414         writeit(3)
415         c.Check(trashit(), IsNil)
416         c.Check(checkexists(0), Equals, false)
417         c.Check(checkexists(1), Equals, false)
418         c.Check(checkexists(2), Equals, true)
419         c.Check(checkexists(3), Equals, false)
420
421         // two old replicas but one returns an error => return the
422         // only non-404 backend error
423         clear()
424         vol[0].blockTrash = func(hash string) error {
425                 return errors.New("fake error")
426         }
427         writeit(0)
428         writeit(3)
429         c.Check(trashit(), ErrorMatches, "fake error")
430         c.Check(checkexists(0), Equals, true)
431         c.Check(checkexists(1), Equals, false)
432         c.Check(checkexists(2), Equals, false)
433         c.Check(checkexists(3), Equals, false)
434 }
435
436 func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
437         s.cluster.API.MaxKeepBlobBuffers = 1
438         ks, cancel := testKeepstore(c, s.cluster, nil)
439         defer cancel()
440         ok := make(chan struct{})
441         go func() {
442                 defer close(ok)
443                 ctx := authContext(arvadostest.ActiveTokenV2)
444                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
445                         Hash: fooHash,
446                         Data: []byte("foo")})
447                 c.Check(err, IsNil)
448         }()
449         select {
450         case <-ok:
451         case <-time.After(time.Second):
452                 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
453         }
454 }
455
456 func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
457         s.cluster.API.MaxKeepBlobBuffers = 4
458         ks, cancel := testKeepstore(c, s.cluster, nil)
459         defer cancel()
460
461         ctx := authContext(arvadostest.ActiveTokenV2)
462         var wg sync.WaitGroup
463         for range make([]int, 20) {
464                 wg.Add(1)
465                 go func() {
466                         defer wg.Done()
467                         resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
468                                 Hash: fooHash,
469                                 Data: []byte("foo")})
470                         c.Check(err, IsNil)
471                         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
472                                 Locator: resp.Locator,
473                                 WriteTo: io.Discard})
474                         c.Check(err, IsNil)
475                 }()
476         }
477         ok := make(chan struct{})
478         go func() {
479                 wg.Wait()
480                 close(ok)
481         }()
482         select {
483         case <-ok:
484         case <-time.After(time.Second):
485                 c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
486         }
487 }
488
489 func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
490         s.cluster.Volumes = map[string]arvados.Volume{
491                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
492                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
493                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
494         }
495         ks, cancel := testKeepstore(c, s.cluster, nil)
496         defer cancel()
497         ctx := authContext(arvadostest.ActiveTokenV2)
498
499         for _, trial := range []struct {
500                 ask            []string
501                 expectReplicas int
502                 expectClasses  map[string]int
503         }{
504                 {nil,
505                         1,
506                         map[string]int{"default": 1}},
507                 {[]string{},
508                         1,
509                         map[string]int{"default": 1}},
510                 {[]string{"default"},
511                         1,
512                         map[string]int{"default": 1}},
513                 {[]string{"default", "default"},
514                         1,
515                         map[string]int{"default": 1}},
516                 {[]string{"special"},
517                         1,
518                         map[string]int{"extra": 1, "special": 1}},
519                 {[]string{"special", "readonly"},
520                         1,
521                         map[string]int{"extra": 1, "special": 1}},
522                 {[]string{"special", "nonexistent"},
523                         1,
524                         map[string]int{"extra": 1, "special": 1}},
525                 {[]string{"extra", "special"},
526                         1,
527                         map[string]int{"extra": 1, "special": 1}},
528                 {[]string{"default", "special"},
529                         2,
530                         map[string]int{"default": 1, "extra": 1, "special": 1}},
531         } {
532                 c.Logf("success case %#v", trial)
533                 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
534                         Hash:           fooHash,
535                         Data:           []byte("foo"),
536                         StorageClasses: trial.ask,
537                 })
538                 if !c.Check(err, IsNil) {
539                         continue
540                 }
541                 c.Check(resp.Replicas, Equals, trial.expectReplicas)
542                 if len(trial.expectClasses) == 0 {
543                         // any non-empty value is correct
544                         c.Check(resp.StorageClasses, Not(HasLen), 0)
545                 } else {
546                         c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
547                 }
548         }
549
550         for _, ask := range [][]string{
551                 {"doesnotexist"},
552                 {"doesnotexist", "readonly"},
553                 {"readonly"},
554         } {
555                 c.Logf("failure case %s", ask)
556                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
557                         Hash:           fooHash,
558                         Data:           []byte("foo"),
559                         StorageClasses: ask,
560                 })
561                 c.Check(err, NotNil)
562         }
563 }
564
565 func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
566         for uuid, v := range s.cluster.Volumes {
567                 v.ReadOnly = true
568                 s.cluster.Volumes[uuid] = v
569         }
570         ks, cancel := testKeepstore(c, s.cluster, nil)
571         defer cancel()
572
573         for _, mnt := range ks.mounts {
574                 err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
575                 c.Assert(err, IsNil)
576                 err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
577                 c.Assert(err, IsNil)
578         }
579
580         err := ks.BlockUntrash(context.Background(), fooHash)
581         c.Check(os.IsNotExist(err), Equals, true)
582
583         for _, mnt := range ks.mounts {
584                 err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
585                 c.Assert(err, IsNil)
586         }
587 }
588
589 func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
590         s.cluster.Volumes = map[string]arvados.Volume{
591                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
592                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
593                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
594         }
595         ks, cancel := testKeepstore(c, s.cluster, nil)
596         defer cancel()
597         ctx := authContext(arvadostest.ActiveTokenV2)
598
599         for i := range make([]byte, 32) {
600                 data := []byte(fmt.Sprintf("block %d", i))
601                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
602                 c.Assert(err, IsNil)
603         }
604         c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
605         c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
606         c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
607 }
608
609 func (s *keepstoreSuite) TestGetLocatorInfo(c *C) {
610         for _, trial := range []struct {
611                 locator string
612                 ok      bool
613                 expect  locatorInfo
614         }{
615                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
616                         ok: true},
617                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
618                         ok: true, expect: locatorInfo{size: 1234}},
619                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
620                         ok: true, expect: locatorInfo{size: 1234, signed: true}},
621                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
622                         ok: true, expect: locatorInfo{size: 1234, remote: true}},
623                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
624                         ok: true, expect: locatorInfo{size: 12345, remote: true}},
625                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123456+👶🦈+Rzzzzz-abcdef",
626                         ok: true, expect: locatorInfo{size: 123456, remote: true}},
627                 // invalid: bad hash char
628                 {locator: "aaaaaaaaaaaaaazaaaaaaaaaaaaaaaaa+1234",
629                         ok: false},
630                 {locator: "aaaaaaaaaaaaaaFaaaaaaaaaaaaaaaaa+1234",
631                         ok: false},
632                 {locator: "aaaaaaaaaaaaaa⛵aaaaaaaaaaaaaaaaa+1234",
633                         ok: false},
634                 // invalid: hash length != 32
635                 {locator: "",
636                         ok: false},
637                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
638                         ok: false},
639                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
640                         ok: false},
641                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
642                         ok: false},
643                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
644                         ok: false},
645                 // invalid: first hint is not size
646                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
647                         ok: false},
648                 // invalid: leading/trailing/double +
649                 {locator: "+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
650                         ok: false},
651                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+",
652                         ok: false},
653                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa++1234",
654                         ok: false},
655                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234++Abcdef@abcdef",
656                         ok: false},
657         } {
658                 c.Logf("=== %s", trial.locator)
659                 li, err := getLocatorInfo(trial.locator)
660                 if !trial.ok {
661                         c.Check(err, NotNil)
662                         continue
663                 }
664                 c.Check(err, IsNil)
665                 c.Check(li.hash, Equals, trial.locator[:32])
666                 c.Check(li.size, Equals, trial.expect.size)
667                 c.Check(li.signed, Equals, trial.expect.signed)
668                 c.Check(li.remote, Equals, trial.expect.remote)
669         }
670 }
671
672 func init() {
673         driver["stub"] = func(params newVolumeParams) (volume, error) {
674                 v := &stubVolume{
675                         params:  params,
676                         data:    make(map[string]stubData),
677                         stubLog: &stubLog{},
678                 }
679                 return v, nil
680         }
681 }
682
683 type stubLog struct {
684         sync.Mutex
685         bytes.Buffer
686 }
687
688 func (sl *stubLog) Printf(format string, args ...interface{}) {
689         if sl == nil {
690                 return
691         }
692         sl.Lock()
693         defer sl.Unlock()
694         fmt.Fprintf(sl, format+"\n", args...)
695 }
696
697 type stubData struct {
698         mtime time.Time
699         data  []byte
700         trash time.Time
701 }
702
703 type stubVolume struct {
704         params  newVolumeParams
705         data    map[string]stubData
706         stubLog *stubLog
707         mtx     sync.Mutex
708
709         // The following funcs enable tests to insert delays and
710         // failures. Each volume operation begins by calling the
711         // corresponding func (if non-nil). If the func returns an
712         // error, that error is returned to caller. Otherwise, the
713         // stub continues normally.
714         blockRead    func(ctx context.Context, hash string, writeTo io.WriterAt) error
715         blockWrite   func(ctx context.Context, hash string, data []byte) error
716         deviceID     func() string
717         blockTouch   func(hash string) error
718         blockTrash   func(hash string) error
719         blockUntrash func(hash string) error
720         index        func(ctx context.Context, prefix string, writeTo io.Writer) error
721         mtime        func(hash string) (time.Time, error)
722         emptyTrash   func()
723 }
724
725 func (v *stubVolume) log(op, hash string) {
726         // Note this intentionally crashes if UUID or hash is short --
727         // if keepstore ever does that, tests should fail.
728         v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
729 }
730
731 func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
732         v.log("read", hash)
733         if v.blockRead != nil {
734                 err := v.blockRead(ctx, hash, writeTo)
735                 if err != nil {
736                         return err
737                 }
738         }
739         v.mtx.Lock()
740         ent, ok := v.data[hash]
741         v.mtx.Unlock()
742         if !ok || !ent.trash.IsZero() {
743                 return os.ErrNotExist
744         }
745         wrote := 0
746         for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
747                 data := ent.data[wrote:]
748                 if len(data) > writesize {
749                         data = data[:writesize]
750                 }
751                 n, err := writeTo.WriteAt(data, int64(wrote))
752                 wrote += n
753                 if err != nil {
754                         return err
755                 }
756         }
757         return nil
758 }
759
760 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
761         v.log("write", hash)
762         if v.blockWrite != nil {
763                 if err := v.blockWrite(ctx, hash, data); err != nil {
764                         return err
765                 }
766         }
767         v.mtx.Lock()
768         defer v.mtx.Unlock()
769         v.data[hash] = stubData{
770                 mtime: time.Now(),
771                 data:  append([]byte(nil), data...),
772         }
773         return nil
774 }
775
776 func (v *stubVolume) DeviceID() string {
777         return fmt.Sprintf("%p", v)
778 }
779
780 func (v *stubVolume) BlockTouch(hash string) error {
781         v.log("touch", hash)
782         if v.blockTouch != nil {
783                 if err := v.blockTouch(hash); err != nil {
784                         return err
785                 }
786         }
787         v.mtx.Lock()
788         defer v.mtx.Unlock()
789         ent, ok := v.data[hash]
790         if !ok || !ent.trash.IsZero() {
791                 return os.ErrNotExist
792         }
793         ent.mtime = time.Now()
794         v.data[hash] = ent
795         return nil
796 }
797
798 // Set mtime to the (presumably old) specified time.
799 func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
800         v.log("touchwithtime", hash)
801         v.mtx.Lock()
802         defer v.mtx.Unlock()
803         ent, ok := v.data[hash]
804         if !ok {
805                 return os.ErrNotExist
806         }
807         ent.mtime = t
808         v.data[hash] = ent
809         return nil
810 }
811
812 func (v *stubVolume) BlockTrash(hash string) error {
813         v.log("trash", hash)
814         if v.blockTrash != nil {
815                 if err := v.blockTrash(hash); err != nil {
816                         return err
817                 }
818         }
819         v.mtx.Lock()
820         defer v.mtx.Unlock()
821         ent, ok := v.data[hash]
822         if !ok || !ent.trash.IsZero() {
823                 return os.ErrNotExist
824         }
825         ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
826         v.data[hash] = ent
827         return nil
828 }
829
830 func (v *stubVolume) BlockUntrash(hash string) error {
831         v.log("untrash", hash)
832         if v.blockUntrash != nil {
833                 if err := v.blockUntrash(hash); err != nil {
834                         return err
835                 }
836         }
837         v.mtx.Lock()
838         defer v.mtx.Unlock()
839         ent, ok := v.data[hash]
840         if !ok || ent.trash.IsZero() {
841                 return os.ErrNotExist
842         }
843         ent.trash = time.Time{}
844         v.data[hash] = ent
845         return nil
846 }
847
848 func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
849         v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
850         if v.index != nil {
851                 if err := v.index(ctx, prefix, writeTo); err != nil {
852                         return err
853                 }
854         }
855         buf := &bytes.Buffer{}
856         v.mtx.Lock()
857         for hash, ent := range v.data {
858                 if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
859                         fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
860                 }
861         }
862         v.mtx.Unlock()
863         _, err := io.Copy(writeTo, buf)
864         return err
865 }
866
867 func (v *stubVolume) Mtime(hash string) (time.Time, error) {
868         v.log("mtime", hash)
869         if v.mtime != nil {
870                 if t, err := v.mtime(hash); err != nil {
871                         return t, err
872                 }
873         }
874         v.mtx.Lock()
875         defer v.mtx.Unlock()
876         ent, ok := v.data[hash]
877         if !ok || !ent.trash.IsZero() {
878                 return time.Time{}, os.ErrNotExist
879         }
880         return ent.mtime, nil
881 }
882
883 func (v *stubVolume) EmptyTrash() {
884         v.stubLog.Printf("%s emptytrash", v.params.UUID)
885         v.mtx.Lock()
886         defer v.mtx.Unlock()
887         for hash, ent := range v.data {
888                 if !ent.trash.IsZero() && time.Now().After(ent.trash) {
889                         delete(v.data, hash)
890                 }
891         }
892 }