Merge branch '22132-scheduling-status-log' refs #22132
[arvados.git] / services / keepstore / keepstore_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "sort"
17         "strings"
18         "sync"
19         "testing"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/config"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadostest"
25         "git.arvados.org/arvados.git/sdk/go/auth"
26         "git.arvados.org/arvados.git/sdk/go/ctxlog"
27         "github.com/prometheus/client_golang/prometheus"
28         . "gopkg.in/check.v1"
29 )
30
31 func TestGocheck(t *testing.T) {
32         TestingT(t)
33 }
34
35 const (
36         fooHash = "acbd18db4cc2f85cedef654fccc4a4d8"
37         barHash = "37b51d194a7513e45b56f6524f2d51f2"
38 )
39
40 var testServiceURL = func() arvados.URL {
41         return arvados.URL{Host: "localhost:12345", Scheme: "http"}
42 }()
43
44 func authContext(token string) context.Context {
45         return auth.NewContext(context.TODO(), &auth.Credentials{Tokens: []string{token}})
46 }
47
48 func testCluster(t TB) *arvados.Cluster {
49         cfg, err := config.NewLoader(bytes.NewBufferString("Clusters: {zzzzz: {}}"), ctxlog.TestLogger(t)).Load()
50         if err != nil {
51                 t.Fatal(err)
52         }
53         cluster, err := cfg.GetCluster("")
54         if err != nil {
55                 t.Fatal(err)
56         }
57         cluster.SystemRootToken = arvadostest.SystemRootToken
58         cluster.ManagementToken = arvadostest.ManagementToken
59         return cluster
60 }
61
62 func testKeepstore(t TB, cluster *arvados.Cluster, reg *prometheus.Registry) (*keepstore, context.CancelFunc) {
63         if reg == nil {
64                 reg = prometheus.NewRegistry()
65         }
66         ctx, cancel := context.WithCancel(context.Background())
67         ctx = ctxlog.Context(ctx, ctxlog.TestLogger(t))
68         ks, err := newKeepstore(ctx, cluster, cluster.SystemRootToken, reg, testServiceURL)
69         if err != nil {
70                 t.Fatal(err)
71         }
72         return ks, cancel
73 }
74
75 var _ = Suite(&keepstoreSuite{})
76
77 type keepstoreSuite struct {
78         cluster *arvados.Cluster
79 }
80
81 func (s *keepstoreSuite) SetUpTest(c *C) {
82         s.cluster = testCluster(c)
83         s.cluster.Volumes = map[string]arvados.Volume{
84                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
85                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
86         }
87 }
88
89 func (s *keepstoreSuite) TestBlockRead_ChecksumMismatch(c *C) {
90         ks, cancel := testKeepstore(c, s.cluster, nil)
91         defer cancel()
92
93         ctx := authContext(arvadostest.ActiveTokenV2)
94
95         fooHash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
96         err := ks.mountsW[0].BlockWrite(ctx, fooHash, []byte("bar"))
97         c.Assert(err, IsNil)
98
99         _, err = ks.BlockWrite(ctx, arvados.BlockWriteOptions{
100                 Hash: fooHash,
101                 Data: []byte("foo"),
102         })
103         c.Check(err, ErrorMatches, "hash collision")
104
105         buf := bytes.NewBuffer(nil)
106         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
107                 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
108                 WriteTo: buf,
109         })
110         c.Check(err, ErrorMatches, "checksum mismatch in stored data")
111         c.Check(buf.String(), Not(Equals), "foo")
112         c.Check(buf.Len() < 3, Equals, true)
113
114         err = ks.mountsW[1].BlockWrite(ctx, fooHash, []byte("foo"))
115         c.Assert(err, IsNil)
116
117         buf = bytes.NewBuffer(nil)
118         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
119                 Locator: ks.signLocator(arvadostest.ActiveTokenV2, fooHash+"+3"),
120                 WriteTo: buf,
121         })
122         c.Check(err, ErrorMatches, "checksum mismatch in stored data")
123         c.Check(buf.Len() < 3, Equals, true)
124 }
125
126 func (s *keepstoreSuite) TestBlockReadWrite_SigningDisabled(c *C) {
127         origKey := s.cluster.Collections.BlobSigningKey
128         s.cluster.Collections.BlobSigning = false
129         s.cluster.Collections.BlobSigningKey = ""
130         ks, cancel := testKeepstore(c, s.cluster, nil)
131         defer cancel()
132
133         resp, err := ks.BlockWrite(authContext("abcde"), arvados.BlockWriteOptions{
134                 Hash: fooHash,
135                 Data: []byte("foo"),
136         })
137         c.Assert(err, IsNil)
138         c.Check(resp.Locator, Equals, fooHash+"+3")
139         locUnsigned := resp.Locator
140         ttl := time.Hour
141         locSigned := arvados.SignLocator(locUnsigned, arvadostest.ActiveTokenV2, time.Now().Add(ttl), ttl, []byte(origKey))
142         c.Assert(locSigned, Not(Equals), locUnsigned)
143
144         for _, locator := range []string{locUnsigned, locSigned} {
145                 for _, token := range []string{"", "xyzzy", arvadostest.ActiveTokenV2} {
146                         c.Logf("=== locator %q token %q", locator, token)
147                         ctx := authContext(token)
148                         buf := bytes.NewBuffer(nil)
149                         _, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
150                                 Locator: locator,
151                                 WriteTo: buf,
152                         })
153                         c.Check(err, IsNil)
154                         c.Check(buf.String(), Equals, "foo")
155                 }
156         }
157 }
158
159 func (s *keepstoreSuite) TestBlockRead_OrderedByStorageClassPriority(c *C) {
160         s.cluster.Volumes = map[string]arvados.Volume{
161                 "zzzzz-nyw5e-111111111111111": {
162                         Driver:         "stub",
163                         Replication:    1,
164                         StorageClasses: map[string]bool{"class1": true}},
165                 "zzzzz-nyw5e-222222222222222": {
166                         Driver:         "stub",
167                         Replication:    1,
168                         StorageClasses: map[string]bool{"class2": true, "class3": true}},
169         }
170
171         // "foobar" is just some data that happens to result in
172         // rendezvous order {111, 222}
173         data := []byte("foobar")
174         hash := fmt.Sprintf("%x", md5.Sum(data))
175
176         for _, trial := range []struct {
177                 priority1 int // priority of class1, thus vol1
178                 priority2 int // priority of class2
179                 priority3 int // priority of class3 (vol2 priority will be max(priority2, priority3))
180                 expectLog string
181         }{
182                 {100, 50, 50, "111 read 385\n"},              // class1 has higher priority => try vol1 first, no need to try vol2
183                 {100, 100, 100, "111 read 385\n"},            // same priority, vol2 is first in rendezvous order => try vol1 first and succeed
184                 {66, 99, 33, "222 read 385\n111 read 385\n"}, // class2 has higher priority => try vol2 first, then try vol1
185                 {66, 33, 99, "222 read 385\n111 read 385\n"}, // class3 has highest priority => vol2 has highest => try vol2 first, then try vol1
186         } {
187                 c.Logf("=== %+v", trial)
188
189                 s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
190                         "class1": {Priority: trial.priority1},
191                         "class2": {Priority: trial.priority2},
192                         "class3": {Priority: trial.priority3},
193                 }
194                 ks, cancel := testKeepstore(c, s.cluster, nil)
195                 defer cancel()
196
197                 ctx := authContext(arvadostest.ActiveTokenV2)
198                 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
199                         Hash:           hash,
200                         Data:           data,
201                         StorageClasses: []string{"class1"},
202                 })
203                 c.Assert(err, IsNil)
204
205                 // Combine logs into one. (We only want the logs from
206                 // the BlockRead below, not from BlockWrite above.)
207                 stubLog := &stubLog{}
208                 for _, mnt := range ks.mounts {
209                         mnt.volume.(*stubVolume).stubLog = stubLog
210                 }
211
212                 n, err := ks.BlockRead(ctx, arvados.BlockReadOptions{
213                         Locator: resp.Locator,
214                         WriteTo: io.Discard,
215                 })
216                 c.Assert(n, Equals, len(data))
217                 c.Assert(err, IsNil)
218                 c.Check(stubLog.String(), Equals, trial.expectLog)
219         }
220 }
221
222 func (s *keepstoreSuite) TestBlockWrite_NoWritableVolumes(c *C) {
223         for uuid, v := range s.cluster.Volumes {
224                 v.ReadOnly = true
225                 s.cluster.Volumes[uuid] = v
226         }
227         ks, cancel := testKeepstore(c, s.cluster, nil)
228         defer cancel()
229         for _, mnt := range ks.mounts {
230                 mnt.volume.(*stubVolume).blockWrite = func(context.Context, string, []byte) error {
231                         c.Error("volume BlockWrite called")
232                         return errors.New("fail")
233                 }
234         }
235         ctx := authContext(arvadostest.ActiveTokenV2)
236
237         _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
238                 Hash: fooHash,
239                 Data: []byte("foo")})
240         c.Check(err, NotNil)
241         c.Check(err.(interface{ HTTPStatus() int }).HTTPStatus(), Equals, http.StatusInsufficientStorage)
242 }
243
244 func (s *keepstoreSuite) TestBlockWrite_MultipleStorageClasses(c *C) {
245         s.cluster.Volumes = map[string]arvados.Volume{
246                 "zzzzz-nyw5e-111111111111111": {
247                         Driver:         "stub",
248                         Replication:    1,
249                         StorageClasses: map[string]bool{"class1": true}},
250                 "zzzzz-nyw5e-121212121212121": {
251                         Driver:         "stub",
252                         Replication:    1,
253                         StorageClasses: map[string]bool{"class1": true, "class2": true}},
254                 "zzzzz-nyw5e-222222222222222": {
255                         Driver:         "stub",
256                         Replication:    1,
257                         StorageClasses: map[string]bool{"class2": true}},
258         }
259
260         // testData is a block that happens to have rendezvous order 111, 121, 222
261         testData := []byte("qux")
262         testHash := fmt.Sprintf("%x+%d", md5.Sum(testData), len(testData))
263
264         s.cluster.StorageClasses = map[string]arvados.StorageClassConfig{
265                 "class1": {},
266                 "class2": {},
267                 "class3": {},
268         }
269
270         ctx := authContext(arvadostest.ActiveTokenV2)
271         for idx, trial := range []struct {
272                 classes   string // desired classes
273                 expectLog string
274         }{
275                 {"class1", "" +
276                         "111 read d85\n" +
277                         "121 read d85\n" +
278                         "111 write d85\n" +
279                         "111 read d85\n" +
280                         "111 touch d85\n"},
281                 {"class2", "" +
282                         "121 read d85\n" + // write#1
283                         "222 read d85\n" +
284                         "121 write d85\n" +
285                         "121 read d85\n" + // write#2
286                         "121 touch d85\n"},
287                 {"class1,class2", "" +
288                         "111 read d85\n" + // write#1
289                         "121 read d85\n" +
290                         "222 read d85\n" +
291                         "121 write d85\n" +
292                         "111 write d85\n" +
293                         "111 read d85\n" + // write#2
294                         "111 touch d85\n" +
295                         "121 read d85\n" +
296                         "121 touch d85\n"},
297                 {"class1,class2,class404", "" +
298                         "111 read d85\n" + // write#1
299                         "121 read d85\n" +
300                         "222 read d85\n" +
301                         "121 write d85\n" +
302                         "111 write d85\n" +
303                         "111 read d85\n" + // write#2
304                         "111 touch d85\n" +
305                         "121 read d85\n" +
306                         "121 touch d85\n"},
307         } {
308                 c.Logf("=== %d: %+v", idx, trial)
309
310                 ks, cancel := testKeepstore(c, s.cluster, nil)
311                 defer cancel()
312                 stubLog := &stubLog{}
313                 for _, mnt := range ks.mounts {
314                         mnt.volume.(*stubVolume).stubLog = stubLog
315                 }
316
317                 // Check that we chose the right block data
318                 rvz := ks.rendezvous(testHash, ks.mountsW)
319                 c.Assert(rvz[0].UUID[24:], Equals, "111")
320                 c.Assert(rvz[1].UUID[24:], Equals, "121")
321                 c.Assert(rvz[2].UUID[24:], Equals, "222")
322
323                 for i := 0; i < 2; i++ {
324                         _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
325                                 Hash:           testHash,
326                                 Data:           testData,
327                                 StorageClasses: strings.Split(trial.classes, ","),
328                         })
329                         c.Check(err, IsNil)
330                 }
331                 // The "nextmnt" loop in BlockWrite first starts the
332                 // goroutine that writes to mount 121, then the
333                 // goroutine that writes to mount 111.  Most of the
334                 // time, mount 121 will log first, but occasionally
335                 // mount 111 will log first.  In that case we swap the
336                 // log entries. (The order of the rest of the log
337                 // entries is meaningful -- just not these two.)
338                 gotLog := strings.Replace(stubLog.String(),
339                         "111 write d85\n121 write d85\n",
340                         "121 write d85\n111 write d85\n", 1)
341                 c.Check(gotLog, Equals, trial.expectLog)
342         }
343 }
344
345 func (s *keepstoreSuite) TestBlockTrash(c *C) {
346         s.cluster.Volumes = map[string]arvados.Volume{
347                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
348                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
349                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
350                 "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
351         }
352         ks, cancel := testKeepstore(c, s.cluster, nil)
353         defer cancel()
354
355         var vol []*stubVolume
356         for _, mount := range ks.mountsR {
357                 vol = append(vol, mount.volume.(*stubVolume))
358         }
359         sort.Slice(vol, func(i, j int) bool {
360                 return vol[i].params.UUID < vol[j].params.UUID
361         })
362
363         ctx := context.Background()
364         loc := fooHash + "+3"
365         tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
366
367         clear := func() {
368                 for _, vol := range vol {
369                         err := vol.BlockTrash(fooHash)
370                         if !os.IsNotExist(err) {
371                                 c.Assert(err, IsNil)
372                         }
373                 }
374         }
375         writeit := func(volidx int) {
376                 err := vol[volidx].BlockWrite(ctx, fooHash, []byte("foo"))
377                 c.Assert(err, IsNil)
378                 err = vol[volidx].blockTouchWithTime(fooHash, tOld)
379                 c.Assert(err, IsNil)
380         }
381         trashit := func() error {
382                 return ks.BlockTrash(ctx, loc)
383         }
384         checkexists := func(volidx int) bool {
385                 err := vol[volidx].BlockRead(ctx, fooHash, brdiscard)
386                 if !os.IsNotExist(err) {
387                         c.Check(err, IsNil)
388                 }
389                 return err == nil
390         }
391
392         clear()
393         c.Check(trashit(), Equals, os.ErrNotExist)
394
395         // one old replica => trash it
396         clear()
397         writeit(0)
398         c.Check(trashit(), IsNil)
399         c.Check(checkexists(0), Equals, false)
400
401         // one old replica + one new replica => keep new, trash old
402         clear()
403         writeit(0)
404         writeit(1)
405         c.Check(vol[1].blockTouchWithTime(fooHash, time.Now()), IsNil)
406         c.Check(trashit(), IsNil)
407         c.Check(checkexists(0), Equals, false)
408         c.Check(checkexists(1), Equals, true)
409
410         // two old replicas => trash both
411         clear()
412         writeit(0)
413         writeit(1)
414         c.Check(trashit(), IsNil)
415         c.Check(checkexists(0), Equals, false)
416         c.Check(checkexists(1), Equals, false)
417
418         // four old replicas => trash all except readonly volume with
419         // AllowTrashWhenReadOnly==false
420         clear()
421         writeit(0)
422         writeit(1)
423         writeit(2)
424         writeit(3)
425         c.Check(trashit(), IsNil)
426         c.Check(checkexists(0), Equals, false)
427         c.Check(checkexists(1), Equals, false)
428         c.Check(checkexists(2), Equals, true)
429         c.Check(checkexists(3), Equals, false)
430
431         // two old replicas but one returns an error => return the
432         // only non-404 backend error
433         clear()
434         vol[0].blockTrash = func(hash string) error {
435                 return errors.New("fake error")
436         }
437         writeit(0)
438         writeit(3)
439         c.Check(trashit(), ErrorMatches, "fake error")
440         c.Check(checkexists(0), Equals, true)
441         c.Check(checkexists(1), Equals, false)
442         c.Check(checkexists(2), Equals, false)
443         c.Check(checkexists(3), Equals, false)
444 }
445
446 func (s *keepstoreSuite) TestBlockWrite_OnlyOneBuffer(c *C) {
447         s.cluster.API.MaxKeepBlobBuffers = 1
448         ks, cancel := testKeepstore(c, s.cluster, nil)
449         defer cancel()
450         ok := make(chan struct{})
451         go func() {
452                 defer close(ok)
453                 ctx := authContext(arvadostest.ActiveTokenV2)
454                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
455                         Hash: fooHash,
456                         Data: []byte("foo")})
457                 c.Check(err, IsNil)
458         }()
459         select {
460         case <-ok:
461         case <-time.After(time.Second):
462                 c.Fatal("PUT deadlocks with MaxKeepBlobBuffers==1")
463         }
464 }
465
466 func (s *keepstoreSuite) TestBufferPoolLeak(c *C) {
467         s.cluster.API.MaxKeepBlobBuffers = 4
468         ks, cancel := testKeepstore(c, s.cluster, nil)
469         defer cancel()
470
471         ctx := authContext(arvadostest.ActiveTokenV2)
472         var wg sync.WaitGroup
473         for range make([]int, 20) {
474                 wg.Add(1)
475                 go func() {
476                         defer wg.Done()
477                         resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
478                                 Hash: fooHash,
479                                 Data: []byte("foo")})
480                         c.Check(err, IsNil)
481                         _, err = ks.BlockRead(ctx, arvados.BlockReadOptions{
482                                 Locator: resp.Locator,
483                                 WriteTo: io.Discard})
484                         c.Check(err, IsNil)
485                 }()
486         }
487         ok := make(chan struct{})
488         go func() {
489                 wg.Wait()
490                 close(ok)
491         }()
492         select {
493         case <-ok:
494         case <-time.After(time.Second):
495                 c.Fatal("read/write sequence deadlocks, likely buffer pool leak")
496         }
497 }
498
499 func (s *keepstoreSuite) TestPutStorageClasses(c *C) {
500         s.cluster.Volumes = map[string]arvados.Volume{
501                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, // "default" is implicit
502                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"special": true, "extra": true}},
503                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", StorageClasses: map[string]bool{"readonly": true}, ReadOnly: true},
504         }
505         ks, cancel := testKeepstore(c, s.cluster, nil)
506         defer cancel()
507         ctx := authContext(arvadostest.ActiveTokenV2)
508
509         for _, trial := range []struct {
510                 ask            []string
511                 expectReplicas int
512                 expectClasses  map[string]int
513         }{
514                 {nil,
515                         1,
516                         map[string]int{"default": 1}},
517                 {[]string{},
518                         1,
519                         map[string]int{"default": 1}},
520                 {[]string{"default"},
521                         1,
522                         map[string]int{"default": 1}},
523                 {[]string{"default", "default"},
524                         1,
525                         map[string]int{"default": 1}},
526                 {[]string{"special"},
527                         1,
528                         map[string]int{"extra": 1, "special": 1}},
529                 {[]string{"special", "readonly"},
530                         1,
531                         map[string]int{"extra": 1, "special": 1}},
532                 {[]string{"special", "nonexistent"},
533                         1,
534                         map[string]int{"extra": 1, "special": 1}},
535                 {[]string{"extra", "special"},
536                         1,
537                         map[string]int{"extra": 1, "special": 1}},
538                 {[]string{"default", "special"},
539                         2,
540                         map[string]int{"default": 1, "extra": 1, "special": 1}},
541         } {
542                 c.Logf("success case %#v", trial)
543                 resp, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
544                         Hash:           fooHash,
545                         Data:           []byte("foo"),
546                         StorageClasses: trial.ask,
547                 })
548                 if !c.Check(err, IsNil) {
549                         continue
550                 }
551                 c.Check(resp.Replicas, Equals, trial.expectReplicas)
552                 if len(trial.expectClasses) == 0 {
553                         // any non-empty value is correct
554                         c.Check(resp.StorageClasses, Not(HasLen), 0)
555                 } else {
556                         c.Check(resp.StorageClasses, DeepEquals, trial.expectClasses)
557                 }
558         }
559
560         for _, ask := range [][]string{
561                 {"doesnotexist"},
562                 {"doesnotexist", "readonly"},
563                 {"readonly"},
564         } {
565                 c.Logf("failure case %s", ask)
566                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{
567                         Hash:           fooHash,
568                         Data:           []byte("foo"),
569                         StorageClasses: ask,
570                 })
571                 c.Check(err, NotNil)
572         }
573 }
574
575 func (s *keepstoreSuite) TestUntrashHandlerWithNoWritableVolumes(c *C) {
576         for uuid, v := range s.cluster.Volumes {
577                 v.ReadOnly = true
578                 s.cluster.Volumes[uuid] = v
579         }
580         ks, cancel := testKeepstore(c, s.cluster, nil)
581         defer cancel()
582
583         for _, mnt := range ks.mounts {
584                 err := mnt.BlockWrite(context.Background(), fooHash, []byte("foo"))
585                 c.Assert(err, IsNil)
586                 err = mnt.BlockRead(context.Background(), fooHash, brdiscard)
587                 c.Assert(err, IsNil)
588         }
589
590         err := ks.BlockUntrash(context.Background(), fooHash)
591         c.Check(os.IsNotExist(err), Equals, true)
592
593         for _, mnt := range ks.mounts {
594                 err := mnt.BlockRead(context.Background(), fooHash, brdiscard)
595                 c.Assert(err, IsNil)
596         }
597 }
598
599 func (s *keepstoreSuite) TestBlockWrite_SkipReadOnly(c *C) {
600         s.cluster.Volumes = map[string]arvados.Volume{
601                 "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
602                 "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub", ReadOnly: true},
603                 "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
604         }
605         ks, cancel := testKeepstore(c, s.cluster, nil)
606         defer cancel()
607         ctx := authContext(arvadostest.ActiveTokenV2)
608
609         for i := range make([]byte, 32) {
610                 data := []byte(fmt.Sprintf("block %d", i))
611                 _, err := ks.BlockWrite(ctx, arvados.BlockWriteOptions{Data: data})
612                 c.Assert(err, IsNil)
613         }
614         c.Check(ks.mounts["zzzzz-nyw5e-000000000000000"].volume.(*stubVolume).stubLog.String(), Matches, "(?ms).*write.*")
615         c.Check(ks.mounts["zzzzz-nyw5e-111111111111111"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
616         c.Check(ks.mounts["zzzzz-nyw5e-222222222222222"].volume.(*stubVolume).stubLog.String(), HasLen, 0)
617 }
618
619 func (s *keepstoreSuite) TestGetLocatorInfo(c *C) {
620         for _, trial := range []struct {
621                 locator string
622                 ok      bool
623                 expect  locatorInfo
624         }{
625                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
626                         ok: true},
627                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
628                         ok: true, expect: locatorInfo{size: 1234}},
629                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Abcdef@abcdef",
630                         ok: true, expect: locatorInfo{size: 1234, signed: true}},
631                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+Rzzzzz-abcdef",
632                         ok: true, expect: locatorInfo{size: 1234, remote: true}},
633                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+12345+Zexample+Rzzzzz-abcdef",
634                         ok: true, expect: locatorInfo{size: 12345, remote: true}},
635                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123456+👶🦈+Rzzzzz-abcdef",
636                         ok: true, expect: locatorInfo{size: 123456, remote: true}},
637                 // invalid: bad hash char
638                 {locator: "aaaaaaaaaaaaaazaaaaaaaaaaaaaaaaa+1234",
639                         ok: false},
640                 {locator: "aaaaaaaaaaaaaaFaaaaaaaaaaaaaaaaa+1234",
641                         ok: false},
642                 {locator: "aaaaaaaaaaaaaa⛵aaaaaaaaaaaaaaaaa+1234",
643                         ok: false},
644                 // invalid: hash length != 32
645                 {locator: "",
646                         ok: false},
647                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
648                         ok: false},
649                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
650                         ok: false},
651                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb",
652                         ok: false},
653                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabb+1234",
654                         ok: false},
655                 // invalid: first hint is not size
656                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+Abcdef+1234",
657                         ok: false},
658                 // invalid: leading/trailing/double +
659                 {locator: "+aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234",
660                         ok: false},
661                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234+",
662                         ok: false},
663                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa++1234",
664                         ok: false},
665                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+1234++Abcdef@abcdef",
666                         ok: false},
667         } {
668                 c.Logf("=== %s", trial.locator)
669                 li, err := getLocatorInfo(trial.locator)
670                 if !trial.ok {
671                         c.Check(err, NotNil)
672                         continue
673                 }
674                 c.Check(err, IsNil)
675                 c.Check(li.hash, Equals, trial.locator[:32])
676                 c.Check(li.size, Equals, trial.expect.size)
677                 c.Check(li.signed, Equals, trial.expect.signed)
678                 c.Check(li.remote, Equals, trial.expect.remote)
679         }
680 }
681
682 func init() {
683         driver["stub"] = func(params newVolumeParams) (volume, error) {
684                 v := &stubVolume{
685                         params:  params,
686                         data:    make(map[string]stubData),
687                         stubLog: &stubLog{},
688                 }
689                 return v, nil
690         }
691 }
692
693 type stubLog struct {
694         sync.Mutex
695         bytes.Buffer
696 }
697
698 func (sl *stubLog) Printf(format string, args ...interface{}) {
699         if sl == nil {
700                 return
701         }
702         sl.Lock()
703         defer sl.Unlock()
704         fmt.Fprintf(sl, format+"\n", args...)
705 }
706
707 type stubData struct {
708         mtime time.Time
709         data  []byte
710         trash time.Time
711 }
712
713 type stubVolume struct {
714         params  newVolumeParams
715         data    map[string]stubData
716         stubLog *stubLog
717         mtx     sync.Mutex
718
719         // The following funcs enable tests to insert delays and
720         // failures. Each volume operation begins by calling the
721         // corresponding func (if non-nil). If the func returns an
722         // error, that error is returned to caller. Otherwise, the
723         // stub continues normally.
724         blockRead    func(ctx context.Context, hash string, writeTo io.WriterAt) error
725         blockWrite   func(ctx context.Context, hash string, data []byte) error
726         deviceID     func() string
727         blockTouch   func(hash string) error
728         blockTrash   func(hash string) error
729         blockUntrash func(hash string) error
730         index        func(ctx context.Context, prefix string, writeTo io.Writer) error
731         mtime        func(hash string) (time.Time, error)
732         emptyTrash   func()
733 }
734
735 func (v *stubVolume) log(op, hash string) {
736         // Note this intentionally crashes if UUID or hash is short --
737         // if keepstore ever does that, tests should fail.
738         v.stubLog.Printf("%s %s %s", v.params.UUID[24:27], op, hash[:3])
739 }
740
741 func (v *stubVolume) BlockRead(ctx context.Context, hash string, writeTo io.WriterAt) error {
742         v.log("read", hash)
743         if v.blockRead != nil {
744                 err := v.blockRead(ctx, hash, writeTo)
745                 if err != nil {
746                         return err
747                 }
748         }
749         v.mtx.Lock()
750         ent, ok := v.data[hash]
751         v.mtx.Unlock()
752         if !ok || !ent.trash.IsZero() {
753                 return os.ErrNotExist
754         }
755         wrote := 0
756         for writesize := 1000; wrote < len(ent.data); writesize = writesize * 2 {
757                 data := ent.data[wrote:]
758                 if len(data) > writesize {
759                         data = data[:writesize]
760                 }
761                 n, err := writeTo.WriteAt(data, int64(wrote))
762                 wrote += n
763                 if err != nil {
764                         return err
765                 }
766         }
767         return nil
768 }
769
770 func (v *stubVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
771         v.log("write", hash)
772         if v.blockWrite != nil {
773                 if err := v.blockWrite(ctx, hash, data); err != nil {
774                         return err
775                 }
776         }
777         v.mtx.Lock()
778         defer v.mtx.Unlock()
779         v.data[hash] = stubData{
780                 mtime: time.Now(),
781                 data:  append([]byte(nil), data...),
782         }
783         return nil
784 }
785
786 func (v *stubVolume) DeviceID() string {
787         return fmt.Sprintf("%p", v)
788 }
789
790 func (v *stubVolume) BlockTouch(hash string) error {
791         v.log("touch", hash)
792         if v.blockTouch != nil {
793                 if err := v.blockTouch(hash); err != nil {
794                         return err
795                 }
796         }
797         v.mtx.Lock()
798         defer v.mtx.Unlock()
799         ent, ok := v.data[hash]
800         if !ok || !ent.trash.IsZero() {
801                 return os.ErrNotExist
802         }
803         ent.mtime = time.Now()
804         v.data[hash] = ent
805         return nil
806 }
807
808 // Set mtime to the (presumably old) specified time.
809 func (v *stubVolume) blockTouchWithTime(hash string, t time.Time) error {
810         v.log("touchwithtime", hash)
811         v.mtx.Lock()
812         defer v.mtx.Unlock()
813         ent, ok := v.data[hash]
814         if !ok {
815                 return os.ErrNotExist
816         }
817         ent.mtime = t
818         v.data[hash] = ent
819         return nil
820 }
821
822 func (v *stubVolume) BlockTrash(hash string) error {
823         v.log("trash", hash)
824         if v.blockTrash != nil {
825                 if err := v.blockTrash(hash); err != nil {
826                         return err
827                 }
828         }
829         v.mtx.Lock()
830         defer v.mtx.Unlock()
831         ent, ok := v.data[hash]
832         if !ok || !ent.trash.IsZero() {
833                 return os.ErrNotExist
834         }
835         ent.trash = time.Now().Add(v.params.Cluster.Collections.BlobTrashLifetime.Duration())
836         v.data[hash] = ent
837         return nil
838 }
839
840 func (v *stubVolume) BlockUntrash(hash string) error {
841         v.log("untrash", hash)
842         if v.blockUntrash != nil {
843                 if err := v.blockUntrash(hash); err != nil {
844                         return err
845                 }
846         }
847         v.mtx.Lock()
848         defer v.mtx.Unlock()
849         ent, ok := v.data[hash]
850         if !ok || ent.trash.IsZero() {
851                 return os.ErrNotExist
852         }
853         ent.trash = time.Time{}
854         v.data[hash] = ent
855         return nil
856 }
857
858 func (v *stubVolume) Index(ctx context.Context, prefix string, writeTo io.Writer) error {
859         v.stubLog.Printf("%s index %s", v.params.UUID, prefix)
860         if v.index != nil {
861                 if err := v.index(ctx, prefix, writeTo); err != nil {
862                         return err
863                 }
864         }
865         buf := &bytes.Buffer{}
866         v.mtx.Lock()
867         for hash, ent := range v.data {
868                 if ent.trash.IsZero() && strings.HasPrefix(hash, prefix) {
869                         fmt.Fprintf(buf, "%s+%d %d\n", hash, len(ent.data), ent.mtime.UnixNano())
870                 }
871         }
872         v.mtx.Unlock()
873         _, err := io.Copy(writeTo, buf)
874         return err
875 }
876
877 func (v *stubVolume) Mtime(hash string) (time.Time, error) {
878         v.log("mtime", hash)
879         if v.mtime != nil {
880                 if t, err := v.mtime(hash); err != nil {
881                         return t, err
882                 }
883         }
884         v.mtx.Lock()
885         defer v.mtx.Unlock()
886         ent, ok := v.data[hash]
887         if !ok || !ent.trash.IsZero() {
888                 return time.Time{}, os.ErrNotExist
889         }
890         return ent.mtime, nil
891 }
892
893 func (v *stubVolume) EmptyTrash() {
894         v.stubLog.Printf("%s emptytrash", v.params.UUID)
895         v.mtx.Lock()
896         defer v.mtx.Unlock()
897         for hash, ent := range v.data {
898                 if !ent.trash.IsZero() && time.Now().After(ent.trash) {
899                         delete(v.data, hash)
900                 }
901         }
902 }