]> git.arvados.org - arvados.git/blob - sdk/go/arvados/fs_collection_test.go
Merge branch '22581-api-service-support' refs #22581
[arvados.git] / sdk / go / arvados / fs_collection_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "io/fs"
15         "io/ioutil"
16         "math/rand"
17         "net/http"
18         "os"
19         "os/exec"
20         "path/filepath"
21         "regexp"
22         "runtime"
23         "runtime/pprof"
24         "strings"
25         "sync"
26         "sync/atomic"
27         "testing"
28         "time"
29
30         check "gopkg.in/check.v1"
31 )
32
33 var _ = check.Suite(&CollectionFSSuite{})
34
35 type keepClientStub struct {
36         blocks      map[string][]byte
37         refreshable map[string]bool
38         cached      map[string]bool
39         reads       []string                   // locators from ReadAt() calls
40         onWrite     func(bufcopy []byte) error // called from WriteBlock, before acquiring lock
41         authToken   string                     // client's auth token (used for signing locators)
42         sigkey      string                     // blob signing key
43         sigttl      time.Duration              // blob signing ttl
44         sync.RWMutex
45 }
46
47 var errStub404 = errors.New("404 block not found")
48
49 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
50         kcs.Lock()
51         kcs.reads = append(kcs.reads, locator)
52         kcs.Unlock()
53         kcs.RLock()
54         defer kcs.RUnlock()
55         if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
56                 return 0, err
57         }
58         buf := kcs.blocks[locator[:32]]
59         if buf == nil {
60                 return 0, errStub404
61         }
62         return copy(p, buf[off:]), nil
63 }
64
65 func (kcs *keepClientStub) BlockRead(_ context.Context, opts BlockReadOptions) (int, error) {
66         kcs.Lock()
67         kcs.reads = append(kcs.reads, opts.Locator)
68         kcs.Unlock()
69         kcs.RLock()
70         defer kcs.RUnlock()
71         if opts.CheckCacheOnly {
72                 if kcs.cached[opts.Locator[:32]] {
73                         return 0, nil
74                 } else {
75                         return 0, ErrNotCached
76                 }
77         }
78         if err := VerifySignature(opts.Locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
79                 return 0, err
80         }
81         buf := kcs.blocks[opts.Locator[:32]]
82         if buf == nil {
83                 return 0, errStub404
84         }
85         n, err := io.Copy(opts.WriteTo, bytes.NewReader(buf))
86         return int(n), err
87 }
88
89 func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
90         var buf []byte
91         if opts.Data == nil {
92                 buf = make([]byte, opts.DataSize)
93                 _, err := io.ReadFull(opts.Reader, buf)
94                 if err != nil {
95                         return BlockWriteResponse{}, err
96                 }
97         } else {
98                 buf = append([]byte(nil), opts.Data...)
99         }
100         locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
101         if kcs.onWrite != nil {
102                 err := kcs.onWrite(buf)
103                 if err != nil {
104                         return BlockWriteResponse{}, err
105                 }
106         }
107         for _, sc := range opts.StorageClasses {
108                 if sc != "default" {
109                         return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
110                 }
111         }
112         kcs.Lock()
113         defer kcs.Unlock()
114         kcs.blocks[locator[:32]] = buf
115         return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
116 }
117
118 var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
119
120 func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
121         if strings.Contains(locator, "+A") {
122                 return locator, nil
123         }
124         kcs.Lock()
125         defer kcs.Unlock()
126         if strings.Contains(locator, "+R") {
127                 if len(locator) < 32 {
128                         return "", fmt.Errorf("bad locator: %q", locator)
129                 }
130                 if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
131                         return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
132                 }
133         }
134         locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
135         locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
136         return locator, nil
137 }
138
139 type CollectionFSSuite struct {
140         client *Client
141         coll   Collection
142         fs     CollectionFileSystem
143         kc     *keepClientStub
144 }
145
146 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
147         s.client = NewClientFromEnv()
148         s.client.AuthToken = fixtureActiveToken
149         err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
150         c.Assert(err, check.IsNil)
151         s.kc = &keepClientStub{
152                 blocks: map[string][]byte{
153                         "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
154                 },
155                 sigkey:    fixtureBlobSigningKey,
156                 sigttl:    fixtureBlobSigningTTL,
157                 authToken: fixtureActiveToken,
158         }
159         s.fs, err = s.coll.FileSystem(s.client, s.kc)
160         c.Assert(err, check.IsNil)
161 }
162
163 func (s *CollectionFSSuite) TestSyncNonCanonicalManifest(c *check.C) {
164         var coll Collection
165         err := s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
166         c.Assert(err, check.IsNil)
167         mtxt := strings.Replace(coll.ManifestText, "3:3:bar 0:3:foo", "0:3:foo 3:3:bar", -1)
168         c.Assert(mtxt, check.Not(check.Equals), coll.ManifestText)
169         err = s.client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
170                 "collection": map[string]interface{}{
171                         "manifest_text": mtxt}})
172         c.Assert(err, check.IsNil)
173         // In order for the rest of the test to work as intended, the API server
174         // needs to retain the file ordering we set manually. We check that here.
175         // We can't check `mtxt == coll.ManifestText` because the API server
176         // might've returned new block signatures if the GET and POST happened in
177         // different seconds.
178         expectPattern := `\./dir1 \S+ 0:3:foo 3:3:bar\n`
179         c.Assert(coll.ManifestText, check.Matches, expectPattern)
180
181         fs, err := coll.FileSystem(s.client, s.kc)
182         c.Assert(err, check.IsNil)
183         err = fs.Sync()
184         c.Check(err, check.IsNil)
185
186         // fs had no local changes, so Sync should not have saved
187         // anything back to the API/database. (If it did, we would see
188         // the manifest rewritten in canonical order.)
189         var saved Collection
190         err = s.client.RequestAndDecode(&saved, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
191         c.Assert(err, check.IsNil)
192         c.Check(saved.ManifestText, check.Matches, expectPattern)
193 }
194
195 func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
196         _, ok := s.fs.(http.FileSystem)
197         c.Check(ok, check.Equals, true)
198 }
199
200 func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
201         fs, err := (&Collection{
202                 StorageClassesDesired: []string{"unobtainium"},
203         }).FileSystem(s.client, s.kc)
204         c.Assert(err, check.IsNil)
205
206         f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
207         c.Assert(err, check.IsNil)
208         _, err = f.Write([]byte("food"))
209         c.Assert(err, check.IsNil)
210         err = f.Close()
211         c.Assert(err, check.IsNil)
212         _, err = fs.MarshalManifest(".")
213         c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
214 }
215
216 func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
217         fs, err := (&Collection{
218                 ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
219         }).FileSystem(s.client, s.kc)
220         c.Assert(err, check.IsNil)
221
222         f, err := fs.Open("/foo:foo")
223         c.Assert(err, check.IsNil)
224
225         fis, err := f.Readdir(0)
226         c.Check(err, check.IsNil)
227         c.Check(len(fis), check.Equals, 1)
228         c.Check(fis[0].Name(), check.Equals, "bar:bar")
229 }
230
231 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
232         f, err := s.fs.Open("/dir1")
233         c.Assert(err, check.IsNil)
234
235         st, err := f.Stat()
236         c.Assert(err, check.IsNil)
237         c.Check(st.Size(), check.Equals, int64(2))
238         c.Check(st.IsDir(), check.Equals, true)
239
240         fis, err := f.Readdir(0)
241         c.Check(err, check.IsNil)
242         c.Check(len(fis), check.Equals, 2)
243         if len(fis) > 0 {
244                 c.Check(fis[0].Size(), check.Equals, int64(3))
245         }
246 }
247
248 func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
249         f, err := s.fs.Open("./dir1")
250         c.Assert(err, check.IsNil)
251
252         fis, err := f.Readdir(1)
253         c.Check(err, check.IsNil)
254         c.Check(len(fis), check.Equals, 1)
255         if len(fis) > 0 {
256                 c.Check(fis[0].Size(), check.Equals, int64(3))
257         }
258
259         fis, err = f.Readdir(1)
260         c.Check(err, check.IsNil)
261         c.Check(len(fis), check.Equals, 1)
262         if len(fis) > 0 {
263                 c.Check(fis[0].Size(), check.Equals, int64(3))
264         }
265
266         fis, err = f.Readdir(1)
267         c.Check(len(fis), check.Equals, 0)
268         c.Check(err, check.NotNil)
269         c.Check(err, check.Equals, io.EOF)
270
271         f, err = s.fs.Open("dir1")
272         c.Assert(err, check.IsNil)
273         fis, err = f.Readdir(1)
274         c.Check(len(fis), check.Equals, 1)
275         c.Assert(err, check.IsNil)
276         fis, err = f.Readdir(2)
277         c.Check(len(fis), check.Equals, 1)
278         c.Assert(err, check.IsNil)
279         fis, err = f.Readdir(2)
280         c.Check(len(fis), check.Equals, 0)
281         c.Assert(err, check.Equals, io.EOF)
282 }
283
284 func (s *CollectionFSSuite) TestPathMunge(c *check.C) {
285         for _, path := range []string{".", "/", "./", "///", "/../", "/./.."} {
286                 f, err := s.fs.Open(path)
287                 c.Assert(err, check.IsNil)
288
289                 st, err := f.Stat()
290                 c.Assert(err, check.IsNil)
291                 c.Check(st.Size(), check.Equals, int64(1))
292                 c.Check(st.IsDir(), check.Equals, true)
293         }
294         for _, path := range []string{"/dir1", "dir1", "./dir1", "///dir1//.//", "../dir1/../dir1/"} {
295                 c.Logf("%q", path)
296                 f, err := s.fs.Open(path)
297                 c.Assert(err, check.IsNil)
298
299                 st, err := f.Stat()
300                 c.Assert(err, check.IsNil)
301                 c.Check(st.Size(), check.Equals, int64(2))
302                 c.Check(st.IsDir(), check.Equals, true)
303         }
304 }
305
306 func (s *CollectionFSSuite) TestNotExist(c *check.C) {
307         for _, path := range []string{"/no", "no", "./no", "n/o", "/n/o"} {
308                 f, err := s.fs.Open(path)
309                 c.Assert(f, check.IsNil)
310                 c.Assert(err, check.NotNil)
311                 c.Assert(os.IsNotExist(err), check.Equals, true)
312         }
313 }
314
315 func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
316         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
317         c.Assert(err, check.IsNil)
318         st, err := f.Stat()
319         c.Assert(err, check.IsNil)
320         c.Check(st.Size(), check.Equals, int64(3))
321         n, err := f.Write([]byte("bar"))
322         c.Check(n, check.Equals, 0)
323         c.Check(err, check.Equals, ErrReadOnlyFile)
324 }
325
326 func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
327         f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
328         c.Assert(err, check.IsNil)
329         st, err := f.Stat()
330         c.Assert(err, check.IsNil)
331         c.Check(st.Size(), check.Equals, int64(0))
332
333         n, err := f.Write([]byte("bar"))
334         c.Check(n, check.Equals, 3)
335         c.Check(err, check.IsNil)
336
337         c.Check(f.Close(), check.IsNil)
338
339         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
340         c.Check(f, check.IsNil)
341         c.Assert(err, check.NotNil)
342
343         f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
344         c.Assert(err, check.IsNil)
345         st, err = f.Stat()
346         c.Assert(err, check.IsNil)
347         c.Check(st.Size(), check.Equals, int64(3))
348
349         c.Check(f.Close(), check.IsNil)
350
351         m, err := s.fs.MarshalManifest(".")
352         c.Assert(err, check.IsNil)
353         c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
354 }
355
356 func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
357         maxBlockSize = 8
358         defer func() { maxBlockSize = 1 << 26 }()
359
360         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
361         c.Assert(err, check.IsNil)
362         defer f.Close()
363         st, err := f.Stat()
364         c.Assert(err, check.IsNil)
365         c.Check(st.Size(), check.Equals, int64(3))
366
367         f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
368         c.Assert(err, check.IsNil)
369         defer f2.Close()
370
371         buf := make([]byte, 64)
372         n, err := f.Read(buf)
373         c.Check(n, check.Equals, 3)
374         c.Check(err, check.Equals, io.EOF)
375         c.Check(string(buf[:3]), check.DeepEquals, "foo")
376
377         pos, err := f.Seek(-2, io.SeekCurrent)
378         c.Check(pos, check.Equals, int64(1))
379         c.Check(err, check.IsNil)
380
381         // Split a storedExtent in two, and insert a memExtent
382         n, err = f.Write([]byte("*"))
383         c.Check(n, check.Equals, 1)
384         c.Check(err, check.IsNil)
385
386         pos, err = f.Seek(0, io.SeekCurrent)
387         c.Check(pos, check.Equals, int64(2))
388         c.Check(err, check.IsNil)
389
390         pos, err = f.Seek(0, io.SeekStart)
391         c.Check(pos, check.Equals, int64(0))
392         c.Check(err, check.IsNil)
393
394         rbuf, err := ioutil.ReadAll(f)
395         c.Check(len(rbuf), check.Equals, 3)
396         c.Check(err, check.IsNil)
397         c.Check(string(rbuf), check.Equals, "f*o")
398
399         // Write multiple blocks in one call
400         f.Seek(1, io.SeekStart)
401         n, err = f.Write([]byte("0123456789abcdefg"))
402         c.Check(n, check.Equals, 17)
403         c.Check(err, check.IsNil)
404         pos, err = f.Seek(0, io.SeekCurrent)
405         c.Check(pos, check.Equals, int64(18))
406         c.Check(err, check.IsNil)
407         pos, err = f.Seek(-18, io.SeekCurrent)
408         c.Check(pos, check.Equals, int64(0))
409         c.Check(err, check.IsNil)
410         n, err = io.ReadFull(f, buf)
411         c.Check(n, check.Equals, 18)
412         c.Check(err, check.Equals, io.ErrUnexpectedEOF)
413         c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
414
415         buf2, err := ioutil.ReadAll(f2)
416         c.Check(err, check.IsNil)
417         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
418
419         // truncate to current size
420         err = f.Truncate(18)
421         c.Check(err, check.IsNil)
422         f2.Seek(0, io.SeekStart)
423         buf2, err = ioutil.ReadAll(f2)
424         c.Check(err, check.IsNil)
425         c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
426
427         // shrink to zero some data
428         f.Truncate(15)
429         f2.Seek(0, io.SeekStart)
430         buf2, err = ioutil.ReadAll(f2)
431         c.Check(err, check.IsNil)
432         c.Check(string(buf2), check.Equals, "f0123456789abcd")
433
434         // grow to partial block/extent
435         f.Truncate(20)
436         f2.Seek(0, io.SeekStart)
437         buf2, err = ioutil.ReadAll(f2)
438         c.Check(err, check.IsNil)
439         c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
440
441         f.Truncate(0)
442         f2.Seek(0, io.SeekStart)
443         f2.Write([]byte("12345678abcdefghijkl"))
444
445         // grow to block/extent boundary
446         f.Truncate(64)
447         f2.Seek(0, io.SeekStart)
448         buf2, err = ioutil.ReadAll(f2)
449         c.Check(err, check.IsNil)
450         c.Check(len(buf2), check.Equals, 64)
451         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
452
453         // shrink to block/extent boundary
454         err = f.Truncate(32)
455         c.Check(err, check.IsNil)
456         f2.Seek(0, io.SeekStart)
457         buf2, err = ioutil.ReadAll(f2)
458         c.Check(err, check.IsNil)
459         c.Check(len(buf2), check.Equals, 32)
460         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
461
462         // shrink to partial block/extent
463         err = f.Truncate(15)
464         c.Check(err, check.IsNil)
465         f2.Seek(0, io.SeekStart)
466         buf2, err = ioutil.ReadAll(f2)
467         c.Check(err, check.IsNil)
468         c.Check(string(buf2), check.Equals, "12345678abcdefg")
469         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
470
471         // Force flush to ensure the block "12345678" gets stored, so
472         // we know what to expect in the final manifest below.
473         _, err = s.fs.MarshalManifest(".")
474         c.Check(err, check.IsNil)
475
476         // Truncate to size=3 while f2's ptr is at 15
477         err = f.Truncate(3)
478         c.Check(err, check.IsNil)
479         buf2, err = ioutil.ReadAll(f2)
480         c.Check(err, check.IsNil)
481         c.Check(string(buf2), check.Equals, "")
482         f2.Seek(0, io.SeekStart)
483         buf2, err = ioutil.ReadAll(f2)
484         c.Check(err, check.IsNil)
485         c.Check(string(buf2), check.Equals, "123")
486         c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
487
488         m, err := s.fs.MarshalManifest(".")
489         c.Check(err, check.IsNil)
490         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
491         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
492         c.Check(s.fs.Size(), check.Equals, int64(6))
493 }
494
495 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
496         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
497         c.Assert(err, check.IsNil)
498         f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
499         c.Assert(err, check.IsNil)
500         defer f.Close()
501
502         checkSize := func(size int64) {
503                 fi, err := f.Stat()
504                 c.Assert(err, check.IsNil)
505                 c.Check(fi.Size(), check.Equals, size)
506
507                 f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
508                 c.Assert(err, check.IsNil)
509                 defer f.Close()
510                 fi, err = f.Stat()
511                 c.Check(err, check.IsNil)
512                 c.Check(fi.Size(), check.Equals, size)
513                 pos, err := f.Seek(0, io.SeekEnd)
514                 c.Check(err, check.IsNil)
515                 c.Check(pos, check.Equals, size)
516         }
517
518         f.Seek(2, io.SeekEnd)
519         checkSize(0)
520         f.Write([]byte{1})
521         checkSize(3)
522
523         f.Seek(2, io.SeekCurrent)
524         checkSize(3)
525         f.Write([]byte{})
526         checkSize(5)
527
528         f.Seek(8, io.SeekStart)
529         checkSize(5)
530         n, err := f.Read(make([]byte, 1))
531         c.Check(n, check.Equals, 0)
532         c.Check(err, check.Equals, io.EOF)
533         checkSize(5)
534         f.Write([]byte{1, 2, 3})
535         checkSize(11)
536 }
537
538 func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
539         foo := "foo"
540         bar := "bar"
541         hash := map[string]string{
542                 foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
543                 bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
544         }
545
546         fs, err := (&Collection{
547                 ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
548         }).FileSystem(s.client, s.kc)
549         c.Assert(err, check.IsNil)
550         manifest, err := fs.MarshalManifest(".")
551         c.Check(manifest, check.Equals, "")
552         c.Check(err, check.NotNil)
553
554         s.kc.refreshable = map[string]bool{hash[bar]: true}
555
556         for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
557                 fs, err = (&Collection{
558                         ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
559                 }).FileSystem(s.client, s.kc)
560                 c.Assert(err, check.IsNil)
561                 manifest, err := fs.MarshalManifest(".")
562                 c.Check(err, check.IsNil)
563                 // Both blocks should now have +A signatures.
564                 c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
565                 c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
566         }
567 }
568
569 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
570         maxBlockSize = 8
571         defer func() { maxBlockSize = 1 << 26 }()
572
573         var err error
574         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
575         c.Assert(err, check.IsNil)
576         for _, name := range []string{"foo", "bar", "baz"} {
577                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
578                 c.Assert(err, check.IsNil)
579                 f.Write([]byte(name))
580                 f.Close()
581         }
582
583         m, err := s.fs.MarshalManifest(".")
584         c.Check(err, check.IsNil)
585         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
586         c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
587 }
588
589 func (s *CollectionFSSuite) TestMkdir(c *check.C) {
590         err := s.fs.Mkdir("foo/bar", 0755)
591         c.Check(err, check.Equals, os.ErrNotExist)
592
593         f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
594         c.Check(err, check.Equals, os.ErrNotExist)
595
596         err = s.fs.Mkdir("foo", 0755)
597         c.Check(err, check.IsNil)
598
599         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
600         c.Check(err, check.IsNil)
601         if err == nil {
602                 defer f.Close()
603                 f.Write([]byte("foo"))
604         }
605
606         // mkdir fails if a file already exists with that name
607         err = s.fs.Mkdir("foo/bar", 0755)
608         c.Check(err, check.NotNil)
609
610         err = s.fs.Remove("foo/bar")
611         c.Check(err, check.IsNil)
612
613         // mkdir succeeds after the file is deleted
614         err = s.fs.Mkdir("foo/bar", 0755)
615         c.Check(err, check.IsNil)
616
617         // creating a file in a nonexistent subdir should still fail
618         f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
619         c.Check(err, check.Equals, os.ErrNotExist)
620
621         f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
622         c.Check(err, check.IsNil)
623         if err == nil {
624                 defer f.Close()
625                 f.Write([]byte("foo"))
626         }
627
628         // creating foo/bar as a regular file should fail
629         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
630         c.Check(err, check.NotNil)
631
632         // creating foo/bar as a directory should fail
633         f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
634         c.Check(err, check.NotNil)
635         err = s.fs.Mkdir("foo/bar", 0755)
636         c.Check(err, check.NotNil)
637
638         m, err := s.fs.MarshalManifest(".")
639         c.Check(err, check.IsNil)
640         m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
641         c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
642 }
643
644 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
645         if testing.Short() {
646                 c.Skip("slow")
647         }
648
649         maxBlockSize = 8
650         defer func() { maxBlockSize = 1 << 26 }()
651
652         var wg sync.WaitGroup
653         for n := 0; n < 128; n++ {
654                 wg.Add(1)
655                 go func() {
656                         defer wg.Done()
657                         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
658                         c.Assert(err, check.IsNil)
659                         defer f.Close()
660                         for i := 0; i < 1024; i++ {
661                                 r := rand.Uint32()
662                                 switch {
663                                 case r%11 == 0:
664                                         _, err := s.fs.MarshalManifest(".")
665                                         c.Check(err, check.IsNil)
666                                 case r&3 == 0:
667                                         f.Truncate(int64(rand.Intn(64)))
668                                 case r&3 == 1:
669                                         f.Seek(int64(rand.Intn(64)), io.SeekStart)
670                                 case r&3 == 2:
671                                         _, err := f.Write([]byte("beep boop"))
672                                         c.Check(err, check.IsNil)
673                                 case r&3 == 3:
674                                         _, err := ioutil.ReadAll(f)
675                                         c.Check(err, check.IsNil)
676                                 }
677                         }
678                 }()
679         }
680         wg.Wait()
681
682         f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
683         c.Assert(err, check.IsNil)
684         defer f.Close()
685         buf, err := ioutil.ReadAll(f)
686         c.Check(err, check.IsNil)
687         c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
688 }
689
690 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
691         maxBlockSize = 40
692         defer func() { maxBlockSize = 1 << 26 }()
693
694         var err error
695         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
696         c.Assert(err, check.IsNil)
697
698         const nfiles = 256
699         const ngoroutines = 256
700
701         var wg sync.WaitGroup
702         for n := 0; n < ngoroutines; n++ {
703                 wg.Add(1)
704                 go func(n int) {
705                         defer wg.Done()
706                         expect := make([]byte, 0, 64)
707                         wbytes := []byte("there's no simple explanation for anything important that any of us do")
708                         f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
709                         c.Assert(err, check.IsNil)
710                         defer f.Close()
711                         for i := 0; i < nfiles; i++ {
712                                 trunc := rand.Intn(65)
713                                 woff := rand.Intn(trunc + 1)
714                                 wbytes = wbytes[:rand.Intn(64-woff+1)]
715                                 for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
716                                         buf[i] = 0
717                                 }
718                                 expect = expect[:trunc]
719                                 if trunc < woff+len(wbytes) {
720                                         expect = expect[:woff+len(wbytes)]
721                                 }
722                                 copy(expect[woff:], wbytes)
723                                 f.Truncate(int64(trunc))
724                                 pos, err := f.Seek(int64(woff), io.SeekStart)
725                                 c.Check(pos, check.Equals, int64(woff))
726                                 c.Check(err, check.IsNil)
727                                 n, err := f.Write(wbytes)
728                                 c.Check(n, check.Equals, len(wbytes))
729                                 c.Check(err, check.IsNil)
730                                 pos, err = f.Seek(0, io.SeekStart)
731                                 c.Check(pos, check.Equals, int64(0))
732                                 c.Check(err, check.IsNil)
733                                 buf, err := ioutil.ReadAll(f)
734                                 c.Check(string(buf), check.Equals, string(expect))
735                                 c.Check(err, check.IsNil)
736                         }
737                 }(n)
738         }
739         wg.Wait()
740
741         for n := 0; n < ngoroutines; n++ {
742                 f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
743                 c.Assert(err, check.IsNil)
744                 f.(*filehandle).inode.(*filenode).waitPrune()
745                 s.checkMemSize(c, f)
746                 defer f.Close()
747         }
748
749         root, err := s.fs.Open("/")
750         c.Assert(err, check.IsNil)
751         defer root.Close()
752         fi, err := root.Readdir(-1)
753         c.Check(err, check.IsNil)
754         c.Check(len(fi), check.Equals, nfiles)
755
756         _, err = s.fs.MarshalManifest(".")
757         c.Check(err, check.IsNil)
758         // TODO: check manifest content
759 }
760
761 func (s *CollectionFSSuite) TestRemove(c *check.C) {
762         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
763         c.Assert(err, check.IsNil)
764         err = fs.Mkdir("dir0", 0755)
765         c.Assert(err, check.IsNil)
766         err = fs.Mkdir("dir1", 0755)
767         c.Assert(err, check.IsNil)
768         err = fs.Mkdir("dir1/dir2", 0755)
769         c.Assert(err, check.IsNil)
770         err = fs.Mkdir("dir1/dir3", 0755)
771         c.Assert(err, check.IsNil)
772
773         err = fs.Remove("dir0")
774         c.Check(err, check.IsNil)
775         err = fs.Remove("dir0")
776         c.Check(err, check.Equals, os.ErrNotExist)
777
778         err = fs.Remove("dir1/dir2/.")
779         c.Check(err, check.Equals, ErrInvalidArgument)
780         err = fs.Remove("dir1/dir2/..")
781         c.Check(err, check.Equals, ErrInvalidArgument)
782         err = fs.Remove("dir1")
783         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
784         err = fs.Remove("dir1/dir2/../../../dir1")
785         c.Check(err, check.Equals, ErrDirectoryNotEmpty)
786         err = fs.Remove("dir1/dir3/")
787         c.Check(err, check.IsNil)
788         err = fs.RemoveAll("dir1")
789         c.Check(err, check.IsNil)
790         err = fs.RemoveAll("dir1")
791         c.Check(err, check.IsNil)
792 }
793
794 func (s *CollectionFSSuite) TestRenameError(c *check.C) {
795         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
796         c.Assert(err, check.IsNil)
797         err = fs.Mkdir("first", 0755)
798         c.Assert(err, check.IsNil)
799         err = fs.Mkdir("first/second", 0755)
800         c.Assert(err, check.IsNil)
801         f, err := fs.OpenFile("first/second/file", os.O_CREATE|os.O_WRONLY, 0755)
802         c.Assert(err, check.IsNil)
803         f.Write([]byte{1, 2, 3, 4, 5})
804         f.Close()
805         err = fs.Rename("first", "first/second/third")
806         c.Check(err, check.Equals, ErrInvalidArgument)
807         err = fs.Rename("first", "first/third")
808         c.Check(err, check.Equals, ErrInvalidArgument)
809         err = fs.Rename("first/second", "second")
810         c.Check(err, check.IsNil)
811         f, err = fs.OpenFile("second/file", 0, 0)
812         c.Assert(err, check.IsNil)
813         data, err := ioutil.ReadAll(f)
814         c.Check(err, check.IsNil)
815         c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
816 }
817
818 func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
819         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
820         c.Assert(err, check.IsNil)
821         err = fs.Mkdir("foo", 0755)
822         c.Assert(err, check.IsNil)
823         err = fs.Mkdir("bar", 0755)
824         c.Assert(err, check.IsNil)
825         err = fs.Rename("bar", "baz")
826         c.Check(err, check.IsNil)
827         err = fs.Rename("foo", "baz")
828         c.Check(err, check.NotNil)
829         err = fs.Rename("foo", "baz/")
830         c.Check(err, check.IsNil)
831         err = fs.Rename("baz/foo", ".")
832         c.Check(err, check.Equals, ErrInvalidArgument)
833         err = fs.Rename("baz/foo/", ".")
834         c.Check(err, check.Equals, ErrInvalidArgument)
835 }
836
837 func (s *CollectionFSSuite) TestRename(c *check.C) {
838         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
839         c.Assert(err, check.IsNil)
840         const (
841                 outer = 16
842                 inner = 16
843         )
844         for i := 0; i < outer; i++ {
845                 err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
846                 c.Assert(err, check.IsNil)
847                 for j := 0; j < inner; j++ {
848                         err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
849                         c.Assert(err, check.IsNil)
850                         for _, fnm := range []string{
851                                 fmt.Sprintf("dir%d/file%d", i, j),
852                                 fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
853                         } {
854                                 f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
855                                 c.Assert(err, check.IsNil)
856                                 _, err = f.Write([]byte("beep"))
857                                 c.Assert(err, check.IsNil)
858                                 f.Close()
859                         }
860                 }
861         }
862         var wg sync.WaitGroup
863         for i := 0; i < outer; i++ {
864                 for j := 0; j < inner; j++ {
865                         wg.Add(1)
866                         go func(i, j int) {
867                                 defer wg.Done()
868                                 oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
869                                 newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
870                                 _, err := fs.Open(newname)
871                                 c.Check(err, check.Equals, os.ErrNotExist)
872                                 err = fs.Rename(oldname, newname)
873                                 c.Check(err, check.IsNil)
874                                 f, err := fs.Open(newname)
875                                 c.Check(err, check.IsNil)
876                                 f.Close()
877                         }(i, j)
878
879                         wg.Add(1)
880                         go func(i, j int) {
881                                 defer wg.Done()
882                                 // oldname does not exist
883                                 err := fs.Rename(
884                                         fmt.Sprintf("dir%d/dir%d/missing", i, j),
885                                         fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
886                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
887
888                                 // newname parent dir does not exist
889                                 err = fs.Rename(
890                                         fmt.Sprintf("dir%d/dir%d", i, j),
891                                         fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
892                                 c.Check(err, check.ErrorMatches, `.*does not exist`)
893
894                                 // oldname parent dir is a file
895                                 err = fs.Rename(
896                                         fmt.Sprintf("dir%d/file%d/patherror", i, j),
897                                         fmt.Sprintf("dir%d/irrelevant", i))
898                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
899
900                                 // newname parent dir is a file
901                                 err = fs.Rename(
902                                         fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
903                                         fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
904                                 c.Check(err, check.ErrorMatches, `.*not a directory`)
905                         }(i, j)
906                 }
907         }
908         wg.Wait()
909
910         f, err := fs.OpenFile("dir1/newfile3", 0, 0)
911         c.Assert(err, check.IsNil)
912         c.Check(f.Size(), check.Equals, int64(4))
913         buf, err := ioutil.ReadAll(f)
914         c.Check(buf, check.DeepEquals, []byte("beep"))
915         c.Check(err, check.IsNil)
916         _, err = fs.Open("dir1/dir1/file1")
917         c.Check(err, check.Equals, os.ErrNotExist)
918 }
919
920 func (s *CollectionFSSuite) TestPersist(c *check.C) {
921         maxBlockSize = 1024
922         defer func() { maxBlockSize = 1 << 26 }()
923
924         var err error
925         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
926         c.Assert(err, check.IsNil)
927         err = s.fs.Mkdir("d:r", 0755)
928         c.Assert(err, check.IsNil)
929
930         expect := map[string][]byte{}
931
932         var wg sync.WaitGroup
933         for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
934                 buf := make([]byte, 500)
935                 rand.Read(buf)
936                 expect[name] = buf
937
938                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
939                 c.Assert(err, check.IsNil)
940                 // Note: we don't close the file until after the test
941                 // is done. Writes to unclosed files should persist.
942                 defer f.Close()
943
944                 wg.Add(1)
945                 go func() {
946                         defer wg.Done()
947                         for i := 0; i < len(buf); i += 5 {
948                                 _, err := f.Write(buf[i : i+5])
949                                 c.Assert(err, check.IsNil)
950                         }
951                 }()
952         }
953         wg.Wait()
954
955         m, err := s.fs.MarshalManifest(".")
956         c.Check(err, check.IsNil)
957         c.Logf("%q", m)
958
959         root, err := s.fs.Open("/")
960         c.Assert(err, check.IsNil)
961         defer root.Close()
962         fi, err := root.Readdir(-1)
963         c.Check(err, check.IsNil)
964         c.Check(len(fi), check.Equals, 4)
965
966         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
967         c.Assert(err, check.IsNil)
968
969         root, err = persisted.Open("/")
970         c.Assert(err, check.IsNil)
971         defer root.Close()
972         fi, err = root.Readdir(-1)
973         c.Check(err, check.IsNil)
974         c.Check(len(fi), check.Equals, 4)
975
976         for name, content := range expect {
977                 c.Logf("read %q", name)
978                 f, err := persisted.Open(name)
979                 c.Assert(err, check.IsNil)
980                 defer f.Close()
981                 buf, err := ioutil.ReadAll(f)
982                 c.Check(err, check.IsNil)
983                 c.Check(buf, check.DeepEquals, content)
984         }
985 }
986
987 func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
988         var err error
989         s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
990         c.Assert(err, check.IsNil)
991         for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
992                 err = s.fs.Mkdir(name, 0755)
993                 c.Assert(err, check.IsNil)
994         }
995
996         expect := map[string][]byte{
997                 "0":                nil,
998                 "00":               {},
999                 "one":              {1},
1000                 "dir/0":            nil,
1001                 "dir/two":          {1, 2},
1002                 "dir/zero":         nil,
1003                 "dir/zerodir/zero": nil,
1004                 "zero/zero/zero":   nil,
1005         }
1006         for name, data := range expect {
1007                 f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
1008                 c.Assert(err, check.IsNil)
1009                 if data != nil {
1010                         _, err := f.Write(data)
1011                         c.Assert(err, check.IsNil)
1012                 }
1013                 f.Close()
1014         }
1015
1016         m, err := s.fs.MarshalManifest(".")
1017         c.Check(err, check.IsNil)
1018         c.Logf("%q", m)
1019
1020         persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
1021         c.Assert(err, check.IsNil)
1022
1023         for name, data := range expect {
1024                 _, err = persisted.Open("bogus-" + name)
1025                 c.Check(err, check.NotNil)
1026
1027                 f, err := persisted.Open(name)
1028                 c.Assert(err, check.IsNil)
1029
1030                 if data == nil {
1031                         data = []byte{}
1032                 }
1033                 buf, err := ioutil.ReadAll(f)
1034                 c.Check(err, check.IsNil)
1035                 c.Check(buf, check.DeepEquals, data)
1036         }
1037
1038         expectDir := map[string]int{
1039                 "empty":           0,
1040                 "not empty":       1,
1041                 "not empty/empty": 0,
1042         }
1043         for name, expectLen := range expectDir {
1044                 _, err := persisted.Open(name + "/bogus")
1045                 c.Check(err, check.NotNil)
1046
1047                 d, err := persisted.Open(name)
1048                 defer d.Close()
1049                 c.Check(err, check.IsNil)
1050                 fi, err := d.Readdir(-1)
1051                 c.Check(err, check.IsNil)
1052                 c.Check(fi, check.HasLen, expectLen)
1053         }
1054 }
1055
1056 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
1057         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1058         c.Assert(err, check.IsNil)
1059
1060         f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
1061         c.Check(f, check.IsNil)
1062         c.Check(err, check.ErrorMatches, `file does not exist`)
1063
1064         f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
1065         c.Assert(err, check.IsNil)
1066         defer f.Close()
1067         n, err := f.Write([]byte{1, 2, 3})
1068         c.Check(n, check.Equals, 0)
1069         c.Check(err, check.ErrorMatches, `read-only file`)
1070         n, err = f.Read(make([]byte, 1))
1071         c.Check(n, check.Equals, 0)
1072         c.Check(err, check.Equals, io.EOF)
1073         f, err = fs.OpenFile("new", os.O_RDWR, 0)
1074         c.Assert(err, check.IsNil)
1075         defer f.Close()
1076         _, err = f.Write([]byte{4, 5, 6})
1077         c.Check(err, check.IsNil)
1078         fi, err := f.Stat()
1079         c.Assert(err, check.IsNil)
1080         c.Check(fi.Size(), check.Equals, int64(3))
1081
1082         f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
1083         c.Assert(err, check.IsNil)
1084         defer f.Close()
1085         pos, err := f.Seek(0, io.SeekEnd)
1086         c.Check(pos, check.Equals, int64(0))
1087         c.Check(err, check.IsNil)
1088         fi, err = f.Stat()
1089         c.Assert(err, check.IsNil)
1090         c.Check(fi.Size(), check.Equals, int64(0))
1091         fs.Remove("new")
1092
1093         buf := make([]byte, 64)
1094         f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
1095         c.Assert(err, check.IsNil)
1096         f.Write([]byte{1, 2, 3})
1097         f.Seek(0, io.SeekStart)
1098         n, _ = f.Read(buf[:1])
1099         c.Check(n, check.Equals, 1)
1100         c.Check(buf[:1], check.DeepEquals, []byte{1})
1101         pos, err = f.Seek(0, io.SeekCurrent)
1102         c.Assert(err, check.IsNil)
1103         c.Check(pos, check.Equals, int64(1))
1104         f.Write([]byte{4, 5, 6})
1105         pos, err = f.Seek(0, io.SeekCurrent)
1106         c.Assert(err, check.IsNil)
1107         c.Check(pos, check.Equals, int64(6))
1108         f.Seek(0, io.SeekStart)
1109         n, err = f.Read(buf)
1110         c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
1111         c.Check(err, check.Equals, io.EOF)
1112         f.Close()
1113
1114         f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
1115         c.Assert(err, check.IsNil)
1116         pos, err = f.Seek(0, io.SeekCurrent)
1117         c.Check(pos, check.Equals, int64(0))
1118         c.Check(err, check.IsNil)
1119         f.Read(buf[:3])
1120         pos, _ = f.Seek(0, io.SeekCurrent)
1121         c.Check(pos, check.Equals, int64(3))
1122         f.Write([]byte{7, 8, 9})
1123         pos, err = f.Seek(0, io.SeekCurrent)
1124         c.Check(err, check.IsNil)
1125         c.Check(pos, check.Equals, int64(9))
1126         f.Close()
1127
1128         f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
1129         c.Assert(err, check.IsNil)
1130         n, err = f.Write([]byte{3, 2, 1})
1131         c.Check(n, check.Equals, 3)
1132         c.Check(err, check.IsNil)
1133         pos, _ = f.Seek(0, io.SeekCurrent)
1134         c.Check(pos, check.Equals, int64(3))
1135         pos, _ = f.Seek(0, io.SeekStart)
1136         c.Check(pos, check.Equals, int64(0))
1137         n, err = f.Read(buf)
1138         c.Check(n, check.Equals, 0)
1139         c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
1140         f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
1141         c.Assert(err, check.IsNil)
1142         n, _ = f.Read(buf)
1143         c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
1144
1145         f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
1146         c.Check(f, check.IsNil)
1147         c.Check(err, check.NotNil)
1148
1149         f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
1150         c.Check(f, check.IsNil)
1151         c.Check(err, check.ErrorMatches, `invalid flag.*`)
1152 }
1153
1154 func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
1155         defer func(cw, mbs int) {
1156                 concurrentWriters = cw
1157                 maxBlockSize = mbs
1158         }(concurrentWriters, maxBlockSize)
1159         concurrentWriters = 2
1160         maxBlockSize = 1024
1161
1162         proceed := make(chan struct{})
1163         var started, concurrent int32
1164         blk2done := false
1165         s.kc.onWrite = func([]byte) error {
1166                 atomic.AddInt32(&concurrent, 1)
1167                 switch atomic.AddInt32(&started, 1) {
1168                 case 1:
1169                         // Wait until block 2 starts and finishes, and block 3 starts
1170                         select {
1171                         case <-proceed:
1172                                 c.Check(blk2done, check.Equals, true)
1173                         case <-time.After(time.Second):
1174                                 c.Error("timed out")
1175                         }
1176                 case 2:
1177                         time.Sleep(time.Millisecond)
1178                         blk2done = true
1179                 case 3:
1180                         close(proceed)
1181                 default:
1182                         time.Sleep(time.Millisecond)
1183                 }
1184                 c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
1185                 return nil
1186         }
1187
1188         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1189         c.Assert(err, check.IsNil)
1190         f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
1191         c.Assert(err, check.IsNil)
1192         defer f.Close()
1193
1194         data := make([]byte, 500)
1195         rand.Read(data)
1196
1197         for i := 0; i < 100; i++ {
1198                 n, err := f.Write(data)
1199                 c.Assert(n, check.Equals, len(data))
1200                 c.Assert(err, check.IsNil)
1201         }
1202
1203         currentMemExtents := func() (memExtents []int) {
1204                 for idx, e := range f.(*filehandle).inode.(*filenode).segments {
1205                         switch e.(type) {
1206                         case *memSegment:
1207                                 memExtents = append(memExtents, idx)
1208                         }
1209                 }
1210                 return
1211         }
1212         f.(*filehandle).inode.(*filenode).waitPrune()
1213         c.Check(currentMemExtents(), check.HasLen, 1)
1214
1215         m, err := fs.MarshalManifest(".")
1216         c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
1217         c.Check(err, check.IsNil)
1218         c.Check(currentMemExtents(), check.HasLen, 0)
1219 }
1220
1221 // Ensure blocks get flushed to disk if a lot of data is written to
1222 // small files/directories without calling sync().
1223 //
1224 // Write four 512KiB files into each of 256 top-level dirs (total
1225 // 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
1226 // exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
1227 // 2MiB).
1228 func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
1229         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1230         c.Assert(err, check.IsNil)
1231
1232         s.kc.onWrite = func([]byte) error {
1233                 // discard flushed data -- otherwise the stub will use
1234                 // unlimited memory
1235                 time.Sleep(time.Millisecond)
1236                 s.kc.Lock()
1237                 defer s.kc.Unlock()
1238                 s.kc.blocks = map[string][]byte{}
1239                 return nil
1240         }
1241         for i := 0; i < 256; i++ {
1242                 buf := bytes.NewBuffer(make([]byte, 524288))
1243                 fmt.Fprintf(buf, "test file in dir%d", i)
1244
1245                 dir := fmt.Sprintf("dir%d", i)
1246                 fs.Mkdir(dir, 0755)
1247                 for j := 0; j < 2; j++ {
1248                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1249                         c.Assert(err, check.IsNil)
1250                         defer f.Close()
1251                         _, err = io.Copy(f, buf)
1252                         c.Assert(err, check.IsNil)
1253                 }
1254
1255                 if i%8 == 0 {
1256                         fs.Flush("", true)
1257                 }
1258
1259                 size := fs.MemorySize()
1260                 if !c.Check(size <= 1<<24, check.Equals, true) {
1261                         c.Logf("at dir%d fs.MemorySize()=%d", i, size)
1262                         return
1263                 }
1264         }
1265 }
1266
1267 // Ensure short blocks at the end of a stream don't get flushed by
1268 // Flush(false).
1269 //
1270 // Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
1271 // blocks have been flushed while 8x 3MiB is still buffered in memory.
1272 func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
1273         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1274         c.Assert(err, check.IsNil)
1275
1276         var flushed int64
1277         s.kc.onWrite = func(p []byte) error {
1278                 atomic.AddInt64(&flushed, int64(len(p)))
1279                 return nil
1280         }
1281
1282         nDirs := int64(8)
1283         nFiles := int64(67)
1284         megabyte := make([]byte, 1<<20)
1285         for i := int64(0); i < nDirs; i++ {
1286                 dir := fmt.Sprintf("dir%d", i)
1287                 fs.Mkdir(dir, 0755)
1288                 for j := int64(0); j < nFiles; j++ {
1289                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1290                         c.Assert(err, check.IsNil)
1291                         defer f.Close()
1292                         _, err = f.Write(megabyte)
1293                         c.Assert(err, check.IsNil)
1294                 }
1295         }
1296         inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
1297         c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
1298         c.Check(flushed, check.Equals, int64(0))
1299
1300         waitForFlush := func(expectUnflushed, expectFlushed int64) {
1301                 for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
1302                 }
1303                 c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
1304                 c.Check(flushed, check.Equals, expectFlushed)
1305         }
1306
1307         // Nothing flushed yet
1308         waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1309
1310         // Flushing a non-empty dir "/" is non-recursive and there are
1311         // no top-level files, so this has no effect
1312         fs.Flush("/", false)
1313         waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
1314
1315         // Flush the full block in dir0
1316         fs.Flush("dir0", false)
1317         bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1318         waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
1319
1320         err = fs.Flush("dir-does-not-exist", false)
1321         c.Check(err, check.NotNil)
1322
1323         // Flush full blocks in all dirs
1324         fs.Flush("", false)
1325         waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
1326
1327         // Flush non-full blocks, too
1328         fs.Flush("", true)
1329         smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
1330         waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
1331 }
1332
1333 // Even when writing lots of files/dirs from different goroutines, as
1334 // long as Flush(dir,false) is called after writing each file,
1335 // unflushed data should be limited to one full block per
1336 // concurrentWriter, plus one nearly-full block at the end of each
1337 // dir/stream.
1338 func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
1339         nDirs := int64(8)
1340         maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
1341
1342         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1343         c.Assert(err, check.IsNil)
1344
1345         release := make(chan struct{})
1346         timeout := make(chan struct{})
1347         time.AfterFunc(10*time.Second, func() { close(timeout) })
1348         var putCount, concurrency int64
1349         var unflushed int64
1350         s.kc.onWrite = func(p []byte) error {
1351                 defer atomic.AddInt64(&unflushed, -int64(len(p)))
1352                 cur := atomic.AddInt64(&concurrency, 1)
1353                 defer atomic.AddInt64(&concurrency, -1)
1354                 pc := atomic.AddInt64(&putCount, 1)
1355                 if pc < int64(concurrentWriters) {
1356                         // Block until we reach concurrentWriters, to
1357                         // make sure we're really accepting concurrent
1358                         // writes.
1359                         select {
1360                         case <-release:
1361                         case <-timeout:
1362                                 c.Error("timeout")
1363                         }
1364                 } else if pc == int64(concurrentWriters) {
1365                         // Unblock the first N-1 PUT reqs.
1366                         close(release)
1367                 }
1368                 c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
1369                 c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
1370                 return nil
1371         }
1372
1373         var owg sync.WaitGroup
1374         megabyte := make([]byte, 1<<20)
1375         for i := int64(0); i < nDirs; i++ {
1376                 dir := fmt.Sprintf("dir%d", i)
1377                 fs.Mkdir(dir, 0755)
1378                 owg.Add(1)
1379                 go func() {
1380                         defer owg.Done()
1381                         defer fs.Flush(dir, true)
1382                         var iwg sync.WaitGroup
1383                         defer iwg.Wait()
1384                         for j := 0; j < 67; j++ {
1385                                 iwg.Add(1)
1386                                 go func(j int) {
1387                                         defer iwg.Done()
1388                                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1389                                         c.Assert(err, check.IsNil)
1390                                         defer f.Close()
1391                                         n, err := f.Write(megabyte)
1392                                         c.Assert(err, check.IsNil)
1393                                         atomic.AddInt64(&unflushed, int64(n))
1394                                         fs.Flush(dir, false)
1395                                 }(j)
1396                         }
1397                 }()
1398         }
1399         owg.Wait()
1400         fs.Flush("", true)
1401 }
1402
1403 func (s *CollectionFSSuite) TestFlushStress(c *check.C) {
1404         done := false
1405         defer func() { done = true }()
1406         time.AfterFunc(10*time.Second, func() {
1407                 if !done {
1408                         pprof.Lookup("goroutine").WriteTo(os.Stderr, 1)
1409                         panic("timeout")
1410                 }
1411         })
1412
1413         wrote := 0
1414         s.kc.onWrite = func(p []byte) error {
1415                 s.kc.Lock()
1416                 s.kc.blocks = map[string][]byte{}
1417                 wrote++
1418                 defer c.Logf("wrote block %d, %d bytes", wrote, len(p))
1419                 s.kc.Unlock()
1420                 time.Sleep(20 * time.Millisecond)
1421                 return nil
1422         }
1423
1424         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1425         c.Assert(err, check.IsNil)
1426
1427         data := make([]byte, 1<<20)
1428         for i := 0; i < 3; i++ {
1429                 dir := fmt.Sprintf("dir%d", i)
1430                 fs.Mkdir(dir, 0755)
1431                 for j := 0; j < 200; j++ {
1432                         data[0] = byte(j)
1433                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
1434                         c.Assert(err, check.IsNil)
1435                         _, err = f.Write(data)
1436                         c.Assert(err, check.IsNil)
1437                         f.Close()
1438                         fs.Flush(dir, false)
1439                 }
1440                 _, err := fs.MarshalManifest(".")
1441                 c.Check(err, check.IsNil)
1442         }
1443 }
1444
1445 func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
1446         s.kc.onWrite = func([]byte) error {
1447                 s.kc.Lock()
1448                 s.kc.blocks = map[string][]byte{}
1449                 s.kc.Unlock()
1450                 return nil
1451         }
1452         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
1453         c.Assert(err, check.IsNil)
1454         for _, blocksize := range []int{8, 1000000} {
1455                 dir := fmt.Sprintf("dir%d", blocksize)
1456                 err = fs.Mkdir(dir, 0755)
1457                 c.Assert(err, check.IsNil)
1458                 data := make([]byte, blocksize)
1459                 for i := 0; i < 100; i++ {
1460                         f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
1461                         c.Assert(err, check.IsNil)
1462                         _, err = f.Write(data)
1463                         c.Assert(err, check.IsNil)
1464                         f.Close()
1465                         fs.Flush(dir, false)
1466                 }
1467                 fs.Flush(dir, true)
1468                 _, err := fs.MarshalManifest(".")
1469                 c.Check(err, check.IsNil)
1470         }
1471 }
1472
1473 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
1474         for _, txt := range []string{
1475                 "\n",
1476                 ".\n",
1477                 ". \n",
1478                 ". d41d8cd98f00b204e9800998ecf8427e+0\n",
1479                 ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
1480                 ". 0:0:foo\n",
1481                 ".  0:0:foo\n",
1482                 ". 0:0:foo 0:0:bar\n",
1483                 ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
1484                 ". d41d8cd98f00b204e9800998ecf8427e+0 :0:0:foo\n",
1485                 ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
1486                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
1487                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
1488                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
1489                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
1490                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
1491                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
1492                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
1493                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
1494                 ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
1495                 "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
1496         } {
1497                 c.Logf("<-%q", txt)
1498                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1499                 c.Check(fs, check.IsNil)
1500                 c.Logf("-> %s", err)
1501                 c.Check(err, check.NotNil)
1502         }
1503 }
1504
1505 func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
1506         for _, txt := range []string{
1507                 "",
1508                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
1509                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
1510                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
1511                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
1512                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
1513                 ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
1514         } {
1515                 c.Logf("<-%q", txt)
1516                 fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
1517                 c.Check(err, check.IsNil)
1518                 c.Check(fs, check.NotNil)
1519         }
1520 }
1521
1522 var fakeLocator = func() []string {
1523         locs := make([]string, 10)
1524         for i := range locs {
1525                 locs[i] = fmt.Sprintf("%x+%d", md5.Sum(make([]byte, i)), i)
1526                 if i%2 == 1 {
1527                         locs[i] += "+Awhatever+Zotherhints"
1528                 }
1529         }
1530         return locs
1531 }()
1532
1533 func (s *CollectionFSSuite) TestReplaceSegments_HappyPath(c *check.C) {
1534         fs, err := (&Collection{
1535                 ManifestText: ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n",
1536         }).FileSystem(nil, &keepClientStub{})
1537         c.Assert(err, check.IsNil)
1538         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1539                 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1540                 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 1, 2},
1541         })
1542         c.Check(changed, check.Equals, true)
1543         c.Check(err, check.IsNil)
1544         mtxt, err := fs.MarshalManifest(".")
1545         c.Check(err, check.IsNil)
1546         c.Check(mtxt, check.Equals, ". "+fakeLocator[3]+" 0:3:file3\n")
1547 }
1548
1549 func (s *CollectionFSSuite) TestReplaceSegments_InvalidOffset(c *check.C) {
1550         origtxt := ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n"
1551         fs, err := (&Collection{
1552                 ManifestText: origtxt,
1553         }).FileSystem(nil, &keepClientStub{})
1554         c.Assert(err, check.IsNil)
1555         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1556                 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1557                 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 2, 2},
1558         })
1559         c.Check(changed, check.Equals, false)
1560         c.Check(err, check.ErrorMatches, `invalid replacement: offset 2 \+ length 2 > block size 3`)
1561         mtxt, err := fs.MarshalManifest(".")
1562         c.Check(err, check.IsNil)
1563         c.Check(mtxt, check.Equals, origtxt)
1564 }
1565
1566 func (s *CollectionFSSuite) TestReplaceSegments_LengthMismatch(c *check.C) {
1567         origtxt := ". " + fakeLocator[1] + " " + fakeLocator[2] + " 0:3:file3\n"
1568         fs, err := (&Collection{
1569                 ManifestText: origtxt,
1570         }).FileSystem(nil, &keepClientStub{})
1571         c.Assert(err, check.IsNil)
1572         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1573                 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[3], 0, 1},
1574                 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[3], 0, 3},
1575         })
1576         c.Check(changed, check.Equals, false)
1577         c.Check(err, check.ErrorMatches, `mismatched length: replacing segment length 2 with segment length 3`)
1578         mtxt, err := fs.MarshalManifest(".")
1579         c.Check(err, check.IsNil)
1580         c.Check(mtxt, check.Equals, origtxt)
1581 }
1582
1583 func (s *CollectionFSSuite) TestReplaceSegments_SkipUnreferenced(c *check.C) {
1584         fs, err := (&Collection{
1585                 ManifestText: ". " + fakeLocator[1] + " " + fakeLocator[2] + " " + fakeLocator[3] + " 0:6:file6\n",
1586         }).FileSystem(nil, &keepClientStub{})
1587         c.Assert(err, check.IsNil)
1588         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1589                 BlockSegment{fakeLocator[1], 0, 1}: BlockSegment{fakeLocator[4], 0, 1}, // skipped because [5] unref
1590                 BlockSegment{fakeLocator[2], 0, 2}: BlockSegment{fakeLocator[4], 1, 2}, // skipped because [5] unref
1591                 BlockSegment{fakeLocator[5], 0, 2}: BlockSegment{fakeLocator[4], 1, 2}, // [5] unreferenced in orig manifest
1592                 BlockSegment{fakeLocator[3], 0, 3}: BlockSegment{fakeLocator[6], 3, 3}, // applied
1593         })
1594         c.Check(changed, check.Equals, true)
1595         c.Check(err, check.IsNil)
1596         mtxt, err := fs.MarshalManifest(".")
1597         c.Check(err, check.IsNil)
1598         c.Check(mtxt, check.Equals, ". "+fakeLocator[1]+" "+fakeLocator[2]+" "+fakeLocator[6]+" 0:3:file6 6:3:file6\n")
1599 }
1600
1601 func (s *CollectionFSSuite) TestReplaceSegments_SkipIncompleteSegment(c *check.C) {
1602         origtxt := ". " + fakeLocator[2] + " " + fakeLocator[3] + " 0:5:file5\n"
1603         fs, err := (&Collection{
1604                 ManifestText: origtxt,
1605         }).FileSystem(nil, &keepClientStub{})
1606         c.Assert(err, check.IsNil)
1607         changed, err := fs.ReplaceSegments(map[BlockSegment]BlockSegment{
1608                 BlockSegment{fakeLocator[2], 0, 1}: BlockSegment{fakeLocator[4], 0, 1}, // length=1 does not match the length=2 segment
1609         })
1610         c.Check(changed, check.Equals, false)
1611         c.Check(err, check.IsNil)
1612         mtxt, err := fs.MarshalManifest(".")
1613         c.Check(err, check.IsNil)
1614         c.Check(mtxt, check.Equals, origtxt)
1615 }
1616
1617 func (s *CollectionFSSuite) testPlanRepack(c *check.C, opts RepackOptions, manifest string, expectPlan [][]storedSegment) {
1618         fs, err := (&Collection{ManifestText: manifest}).FileSystem(nil, s.kc)
1619         c.Assert(err, check.IsNil)
1620         cfs := fs.(*collectionFileSystem)
1621         repl, err := cfs.planRepack(context.Background(), opts, cfs.root.(*dirnode))
1622         c.Assert(err, check.IsNil)
1623
1624         // we always expect kc==cfs, so we fill this in instead of
1625         // requiring each test case to repeat it
1626         for _, pp := range expectPlan {
1627                 for i := range pp {
1628                         pp[i].kc = cfs
1629                 }
1630         }
1631         c.Check(repl, check.DeepEquals, expectPlan)
1632 }
1633
1634 func (s *CollectionFSSuite) TestPlanRepack_2x32M(c *check.C) {
1635         s.testPlanRepack(c,
1636                 RepackOptions{Full: true},
1637                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000 0:64000000:file\n",
1638                 [][]storedSegment{
1639                         {
1640                                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000", size: 32000000, length: 32000000, offset: 0},
1641                                 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000", size: 32000000, length: 32000000, offset: 0},
1642                         },
1643                 })
1644 }
1645
1646 func (s *CollectionFSSuite) TestPlanRepack_2x32M_Cached(c *check.C) {
1647         s.kc.cached = map[string]bool{
1648                 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa": true,
1649                 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb": true,
1650         }
1651         s.testPlanRepack(c,
1652                 RepackOptions{Full: true, CachedOnly: true},
1653                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000 0:64000000:file\n",
1654                 [][]storedSegment{
1655                         {
1656                                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000", size: 32000000, length: 32000000, offset: 0},
1657                                 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000", size: 32000000, length: 32000000, offset: 0},
1658                         },
1659                 })
1660 }
1661
1662 func (s *CollectionFSSuite) TestPlanRepack_2x32M_OneCached(c *check.C) {
1663         s.kc.cached = map[string]bool{
1664                 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa": true,
1665         }
1666         s.testPlanRepack(c,
1667                 RepackOptions{Full: true, CachedOnly: true},
1668                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000 0:64000000:file\n",
1669                 nil)
1670 }
1671
1672 func (s *CollectionFSSuite) TestPlanRepack_3x32M_TwoCached(c *check.C) {
1673         s.kc.cached = map[string]bool{
1674                 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa": true,
1675                 "cccccccccccccccccccccccccccccccc": true,
1676         }
1677         s.testPlanRepack(c,
1678                 RepackOptions{Full: true, CachedOnly: true},
1679                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000 cccccccccccccccccccccccccccccccc+32000000 0:96000000:file\n",
1680                 [][]storedSegment{
1681                         {
1682                                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000", size: 32000000, length: 32000000, offset: 0},
1683                                 {locator: "cccccccccccccccccccccccccccccccc+32000000", size: 32000000, length: 32000000, offset: 0},
1684                         },
1685                 })
1686 }
1687
1688 func (s *CollectionFSSuite) TestPlanRepack_2x32Mi(c *check.C) {
1689         s.testPlanRepack(c,
1690                 RepackOptions{Full: true},
1691                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+33554432 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+33554432 0:67108864:file\n",
1692                 nil)
1693 }
1694
1695 func (s *CollectionFSSuite) TestPlanRepack_2x32MiMinus1(c *check.C) {
1696         s.testPlanRepack(c,
1697                 RepackOptions{Full: true},
1698                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+33554431 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+33554431 0:67108862:file\n",
1699                 [][]storedSegment{
1700                         {
1701                                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+33554431", size: 33554431, length: 33554431, offset: 0},
1702                                 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+33554431", size: 33554431, length: 33554431, offset: 0},
1703                         },
1704                 })
1705 }
1706
1707 func (s *CollectionFSSuite) TestPlanRepack_3x32M(c *check.C) {
1708         s.testPlanRepack(c,
1709                 RepackOptions{Full: true},
1710                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000 cccccccccccccccccccccccccccccccc+32000000 0:96000000:file\n",
1711                 [][]storedSegment{
1712                         {
1713                                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+32000000", size: 32000000, length: 32000000, offset: 0},
1714                                 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+32000000", size: 32000000, length: 32000000, offset: 0},
1715                         },
1716                 })
1717 }
1718
1719 func (s *CollectionFSSuite) TestPlanRepack_3x42M(c *check.C) {
1720         // Each block is more than half full, so do nothing.
1721         s.testPlanRepack(c,
1722                 RepackOptions{Full: true},
1723                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+42000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+42000000 cccccccccccccccccccccccccccccccc+42000000 0:126000000:file\n",
1724                 nil)
1725 }
1726
1727 func (s *CollectionFSSuite) TestPlanRepack_Premature(c *check.C) {
1728         // Repacking would reduce to one block, but it would still be
1729         // too short to be worthwhile, so do nothing.
1730         s.testPlanRepack(c,
1731                 RepackOptions{Full: true},
1732                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+123 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+123 cccccccccccccccccccccccccccccccc+123 0:369:file\n",
1733                 nil)
1734 }
1735
1736 func (s *CollectionFSSuite) TestPlanRepack_4x22M_NonAdjacent(c *check.C) {
1737         // Repack the first three 22M blocks into one 66M block.
1738         // Don't touch the 44M blocks or the final 22M block.
1739         s.testPlanRepack(c,
1740                 RepackOptions{Full: true},
1741                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+44000000 cccccccccccccccccccccccccccccccc+22000000 dddddddddddddddddddddddddddddddd+44000000 eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee+22000000 ffffffffffffffffffffffffffffffff+44000000 00000000000000000000000000000000+22000000 0:220000000:file\n",
1742                 [][]storedSegment{
1743                         {
1744                                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000", size: 22000000, length: 22000000, offset: 0},
1745                                 {locator: "cccccccccccccccccccccccccccccccc+22000000", size: 22000000, length: 22000000, offset: 0},
1746                                 {locator: "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee+22000000", size: 22000000, length: 22000000, offset: 0},
1747                         },
1748                 })
1749 }
1750
1751 func (s *CollectionFSSuite) TestPlanRepack_2x22M_DuplicateBlock(c *check.C) {
1752         // Repack a+b+c, not a+b+a.
1753         s.testPlanRepack(c,
1754                 RepackOptions{Full: true},
1755                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+22000000 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 0:66000000:file\n"+
1756                         "./dir cccccccccccccccccccccccccccccccc+22000000 0:22000000:file\n",
1757                 [][]storedSegment{
1758                         {
1759                                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000", size: 22000000, length: 22000000, offset: 0},
1760                                 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+22000000", size: 22000000, length: 22000000, offset: 0},
1761                                 {locator: "cccccccccccccccccccccccccccccccc+22000000", size: 22000000, length: 22000000, offset: 0},
1762                         },
1763                 })
1764 }
1765
1766 func (s *CollectionFSSuite) TestPlanRepack_2x22M_DuplicateBlock_TooShort(c *check.C) {
1767         // Repacking a+b would not meet the 32MiB threshold.
1768         s.testPlanRepack(c,
1769                 RepackOptions{Full: true},
1770                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+1 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+22000000 0:44000001:file\n",
1771                 nil)
1772 }
1773
1774 func (s *CollectionFSSuite) TestPlanRepack_SiblingsTogether(c *check.C) {
1775         // Pack sibling files' ("a" and "c") segments together before
1776         // other subdirs ("b/b"), even though subdir "b" sorts between
1777         // "a" and "c".
1778         s.testPlanRepack(c,
1779                 RepackOptions{Full: true},
1780                 ". aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+15000000 cccccccccccccccccccccccccccccccc+15000000 0:15000000:a 15000000:15000000:c\n"+
1781                         "./b bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15000000 0:15000000:b\n",
1782                 [][]storedSegment{
1783                         {
1784                                 {locator: "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa+15000000", size: 15000000, length: 15000000, offset: 0},
1785                                 {locator: "cccccccccccccccccccccccccccccccc+15000000", size: 15000000, length: 15000000, offset: 0},
1786                                 {locator: "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb+15000000", size: 15000000, length: 15000000, offset: 0},
1787                         },
1788                 })
1789 }
1790
1791 func (s *CollectionFSSuite) TestRepackData(c *check.C) {
1792         fs, err := (&Collection{}).FileSystem(nil, s.kc)
1793         c.Assert(err, check.IsNil)
1794         cfs := fs.(*collectionFileSystem)
1795
1796         testBlockWritten := make(map[int]string)
1797         // testSegment(N) returns an N-byte segment of a block
1798         // containing repeated byte N%256.  The segment's offset
1799         // within the block is N/1000000 (*).  The block also has
1800         // N/1000000 null bytes following the segment(*).
1801         //
1802         // If N=404, the block is not readable.
1803         //
1804         // (*) ...unless that would result in an oversize block.
1805         testSegment := func(testSegmentNum int) storedSegment {
1806                 length := testSegmentNum
1807                 offset := testSegmentNum / 1000000
1808                 if offset+length > maxBlockSize {
1809                         offset = 0
1810                 }
1811                 size := testSegmentNum + offset
1812                 if size+offset <= maxBlockSize {
1813                         size += offset
1814                 }
1815                 if _, stored := testBlockWritten[testSegmentNum]; !stored {
1816                         data := make([]byte, size)
1817                         for b := range data[offset : offset+length] {
1818                                 data[b] = byte(testSegmentNum & 0xff)
1819                         }
1820                         resp, err := s.kc.BlockWrite(context.Background(), BlockWriteOptions{Data: data})
1821                         c.Assert(err, check.IsNil)
1822                         testBlockWritten[testSegmentNum] = resp.Locator
1823                         if testSegmentNum == 404 {
1824                                 delete(s.kc.blocks, resp.Locator[:32])
1825                         }
1826                 }
1827                 return storedSegment{
1828                         kc:      cfs,
1829                         locator: testBlockWritten[testSegmentNum],
1830                         size:    size,
1831                         length:  length,
1832                         offset:  offset,
1833                 }
1834         }
1835         for trialIndex, trial := range []struct {
1836                 label string
1837                 // "input" here has the same shape as repackData's
1838                 // [][]storedSegment argument, but uses int N has
1839                 // shorthand for testSegment(N).
1840                 input              [][]int
1841                 onWrite            func([]byte) error
1842                 expectRepackedLen  int
1843                 expectErrorMatches string
1844         }{
1845                 {
1846                         label:             "one {3 blocks to 1} merge",
1847                         input:             [][]int{{1, 2, 3}},
1848                         expectRepackedLen: 3,
1849                 },
1850                 {
1851                         label:             "two {3 blocks to 1} merges",
1852                         input:             [][]int{{1, 2, 3}, {4, 5, 6}},
1853                         expectRepackedLen: 6,
1854                 },
1855                 {
1856                         label:             "merge two {3 blocks to 1} merges",
1857                         input:             [][]int{{1, 2, 3}, {4, 5, 6}},
1858                         expectRepackedLen: 6,
1859                 },
1860                 {
1861                         label:             "no-op",
1862                         input:             nil,
1863                         expectRepackedLen: 0,
1864                 },
1865                 {
1866                         label:             "merge 3 blocks plus a zero-length segment -- not expected to be used, but should work",
1867                         input:             [][]int{{1, 2, 0, 3}},
1868                         expectRepackedLen: 4,
1869                 },
1870                 {
1871                         label:             "merge a single segment -- not expected to be used, but should work",
1872                         input:             [][]int{{12345}},
1873                         expectRepackedLen: 1,
1874                 },
1875                 {
1876                         label:             "merge a single empty segment -- not expected to be used, but should work",
1877                         input:             [][]int{{0}},
1878                         expectRepackedLen: 1,
1879                 },
1880                 {
1881                         label:             "merge zero segments -- not expected to be used, but should work",
1882                         input:             [][]int{{}},
1883                         expectRepackedLen: 0,
1884                 },
1885                 {
1886                         label:             "merge same orig segment into two different replacements -- not expected to be used, but should work",
1887                         input:             [][]int{{1, 22, 3}, {4, 22, 6}},
1888                         expectRepackedLen: 5,
1889                 },
1890                 {
1891                         label:             "identical merges -- not expected to be used, but should work",
1892                         input:             [][]int{{11, 22, 33}, {11, 22, 33}},
1893                         expectRepackedLen: 3,
1894                 },
1895                 {
1896                         label:              "read error on first segment",
1897                         input:              [][]int{{404, 2, 3}},
1898                         expectRepackedLen:  0,
1899                         expectErrorMatches: "404 block not found",
1900                 },
1901                 {
1902                         label:              "read error on second segment",
1903                         input:              [][]int{{1, 404, 3}},
1904                         expectErrorMatches: "404 block not found",
1905                 },
1906                 {
1907                         label:              "read error on last segment",
1908                         input:              [][]int{{1, 2, 404}},
1909                         expectErrorMatches: "404 block not found",
1910                 },
1911                 {
1912                         label:              "merge does not fit in one block",
1913                         input:              [][]int{{50000000, 20000000}},
1914                         expectErrorMatches: "combined length 70000000 would exceed maximum block size 67108864",
1915                 },
1916                 {
1917                         label:              "write error",
1918                         input:              [][]int{{1, 2, 3}},
1919                         onWrite:            func(p []byte) error { return errors.New("stub write error") },
1920                         expectErrorMatches: "stub write error",
1921                 },
1922         } {
1923                 c.Logf("trial %d: %s", trialIndex, trial.label)
1924                 var input [][]storedSegment
1925                 for _, seglist := range trial.input {
1926                         var segments []storedSegment
1927                         for _, segnum := range seglist {
1928                                 segments = append(segments, testSegment(segnum))
1929                         }
1930                         input = append(input, segments)
1931                 }
1932                 s.kc.onWrite = trial.onWrite
1933                 repacked, err := cfs.repackData(context.Background(), input)
1934                 if trial.expectErrorMatches != "" {
1935                         c.Check(err, check.ErrorMatches, trial.expectErrorMatches)
1936                         continue
1937                 }
1938                 c.Assert(err, check.IsNil)
1939                 c.Check(repacked, check.HasLen, trial.expectRepackedLen)
1940                 for _, origSegments := range input {
1941                         replLocator := ""
1942                         for _, origSegment := range origSegments {
1943                                 origBlock := BlockSegment{
1944                                         Locator: stripAllHints(origSegment.locator),
1945                                         Length:  origSegment.length,
1946                                         Offset:  origSegment.offset,
1947                                 }
1948                                 buf := make([]byte, origSegment.size)
1949                                 n, err := cfs.ReadAt(repacked[origBlock].Locator, buf, repacked[origBlock].Offset)
1950                                 c.Assert(err, check.IsNil)
1951                                 c.Check(n, check.Equals, len(buf))
1952                                 expectContent := byte(origSegment.length & 0xff)
1953                                 for segoffset, b := range buf {
1954                                         if b != expectContent {
1955                                                 c.Errorf("content mismatch: origSegment.locator %s -> replLocator %s offset %d: byte %d is %d, expected %d", origSegment.locator, replLocator, repacked[origBlock].Offset, segoffset, b, expectContent)
1956                                                 break
1957                                         }
1958                                 }
1959                         }
1960                 }
1961         }
1962 }
1963
1964 type dataToWrite struct {
1965         path string
1966         data func() []byte
1967 }
1968
1969 func dataToWrite_SourceTree(c *check.C, maxfiles int) (writes []dataToWrite) {
1970         gitdir, err := filepath.Abs("../../..")
1971         c.Assert(err, check.IsNil)
1972         infs := os.DirFS(gitdir)
1973         buf, err := exec.Command("git", "-C", gitdir, "ls-files").CombinedOutput()
1974         c.Assert(err, check.IsNil, check.Commentf("%s", buf))
1975         for _, path := range bytes.Split(buf, []byte("\n")) {
1976                 path := string(path)
1977                 if path == "" ||
1978                         strings.HasPrefix(path, "tools/arvbox/lib/arvbox/docker/service") &&
1979                                 strings.HasSuffix(path, "/run") {
1980                         // dangling symlink
1981                         continue
1982                 }
1983                 fi, err := fs.Stat(infs, path)
1984                 c.Assert(err, check.IsNil)
1985                 if fi.IsDir() || fi.Mode()&os.ModeSymlink != 0 {
1986                         continue
1987                 }
1988                 writes = append(writes, dataToWrite{
1989                         path: path,
1990                         data: func() []byte {
1991                                 data, err := fs.ReadFile(infs, path)
1992                                 c.Assert(err, check.IsNil)
1993                                 return data
1994                         },
1995                 })
1996                 if len(writes) >= maxfiles {
1997                         break
1998                 }
1999         }
2000         return
2001 }
2002
2003 func dataToWrite_ConstantSizeFilesInDirs(c *check.C, ndirs, nfiles, filesize, chunksize int) (writes []dataToWrite) {
2004         for chunk := 0; chunk == 0 || (chunksize > 0 && chunk < (filesize+chunksize-1)/chunksize); chunk++ {
2005                 for i := 0; i < nfiles; i++ {
2006                         datasize := filesize
2007                         if chunksize > 0 {
2008                                 datasize = chunksize
2009                                 if remain := filesize - chunk*chunksize; remain < chunksize {
2010                                         datasize = remain
2011                                 }
2012                         }
2013                         data := make([]byte, datasize)
2014                         copy(data, []byte(fmt.Sprintf("%d chunk %d", i, chunk)))
2015                         writes = append(writes, dataToWrite{
2016                                 path: fmt.Sprintf("dir%d/file%d", i*ndirs/nfiles, i),
2017                                 data: func() []byte { return data },
2018                         })
2019                 }
2020         }
2021         return
2022 }
2023
2024 var enableRepackCharts = os.Getenv("ARVADOS_TEST_REPACK_CHARTS") != ""
2025
2026 func (s *CollectionFSSuite) skipMostRepackCostTests(c *check.C) {
2027         if !enableRepackCharts {
2028                 c.Skip("Set ARVADOS_TEST_REPACK_CHARTS to run more cost tests and generate data for charts like https://dev.arvados.org/issues/22320#note-14")
2029         }
2030 }
2031
2032 func (s *CollectionFSSuite) TestRepackCost_SourceTree_Part(c *check.C) {
2033         s.testRepackCost(c, dataToWrite_SourceTree(c, 500), 40)
2034 }
2035
2036 func (s *CollectionFSSuite) TestRepackCost_SourceTree(c *check.C) {
2037         s.skipMostRepackCostTests(c)
2038         s.testRepackCost(c, dataToWrite_SourceTree(c, 99999), 50)
2039 }
2040
2041 func (s *CollectionFSSuite) TestRepackCost_1000x_1M_Files(c *check.C) {
2042         s.skipMostRepackCostTests(c)
2043         s.testRepackCost(c, dataToWrite_ConstantSizeFilesInDirs(c, 10, 1000, 1000000, 0), 80)
2044 }
2045
2046 func (s *CollectionFSSuite) TestRepackCost_100x_8M_Files(c *check.C) {
2047         s.skipMostRepackCostTests(c)
2048         s.testRepackCost(c, dataToWrite_ConstantSizeFilesInDirs(c, 10, 100, 8000000, 0), 20)
2049 }
2050
2051 func (s *CollectionFSSuite) TestRepackCost_100x_8M_Files_1M_Chunks(c *check.C) {
2052         s.skipMostRepackCostTests(c)
2053         s.testRepackCost(c, dataToWrite_ConstantSizeFilesInDirs(c, 10, 100, 8000000, 1000000), 50)
2054 }
2055
2056 func (s *CollectionFSSuite) TestRepackCost_100x_10M_Files_1M_Chunks(c *check.C) {
2057         s.skipMostRepackCostTests(c)
2058         s.testRepackCost(c, dataToWrite_ConstantSizeFilesInDirs(c, 10, 100, 10000000, 1000000), 80)
2059 }
2060
2061 func (s *CollectionFSSuite) TestRepackCost_100x_10M_Files(c *check.C) {
2062         s.skipMostRepackCostTests(c)
2063         s.testRepackCost(c, dataToWrite_ConstantSizeFilesInDirs(c, 10, 100, 10000000, 0), 100)
2064 }
2065
2066 func (s *CollectionFSSuite) testRepackCost(c *check.C, writes []dataToWrite, maxBlocks int) {
2067         s.kc.blocks = make(map[string][]byte)
2068         testfs, err := (&Collection{}).FileSystem(nil, s.kc)
2069         c.Assert(err, check.IsNil)
2070         cfs := testfs.(*collectionFileSystem)
2071         dirsCreated := make(map[string]bool)
2072         bytesContent := 0
2073         bytesWritten := func() (n int) {
2074                 s.kc.Lock()
2075                 defer s.kc.Unlock()
2076                 for _, data := range s.kc.blocks {
2077                         n += len(data)
2078                 }
2079                 return
2080         }
2081         blocksInManifest := func() int {
2082                 blocks := make(map[string]bool)
2083                 cfs.fileSystem.root.(*dirnode).walkSegments(func(s segment) segment {
2084                         blocks[s.(storedSegment).blockSegment().StripAllHints().Locator] = true
2085                         return s
2086                 })
2087                 return len(blocks)
2088         }
2089         tRepackNoop := time.Duration(0)
2090         nRepackNoop := 0
2091         tRepackTotal := time.Duration(0)
2092         nRepackTotal := 0
2093         filesWritten := make(map[string]bool)
2094         stats := bytes.NewBuffer(nil)
2095         fmt.Fprint(stats, "writes\tfiles\tbytes_in_files\tblocks\tbytes_written_backend\tn_repacked\tn_repack_noop\tseconds_repacking\n")
2096         for writeIndex, write := range writes {
2097                 for i, c := range write.path {
2098                         if c == '/' && !dirsCreated[write.path[:i]] {
2099                                 testfs.Mkdir(write.path[:i], 0700)
2100                                 dirsCreated[write.path[:i]] = true
2101                         }
2102                 }
2103                 f, err := testfs.OpenFile(write.path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
2104                 c.Assert(err, check.IsNil)
2105                 filesWritten[write.path] = true
2106                 data := write.data()
2107                 _, err = f.Write(data)
2108                 c.Assert(err, check.IsNil)
2109                 err = f.Close()
2110                 c.Assert(err, check.IsNil)
2111                 bytesContent += len(data)
2112
2113                 _, err = cfs.MarshalManifest("")
2114                 c.Assert(err, check.IsNil)
2115                 t0 := time.Now()
2116                 n, err := cfs.Repack(context.Background(), RepackOptions{})
2117                 c.Assert(err, check.IsNil)
2118                 tRepack := time.Since(t0)
2119                 tRepackTotal += tRepack
2120                 nRepackTotal++
2121
2122                 if n == 0 {
2123                         tRepackNoop += tRepack
2124                         nRepackNoop++
2125                 } else if bytesWritten()/4 > bytesContent {
2126                         // Rewriting data >4x on average means
2127                         // something is terribly wrong -- give up now
2128                         // instead of going OOM.
2129                         c.Logf("something is terribly wrong -- bytesWritten %d >> bytesContent %d", bytesWritten(), bytesContent)
2130                         c.FailNow()
2131                 }
2132                 fmt.Fprintf(stats, "%d\t%d\t%d\t%d\t%d\t%d\t%d\t%.06f\n", writeIndex+1, len(filesWritten), bytesContent, blocksInManifest(), bytesWritten(), nRepackTotal-nRepackNoop, nRepackNoop, tRepackTotal.Seconds())
2133         }
2134         c.Check(err, check.IsNil)
2135         c.Check(blocksInManifest() <= maxBlocks, check.Equals, true, check.Commentf("expect %d <= %d", blocksInManifest(), maxBlocks))
2136
2137         c.Logf("writes %d files %d bytesContent %d bytesWritten %d bytesRewritten %d blocksInManifest %d", len(writes), len(filesWritten), bytesContent, bytesWritten(), bytesWritten()-bytesContent, blocksInManifest())
2138         c.Logf("spent %v on %d Repack calls, average %v per call", tRepackTotal, nRepackTotal, tRepackTotal/time.Duration(nRepackTotal))
2139         c.Logf("spent %v on %d Repack calls that had no effect, average %v per call", tRepackNoop, nRepackNoop, tRepackNoop/time.Duration(nRepackNoop))
2140
2141         if enableRepackCharts {
2142                 // write stats to tmp/{testname}_stats.tsv
2143                 err = os.Mkdir("tmp", 0777)
2144                 if !os.IsExist(err) {
2145                         c.Check(err, check.IsNil)
2146                 }
2147                 err = os.WriteFile("tmp/"+c.TestName()+"_stats.tsv", stats.Bytes(), 0666)
2148                 c.Check(err, check.IsNil)
2149         }
2150 }
2151
2152 func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
2153         filedata1 := "hello snapshot+splice world\n"
2154         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
2155         c.Assert(err, check.IsNil)
2156         {
2157                 f, err := fs.OpenFile("file1", os.O_CREATE|os.O_RDWR, 0700)
2158                 c.Assert(err, check.IsNil)
2159                 _, err = f.Write([]byte(filedata1))
2160                 c.Assert(err, check.IsNil)
2161                 err = f.Close()
2162                 c.Assert(err, check.IsNil)
2163         }
2164
2165         snap, err := Snapshot(fs, "/")
2166         c.Assert(err, check.IsNil)
2167         err = Splice(fs, "dir1", snap)
2168         c.Assert(err, check.IsNil)
2169         f, err := fs.Open("dir1/file1")
2170         c.Assert(err, check.IsNil)
2171         buf, err := io.ReadAll(f)
2172         c.Assert(err, check.IsNil)
2173         c.Check(string(buf), check.Equals, filedata1)
2174 }
2175
2176 func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
2177         filedata1 := "hello refresh signatures world\n"
2178         fs, err := (&Collection{}).FileSystem(s.client, s.kc)
2179         c.Assert(err, check.IsNil)
2180         fs.Mkdir("d1", 0700)
2181         f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
2182         c.Assert(err, check.IsNil)
2183         _, err = f.Write([]byte(filedata1))
2184         c.Assert(err, check.IsNil)
2185         err = f.Close()
2186         c.Assert(err, check.IsNil)
2187
2188         filedata2 := "hello refresh signatures universe\n"
2189         fs.Mkdir("d2", 0700)
2190         f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
2191         c.Assert(err, check.IsNil)
2192         _, err = f.Write([]byte(filedata2))
2193         c.Assert(err, check.IsNil)
2194         err = f.Close()
2195         c.Assert(err, check.IsNil)
2196         txt, err := fs.MarshalManifest(".")
2197         c.Assert(err, check.IsNil)
2198         var saved Collection
2199         err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
2200                 "select": []string{"manifest_text", "uuid", "portable_data_hash"},
2201                 "collection": map[string]interface{}{
2202                         "manifest_text": txt,
2203                 },
2204         })
2205         c.Assert(err, check.IsNil)
2206
2207         // Update signatures synchronously if they are already expired
2208         // when Read() is called.
2209         {
2210                 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
2211                 fs, err := saved.FileSystem(s.client, s.kc)
2212                 c.Assert(err, check.IsNil)
2213                 f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
2214                 c.Assert(err, check.IsNil)
2215                 buf, err := ioutil.ReadAll(f)
2216                 c.Check(err, check.IsNil)
2217                 c.Check(string(buf), check.Equals, filedata1)
2218         }
2219
2220         // Update signatures asynchronously if we're more than half
2221         // way to TTL when Read() is called.
2222         {
2223                 exp := time.Now().Add(2 * time.Minute)
2224                 saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
2225                 fs, err := saved.FileSystem(s.client, s.kc)
2226                 c.Assert(err, check.IsNil)
2227                 f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
2228                 c.Assert(err, check.IsNil)
2229                 f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
2230                 c.Assert(err, check.IsNil)
2231                 buf, err := ioutil.ReadAll(f1)
2232                 c.Check(err, check.IsNil)
2233                 c.Check(string(buf), check.Equals, filedata1)
2234
2235                 // Ensure fs treats the 2-minute TTL as less than half
2236                 // the server's signing TTL. If we don't do this,
2237                 // collectionfs will guess the signature is fresh,
2238                 // i.e., signing TTL is 2 minutes, and won't do an
2239                 // async refresh.
2240                 fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
2241
2242                 refreshed := false
2243                 for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
2244                         _, err = f1.Seek(0, io.SeekStart)
2245                         c.Assert(err, check.IsNil)
2246                         buf, err = ioutil.ReadAll(f1)
2247                         c.Assert(err, check.IsNil)
2248                         c.Assert(string(buf), check.Equals, filedata1)
2249                         loc := s.kc.reads[len(s.kc.reads)-1]
2250                         t, err := signatureExpiryTime(loc)
2251                         c.Assert(err, check.IsNil)
2252                         c.Logf("last read block %s had signature expiry time %v", loc, t)
2253                         if t.Sub(time.Now()) > time.Hour {
2254                                 refreshed = true
2255                         }
2256                 }
2257                 c.Check(refreshed, check.Equals, true)
2258
2259                 // Second locator should have been updated at the same
2260                 // time.
2261                 buf, err = ioutil.ReadAll(f2)
2262                 c.Assert(err, check.IsNil)
2263                 c.Assert(string(buf), check.Equals, filedata2)
2264                 loc := s.kc.reads[len(s.kc.reads)-1]
2265                 c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
2266                 t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
2267                 c.Assert(err, check.IsNil)
2268                 c.Logf("last read block %s had signature expiry time %v", loc, t)
2269                 c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
2270         }
2271 }
2272
2273 var bigmanifest = func() string {
2274         var buf bytes.Buffer
2275         for i := 0; i < 2000; i++ {
2276                 fmt.Fprintf(&buf, "./dir%d", i)
2277                 for i := 0; i < 100; i++ {
2278                         fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
2279                 }
2280                 for i := 0; i < 2000; i++ {
2281                         fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
2282                 }
2283                 fmt.Fprintf(&buf, "\n")
2284         }
2285         return buf.String()
2286 }()
2287
2288 func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
2289         DebugLocksPanicMode = false
2290         c.Logf("test manifest is %d bytes", len(bigmanifest))
2291         for i := 0; i < c.N; i++ {
2292                 fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
2293                 c.Check(err, check.IsNil)
2294                 c.Check(fs, check.NotNil)
2295         }
2296 }
2297
2298 func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
2299         fn := f.(*filehandle).inode.(*filenode)
2300         var memsize int64
2301         for _, seg := range fn.segments {
2302                 if e, ok := seg.(*memSegment); ok {
2303                         memsize += int64(len(e.buf))
2304                 }
2305         }
2306         c.Check(fn.memsize, check.Equals, memsize)
2307 }
2308
2309 type CollectionFSUnitSuite struct{}
2310
2311 var _ = check.Suite(&CollectionFSUnitSuite{})
2312
2313 // expect ~2 seconds to load a manifest with 256K files
2314 func (s *CollectionFSUnitSuite) TestLargeManifest_ManyFiles(c *check.C) {
2315         if testing.Short() {
2316                 c.Skip("slow")
2317         }
2318         s.testLargeManifest(c, 512, 512, 1, 0)
2319 }
2320
2321 func (s *CollectionFSUnitSuite) TestLargeManifest_LargeFiles(c *check.C) {
2322         if testing.Short() {
2323                 c.Skip("slow")
2324         }
2325         s.testLargeManifest(c, 1, 800, 1000, 0)
2326 }
2327
2328 func (s *CollectionFSUnitSuite) TestLargeManifest_InterleavedFiles(c *check.C) {
2329         if testing.Short() {
2330                 c.Skip("slow")
2331         }
2332         // Timing figures here are from a dev host, (0)->(1)->(2)->(3)
2333         // (0) no optimizations (main branch commit ea697fb1e8)
2334         // (1) resolve streampos->blkidx with binary search
2335         // (2) ...and rewrite PortableDataHash() without regexp
2336         // (3) ...and use fnodeCache in loadManifest
2337         s.testLargeManifest(c, 1, 800, 100, 4<<20) // 127s    -> 12s  -> 2.5s -> 1.5s
2338         s.testLargeManifest(c, 1, 50, 1000, 4<<20) // 44s     -> 10s  -> 1.5s -> 0.8s
2339         s.testLargeManifest(c, 1, 200, 100, 4<<20) // 13s     -> 4s   -> 0.6s -> 0.3s
2340         s.testLargeManifest(c, 1, 200, 150, 4<<20) // 26s     -> 4s   -> 1s   -> 0.5s
2341         s.testLargeManifest(c, 1, 200, 200, 4<<20) // 38s     -> 6s   -> 1.3s -> 0.7s
2342         s.testLargeManifest(c, 1, 200, 225, 4<<20) // 46s     -> 7s   -> 1.5s -> 1s
2343         s.testLargeManifest(c, 1, 400, 400, 4<<20) // 477s    -> 24s  -> 5s   -> 3s
2344         // s.testLargeManifest(c, 1, 800, 1000, 4<<20) // timeout -> 186s -> 28s  -> 17s
2345 }
2346
2347 func (s *CollectionFSUnitSuite) testLargeManifest(c *check.C, dirCount, filesPerDir, blocksPerFile, interleaveChunk int) {
2348         t0 := time.Now()
2349         const blksize = 1 << 26
2350         c.Logf("%s building manifest with dirCount=%d filesPerDir=%d blocksPerFile=%d", time.Now(), dirCount, filesPerDir, blocksPerFile)
2351         mb := bytes.NewBuffer(make([]byte, 0, 40000000))
2352         blkid := 0
2353         for i := 0; i < dirCount; i++ {
2354                 fmt.Fprintf(mb, "./dir%d", i)
2355                 for j := 0; j < filesPerDir; j++ {
2356                         for k := 0; k < blocksPerFile; k++ {
2357                                 blkid++
2358                                 fmt.Fprintf(mb, " %032x+%d+A%040x@%08x", blkid, blksize, blkid, blkid)
2359                         }
2360                 }
2361                 for j := 0; j < filesPerDir; j++ {
2362                         if interleaveChunk == 0 {
2363                                 fmt.Fprintf(mb, " %d:%d:dir%d/file%d", (filesPerDir-j-1)*blocksPerFile*blksize, blocksPerFile*blksize, j, j)
2364                                 continue
2365                         }
2366                         for todo := int64(blocksPerFile) * int64(blksize); todo > 0; todo -= int64(interleaveChunk) {
2367                                 size := int64(interleaveChunk)
2368                                 if size > todo {
2369                                         size = todo
2370                                 }
2371                                 offset := rand.Int63n(int64(blocksPerFile)*int64(blksize)*int64(filesPerDir) - size)
2372                                 fmt.Fprintf(mb, " %d:%d:dir%d/file%d", offset, size, j, j)
2373                         }
2374                 }
2375                 mb.Write([]byte{'\n'})
2376         }
2377         coll := Collection{ManifestText: mb.String()}
2378         c.Logf("%s built manifest size=%d", time.Now(), mb.Len())
2379
2380         var memstats runtime.MemStats
2381         runtime.ReadMemStats(&memstats)
2382         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
2383
2384         f, err := coll.FileSystem(NewClientFromEnv(), &keepClientStub{})
2385         c.Check(err, check.IsNil)
2386         c.Logf("%s loaded", time.Now())
2387         c.Check(f.Size(), check.Equals, int64(dirCount*filesPerDir*blocksPerFile*blksize))
2388
2389         // Stat() and OpenFile() each file. This mimics the behavior
2390         // of webdav propfind, which opens each file even when just
2391         // listing directory entries.
2392         for i := 0; i < dirCount; i++ {
2393                 for j := 0; j < filesPerDir; j++ {
2394                         fnm := fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j)
2395                         fi, err := f.Stat(fnm)
2396                         c.Assert(err, check.IsNil)
2397                         c.Check(fi.IsDir(), check.Equals, false)
2398                         f, err := f.OpenFile(fnm, os.O_RDONLY, 0)
2399                         c.Assert(err, check.IsNil)
2400                         f.Close()
2401                 }
2402         }
2403         c.Logf("%s OpenFile() x %d", time.Now(), dirCount*filesPerDir)
2404
2405         runtime.ReadMemStats(&memstats)
2406         c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
2407         c.Logf("%s MemorySize=%d", time.Now(), f.MemorySize())
2408         c.Logf("%s ... test duration %s", time.Now(), time.Now().Sub(t0))
2409 }
2410
2411 // Gocheck boilerplate
2412 func Test(t *testing.T) {
2413         check.TestingT(t)
2414 }